4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
55 class CancelJob(Exception):
56 """Special exception to cancel a job.
62 """Returns the current timestamp.
65 @return: the current time in the (seconds, microseconds) format
68 return utils.SplitTime(time.time())
71 class _QueuedOpCode(object):
72 """Encapsulates an opcode object.
74 @ivar log: holds the execution log and consists of tuples
75 of the form C{(log_serial, timestamp, level, message)}
76 @ivar input: the OpCode we encapsulate
77 @ivar status: the current status
78 @ivar result: the result of the LU execution
79 @ivar start_timestamp: timestamp for the start of the execution
80 @ivar stop_timestamp: timestamp for the end of the execution
83 __slots__ = ["input", "status", "result", "log",
84 "start_timestamp", "end_timestamp",
87 def __init__(self, op):
88 """Constructor for the _QuededOpCode.
90 @type op: L{opcodes.OpCode}
91 @param op: the opcode we encapsulate
95 self.status = constants.OP_STATUS_QUEUED
98 self.start_timestamp = None
99 self.end_timestamp = None
102 def Restore(cls, state):
103 """Restore the _QueuedOpCode from the serialized form.
106 @param state: the serialized state
107 @rtype: _QueuedOpCode
108 @return: a new _QueuedOpCode instance
111 obj = _QueuedOpCode.__new__(cls)
112 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113 obj.status = state["status"]
114 obj.result = state["result"]
115 obj.log = state["log"]
116 obj.start_timestamp = state.get("start_timestamp", None)
117 obj.end_timestamp = state.get("end_timestamp", None)
121 """Serializes this _QueuedOpCode.
124 @return: the dictionary holding the serialized state
128 "input": self.input.__getstate__(),
129 "status": self.status,
130 "result": self.result,
132 "start_timestamp": self.start_timestamp,
133 "end_timestamp": self.end_timestamp,
137 class _QueuedJob(object):
138 """In-memory job representation.
140 This is what we use to track the user-submitted jobs. Locking must
141 be taken care of by users of this class.
143 @type queue: L{JobQueue}
144 @ivar queue: the parent queue
147 @ivar ops: the list of _QueuedOpCode that constitute the job
148 @type log_serial: int
149 @ivar log_serial: holds the index for the next log entry
150 @ivar received_timestamp: the timestamp for when the job was received
151 @ivar start_timestmap: the timestamp for start of execution
152 @ivar end_timestamp: the timestamp for end of execution
153 @ivar lock_status: In-memory locking information for debugging
154 @ivar change: a Condition variable we use for waiting for job changes
157 __slots__ = ["queue", "id", "ops", "log_serial",
158 "received_timestamp", "start_timestamp", "end_timestamp",
159 "lock_status", "change",
162 def __init__(self, queue, job_id, ops):
163 """Constructor for the _QueuedJob.
165 @type queue: L{JobQueue}
166 @param queue: our parent queue
168 @param job_id: our job id
170 @param ops: the list of opcodes we hold, which will be encapsulated
175 # TODO: use a better exception
176 raise Exception("No opcodes")
180 self.ops = [_QueuedOpCode(op) for op in ops]
182 self.received_timestamp = TimeStampNow()
183 self.start_timestamp = None
184 self.end_timestamp = None
186 # In-memory attributes
187 self.lock_status = None
189 # Condition to wait for changes
190 self.change = threading.Condition(self.queue._lock)
193 def Restore(cls, queue, state):
194 """Restore a _QueuedJob from serialized state:
196 @type queue: L{JobQueue}
197 @param queue: to which queue the restored job belongs
199 @param state: the serialized state
201 @return: the restored _JobQueue instance
204 obj = _QueuedJob.__new__(cls)
207 obj.received_timestamp = state.get("received_timestamp", None)
208 obj.start_timestamp = state.get("start_timestamp", None)
209 obj.end_timestamp = state.get("end_timestamp", None)
211 # In-memory attributes
212 obj.lock_status = None
216 for op_state in state["ops"]:
217 op = _QueuedOpCode.Restore(op_state)
218 for log_entry in op.log:
219 obj.log_serial = max(obj.log_serial, log_entry[0])
222 # Condition to wait for changes
223 obj.change = threading.Condition(obj.queue._lock)
228 """Serialize the _JobQueue instance.
231 @return: the serialized state
236 "ops": [op.Serialize() for op in self.ops],
237 "start_timestamp": self.start_timestamp,
238 "end_timestamp": self.end_timestamp,
239 "received_timestamp": self.received_timestamp,
242 def CalcStatus(self):
243 """Compute the status of this job.
245 This function iterates over all the _QueuedOpCodes in the job and
246 based on their status, computes the job status.
249 - if we find a cancelled, or finished with error, the job
250 status will be the same
251 - otherwise, the last opcode with the status one of:
256 will determine the job status
258 - otherwise, it means either all opcodes are queued, or success,
259 and the job status will be the same
261 @return: the job status
264 status = constants.JOB_STATUS_QUEUED
268 if op.status == constants.OP_STATUS_SUCCESS:
273 if op.status == constants.OP_STATUS_QUEUED:
275 elif op.status == constants.OP_STATUS_WAITLOCK:
276 status = constants.JOB_STATUS_WAITLOCK
277 elif op.status == constants.OP_STATUS_RUNNING:
278 status = constants.JOB_STATUS_RUNNING
279 elif op.status == constants.OP_STATUS_CANCELING:
280 status = constants.JOB_STATUS_CANCELING
282 elif op.status == constants.OP_STATUS_ERROR:
283 status = constants.JOB_STATUS_ERROR
284 # The whole job fails if one opcode failed
286 elif op.status == constants.OP_STATUS_CANCELED:
287 status = constants.OP_STATUS_CANCELED
291 status = constants.JOB_STATUS_SUCCESS
295 def GetLogEntries(self, newer_than):
296 """Selectively returns the log entries.
298 @type newer_than: None or int
299 @param newer_than: if this is None, return all log entries,
300 otherwise return only the log entries with serial higher
303 @return: the list of the log entries selected
306 if newer_than is None:
313 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
317 def MarkUnfinishedOps(self, status, result):
318 """Mark unfinished opcodes with a given status and result.
320 This is an utility function for marking all running or waiting to
321 be run opcodes with a given status. Opcodes which are already
322 finalised are not changed.
324 @param status: a given opcode status
325 @param result: the opcode result
330 if op.status in constants.OPS_FINALIZED:
331 assert not_marked, "Finalized opcodes found after non-finalized ones"
338 class _OpExecCallbacks(mcpu.OpExecCbBase):
339 def __init__(self, queue, job, op):
340 """Initializes this class.
342 @type queue: L{JobQueue}
343 @param queue: Job queue
344 @type job: L{_QueuedJob}
345 @param job: Job object
346 @type op: L{_QueuedOpCode}
350 assert queue, "Queue is missing"
351 assert job, "Job is missing"
352 assert op, "Opcode is missing"
358 def NotifyStart(self):
359 """Mark the opcode as running, not lock-waiting.
361 This is called from the mcpu code as a notifier function, when the LU is
362 finally about to start the Exec() method. Of course, to have end-user
363 visible results, the opcode must be initially (before calling into
364 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
367 self._queue.acquire()
369 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
370 constants.OP_STATUS_CANCELING)
372 # All locks are acquired by now
373 self._job.lock_status = None
375 # Cancel here if we were asked to
376 if self._op.status == constants.OP_STATUS_CANCELING:
379 self._op.status = constants.OP_STATUS_RUNNING
381 self._queue.release()
383 def Feedback(self, *args):
384 """Append a log entry.
390 log_type = constants.ELOG_MESSAGE
393 (log_type, log_msg) = args
395 # The time is split to make serialization easier and not lose
397 timestamp = utils.SplitTime(time.time())
399 self._queue.acquire()
401 self._job.log_serial += 1
402 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
404 self._job.change.notifyAll()
406 self._queue.release()
408 def ReportLocks(self, msg):
409 """Write locking information to the job.
411 Called whenever the LU processor is waiting for a lock or has acquired one.
414 # Not getting the queue lock because this is a single assignment
415 self._job.lock_status = msg
418 class _JobQueueWorker(workerpool.BaseWorker):
419 """The actual job workers.
422 def RunTask(self, job):
425 This functions processes a job. It is closely tied to the _QueuedJob and
426 _QueuedOpCode classes.
428 @type job: L{_QueuedJob}
429 @param job: the job to be processed
432 logging.info("Worker %s processing job %s",
433 self.worker_id, job.id)
434 proc = mcpu.Processor(self.pool.queue.context, job.id)
439 for idx, op in enumerate(job.ops):
440 op_summary = op.input.Summary()
441 if op.status == constants.OP_STATUS_SUCCESS:
442 # this is a job that was partially completed before master
443 # daemon shutdown, so it can be expected that some opcodes
444 # are already completed successfully (if any did error
445 # out, then the whole job should have been aborted and not
446 # resubmitted for processing)
447 logging.info("Op %s/%s: opcode %s already processed, skipping",
448 idx + 1, count, op_summary)
451 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
456 if op.status == constants.OP_STATUS_CANCELED:
458 assert op.status == constants.OP_STATUS_QUEUED
459 op.status = constants.OP_STATUS_WAITLOCK
461 op.start_timestamp = TimeStampNow()
462 if idx == 0: # first opcode
463 job.start_timestamp = op.start_timestamp
464 queue.UpdateJobUnlocked(job)
466 input_opcode = op.input
470 # Make sure not to hold queue lock while calling ExecOpCode
471 result = proc.ExecOpCode(input_opcode,
472 _OpExecCallbacks(queue, job, op))
476 op.status = constants.OP_STATUS_SUCCESS
478 op.end_timestamp = TimeStampNow()
479 queue.UpdateJobUnlocked(job)
483 logging.info("Op %s/%s: Successfully finished opcode %s",
484 idx + 1, count, op_summary)
486 # Will be handled further up
488 except Exception, err:
492 op.status = constants.OP_STATUS_ERROR
493 if isinstance(err, errors.GenericError):
494 op.result = errors.EncodeException(err)
497 op.end_timestamp = TimeStampNow()
498 logging.info("Op %s/%s: Error in opcode %s: %s",
499 idx + 1, count, op_summary, err)
501 queue.UpdateJobUnlocked(job)
509 queue.CancelJobUnlocked(job)
512 except errors.GenericError, err:
513 logging.exception("Ganeti exception")
515 logging.exception("Unhandled exception")
520 job.lock_status = None
521 job.end_timestamp = TimeStampNow()
522 queue.UpdateJobUnlocked(job)
525 status = job.CalcStatus()
529 logging.info("Worker %s finished job %s, status = %s",
530 self.worker_id, job_id, status)
533 class _JobQueueWorkerPool(workerpool.WorkerPool):
534 """Simple class implementing a job-processing workerpool.
537 def __init__(self, queue):
538 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
543 def _RequireOpenQueue(fn):
544 """Decorator for "public" functions.
546 This function should be used for all 'public' functions. That is,
547 functions usually called from other classes. Note that this should
548 be applied only to methods (not plain functions), since it expects
549 that the decorated function is called with a first argument that has
550 a '_queue_lock' argument.
552 @warning: Use this decorator only after utils.LockedMethod!
561 def wrapper(self, *args, **kwargs):
562 assert self._queue_lock is not None, "Queue should be open"
563 return fn(self, *args, **kwargs)
567 class JobQueue(object):
568 """Queue used to manage the jobs.
570 @cvar _RE_JOB_FILE: regex matching the valid job file names
573 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
575 def __init__(self, context):
576 """Constructor for JobQueue.
578 The constructor will initialize the job queue object and then
579 start loading the current jobs from disk, either for starting them
580 (if they were queue) or for aborting them (if they were already
583 @type context: GanetiContext
584 @param context: the context object for access to the configuration
585 data and other ganeti objects
588 self.context = context
589 self._memcache = weakref.WeakValueDictionary()
590 self._my_hostname = utils.HostInfo().name
593 self._lock = threading.Lock()
594 self.acquire = self._lock.acquire
595 self.release = self._lock.release
598 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
601 self._last_serial = jstore.ReadSerial()
602 assert self._last_serial is not None, ("Serial file was modified between"
603 " check in jstore and here")
605 # Get initial list of nodes
606 self._nodes = dict((n.name, n.primary_ip)
607 for n in self.context.cfg.GetAllNodesInfo().values()
608 if n.master_candidate)
612 del self._nodes[self._my_hostname]
616 # TODO: Check consistency across nodes
619 self._wpool = _JobQueueWorkerPool(self)
621 # We need to lock here because WorkerPool.AddTask() may start a job while
622 # we're still doing our work.
625 logging.info("Inspecting job queue")
627 all_job_ids = self._GetJobIDsUnlocked()
628 jobs_count = len(all_job_ids)
629 lastinfo = time.time()
630 for idx, job_id in enumerate(all_job_ids):
631 # Give an update every 1000 jobs or 10 seconds
632 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
633 idx == (jobs_count - 1)):
634 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
635 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
636 lastinfo = time.time()
638 job = self._LoadJobUnlocked(job_id)
640 # a failure in loading the job can cause 'None' to be returned
644 status = job.CalcStatus()
646 if status in (constants.JOB_STATUS_QUEUED, ):
647 self._wpool.AddTask(job)
649 elif status in (constants.JOB_STATUS_RUNNING,
650 constants.JOB_STATUS_WAITLOCK,
651 constants.JOB_STATUS_CANCELING):
652 logging.warning("Unfinished job %s found: %s", job.id, job)
654 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
655 "Unclean master daemon shutdown")
657 self.UpdateJobUnlocked(job)
659 logging.info("Job queue inspection finished")
663 self._wpool.TerminateWorkers()
668 def AddNode(self, node):
669 """Register a new node with the queue.
671 @type node: L{objects.Node}
672 @param node: the node object to be added
675 node_name = node.name
676 assert node_name != self._my_hostname
678 # Clean queue directory on added node
679 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
680 msg = result.fail_msg
682 logging.warning("Cannot cleanup queue directory on node %s: %s",
685 if not node.master_candidate:
686 # remove if existing, ignoring errors
687 self._nodes.pop(node_name, None)
688 # and skip the replication of the job ids
691 # Upload the whole queue excluding archived jobs
692 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
694 # Upload current serial file
695 files.append(constants.JOB_QUEUE_SERIAL_FILE)
697 for file_name in files:
699 content = utils.ReadFile(file_name)
701 result = rpc.RpcRunner.call_jobqueue_update([node_name],
704 msg = result[node_name].fail_msg
706 logging.error("Failed to upload file %s to node %s: %s",
707 file_name, node_name, msg)
709 self._nodes[node_name] = node.primary_ip
713 def RemoveNode(self, node_name):
714 """Callback called when removing nodes from the cluster.
717 @param node_name: the name of the node to remove
721 # The queue is removed by the "leave node" RPC call.
722 del self._nodes[node_name]
726 def _CheckRpcResult(self, result, nodes, failmsg):
727 """Verifies the status of an RPC call.
729 Since we aim to keep consistency should this node (the current
730 master) fail, we will log errors if our rpc fail, and especially
731 log the case when more than half of the nodes fails.
733 @param result: the data as returned from the rpc call
735 @param nodes: the list of nodes we made the call to
737 @param failmsg: the identifier to be used for logging
744 msg = result[node].fail_msg
747 logging.error("RPC call %s failed on node %s: %s",
748 result[node].call, node, msg)
752 # +1 for the master node
753 if (len(success) + 1) < len(failed):
754 # TODO: Handle failing nodes
755 logging.error("More than half of the nodes failed")
757 def _GetNodeIp(self):
758 """Helper for returning the node name/ip list.
761 @return: a tuple of two lists, the first one with the node
762 names and the second one with the node addresses
765 name_list = self._nodes.keys()
766 addr_list = [self._nodes[name] for name in name_list]
767 return name_list, addr_list
769 def _WriteAndReplicateFileUnlocked(self, file_name, data):
770 """Writes a file locally and then replicates it to all nodes.
772 This function will replace the contents of a file on the local
773 node and then replicate it to all the other nodes we have.
776 @param file_name: the path of the file to be replicated
778 @param data: the new contents of the file
781 utils.WriteFile(file_name, data=data)
783 names, addrs = self._GetNodeIp()
784 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
785 self._CheckRpcResult(result, self._nodes,
786 "Updating %s" % file_name)
788 def _RenameFilesUnlocked(self, rename):
789 """Renames a file locally and then replicate the change.
791 This function will rename a file in the local queue directory
792 and then replicate this rename to all the other nodes we have.
794 @type rename: list of (old, new)
795 @param rename: List containing tuples mapping old to new names
798 # Rename them locally
799 for old, new in rename:
800 utils.RenameFile(old, new, mkdir=True)
802 # ... and on all nodes
803 names, addrs = self._GetNodeIp()
804 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
805 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
807 def _FormatJobID(self, job_id):
808 """Convert a job ID to string format.
810 Currently this just does C{str(job_id)} after performing some
811 checks, but if we want to change the job id format this will
812 abstract this change.
814 @type job_id: int or long
815 @param job_id: the numeric job id
817 @return: the formatted job id
820 if not isinstance(job_id, (int, long)):
821 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
823 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
828 def _GetArchiveDirectory(cls, job_id):
829 """Returns the archive directory for a job.
832 @param job_id: Job identifier
834 @return: Directory name
837 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
839 def _NewSerialsUnlocked(self, count):
840 """Generates a new job identifier.
842 Job identifiers are unique during the lifetime of a cluster.
845 @param count: how many serials to return
847 @return: a string representing the job identifier.
852 serial = self._last_serial + count
855 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
858 result = [self._FormatJobID(v)
859 for v in range(self._last_serial, serial + 1)]
860 # Keep it only if we were able to write the file
861 self._last_serial = serial
866 def _GetJobPath(job_id):
867 """Returns the job file for a given job id.
870 @param job_id: the job identifier
872 @return: the path to the job file
875 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
878 def _GetArchivedJobPath(cls, job_id):
879 """Returns the archived job file for a give job id.
882 @param job_id: the job identifier
884 @return: the path to the archived job file
887 path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
888 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
891 def _ExtractJobID(cls, name):
892 """Extract the job id from a filename.
895 @param name: the job filename
896 @rtype: job id or None
897 @return: the job id corresponding to the given filename,
898 or None if the filename does not represent a valid
902 m = cls._RE_JOB_FILE.match(name)
908 def _GetJobIDsUnlocked(self, archived=False):
909 """Return all known job IDs.
911 If the parameter archived is True, archived jobs IDs will be
912 included. Currently this argument is unused.
914 The method only looks at disk because it's a requirement that all
915 jobs are present on disk (so in the _memcache we don't have any
919 @return: the list of job IDs
922 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
923 jlist = utils.NiceSort(jlist)
926 def _ListJobFiles(self):
927 """Returns the list of current job files.
930 @return: the list of job file names
933 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
934 if self._RE_JOB_FILE.match(name)]
936 def _LoadJobUnlocked(self, job_id):
937 """Loads a job from the disk or memory.
939 Given a job id, this will return the cached job object if
940 existing, or try to load the job from the disk. If loading from
941 disk, it will also add the job to the cache.
943 @param job_id: the job id
944 @rtype: L{_QueuedJob} or None
945 @return: either None or the job object
948 job = self._memcache.get(job_id, None)
950 logging.debug("Found job %s in memcache", job_id)
953 filepath = self._GetJobPath(job_id)
954 logging.debug("Loading job from %s", filepath)
956 raw_data = utils.ReadFile(filepath)
958 if err.errno in (errno.ENOENT, ):
962 data = serializer.LoadJson(raw_data)
965 job = _QueuedJob.Restore(self, data)
966 except Exception, err:
967 new_path = self._GetArchivedJobPath(job_id)
968 if filepath == new_path:
969 # job already archived (future case)
970 logging.exception("Can't parse job %s", job_id)
973 logging.exception("Can't parse job %s, will archive.", job_id)
974 self._RenameFilesUnlocked([(filepath, new_path)])
977 self._memcache[job_id] = job
978 logging.debug("Added job %s to the cache", job_id)
981 def _GetJobsUnlocked(self, job_ids):
982 """Return a list of jobs based on their IDs.
985 @param job_ids: either an empty list (meaning all jobs),
988 @return: the list of job objects
992 job_ids = self._GetJobIDsUnlocked()
994 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
997 def _IsQueueMarkedDrain():
998 """Check if the queue is marked from drain.
1000 This currently uses the queue drain file, which makes it a
1001 per-node flag. In the future this can be moved to the config file.
1004 @return: True of the job queue is marked for draining
1007 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1010 def SetDrainFlag(drain_flag):
1011 """Sets the drain flag for the queue.
1013 This is similar to the function L{backend.JobQueueSetDrainFlag},
1014 and in the future we might merge them.
1016 @type drain_flag: boolean
1017 @param drain_flag: Whether to set or unset the drain flag
1021 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1023 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1027 def _SubmitJobUnlocked(self, job_id, ops):
1028 """Create and store a new job.
1030 This enters the job into our job queue and also puts it on the new
1031 queue, in order for it to be picked up by the queue processors.
1033 @type job_id: job ID
1034 @param job_id: the job ID for the new job
1036 @param ops: The list of OpCodes that will become the new job.
1038 @return: the job ID of the newly created job
1039 @raise errors.JobQueueDrainError: if the job is marked for draining
1042 if self._IsQueueMarkedDrain():
1043 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1045 # Check job queue size
1046 size = len(self._ListJobFiles())
1047 if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1048 # TODO: Autoarchive jobs. Make sure it's not done on every job
1049 # submission, though.
1053 if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1054 raise errors.JobQueueFull()
1056 job = _QueuedJob(self, job_id, ops)
1059 self.UpdateJobUnlocked(job)
1061 logging.debug("Adding new job %s to the cache", job_id)
1062 self._memcache[job_id] = job
1064 # Add to worker pool
1065 self._wpool.AddTask(job)
1071 def SubmitJob(self, ops):
1072 """Create and store a new job.
1074 @see: L{_SubmitJobUnlocked}
1077 job_id = self._NewSerialsUnlocked(1)[0]
1078 return self._SubmitJobUnlocked(job_id, ops)
1082 def SubmitManyJobs(self, jobs):
1083 """Create and store multiple jobs.
1085 @see: L{_SubmitJobUnlocked}
1089 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1090 for job_id, ops in zip(all_job_ids, jobs):
1092 data = self._SubmitJobUnlocked(job_id, ops)
1094 except errors.GenericError, err:
1097 results.append((status, data))
1102 def UpdateJobUnlocked(self, job):
1103 """Update a job's on disk storage.
1105 After a job has been modified, this function needs to be called in
1106 order to write the changes to disk and replicate them to the other
1109 @type job: L{_QueuedJob}
1110 @param job: the changed job
1113 filename = self._GetJobPath(job.id)
1114 data = serializer.DumpJson(job.Serialize(), indent=False)
1115 logging.debug("Writing job %s to %s", job.id, filename)
1116 self._WriteAndReplicateFileUnlocked(filename, data)
1118 # Notify waiters about potential changes
1119 job.change.notifyAll()
1123 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1125 """Waits for changes in a job.
1127 @type job_id: string
1128 @param job_id: Job identifier
1129 @type fields: list of strings
1130 @param fields: Which fields to check for changes
1131 @type prev_job_info: list or None
1132 @param prev_job_info: Last job information returned
1133 @type prev_log_serial: int
1134 @param prev_log_serial: Last job message serial number
1135 @type timeout: float
1136 @param timeout: maximum time to wait
1137 @rtype: tuple (job info, log entries)
1138 @return: a tuple of the job information as required via
1139 the fields parameter, and the log entries as a list
1141 if the job has not changed and the timeout has expired,
1142 we instead return a special value,
1143 L{constants.JOB_NOTCHANGED}, which should be interpreted
1144 as such by the clients
1147 job = self._LoadJobUnlocked(job_id)
1149 logging.debug("Job %s not found", job_id)
1152 def _CheckForChanges():
1153 logging.debug("Waiting for changes in job %s", job_id)
1155 status = job.CalcStatus()
1156 job_info = self._GetJobInfoUnlocked(job, fields)
1157 log_entries = job.GetLogEntries(prev_log_serial)
1159 # Serializing and deserializing data can cause type changes (e.g. from
1160 # tuple to list) or precision loss. We're doing it here so that we get
1161 # the same modifications as the data received from the client. Without
1162 # this, the comparison afterwards might fail without the data being
1163 # significantly different.
1164 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1165 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1167 # Don't even try to wait if the job is no longer running, there will be
1169 if (status not in (constants.JOB_STATUS_QUEUED,
1170 constants.JOB_STATUS_RUNNING,
1171 constants.JOB_STATUS_WAITLOCK) or
1172 prev_job_info != job_info or
1173 (log_entries and prev_log_serial != log_entries[0][0])):
1174 logging.debug("Job %s changed", job_id)
1175 return (job_info, log_entries)
1177 raise utils.RetryAgain()
1180 # Setting wait function to release the queue lock while waiting
1181 return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1182 wait_fn=job.change.wait)
1183 except utils.RetryTimeout:
1184 return constants.JOB_NOTCHANGED
1188 def CancelJob(self, job_id):
1191 This will only succeed if the job has not started yet.
1193 @type job_id: string
1194 @param job_id: job ID of job to be cancelled.
1197 logging.info("Cancelling job %s", job_id)
1199 job = self._LoadJobUnlocked(job_id)
1201 logging.debug("Job %s not found", job_id)
1202 return (False, "Job %s not found" % job_id)
1204 job_status = job.CalcStatus()
1206 if job_status not in (constants.JOB_STATUS_QUEUED,
1207 constants.JOB_STATUS_WAITLOCK):
1208 logging.debug("Job %s is no longer waiting in the queue", job.id)
1209 return (False, "Job %s is no longer waiting in the queue" % job.id)
1211 if job_status == constants.JOB_STATUS_QUEUED:
1212 self.CancelJobUnlocked(job)
1213 return (True, "Job %s canceled" % job.id)
1215 elif job_status == constants.JOB_STATUS_WAITLOCK:
1216 # The worker will notice the new status and cancel the job
1218 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1220 self.UpdateJobUnlocked(job)
1221 return (True, "Job %s will be canceled" % job.id)
1224 def CancelJobUnlocked(self, job):
1225 """Marks a job as canceled.
1229 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1230 "Job canceled by request")
1232 self.UpdateJobUnlocked(job)
1235 def _ArchiveJobsUnlocked(self, jobs):
1238 @type jobs: list of L{_QueuedJob}
1239 @param jobs: Job objects
1241 @return: Number of archived jobs
1247 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1248 constants.JOB_STATUS_SUCCESS,
1249 constants.JOB_STATUS_ERROR):
1250 logging.debug("Job %s is not yet done", job.id)
1253 archive_jobs.append(job)
1255 old = self._GetJobPath(job.id)
1256 new = self._GetArchivedJobPath(job.id)
1257 rename_files.append((old, new))
1259 # TODO: What if 1..n files fail to rename?
1260 self._RenameFilesUnlocked(rename_files)
1262 logging.debug("Successfully archived job(s) %s",
1263 ", ".join(job.id for job in archive_jobs))
1265 return len(archive_jobs)
1269 def ArchiveJob(self, job_id):
1272 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1274 @type job_id: string
1275 @param job_id: Job ID of job to be archived.
1277 @return: Whether job was archived
1280 logging.info("Archiving job %s", job_id)
1282 job = self._LoadJobUnlocked(job_id)
1284 logging.debug("Job %s not found", job_id)
1287 return self._ArchiveJobsUnlocked([job]) == 1
1291 def AutoArchiveJobs(self, age, timeout):
1292 """Archives all jobs based on age.
1294 The method will archive all jobs which are older than the age
1295 parameter. For jobs that don't have an end timestamp, the start
1296 timestamp will be considered. The special '-1' age will cause
1297 archival of all jobs (that are not running or queued).
1300 @param age: the minimum age in seconds
1303 logging.info("Archiving jobs with age more than %s seconds", age)
1306 end_time = now + timeout
1310 all_job_ids = self._GetJobIDsUnlocked(archived=False)
1312 for idx, job_id in enumerate(all_job_ids):
1315 # Not optimal because jobs could be pending
1316 # TODO: Measure average duration for job archival and take number of
1317 # pending jobs into account.
1318 if time.time() > end_time:
1321 # Returns None if the job failed to load
1322 job = self._LoadJobUnlocked(job_id)
1324 if job.end_timestamp is None:
1325 if job.start_timestamp is None:
1326 job_age = job.received_timestamp
1328 job_age = job.start_timestamp
1330 job_age = job.end_timestamp
1332 if age == -1 or now - job_age[0] > age:
1335 # Archive 10 jobs at a time
1336 if len(pending) >= 10:
1337 archived_count += self._ArchiveJobsUnlocked(pending)
1341 archived_count += self._ArchiveJobsUnlocked(pending)
1343 return (archived_count, len(all_job_ids) - last_touched - 1)
1345 def _GetJobInfoUnlocked(self, job, fields):
1346 """Returns information about a job.
1348 @type job: L{_QueuedJob}
1349 @param job: the job which we query
1351 @param fields: names of fields to return
1353 @return: list with one element for each field
1354 @raise errors.OpExecError: when an invalid field
1359 for fname in fields:
1362 elif fname == "status":
1363 row.append(job.CalcStatus())
1364 elif fname == "ops":
1365 row.append([op.input.__getstate__() for op in job.ops])
1366 elif fname == "opresult":
1367 row.append([op.result for op in job.ops])
1368 elif fname == "opstatus":
1369 row.append([op.status for op in job.ops])
1370 elif fname == "oplog":
1371 row.append([op.log for op in job.ops])
1372 elif fname == "opstart":
1373 row.append([op.start_timestamp for op in job.ops])
1374 elif fname == "opend":
1375 row.append([op.end_timestamp for op in job.ops])
1376 elif fname == "received_ts":
1377 row.append(job.received_timestamp)
1378 elif fname == "start_ts":
1379 row.append(job.start_timestamp)
1380 elif fname == "end_ts":
1381 row.append(job.end_timestamp)
1382 elif fname == "lock_status":
1383 row.append(job.lock_status)
1384 elif fname == "summary":
1385 row.append([op.input.Summary() for op in job.ops])
1387 raise errors.OpExecError("Invalid job query field '%s'" % fname)
1392 def QueryJobs(self, job_ids, fields):
1393 """Returns a list of jobs in queue.
1395 This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1396 processing for each job.
1399 @param job_ids: sequence of job identifiers or None for all
1401 @param fields: names of fields to return
1403 @return: list one element per job, each element being list with
1404 the requested fields
1409 for job in self._GetJobsUnlocked(job_ids):
1413 jobs.append(self._GetJobInfoUnlocked(job, fields))
1420 """Stops the job queue.
1422 This shutdowns all the worker threads an closes the queue.
1425 self._wpool.TerminateWorkers()
1427 self._queue_lock.Close()
1428 self._queue_lock = None