4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
55 class CancelJob(Exception):
56 """Special exception to cancel a job.
62 """Returns the current timestamp.
65 @return: the current time in the (seconds, microseconds) format
68 return utils.SplitTime(time.time())
71 class _QueuedOpCode(object):
72 """Encapsulates an opcode object.
74 @ivar log: holds the execution log and consists of tuples
75 of the form C{(log_serial, timestamp, level, message)}
76 @ivar input: the OpCode we encapsulate
77 @ivar status: the current status
78 @ivar result: the result of the LU execution
79 @ivar start_timestamp: timestamp for the start of the execution
80 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
81 @ivar stop_timestamp: timestamp for the end of the execution
84 __slots__ = ["input", "status", "result", "log",
85 "start_timestamp", "exec_timestamp", "end_timestamp",
88 def __init__(self, op):
89 """Constructor for the _QuededOpCode.
91 @type op: L{opcodes.OpCode}
92 @param op: the opcode we encapsulate
96 self.status = constants.OP_STATUS_QUEUED
99 self.start_timestamp = None
100 self.exec_timestamp = None
101 self.end_timestamp = None
104 def Restore(cls, state):
105 """Restore the _QueuedOpCode from the serialized form.
108 @param state: the serialized state
109 @rtype: _QueuedOpCode
110 @return: a new _QueuedOpCode instance
113 obj = _QueuedOpCode.__new__(cls)
114 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
115 obj.status = state["status"]
116 obj.result = state["result"]
117 obj.log = state["log"]
118 obj.start_timestamp = state.get("start_timestamp", None)
119 obj.exec_timestamp = state.get("exec_timestamp", None)
120 obj.end_timestamp = state.get("end_timestamp", None)
124 """Serializes this _QueuedOpCode.
127 @return: the dictionary holding the serialized state
131 "input": self.input.__getstate__(),
132 "status": self.status,
133 "result": self.result,
135 "start_timestamp": self.start_timestamp,
136 "exec_timestamp": self.exec_timestamp,
137 "end_timestamp": self.end_timestamp,
141 class _QueuedJob(object):
142 """In-memory job representation.
144 This is what we use to track the user-submitted jobs. Locking must
145 be taken care of by users of this class.
147 @type queue: L{JobQueue}
148 @ivar queue: the parent queue
151 @ivar ops: the list of _QueuedOpCode that constitute the job
152 @type log_serial: int
153 @ivar log_serial: holds the index for the next log entry
154 @ivar received_timestamp: the timestamp for when the job was received
155 @ivar start_timestmap: the timestamp for start of execution
156 @ivar end_timestamp: the timestamp for end of execution
157 @ivar lock_status: In-memory locking information for debugging
158 @ivar change: a Condition variable we use for waiting for job changes
161 # pylint: disable-msg=W0212
162 __slots__ = ["queue", "id", "ops", "log_serial",
163 "received_timestamp", "start_timestamp", "end_timestamp",
164 "lock_status", "change",
167 def __init__(self, queue, job_id, ops):
168 """Constructor for the _QueuedJob.
170 @type queue: L{JobQueue}
171 @param queue: our parent queue
173 @param job_id: our job id
175 @param ops: the list of opcodes we hold, which will be encapsulated
180 raise errors.GenericError("A job needs at least one opcode")
184 self.ops = [_QueuedOpCode(op) for op in ops]
186 self.received_timestamp = TimeStampNow()
187 self.start_timestamp = None
188 self.end_timestamp = None
190 # In-memory attributes
191 self.lock_status = None
193 # Condition to wait for changes
194 self.change = threading.Condition(self.queue._lock)
197 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
199 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
201 return "<%s at %#x>" % (" ".join(status), id(self))
204 def Restore(cls, queue, state):
205 """Restore a _QueuedJob from serialized state:
207 @type queue: L{JobQueue}
208 @param queue: to which queue the restored job belongs
210 @param state: the serialized state
212 @return: the restored _JobQueue instance
215 obj = _QueuedJob.__new__(cls)
218 obj.received_timestamp = state.get("received_timestamp", None)
219 obj.start_timestamp = state.get("start_timestamp", None)
220 obj.end_timestamp = state.get("end_timestamp", None)
222 # In-memory attributes
223 obj.lock_status = None
227 for op_state in state["ops"]:
228 op = _QueuedOpCode.Restore(op_state)
229 for log_entry in op.log:
230 obj.log_serial = max(obj.log_serial, log_entry[0])
233 # Condition to wait for changes
234 obj.change = threading.Condition(obj.queue._lock)
239 """Serialize the _JobQueue instance.
242 @return: the serialized state
247 "ops": [op.Serialize() for op in self.ops],
248 "start_timestamp": self.start_timestamp,
249 "end_timestamp": self.end_timestamp,
250 "received_timestamp": self.received_timestamp,
253 def CalcStatus(self):
254 """Compute the status of this job.
256 This function iterates over all the _QueuedOpCodes in the job and
257 based on their status, computes the job status.
260 - if we find a cancelled, or finished with error, the job
261 status will be the same
262 - otherwise, the last opcode with the status one of:
267 will determine the job status
269 - otherwise, it means either all opcodes are queued, or success,
270 and the job status will be the same
272 @return: the job status
275 status = constants.JOB_STATUS_QUEUED
279 if op.status == constants.OP_STATUS_SUCCESS:
284 if op.status == constants.OP_STATUS_QUEUED:
286 elif op.status == constants.OP_STATUS_WAITLOCK:
287 status = constants.JOB_STATUS_WAITLOCK
288 elif op.status == constants.OP_STATUS_RUNNING:
289 status = constants.JOB_STATUS_RUNNING
290 elif op.status == constants.OP_STATUS_CANCELING:
291 status = constants.JOB_STATUS_CANCELING
293 elif op.status == constants.OP_STATUS_ERROR:
294 status = constants.JOB_STATUS_ERROR
295 # The whole job fails if one opcode failed
297 elif op.status == constants.OP_STATUS_CANCELED:
298 status = constants.OP_STATUS_CANCELED
302 status = constants.JOB_STATUS_SUCCESS
306 def GetLogEntries(self, newer_than):
307 """Selectively returns the log entries.
309 @type newer_than: None or int
310 @param newer_than: if this is None, return all log entries,
311 otherwise return only the log entries with serial higher
314 @return: the list of the log entries selected
317 if newer_than is None:
324 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
328 def GetInfo(self, fields):
329 """Returns information about a job.
332 @param fields: names of fields to return
334 @return: list with one element for each field
335 @raise errors.OpExecError: when an invalid field
343 elif fname == "status":
344 row.append(self.CalcStatus())
346 row.append([op.input.__getstate__() for op in self.ops])
347 elif fname == "opresult":
348 row.append([op.result for op in self.ops])
349 elif fname == "opstatus":
350 row.append([op.status for op in self.ops])
351 elif fname == "oplog":
352 row.append([op.log for op in self.ops])
353 elif fname == "opstart":
354 row.append([op.start_timestamp for op in self.ops])
355 elif fname == "opexec":
356 row.append([op.exec_timestamp for op in self.ops])
357 elif fname == "opend":
358 row.append([op.end_timestamp for op in self.ops])
359 elif fname == "received_ts":
360 row.append(self.received_timestamp)
361 elif fname == "start_ts":
362 row.append(self.start_timestamp)
363 elif fname == "end_ts":
364 row.append(self.end_timestamp)
365 elif fname == "lock_status":
366 row.append(self.lock_status)
367 elif fname == "summary":
368 row.append([op.input.Summary() for op in self.ops])
370 raise errors.OpExecError("Invalid self query field '%s'" % fname)
373 def MarkUnfinishedOps(self, status, result):
374 """Mark unfinished opcodes with a given status and result.
376 This is an utility function for marking all running or waiting to
377 be run opcodes with a given status. Opcodes which are already
378 finalised are not changed.
380 @param status: a given opcode status
381 @param result: the opcode result
386 if op.status in constants.OPS_FINALIZED:
387 assert not_marked, "Finalized opcodes found after non-finalized ones"
394 class _OpExecCallbacks(mcpu.OpExecCbBase):
395 def __init__(self, queue, job, op):
396 """Initializes this class.
398 @type queue: L{JobQueue}
399 @param queue: Job queue
400 @type job: L{_QueuedJob}
401 @param job: Job object
402 @type op: L{_QueuedOpCode}
406 assert queue, "Queue is missing"
407 assert job, "Job is missing"
408 assert op, "Opcode is missing"
414 def NotifyStart(self):
415 """Mark the opcode as running, not lock-waiting.
417 This is called from the mcpu code as a notifier function, when the LU is
418 finally about to start the Exec() method. Of course, to have end-user
419 visible results, the opcode must be initially (before calling into
420 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
423 self._queue.acquire()
425 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
426 constants.OP_STATUS_CANCELING)
428 # All locks are acquired by now
429 self._job.lock_status = None
431 # Cancel here if we were asked to
432 if self._op.status == constants.OP_STATUS_CANCELING:
435 self._op.status = constants.OP_STATUS_RUNNING
436 self._op.exec_timestamp = TimeStampNow()
438 self._queue.release()
440 def Feedback(self, *args):
441 """Append a log entry.
447 log_type = constants.ELOG_MESSAGE
450 (log_type, log_msg) = args
452 # The time is split to make serialization easier and not lose
454 timestamp = utils.SplitTime(time.time())
456 self._queue.acquire()
458 self._job.log_serial += 1
459 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
461 self._job.change.notifyAll()
463 self._queue.release()
465 def ReportLocks(self, msg):
466 """Write locking information to the job.
468 Called whenever the LU processor is waiting for a lock or has acquired one.
471 # Not getting the queue lock because this is a single assignment
472 self._job.lock_status = msg
475 class _JobQueueWorker(workerpool.BaseWorker):
476 """The actual job workers.
479 def RunTask(self, job): # pylint: disable-msg=W0221
482 This functions processes a job. It is closely tied to the _QueuedJob and
483 _QueuedOpCode classes.
485 @type job: L{_QueuedJob}
486 @param job: the job to be processed
489 logging.info("Processing job %s", job.id)
490 proc = mcpu.Processor(self.pool.queue.context, job.id)
495 for idx, op in enumerate(job.ops):
496 op_summary = op.input.Summary()
497 if op.status == constants.OP_STATUS_SUCCESS:
498 # this is a job that was partially completed before master
499 # daemon shutdown, so it can be expected that some opcodes
500 # are already completed successfully (if any did error
501 # out, then the whole job should have been aborted and not
502 # resubmitted for processing)
503 logging.info("Op %s/%s: opcode %s already processed, skipping",
504 idx + 1, count, op_summary)
507 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
512 if op.status == constants.OP_STATUS_CANCELED:
514 assert op.status == constants.OP_STATUS_QUEUED
515 op.status = constants.OP_STATUS_WAITLOCK
517 op.start_timestamp = TimeStampNow()
518 if idx == 0: # first opcode
519 job.start_timestamp = op.start_timestamp
520 queue.UpdateJobUnlocked(job)
522 input_opcode = op.input
526 # Make sure not to hold queue lock while calling ExecOpCode
527 result = proc.ExecOpCode(input_opcode,
528 _OpExecCallbacks(queue, job, op))
532 op.status = constants.OP_STATUS_SUCCESS
534 op.end_timestamp = TimeStampNow()
535 queue.UpdateJobUnlocked(job)
539 logging.info("Op %s/%s: Successfully finished opcode %s",
540 idx + 1, count, op_summary)
542 # Will be handled further up
544 except Exception, err:
548 op.status = constants.OP_STATUS_ERROR
549 if isinstance(err, errors.GenericError):
550 op.result = errors.EncodeException(err)
553 op.end_timestamp = TimeStampNow()
554 logging.info("Op %s/%s: Error in opcode %s: %s",
555 idx + 1, count, op_summary, err)
557 queue.UpdateJobUnlocked(job)
565 queue.CancelJobUnlocked(job)
568 except errors.GenericError, err:
569 logging.exception("Ganeti exception")
571 logging.exception("Unhandled exception")
576 job.lock_status = None
577 job.end_timestamp = TimeStampNow()
578 queue.UpdateJobUnlocked(job)
581 status = job.CalcStatus()
585 logging.info("Finished job %s, status = %s", job_id, status)
588 class _JobQueueWorkerPool(workerpool.WorkerPool):
589 """Simple class implementing a job-processing workerpool.
592 def __init__(self, queue):
593 super(_JobQueueWorkerPool, self).__init__("JobQueue",
599 def _RequireOpenQueue(fn):
600 """Decorator for "public" functions.
602 This function should be used for all 'public' functions. That is,
603 functions usually called from other classes. Note that this should
604 be applied only to methods (not plain functions), since it expects
605 that the decorated function is called with a first argument that has
606 a '_queue_filelock' argument.
608 @warning: Use this decorator only after utils.LockedMethod!
617 def wrapper(self, *args, **kwargs):
618 # pylint: disable-msg=W0212
619 assert self._queue_filelock is not None, "Queue should be open"
620 return fn(self, *args, **kwargs)
624 class JobQueue(object):
625 """Queue used to manage the jobs.
627 @cvar _RE_JOB_FILE: regex matching the valid job file names
630 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
632 def __init__(self, context):
633 """Constructor for JobQueue.
635 The constructor will initialize the job queue object and then
636 start loading the current jobs from disk, either for starting them
637 (if they were queue) or for aborting them (if they were already
640 @type context: GanetiContext
641 @param context: the context object for access to the configuration
642 data and other ganeti objects
645 self.context = context
646 self._memcache = weakref.WeakValueDictionary()
647 self._my_hostname = utils.HostInfo().name
650 self._lock = threading.Lock()
651 self.acquire = self._lock.acquire
652 self.release = self._lock.release
654 # Initialize the queue, and acquire the filelock.
655 # This ensures no other process is working on the job queue.
656 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
659 self._last_serial = jstore.ReadSerial()
660 assert self._last_serial is not None, ("Serial file was modified between"
661 " check in jstore and here")
663 # Get initial list of nodes
664 self._nodes = dict((n.name, n.primary_ip)
665 for n in self.context.cfg.GetAllNodesInfo().values()
666 if n.master_candidate)
669 self._nodes.pop(self._my_hostname, None)
671 # TODO: Check consistency across nodes
674 self._UpdateQueueSizeUnlocked()
675 self._drained = self._IsQueueMarkedDrain()
678 self._wpool = _JobQueueWorkerPool(self)
680 # We need to lock here because WorkerPool.AddTask() may start a job while
681 # we're still doing our work.
684 logging.info("Inspecting job queue")
686 all_job_ids = self._GetJobIDsUnlocked()
687 jobs_count = len(all_job_ids)
688 lastinfo = time.time()
689 for idx, job_id in enumerate(all_job_ids):
690 # Give an update every 1000 jobs or 10 seconds
691 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
692 idx == (jobs_count - 1)):
693 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
694 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
695 lastinfo = time.time()
697 job = self._LoadJobUnlocked(job_id)
699 # a failure in loading the job can cause 'None' to be returned
703 status = job.CalcStatus()
705 if status in (constants.JOB_STATUS_QUEUED, ):
706 self._wpool.AddTask(job)
708 elif status in (constants.JOB_STATUS_RUNNING,
709 constants.JOB_STATUS_WAITLOCK,
710 constants.JOB_STATUS_CANCELING):
711 logging.warning("Unfinished job %s found: %s", job.id, job)
713 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
714 "Unclean master daemon shutdown")
716 self.UpdateJobUnlocked(job)
718 logging.info("Job queue inspection finished")
722 self._wpool.TerminateWorkers()
727 def AddNode(self, node):
728 """Register a new node with the queue.
730 @type node: L{objects.Node}
731 @param node: the node object to be added
734 node_name = node.name
735 assert node_name != self._my_hostname
737 # Clean queue directory on added node
738 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
739 msg = result.fail_msg
741 logging.warning("Cannot cleanup queue directory on node %s: %s",
744 if not node.master_candidate:
745 # remove if existing, ignoring errors
746 self._nodes.pop(node_name, None)
747 # and skip the replication of the job ids
750 # Upload the whole queue excluding archived jobs
751 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
753 # Upload current serial file
754 files.append(constants.JOB_QUEUE_SERIAL_FILE)
756 for file_name in files:
758 content = utils.ReadFile(file_name)
760 result = rpc.RpcRunner.call_jobqueue_update([node_name],
763 msg = result[node_name].fail_msg
765 logging.error("Failed to upload file %s to node %s: %s",
766 file_name, node_name, msg)
768 self._nodes[node_name] = node.primary_ip
772 def RemoveNode(self, node_name):
773 """Callback called when removing nodes from the cluster.
776 @param node_name: the name of the node to remove
779 self._nodes.pop(node_name, None)
782 def _CheckRpcResult(result, nodes, failmsg):
783 """Verifies the status of an RPC call.
785 Since we aim to keep consistency should this node (the current
786 master) fail, we will log errors if our rpc fail, and especially
787 log the case when more than half of the nodes fails.
789 @param result: the data as returned from the rpc call
791 @param nodes: the list of nodes we made the call to
793 @param failmsg: the identifier to be used for logging
800 msg = result[node].fail_msg
803 logging.error("RPC call %s (%s) failed on node %s: %s",
804 result[node].call, failmsg, node, msg)
808 # +1 for the master node
809 if (len(success) + 1) < len(failed):
810 # TODO: Handle failing nodes
811 logging.error("More than half of the nodes failed")
813 def _GetNodeIp(self):
814 """Helper for returning the node name/ip list.
817 @return: a tuple of two lists, the first one with the node
818 names and the second one with the node addresses
821 name_list = self._nodes.keys()
822 addr_list = [self._nodes[name] for name in name_list]
823 return name_list, addr_list
825 def _UpdateJobQueueFile(self, file_name, data, replicate):
826 """Writes a file locally and then replicates it to all nodes.
828 This function will replace the contents of a file on the local
829 node and then replicate it to all the other nodes we have.
832 @param file_name: the path of the file to be replicated
834 @param data: the new contents of the file
835 @type replicate: boolean
836 @param replicate: whether to spread the changes to the remote nodes
839 utils.WriteFile(file_name, data=data)
842 names, addrs = self._GetNodeIp()
843 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
844 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
846 def _RenameFilesUnlocked(self, rename):
847 """Renames a file locally and then replicate the change.
849 This function will rename a file in the local queue directory
850 and then replicate this rename to all the other nodes we have.
852 @type rename: list of (old, new)
853 @param rename: List containing tuples mapping old to new names
856 # Rename them locally
857 for old, new in rename:
858 utils.RenameFile(old, new, mkdir=True)
860 # ... and on all nodes
861 names, addrs = self._GetNodeIp()
862 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
863 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
866 def _FormatJobID(job_id):
867 """Convert a job ID to string format.
869 Currently this just does C{str(job_id)} after performing some
870 checks, but if we want to change the job id format this will
871 abstract this change.
873 @type job_id: int or long
874 @param job_id: the numeric job id
876 @return: the formatted job id
879 if not isinstance(job_id, (int, long)):
880 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
882 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
887 def _GetArchiveDirectory(cls, job_id):
888 """Returns the archive directory for a job.
891 @param job_id: Job identifier
893 @return: Directory name
896 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
898 def _NewSerialsUnlocked(self, count):
899 """Generates a new job identifier.
901 Job identifiers are unique during the lifetime of a cluster.
904 @param count: how many serials to return
906 @return: a string representing the job identifier.
911 serial = self._last_serial + count
914 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
915 "%s\n" % serial, True)
917 result = [self._FormatJobID(v)
918 for v in range(self._last_serial, serial + 1)]
919 # Keep it only if we were able to write the file
920 self._last_serial = serial
925 def _GetJobPath(job_id):
926 """Returns the job file for a given job id.
929 @param job_id: the job identifier
931 @return: the path to the job file
934 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
937 def _GetArchivedJobPath(cls, job_id):
938 """Returns the archived job file for a give job id.
941 @param job_id: the job identifier
943 @return: the path to the archived job file
946 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
947 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
949 def _GetJobIDsUnlocked(self, sort=True):
950 """Return all known job IDs.
952 The method only looks at disk because it's a requirement that all
953 jobs are present on disk (so in the _memcache we don't have any
957 @param sort: perform sorting on the returned job ids
959 @return: the list of job IDs
963 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
964 m = self._RE_JOB_FILE.match(filename)
966 jlist.append(m.group(1))
968 jlist = utils.NiceSort(jlist)
971 def _LoadJobUnlocked(self, job_id):
972 """Loads a job from the disk or memory.
974 Given a job id, this will return the cached job object if
975 existing, or try to load the job from the disk. If loading from
976 disk, it will also add the job to the cache.
978 @param job_id: the job id
979 @rtype: L{_QueuedJob} or None
980 @return: either None or the job object
983 job = self._memcache.get(job_id, None)
985 logging.debug("Found job %s in memcache", job_id)
988 job = self._LoadJobFromDisk(job_id)
990 self._memcache[job_id] = job
991 logging.debug("Added job %s to the cache", job_id)
994 def _LoadJobFromDisk(self, job_id):
995 """Load the given job file from disk.
997 Given a job file, read, load and restore it in a _QueuedJob format.
1000 @param job_id: job identifier
1001 @rtype: L{_QueuedJob} or None
1002 @return: either None or the job object
1005 filepath = self._GetJobPath(job_id)
1006 logging.debug("Loading job from %s", filepath)
1008 raw_data = utils.ReadFile(filepath)
1009 except EnvironmentError, err:
1010 if err.errno in (errno.ENOENT, ):
1015 data = serializer.LoadJson(raw_data)
1016 job = _QueuedJob.Restore(self, data)
1017 except Exception, err: # pylint: disable-msg=W0703
1018 new_path = self._GetArchivedJobPath(job_id)
1019 if filepath == new_path:
1020 # job already archived (future case)
1021 logging.exception("Can't parse job %s", job_id)
1024 logging.exception("Can't parse job %s, will archive.", job_id)
1025 self._RenameFilesUnlocked([(filepath, new_path)])
1030 def _GetJobsUnlocked(self, job_ids):
1031 """Return a list of jobs based on their IDs.
1034 @param job_ids: either an empty list (meaning all jobs),
1035 or a list of job IDs
1037 @return: the list of job objects
1041 job_ids = self._GetJobIDsUnlocked()
1043 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1046 def _IsQueueMarkedDrain():
1047 """Check if the queue is marked from drain.
1049 This currently uses the queue drain file, which makes it a
1050 per-node flag. In the future this can be moved to the config file.
1053 @return: True of the job queue is marked for draining
1056 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1058 def _UpdateQueueSizeUnlocked(self):
1059 """Update the queue size.
1062 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1066 def SetDrainFlag(self, drain_flag):
1067 """Sets the drain flag for the queue.
1069 @type drain_flag: boolean
1070 @param drain_flag: Whether to set or unset the drain flag
1074 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1076 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1078 self._drained = drain_flag
1083 def _SubmitJobUnlocked(self, job_id, ops):
1084 """Create and store a new job.
1086 This enters the job into our job queue and also puts it on the new
1087 queue, in order for it to be picked up by the queue processors.
1089 @type job_id: job ID
1090 @param job_id: the job ID for the new job
1092 @param ops: The list of OpCodes that will become the new job.
1094 @return: the job ID of the newly created job
1095 @raise errors.JobQueueDrainError: if the job is marked for draining
1098 # Ok when sharing the big job queue lock, as the drain file is created when
1099 # the lock is exclusive.
1101 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1103 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1104 raise errors.JobQueueFull()
1106 job = _QueuedJob(self, job_id, ops)
1109 self.UpdateJobUnlocked(job)
1111 self._queue_size += 1
1113 logging.debug("Adding new job %s to the cache", job_id)
1114 self._memcache[job_id] = job
1116 # Add to worker pool
1117 self._wpool.AddTask(job)
1123 def SubmitJob(self, ops):
1124 """Create and store a new job.
1126 @see: L{_SubmitJobUnlocked}
1129 job_id = self._NewSerialsUnlocked(1)[0]
1130 return self._SubmitJobUnlocked(job_id, ops)
1134 def SubmitManyJobs(self, jobs):
1135 """Create and store multiple jobs.
1137 @see: L{_SubmitJobUnlocked}
1141 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1142 for job_id, ops in zip(all_job_ids, jobs):
1144 data = self._SubmitJobUnlocked(job_id, ops)
1146 except errors.GenericError, err:
1149 results.append((status, data))
1154 def UpdateJobUnlocked(self, job, replicate=True):
1155 """Update a job's on disk storage.
1157 After a job has been modified, this function needs to be called in
1158 order to write the changes to disk and replicate them to the other
1161 @type job: L{_QueuedJob}
1162 @param job: the changed job
1163 @type replicate: boolean
1164 @param replicate: whether to replicate the change to remote nodes
1167 filename = self._GetJobPath(job.id)
1168 data = serializer.DumpJson(job.Serialize(), indent=False)
1169 logging.debug("Writing job %s to %s", job.id, filename)
1170 self._UpdateJobQueueFile(filename, data, replicate)
1172 # Notify waiters about potential changes
1173 job.change.notifyAll()
1177 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1179 """Waits for changes in a job.
1181 @type job_id: string
1182 @param job_id: Job identifier
1183 @type fields: list of strings
1184 @param fields: Which fields to check for changes
1185 @type prev_job_info: list or None
1186 @param prev_job_info: Last job information returned
1187 @type prev_log_serial: int
1188 @param prev_log_serial: Last job message serial number
1189 @type timeout: float
1190 @param timeout: maximum time to wait
1191 @rtype: tuple (job info, log entries)
1192 @return: a tuple of the job information as required via
1193 the fields parameter, and the log entries as a list
1195 if the job has not changed and the timeout has expired,
1196 we instead return a special value,
1197 L{constants.JOB_NOTCHANGED}, which should be interpreted
1198 as such by the clients
1201 job = self._LoadJobUnlocked(job_id)
1203 logging.debug("Job %s not found", job_id)
1206 def _CheckForChanges():
1207 logging.debug("Waiting for changes in job %s", job_id)
1209 status = job.CalcStatus()
1210 job_info = job.GetInfo(fields)
1211 log_entries = job.GetLogEntries(prev_log_serial)
1213 # Serializing and deserializing data can cause type changes (e.g. from
1214 # tuple to list) or precision loss. We're doing it here so that we get
1215 # the same modifications as the data received from the client. Without
1216 # this, the comparison afterwards might fail without the data being
1217 # significantly different.
1218 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1219 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1221 # Don't even try to wait if the job is no longer running, there will be
1223 if (status not in (constants.JOB_STATUS_QUEUED,
1224 constants.JOB_STATUS_RUNNING,
1225 constants.JOB_STATUS_WAITLOCK) or
1226 prev_job_info != job_info or
1227 (log_entries and prev_log_serial != log_entries[0][0])):
1228 logging.debug("Job %s changed", job_id)
1229 return (job_info, log_entries)
1231 raise utils.RetryAgain()
1234 # Setting wait function to release the queue lock while waiting
1235 return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1236 wait_fn=job.change.wait)
1237 except utils.RetryTimeout:
1238 return constants.JOB_NOTCHANGED
1242 def CancelJob(self, job_id):
1245 This will only succeed if the job has not started yet.
1247 @type job_id: string
1248 @param job_id: job ID of job to be cancelled.
1251 logging.info("Cancelling job %s", job_id)
1253 job = self._LoadJobUnlocked(job_id)
1255 logging.debug("Job %s not found", job_id)
1256 return (False, "Job %s not found" % job_id)
1258 job_status = job.CalcStatus()
1260 if job_status not in (constants.JOB_STATUS_QUEUED,
1261 constants.JOB_STATUS_WAITLOCK):
1262 logging.debug("Job %s is no longer waiting in the queue", job.id)
1263 return (False, "Job %s is no longer waiting in the queue" % job.id)
1265 if job_status == constants.JOB_STATUS_QUEUED:
1266 self.CancelJobUnlocked(job)
1267 return (True, "Job %s canceled" % job.id)
1269 elif job_status == constants.JOB_STATUS_WAITLOCK:
1270 # The worker will notice the new status and cancel the job
1272 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1274 self.UpdateJobUnlocked(job)
1275 return (True, "Job %s will be canceled" % job.id)
1278 def CancelJobUnlocked(self, job):
1279 """Marks a job as canceled.
1283 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1284 "Job canceled by request")
1286 self.UpdateJobUnlocked(job)
1289 def _ArchiveJobsUnlocked(self, jobs):
1292 @type jobs: list of L{_QueuedJob}
1293 @param jobs: Job objects
1295 @return: Number of archived jobs
1301 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1302 constants.JOB_STATUS_SUCCESS,
1303 constants.JOB_STATUS_ERROR):
1304 logging.debug("Job %s is not yet done", job.id)
1307 archive_jobs.append(job)
1309 old = self._GetJobPath(job.id)
1310 new = self._GetArchivedJobPath(job.id)
1311 rename_files.append((old, new))
1313 # TODO: What if 1..n files fail to rename?
1314 self._RenameFilesUnlocked(rename_files)
1316 logging.debug("Successfully archived job(s) %s",
1317 utils.CommaJoin(job.id for job in archive_jobs))
1319 # Since we haven't quite checked, above, if we succeeded or failed renaming
1320 # the files, we update the cached queue size from the filesystem. When we
1321 # get around to fix the TODO: above, we can use the number of actually
1322 # archived jobs to fix this.
1323 self._UpdateQueueSizeUnlocked()
1324 return len(archive_jobs)
1328 def ArchiveJob(self, job_id):
1331 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1333 @type job_id: string
1334 @param job_id: Job ID of job to be archived.
1336 @return: Whether job was archived
1339 logging.info("Archiving job %s", job_id)
1341 job = self._LoadJobUnlocked(job_id)
1343 logging.debug("Job %s not found", job_id)
1346 return self._ArchiveJobsUnlocked([job]) == 1
1350 def AutoArchiveJobs(self, age, timeout):
1351 """Archives all jobs based on age.
1353 The method will archive all jobs which are older than the age
1354 parameter. For jobs that don't have an end timestamp, the start
1355 timestamp will be considered. The special '-1' age will cause
1356 archival of all jobs (that are not running or queued).
1359 @param age: the minimum age in seconds
1362 logging.info("Archiving jobs with age more than %s seconds", age)
1365 end_time = now + timeout
1369 all_job_ids = self._GetJobIDsUnlocked()
1371 for idx, job_id in enumerate(all_job_ids):
1372 last_touched = idx + 1
1374 # Not optimal because jobs could be pending
1375 # TODO: Measure average duration for job archival and take number of
1376 # pending jobs into account.
1377 if time.time() > end_time:
1380 # Returns None if the job failed to load
1381 job = self._LoadJobUnlocked(job_id)
1383 if job.end_timestamp is None:
1384 if job.start_timestamp is None:
1385 job_age = job.received_timestamp
1387 job_age = job.start_timestamp
1389 job_age = job.end_timestamp
1391 if age == -1 or now - job_age[0] > age:
1394 # Archive 10 jobs at a time
1395 if len(pending) >= 10:
1396 archived_count += self._ArchiveJobsUnlocked(pending)
1400 archived_count += self._ArchiveJobsUnlocked(pending)
1402 return (archived_count, len(all_job_ids) - last_touched)
1406 def QueryJobs(self, job_ids, fields):
1407 """Returns a list of jobs in queue.
1409 This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1410 processing for each job.
1413 @param job_ids: sequence of job identifiers or None for all
1415 @param fields: names of fields to return
1417 @return: list one element per job, each element being list with
1418 the requested fields
1423 for job in self._GetJobsUnlocked(job_ids):
1427 jobs.append(job.GetInfo(fields))
1434 """Stops the job queue.
1436 This shutdowns all the worker threads an closes the queue.
1439 self._wpool.TerminateWorkers()
1441 self._queue_filelock.Close()
1442 self._queue_filelock = None