4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
55 class CancelJob(Exception):
56 """Special exception to cancel a job.
62 """Returns the current timestamp.
65 @return: the current time in the (seconds, microseconds) format
68 return utils.SplitTime(time.time())
71 class _QueuedOpCode(object):
72 """Encapsulates an opcode object.
74 @ivar log: holds the execution log and consists of tuples
75 of the form C{(log_serial, timestamp, level, message)}
76 @ivar input: the OpCode we encapsulate
77 @ivar status: the current status
78 @ivar result: the result of the LU execution
79 @ivar start_timestamp: timestamp for the start of the execution
80 @ivar stop_timestamp: timestamp for the end of the execution
83 __slots__ = ["input", "status", "result", "log",
84 "start_timestamp", "end_timestamp",
87 def __init__(self, op):
88 """Constructor for the _QuededOpCode.
90 @type op: L{opcodes.OpCode}
91 @param op: the opcode we encapsulate
95 self.status = constants.OP_STATUS_QUEUED
98 self.start_timestamp = None
99 self.end_timestamp = None
102 def Restore(cls, state):
103 """Restore the _QueuedOpCode from the serialized form.
106 @param state: the serialized state
107 @rtype: _QueuedOpCode
108 @return: a new _QueuedOpCode instance
111 obj = _QueuedOpCode.__new__(cls)
112 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113 obj.status = state["status"]
114 obj.result = state["result"]
115 obj.log = state["log"]
116 obj.start_timestamp = state.get("start_timestamp", None)
117 obj.end_timestamp = state.get("end_timestamp", None)
121 """Serializes this _QueuedOpCode.
124 @return: the dictionary holding the serialized state
128 "input": self.input.__getstate__(),
129 "status": self.status,
130 "result": self.result,
132 "start_timestamp": self.start_timestamp,
133 "end_timestamp": self.end_timestamp,
137 class _QueuedJob(object):
138 """In-memory job representation.
140 This is what we use to track the user-submitted jobs. Locking must
141 be taken care of by users of this class.
143 @type queue: L{JobQueue}
144 @ivar queue: the parent queue
147 @ivar ops: the list of _QueuedOpCode that constitute the job
148 @type log_serial: int
149 @ivar log_serial: holds the index for the next log entry
150 @ivar received_timestamp: the timestamp for when the job was received
151 @ivar start_timestmap: the timestamp for start of execution
152 @ivar end_timestamp: the timestamp for end of execution
153 @ivar lock_status: In-memory locking information for debugging
154 @ivar change: a Condition variable we use for waiting for job changes
157 # pylint: disable-msg=W0212
158 __slots__ = ["queue", "id", "ops", "log_serial",
159 "received_timestamp", "start_timestamp", "end_timestamp",
160 "lock_status", "change",
163 def __init__(self, queue, job_id, ops):
164 """Constructor for the _QueuedJob.
166 @type queue: L{JobQueue}
167 @param queue: our parent queue
169 @param job_id: our job id
171 @param ops: the list of opcodes we hold, which will be encapsulated
176 # TODO: use a better exception
177 raise Exception("No opcodes")
181 self.ops = [_QueuedOpCode(op) for op in ops]
183 self.received_timestamp = TimeStampNow()
184 self.start_timestamp = None
185 self.end_timestamp = None
187 # In-memory attributes
188 self.lock_status = None
190 # Condition to wait for changes
191 self.change = threading.Condition(self.queue._lock)
194 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
196 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
198 return "<%s at %#x>" % (" ".join(status), id(self))
201 def Restore(cls, queue, state):
202 """Restore a _QueuedJob from serialized state:
204 @type queue: L{JobQueue}
205 @param queue: to which queue the restored job belongs
207 @param state: the serialized state
209 @return: the restored _JobQueue instance
212 obj = _QueuedJob.__new__(cls)
215 obj.received_timestamp = state.get("received_timestamp", None)
216 obj.start_timestamp = state.get("start_timestamp", None)
217 obj.end_timestamp = state.get("end_timestamp", None)
219 # In-memory attributes
220 obj.lock_status = None
224 for op_state in state["ops"]:
225 op = _QueuedOpCode.Restore(op_state)
226 for log_entry in op.log:
227 obj.log_serial = max(obj.log_serial, log_entry[0])
230 # Condition to wait for changes
231 obj.change = threading.Condition(obj.queue._lock)
236 """Serialize the _JobQueue instance.
239 @return: the serialized state
244 "ops": [op.Serialize() for op in self.ops],
245 "start_timestamp": self.start_timestamp,
246 "end_timestamp": self.end_timestamp,
247 "received_timestamp": self.received_timestamp,
250 def CalcStatus(self):
251 """Compute the status of this job.
253 This function iterates over all the _QueuedOpCodes in the job and
254 based on their status, computes the job status.
257 - if we find a cancelled, or finished with error, the job
258 status will be the same
259 - otherwise, the last opcode with the status one of:
264 will determine the job status
266 - otherwise, it means either all opcodes are queued, or success,
267 and the job status will be the same
269 @return: the job status
272 status = constants.JOB_STATUS_QUEUED
276 if op.status == constants.OP_STATUS_SUCCESS:
281 if op.status == constants.OP_STATUS_QUEUED:
283 elif op.status == constants.OP_STATUS_WAITLOCK:
284 status = constants.JOB_STATUS_WAITLOCK
285 elif op.status == constants.OP_STATUS_RUNNING:
286 status = constants.JOB_STATUS_RUNNING
287 elif op.status == constants.OP_STATUS_CANCELING:
288 status = constants.JOB_STATUS_CANCELING
290 elif op.status == constants.OP_STATUS_ERROR:
291 status = constants.JOB_STATUS_ERROR
292 # The whole job fails if one opcode failed
294 elif op.status == constants.OP_STATUS_CANCELED:
295 status = constants.OP_STATUS_CANCELED
299 status = constants.JOB_STATUS_SUCCESS
303 def GetLogEntries(self, newer_than):
304 """Selectively returns the log entries.
306 @type newer_than: None or int
307 @param newer_than: if this is None, return all log entries,
308 otherwise return only the log entries with serial higher
311 @return: the list of the log entries selected
314 if newer_than is None:
321 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
325 def MarkUnfinishedOps(self, status, result):
326 """Mark unfinished opcodes with a given status and result.
328 This is an utility function for marking all running or waiting to
329 be run opcodes with a given status. Opcodes which are already
330 finalised are not changed.
332 @param status: a given opcode status
333 @param result: the opcode result
338 if op.status in constants.OPS_FINALIZED:
339 assert not_marked, "Finalized opcodes found after non-finalized ones"
346 class _OpExecCallbacks(mcpu.OpExecCbBase):
347 def __init__(self, queue, job, op):
348 """Initializes this class.
350 @type queue: L{JobQueue}
351 @param queue: Job queue
352 @type job: L{_QueuedJob}
353 @param job: Job object
354 @type op: L{_QueuedOpCode}
358 assert queue, "Queue is missing"
359 assert job, "Job is missing"
360 assert op, "Opcode is missing"
366 def NotifyStart(self):
367 """Mark the opcode as running, not lock-waiting.
369 This is called from the mcpu code as a notifier function, when the LU is
370 finally about to start the Exec() method. Of course, to have end-user
371 visible results, the opcode must be initially (before calling into
372 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
375 self._queue.acquire()
377 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
378 constants.OP_STATUS_CANCELING)
380 # All locks are acquired by now
381 self._job.lock_status = None
383 # Cancel here if we were asked to
384 if self._op.status == constants.OP_STATUS_CANCELING:
387 self._op.status = constants.OP_STATUS_RUNNING
389 self._queue.release()
391 def Feedback(self, *args):
392 """Append a log entry.
398 log_type = constants.ELOG_MESSAGE
401 (log_type, log_msg) = args
403 # The time is split to make serialization easier and not lose
405 timestamp = utils.SplitTime(time.time())
407 self._queue.acquire()
409 self._job.log_serial += 1
410 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
412 self._job.change.notifyAll()
414 self._queue.release()
416 def ReportLocks(self, msg):
417 """Write locking information to the job.
419 Called whenever the LU processor is waiting for a lock or has acquired one.
422 # Not getting the queue lock because this is a single assignment
423 self._job.lock_status = msg
426 class _JobQueueWorker(workerpool.BaseWorker):
427 """The actual job workers.
430 def RunTask(self, job): # pylint: disable-msg=W0221
433 This functions processes a job. It is closely tied to the _QueuedJob and
434 _QueuedOpCode classes.
436 @type job: L{_QueuedJob}
437 @param job: the job to be processed
440 logging.info("Processing job %s", job.id)
441 proc = mcpu.Processor(self.pool.queue.context, job.id)
446 for idx, op in enumerate(job.ops):
447 op_summary = op.input.Summary()
448 if op.status == constants.OP_STATUS_SUCCESS:
449 # this is a job that was partially completed before master
450 # daemon shutdown, so it can be expected that some opcodes
451 # are already completed successfully (if any did error
452 # out, then the whole job should have been aborted and not
453 # resubmitted for processing)
454 logging.info("Op %s/%s: opcode %s already processed, skipping",
455 idx + 1, count, op_summary)
458 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
463 if op.status == constants.OP_STATUS_CANCELED:
465 assert op.status == constants.OP_STATUS_QUEUED
466 op.status = constants.OP_STATUS_WAITLOCK
468 op.start_timestamp = TimeStampNow()
469 if idx == 0: # first opcode
470 job.start_timestamp = op.start_timestamp
471 queue.UpdateJobUnlocked(job)
473 input_opcode = op.input
477 # Make sure not to hold queue lock while calling ExecOpCode
478 result = proc.ExecOpCode(input_opcode,
479 _OpExecCallbacks(queue, job, op))
483 op.status = constants.OP_STATUS_SUCCESS
485 op.end_timestamp = TimeStampNow()
486 queue.UpdateJobUnlocked(job)
490 logging.info("Op %s/%s: Successfully finished opcode %s",
491 idx + 1, count, op_summary)
493 # Will be handled further up
495 except Exception, err:
499 op.status = constants.OP_STATUS_ERROR
500 if isinstance(err, errors.GenericError):
501 op.result = errors.EncodeException(err)
504 op.end_timestamp = TimeStampNow()
505 logging.info("Op %s/%s: Error in opcode %s: %s",
506 idx + 1, count, op_summary, err)
508 queue.UpdateJobUnlocked(job)
516 queue.CancelJobUnlocked(job)
519 except errors.GenericError, err:
520 logging.exception("Ganeti exception")
522 logging.exception("Unhandled exception")
527 job.lock_status = None
528 job.end_timestamp = TimeStampNow()
529 queue.UpdateJobUnlocked(job)
532 status = job.CalcStatus()
536 logging.info("Finished job %s, status = %s", job_id, status)
539 class _JobQueueWorkerPool(workerpool.WorkerPool):
540 """Simple class implementing a job-processing workerpool.
543 def __init__(self, queue):
544 super(_JobQueueWorkerPool, self).__init__("JobQueue",
550 def _RequireOpenQueue(fn):
551 """Decorator for "public" functions.
553 This function should be used for all 'public' functions. That is,
554 functions usually called from other classes. Note that this should
555 be applied only to methods (not plain functions), since it expects
556 that the decorated function is called with a first argument that has
557 a '_queue_lock' argument.
559 @warning: Use this decorator only after utils.LockedMethod!
568 def wrapper(self, *args, **kwargs):
569 # pylint: disable-msg=W0212
570 assert self._queue_lock is not None, "Queue should be open"
571 return fn(self, *args, **kwargs)
575 class JobQueue(object):
576 """Queue used to manage the jobs.
578 @cvar _RE_JOB_FILE: regex matching the valid job file names
581 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
583 def __init__(self, context):
584 """Constructor for JobQueue.
586 The constructor will initialize the job queue object and then
587 start loading the current jobs from disk, either for starting them
588 (if they were queue) or for aborting them (if they were already
591 @type context: GanetiContext
592 @param context: the context object for access to the configuration
593 data and other ganeti objects
596 self.context = context
597 self._memcache = weakref.WeakValueDictionary()
598 self._my_hostname = utils.HostInfo().name
601 self._lock = threading.Lock()
602 self.acquire = self._lock.acquire
603 self.release = self._lock.release
606 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
609 self._last_serial = jstore.ReadSerial()
610 assert self._last_serial is not None, ("Serial file was modified between"
611 " check in jstore and here")
613 # Get initial list of nodes
614 self._nodes = dict((n.name, n.primary_ip)
615 for n in self.context.cfg.GetAllNodesInfo().values()
616 if n.master_candidate)
620 del self._nodes[self._my_hostname]
624 # TODO: Check consistency across nodes
627 self._wpool = _JobQueueWorkerPool(self)
629 # We need to lock here because WorkerPool.AddTask() may start a job while
630 # we're still doing our work.
633 logging.info("Inspecting job queue")
635 all_job_ids = self._GetJobIDsUnlocked()
636 jobs_count = len(all_job_ids)
637 lastinfo = time.time()
638 for idx, job_id in enumerate(all_job_ids):
639 # Give an update every 1000 jobs or 10 seconds
640 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
641 idx == (jobs_count - 1)):
642 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
643 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
644 lastinfo = time.time()
646 job = self._LoadJobUnlocked(job_id)
648 # a failure in loading the job can cause 'None' to be returned
652 status = job.CalcStatus()
654 if status in (constants.JOB_STATUS_QUEUED, ):
655 self._wpool.AddTask(job)
657 elif status in (constants.JOB_STATUS_RUNNING,
658 constants.JOB_STATUS_WAITLOCK,
659 constants.JOB_STATUS_CANCELING):
660 logging.warning("Unfinished job %s found: %s", job.id, job)
662 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
663 "Unclean master daemon shutdown")
665 self.UpdateJobUnlocked(job)
667 logging.info("Job queue inspection finished")
671 self._wpool.TerminateWorkers()
676 def AddNode(self, node):
677 """Register a new node with the queue.
679 @type node: L{objects.Node}
680 @param node: the node object to be added
683 node_name = node.name
684 assert node_name != self._my_hostname
686 # Clean queue directory on added node
687 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
688 msg = result.fail_msg
690 logging.warning("Cannot cleanup queue directory on node %s: %s",
693 if not node.master_candidate:
694 # remove if existing, ignoring errors
695 self._nodes.pop(node_name, None)
696 # and skip the replication of the job ids
699 # Upload the whole queue excluding archived jobs
700 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
702 # Upload current serial file
703 files.append(constants.JOB_QUEUE_SERIAL_FILE)
705 for file_name in files:
707 content = utils.ReadFile(file_name)
709 result = rpc.RpcRunner.call_jobqueue_update([node_name],
712 msg = result[node_name].fail_msg
714 logging.error("Failed to upload file %s to node %s: %s",
715 file_name, node_name, msg)
717 self._nodes[node_name] = node.primary_ip
721 def RemoveNode(self, node_name):
722 """Callback called when removing nodes from the cluster.
725 @param node_name: the name of the node to remove
729 # The queue is removed by the "leave node" RPC call.
730 del self._nodes[node_name]
735 def _CheckRpcResult(result, nodes, failmsg):
736 """Verifies the status of an RPC call.
738 Since we aim to keep consistency should this node (the current
739 master) fail, we will log errors if our rpc fail, and especially
740 log the case when more than half of the nodes fails.
742 @param result: the data as returned from the rpc call
744 @param nodes: the list of nodes we made the call to
746 @param failmsg: the identifier to be used for logging
753 msg = result[node].fail_msg
756 logging.error("RPC call %s (%s) failed on node %s: %s",
757 result[node].call, failmsg, node, msg)
761 # +1 for the master node
762 if (len(success) + 1) < len(failed):
763 # TODO: Handle failing nodes
764 logging.error("More than half of the nodes failed")
766 def _GetNodeIp(self):
767 """Helper for returning the node name/ip list.
770 @return: a tuple of two lists, the first one with the node
771 names and the second one with the node addresses
774 name_list = self._nodes.keys()
775 addr_list = [self._nodes[name] for name in name_list]
776 return name_list, addr_list
778 def _WriteAndReplicateFileUnlocked(self, file_name, data):
779 """Writes a file locally and then replicates it to all nodes.
781 This function will replace the contents of a file on the local
782 node and then replicate it to all the other nodes we have.
785 @param file_name: the path of the file to be replicated
787 @param data: the new contents of the file
790 utils.WriteFile(file_name, data=data)
792 names, addrs = self._GetNodeIp()
793 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
794 self._CheckRpcResult(result, self._nodes,
795 "Updating %s" % file_name)
797 def _RenameFilesUnlocked(self, rename):
798 """Renames a file locally and then replicate the change.
800 This function will rename a file in the local queue directory
801 and then replicate this rename to all the other nodes we have.
803 @type rename: list of (old, new)
804 @param rename: List containing tuples mapping old to new names
807 # Rename them locally
808 for old, new in rename:
809 utils.RenameFile(old, new, mkdir=True)
811 # ... and on all nodes
812 names, addrs = self._GetNodeIp()
813 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
814 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
817 def _FormatJobID(job_id):
818 """Convert a job ID to string format.
820 Currently this just does C{str(job_id)} after performing some
821 checks, but if we want to change the job id format this will
822 abstract this change.
824 @type job_id: int or long
825 @param job_id: the numeric job id
827 @return: the formatted job id
830 if not isinstance(job_id, (int, long)):
831 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
833 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
838 def _GetArchiveDirectory(cls, job_id):
839 """Returns the archive directory for a job.
842 @param job_id: Job identifier
844 @return: Directory name
847 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
849 def _NewSerialsUnlocked(self, count):
850 """Generates a new job identifier.
852 Job identifiers are unique during the lifetime of a cluster.
855 @param count: how many serials to return
857 @return: a string representing the job identifier.
862 serial = self._last_serial + count
865 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
868 result = [self._FormatJobID(v)
869 for v in range(self._last_serial, serial + 1)]
870 # Keep it only if we were able to write the file
871 self._last_serial = serial
876 def _GetJobPath(job_id):
877 """Returns the job file for a given job id.
880 @param job_id: the job identifier
882 @return: the path to the job file
885 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
888 def _GetArchivedJobPath(cls, job_id):
889 """Returns the archived job file for a give job id.
892 @param job_id: the job identifier
894 @return: the path to the archived job file
897 path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
898 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, path)
901 def _ExtractJobID(cls, name):
902 """Extract the job id from a filename.
905 @param name: the job filename
906 @rtype: job id or None
907 @return: the job id corresponding to the given filename,
908 or None if the filename does not represent a valid
912 m = cls._RE_JOB_FILE.match(name)
918 def _GetJobIDsUnlocked(self, archived=False):
919 """Return all known job IDs.
921 If the parameter archived is True, archived jobs IDs will be
922 included. Currently this argument is unused.
924 The method only looks at disk because it's a requirement that all
925 jobs are present on disk (so in the _memcache we don't have any
929 @return: the list of job IDs
932 # pylint: disable-msg=W0613
933 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
934 jlist = utils.NiceSort(jlist)
937 def _ListJobFiles(self):
938 """Returns the list of current job files.
941 @return: the list of job file names
944 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
945 if self._RE_JOB_FILE.match(name)]
947 def _LoadJobUnlocked(self, job_id):
948 """Loads a job from the disk or memory.
950 Given a job id, this will return the cached job object if
951 existing, or try to load the job from the disk. If loading from
952 disk, it will also add the job to the cache.
954 @param job_id: the job id
955 @rtype: L{_QueuedJob} or None
956 @return: either None or the job object
959 job = self._memcache.get(job_id, None)
961 logging.debug("Found job %s in memcache", job_id)
964 filepath = self._GetJobPath(job_id)
965 logging.debug("Loading job from %s", filepath)
967 raw_data = utils.ReadFile(filepath)
969 if err.errno in (errno.ENOENT, ):
973 data = serializer.LoadJson(raw_data)
976 job = _QueuedJob.Restore(self, data)
977 except Exception, err: # pylint: disable-msg=W0703
978 new_path = self._GetArchivedJobPath(job_id)
979 if filepath == new_path:
980 # job already archived (future case)
981 logging.exception("Can't parse job %s", job_id)
984 logging.exception("Can't parse job %s, will archive.", job_id)
985 self._RenameFilesUnlocked([(filepath, new_path)])
988 self._memcache[job_id] = job
989 logging.debug("Added job %s to the cache", job_id)
992 def _GetJobsUnlocked(self, job_ids):
993 """Return a list of jobs based on their IDs.
996 @param job_ids: either an empty list (meaning all jobs),
999 @return: the list of job objects
1003 job_ids = self._GetJobIDsUnlocked()
1005 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1008 def _IsQueueMarkedDrain():
1009 """Check if the queue is marked from drain.
1011 This currently uses the queue drain file, which makes it a
1012 per-node flag. In the future this can be moved to the config file.
1015 @return: True of the job queue is marked for draining
1018 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1021 def SetDrainFlag(drain_flag):
1022 """Sets the drain flag for the queue.
1024 This is similar to the function L{backend.JobQueueSetDrainFlag},
1025 and in the future we might merge them.
1027 @type drain_flag: boolean
1028 @param drain_flag: Whether to set or unset the drain flag
1032 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1034 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1038 def _SubmitJobUnlocked(self, job_id, ops):
1039 """Create and store a new job.
1041 This enters the job into our job queue and also puts it on the new
1042 queue, in order for it to be picked up by the queue processors.
1044 @type job_id: job ID
1045 @param job_id: the job ID for the new job
1047 @param ops: The list of OpCodes that will become the new job.
1049 @return: the job ID of the newly created job
1050 @raise errors.JobQueueDrainError: if the job is marked for draining
1053 if self._IsQueueMarkedDrain():
1054 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1056 # Check job queue size
1057 size = len(self._ListJobFiles())
1058 if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1059 # TODO: Autoarchive jobs. Make sure it's not done on every job
1060 # submission, though.
1064 if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1065 raise errors.JobQueueFull()
1067 job = _QueuedJob(self, job_id, ops)
1070 self.UpdateJobUnlocked(job)
1072 logging.debug("Adding new job %s to the cache", job_id)
1073 self._memcache[job_id] = job
1075 # Add to worker pool
1076 self._wpool.AddTask(job)
1082 def SubmitJob(self, ops):
1083 """Create and store a new job.
1085 @see: L{_SubmitJobUnlocked}
1088 job_id = self._NewSerialsUnlocked(1)[0]
1089 return self._SubmitJobUnlocked(job_id, ops)
1093 def SubmitManyJobs(self, jobs):
1094 """Create and store multiple jobs.
1096 @see: L{_SubmitJobUnlocked}
1100 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1101 for job_id, ops in zip(all_job_ids, jobs):
1103 data = self._SubmitJobUnlocked(job_id, ops)
1105 except errors.GenericError, err:
1108 results.append((status, data))
1113 def UpdateJobUnlocked(self, job):
1114 """Update a job's on disk storage.
1116 After a job has been modified, this function needs to be called in
1117 order to write the changes to disk and replicate them to the other
1120 @type job: L{_QueuedJob}
1121 @param job: the changed job
1124 filename = self._GetJobPath(job.id)
1125 data = serializer.DumpJson(job.Serialize(), indent=False)
1126 logging.debug("Writing job %s to %s", job.id, filename)
1127 self._WriteAndReplicateFileUnlocked(filename, data)
1129 # Notify waiters about potential changes
1130 job.change.notifyAll()
1134 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1136 """Waits for changes in a job.
1138 @type job_id: string
1139 @param job_id: Job identifier
1140 @type fields: list of strings
1141 @param fields: Which fields to check for changes
1142 @type prev_job_info: list or None
1143 @param prev_job_info: Last job information returned
1144 @type prev_log_serial: int
1145 @param prev_log_serial: Last job message serial number
1146 @type timeout: float
1147 @param timeout: maximum time to wait
1148 @rtype: tuple (job info, log entries)
1149 @return: a tuple of the job information as required via
1150 the fields parameter, and the log entries as a list
1152 if the job has not changed and the timeout has expired,
1153 we instead return a special value,
1154 L{constants.JOB_NOTCHANGED}, which should be interpreted
1155 as such by the clients
1158 job = self._LoadJobUnlocked(job_id)
1160 logging.debug("Job %s not found", job_id)
1163 def _CheckForChanges():
1164 logging.debug("Waiting for changes in job %s", job_id)
1166 status = job.CalcStatus()
1167 job_info = self._GetJobInfoUnlocked(job, fields)
1168 log_entries = job.GetLogEntries(prev_log_serial)
1170 # Serializing and deserializing data can cause type changes (e.g. from
1171 # tuple to list) or precision loss. We're doing it here so that we get
1172 # the same modifications as the data received from the client. Without
1173 # this, the comparison afterwards might fail without the data being
1174 # significantly different.
1175 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1176 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1178 # Don't even try to wait if the job is no longer running, there will be
1180 if (status not in (constants.JOB_STATUS_QUEUED,
1181 constants.JOB_STATUS_RUNNING,
1182 constants.JOB_STATUS_WAITLOCK) or
1183 prev_job_info != job_info or
1184 (log_entries and prev_log_serial != log_entries[0][0])):
1185 logging.debug("Job %s changed", job_id)
1186 return (job_info, log_entries)
1188 raise utils.RetryAgain()
1191 # Setting wait function to release the queue lock while waiting
1192 return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1193 wait_fn=job.change.wait)
1194 except utils.RetryTimeout:
1195 return constants.JOB_NOTCHANGED
1199 def CancelJob(self, job_id):
1202 This will only succeed if the job has not started yet.
1204 @type job_id: string
1205 @param job_id: job ID of job to be cancelled.
1208 logging.info("Cancelling job %s", job_id)
1210 job = self._LoadJobUnlocked(job_id)
1212 logging.debug("Job %s not found", job_id)
1213 return (False, "Job %s not found" % job_id)
1215 job_status = job.CalcStatus()
1217 if job_status not in (constants.JOB_STATUS_QUEUED,
1218 constants.JOB_STATUS_WAITLOCK):
1219 logging.debug("Job %s is no longer waiting in the queue", job.id)
1220 return (False, "Job %s is no longer waiting in the queue" % job.id)
1222 if job_status == constants.JOB_STATUS_QUEUED:
1223 self.CancelJobUnlocked(job)
1224 return (True, "Job %s canceled" % job.id)
1226 elif job_status == constants.JOB_STATUS_WAITLOCK:
1227 # The worker will notice the new status and cancel the job
1229 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1231 self.UpdateJobUnlocked(job)
1232 return (True, "Job %s will be canceled" % job.id)
1235 def CancelJobUnlocked(self, job):
1236 """Marks a job as canceled.
1240 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1241 "Job canceled by request")
1243 self.UpdateJobUnlocked(job)
1246 def _ArchiveJobsUnlocked(self, jobs):
1249 @type jobs: list of L{_QueuedJob}
1250 @param jobs: Job objects
1252 @return: Number of archived jobs
1258 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1259 constants.JOB_STATUS_SUCCESS,
1260 constants.JOB_STATUS_ERROR):
1261 logging.debug("Job %s is not yet done", job.id)
1264 archive_jobs.append(job)
1266 old = self._GetJobPath(job.id)
1267 new = self._GetArchivedJobPath(job.id)
1268 rename_files.append((old, new))
1270 # TODO: What if 1..n files fail to rename?
1271 self._RenameFilesUnlocked(rename_files)
1273 logging.debug("Successfully archived job(s) %s",
1274 utils.CommaJoin(job.id for job in archive_jobs))
1276 return len(archive_jobs)
1280 def ArchiveJob(self, job_id):
1283 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1285 @type job_id: string
1286 @param job_id: Job ID of job to be archived.
1288 @return: Whether job was archived
1291 logging.info("Archiving job %s", job_id)
1293 job = self._LoadJobUnlocked(job_id)
1295 logging.debug("Job %s not found", job_id)
1298 return self._ArchiveJobsUnlocked([job]) == 1
1302 def AutoArchiveJobs(self, age, timeout):
1303 """Archives all jobs based on age.
1305 The method will archive all jobs which are older than the age
1306 parameter. For jobs that don't have an end timestamp, the start
1307 timestamp will be considered. The special '-1' age will cause
1308 archival of all jobs (that are not running or queued).
1311 @param age: the minimum age in seconds
1314 logging.info("Archiving jobs with age more than %s seconds", age)
1317 end_time = now + timeout
1321 all_job_ids = self._GetJobIDsUnlocked(archived=False)
1323 for idx, job_id in enumerate(all_job_ids):
1324 last_touched = idx + 1
1326 # Not optimal because jobs could be pending
1327 # TODO: Measure average duration for job archival and take number of
1328 # pending jobs into account.
1329 if time.time() > end_time:
1332 # Returns None if the job failed to load
1333 job = self._LoadJobUnlocked(job_id)
1335 if job.end_timestamp is None:
1336 if job.start_timestamp is None:
1337 job_age = job.received_timestamp
1339 job_age = job.start_timestamp
1341 job_age = job.end_timestamp
1343 if age == -1 or now - job_age[0] > age:
1346 # Archive 10 jobs at a time
1347 if len(pending) >= 10:
1348 archived_count += self._ArchiveJobsUnlocked(pending)
1352 archived_count += self._ArchiveJobsUnlocked(pending)
1354 return (archived_count, len(all_job_ids) - last_touched)
1357 def _GetJobInfoUnlocked(job, fields):
1358 """Returns information about a job.
1360 @type job: L{_QueuedJob}
1361 @param job: the job which we query
1363 @param fields: names of fields to return
1365 @return: list with one element for each field
1366 @raise errors.OpExecError: when an invalid field
1371 for fname in fields:
1374 elif fname == "status":
1375 row.append(job.CalcStatus())
1376 elif fname == "ops":
1377 row.append([op.input.__getstate__() for op in job.ops])
1378 elif fname == "opresult":
1379 row.append([op.result for op in job.ops])
1380 elif fname == "opstatus":
1381 row.append([op.status for op in job.ops])
1382 elif fname == "oplog":
1383 row.append([op.log for op in job.ops])
1384 elif fname == "opstart":
1385 row.append([op.start_timestamp for op in job.ops])
1386 elif fname == "opend":
1387 row.append([op.end_timestamp for op in job.ops])
1388 elif fname == "received_ts":
1389 row.append(job.received_timestamp)
1390 elif fname == "start_ts":
1391 row.append(job.start_timestamp)
1392 elif fname == "end_ts":
1393 row.append(job.end_timestamp)
1394 elif fname == "lock_status":
1395 row.append(job.lock_status)
1396 elif fname == "summary":
1397 row.append([op.input.Summary() for op in job.ops])
1399 raise errors.OpExecError("Invalid job query field '%s'" % fname)
1404 def QueryJobs(self, job_ids, fields):
1405 """Returns a list of jobs in queue.
1407 This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1408 processing for each job.
1411 @param job_ids: sequence of job identifiers or None for all
1413 @param fields: names of fields to return
1415 @return: list one element per job, each element being list with
1416 the requested fields
1421 for job in self._GetJobsUnlocked(job_ids):
1425 jobs.append(self._GetJobInfoUnlocked(job, fields))
1432 """Stops the job queue.
1434 This shutdowns all the worker threads an closes the queue.
1437 self._wpool.TerminateWorkers()
1439 self._queue_lock.Close()
1440 self._queue_lock = None