4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
55 class CancelJob(Exception):
56 """Special exception to cancel a job.
62 """Returns the current timestamp.
65 @return: the current time in the (seconds, microseconds) format
68 return utils.SplitTime(time.time())
71 class _QueuedOpCode(object):
72 """Encapsulates an opcode object.
74 @ivar log: holds the execution log and consists of tuples
75 of the form C{(log_serial, timestamp, level, message)}
76 @ivar input: the OpCode we encapsulate
77 @ivar status: the current status
78 @ivar result: the result of the LU execution
79 @ivar start_timestamp: timestamp for the start of the execution
80 @ivar stop_timestamp: timestamp for the end of the execution
83 __slots__ = ["input", "status", "result", "log",
84 "start_timestamp", "end_timestamp",
87 def __init__(self, op):
88 """Constructor for the _QuededOpCode.
90 @type op: L{opcodes.OpCode}
91 @param op: the opcode we encapsulate
95 self.status = constants.OP_STATUS_QUEUED
98 self.start_timestamp = None
99 self.end_timestamp = None
102 def Restore(cls, state):
103 """Restore the _QueuedOpCode from the serialized form.
106 @param state: the serialized state
107 @rtype: _QueuedOpCode
108 @return: a new _QueuedOpCode instance
111 obj = _QueuedOpCode.__new__(cls)
112 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113 obj.status = state["status"]
114 obj.result = state["result"]
115 obj.log = state["log"]
116 obj.start_timestamp = state.get("start_timestamp", None)
117 obj.end_timestamp = state.get("end_timestamp", None)
121 """Serializes this _QueuedOpCode.
124 @return: the dictionary holding the serialized state
128 "input": self.input.__getstate__(),
129 "status": self.status,
130 "result": self.result,
132 "start_timestamp": self.start_timestamp,
133 "end_timestamp": self.end_timestamp,
137 class _QueuedJob(object):
138 """In-memory job representation.
140 This is what we use to track the user-submitted jobs. Locking must
141 be taken care of by users of this class.
143 @type queue: L{JobQueue}
144 @ivar queue: the parent queue
147 @ivar ops: the list of _QueuedOpCode that constitute the job
148 @type log_serial: int
149 @ivar log_serial: holds the index for the next log entry
150 @ivar received_timestamp: the timestamp for when the job was received
151 @ivar start_timestmap: the timestamp for start of execution
152 @ivar end_timestamp: the timestamp for end of execution
153 @ivar lock_status: In-memory locking information for debugging
154 @ivar change: a Condition variable we use for waiting for job changes
157 # pylint: disable-msg=W0212
158 __slots__ = ["queue", "id", "ops", "log_serial",
159 "received_timestamp", "start_timestamp", "end_timestamp",
160 "lock_status", "change",
163 def __init__(self, queue, job_id, ops):
164 """Constructor for the _QueuedJob.
166 @type queue: L{JobQueue}
167 @param queue: our parent queue
169 @param job_id: our job id
171 @param ops: the list of opcodes we hold, which will be encapsulated
176 # TODO: use a better exception
177 raise Exception("No opcodes")
181 self.ops = [_QueuedOpCode(op) for op in ops]
183 self.received_timestamp = TimeStampNow()
184 self.start_timestamp = None
185 self.end_timestamp = None
187 # In-memory attributes
188 self.lock_status = None
190 # Condition to wait for changes
191 self.change = threading.Condition(self.queue._lock)
194 def Restore(cls, queue, state):
195 """Restore a _QueuedJob from serialized state:
197 @type queue: L{JobQueue}
198 @param queue: to which queue the restored job belongs
200 @param state: the serialized state
202 @return: the restored _JobQueue instance
205 obj = _QueuedJob.__new__(cls)
208 obj.received_timestamp = state.get("received_timestamp", None)
209 obj.start_timestamp = state.get("start_timestamp", None)
210 obj.end_timestamp = state.get("end_timestamp", None)
212 # In-memory attributes
213 obj.lock_status = None
217 for op_state in state["ops"]:
218 op = _QueuedOpCode.Restore(op_state)
219 for log_entry in op.log:
220 obj.log_serial = max(obj.log_serial, log_entry[0])
223 # Condition to wait for changes
224 obj.change = threading.Condition(obj.queue._lock)
229 """Serialize the _JobQueue instance.
232 @return: the serialized state
237 "ops": [op.Serialize() for op in self.ops],
238 "start_timestamp": self.start_timestamp,
239 "end_timestamp": self.end_timestamp,
240 "received_timestamp": self.received_timestamp,
243 def CalcStatus(self):
244 """Compute the status of this job.
246 This function iterates over all the _QueuedOpCodes in the job and
247 based on their status, computes the job status.
250 - if we find a cancelled, or finished with error, the job
251 status will be the same
252 - otherwise, the last opcode with the status one of:
257 will determine the job status
259 - otherwise, it means either all opcodes are queued, or success,
260 and the job status will be the same
262 @return: the job status
265 status = constants.JOB_STATUS_QUEUED
269 if op.status == constants.OP_STATUS_SUCCESS:
274 if op.status == constants.OP_STATUS_QUEUED:
276 elif op.status == constants.OP_STATUS_WAITLOCK:
277 status = constants.JOB_STATUS_WAITLOCK
278 elif op.status == constants.OP_STATUS_RUNNING:
279 status = constants.JOB_STATUS_RUNNING
280 elif op.status == constants.OP_STATUS_CANCELING:
281 status = constants.JOB_STATUS_CANCELING
283 elif op.status == constants.OP_STATUS_ERROR:
284 status = constants.JOB_STATUS_ERROR
285 # The whole job fails if one opcode failed
287 elif op.status == constants.OP_STATUS_CANCELED:
288 status = constants.OP_STATUS_CANCELED
292 status = constants.JOB_STATUS_SUCCESS
296 def GetLogEntries(self, newer_than):
297 """Selectively returns the log entries.
299 @type newer_than: None or int
300 @param newer_than: if this is None, return all log entries,
301 otherwise return only the log entries with serial higher
304 @return: the list of the log entries selected
307 if newer_than is None:
314 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
318 def MarkUnfinishedOps(self, status, result):
319 """Mark unfinished opcodes with a given status and result.
321 This is an utility function for marking all running or waiting to
322 be run opcodes with a given status. Opcodes which are already
323 finalised are not changed.
325 @param status: a given opcode status
326 @param result: the opcode result
331 if op.status in constants.OPS_FINALIZED:
332 assert not_marked, "Finalized opcodes found after non-finalized ones"
339 class _OpExecCallbacks(mcpu.OpExecCbBase):
340 def __init__(self, queue, job, op):
341 """Initializes this class.
343 @type queue: L{JobQueue}
344 @param queue: Job queue
345 @type job: L{_QueuedJob}
346 @param job: Job object
347 @type op: L{_QueuedOpCode}
351 assert queue, "Queue is missing"
352 assert job, "Job is missing"
353 assert op, "Opcode is missing"
359 def NotifyStart(self):
360 """Mark the opcode as running, not lock-waiting.
362 This is called from the mcpu code as a notifier function, when the LU is
363 finally about to start the Exec() method. Of course, to have end-user
364 visible results, the opcode must be initially (before calling into
365 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
368 self._queue.acquire()
370 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
371 constants.OP_STATUS_CANCELING)
373 # All locks are acquired by now
374 self._job.lock_status = None
376 # Cancel here if we were asked to
377 if self._op.status == constants.OP_STATUS_CANCELING:
380 self._op.status = constants.OP_STATUS_RUNNING
382 self._queue.release()
384 def Feedback(self, *args):
385 """Append a log entry.
391 log_type = constants.ELOG_MESSAGE
394 (log_type, log_msg) = args
396 # The time is split to make serialization easier and not lose
398 timestamp = utils.SplitTime(time.time())
400 self._queue.acquire()
402 self._job.log_serial += 1
403 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
405 self._job.change.notifyAll()
407 self._queue.release()
409 def ReportLocks(self, msg):
410 """Write locking information to the job.
412 Called whenever the LU processor is waiting for a lock or has acquired one.
415 # Not getting the queue lock because this is a single assignment
416 self._job.lock_status = msg
419 class _JobQueueWorker(workerpool.BaseWorker):
420 """The actual job workers.
423 def RunTask(self, job): # pylint: disable-msg=W0221
426 This functions processes a job. It is closely tied to the _QueuedJob and
427 _QueuedOpCode classes.
429 @type job: L{_QueuedJob}
430 @param job: the job to be processed
433 logging.info("Worker %s processing job %s",
434 self.worker_id, job.id)
435 proc = mcpu.Processor(self.pool.queue.context, job.id)
440 for idx, op in enumerate(job.ops):
441 op_summary = op.input.Summary()
442 if op.status == constants.OP_STATUS_SUCCESS:
443 # this is a job that was partially completed before master
444 # daemon shutdown, so it can be expected that some opcodes
445 # are already completed successfully (if any did error
446 # out, then the whole job should have been aborted and not
447 # resubmitted for processing)
448 logging.info("Op %s/%s: opcode %s already processed, skipping",
449 idx + 1, count, op_summary)
452 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
457 if op.status == constants.OP_STATUS_CANCELED:
459 assert op.status == constants.OP_STATUS_QUEUED
460 op.status = constants.OP_STATUS_WAITLOCK
462 op.start_timestamp = TimeStampNow()
463 if idx == 0: # first opcode
464 job.start_timestamp = op.start_timestamp
465 queue.UpdateJobUnlocked(job)
467 input_opcode = op.input
471 # Make sure not to hold queue lock while calling ExecOpCode
472 result = proc.ExecOpCode(input_opcode,
473 _OpExecCallbacks(queue, job, op))
477 op.status = constants.OP_STATUS_SUCCESS
479 op.end_timestamp = TimeStampNow()
480 queue.UpdateJobUnlocked(job)
484 logging.info("Op %s/%s: Successfully finished opcode %s",
485 idx + 1, count, op_summary)
487 # Will be handled further up
489 except Exception, err:
493 op.status = constants.OP_STATUS_ERROR
494 if isinstance(err, errors.GenericError):
495 op.result = errors.EncodeException(err)
498 op.end_timestamp = TimeStampNow()
499 logging.info("Op %s/%s: Error in opcode %s: %s",
500 idx + 1, count, op_summary, err)
502 queue.UpdateJobUnlocked(job)
510 queue.CancelJobUnlocked(job)
513 except errors.GenericError, err:
514 logging.exception("Ganeti exception")
516 logging.exception("Unhandled exception")
521 job.lock_status = None
522 job.end_timestamp = TimeStampNow()
523 queue.UpdateJobUnlocked(job)
526 status = job.CalcStatus()
530 logging.info("Worker %s finished job %s, status = %s",
531 self.worker_id, job_id, status)
534 class _JobQueueWorkerPool(workerpool.WorkerPool):
535 """Simple class implementing a job-processing workerpool.
538 def __init__(self, queue):
539 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
544 def _RequireOpenQueue(fn):
545 """Decorator for "public" functions.
547 This function should be used for all 'public' functions. That is,
548 functions usually called from other classes. Note that this should
549 be applied only to methods (not plain functions), since it expects
550 that the decorated function is called with a first argument that has
551 a '_queue_lock' argument.
553 @warning: Use this decorator only after utils.LockedMethod!
562 def wrapper(self, *args, **kwargs):
563 # pylint: disable-msg=W0212
564 assert self._queue_lock is not None, "Queue should be open"
565 return fn(self, *args, **kwargs)
569 class JobQueue(object):
570 """Queue used to manage the jobs.
572 @cvar _RE_JOB_FILE: regex matching the valid job file names
575 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
577 def __init__(self, context):
578 """Constructor for JobQueue.
580 The constructor will initialize the job queue object and then
581 start loading the current jobs from disk, either for starting them
582 (if they were queue) or for aborting them (if they were already
585 @type context: GanetiContext
586 @param context: the context object for access to the configuration
587 data and other ganeti objects
590 self.context = context
591 self._memcache = weakref.WeakValueDictionary()
592 self._my_hostname = utils.HostInfo().name
595 self._lock = threading.Lock()
596 self.acquire = self._lock.acquire
597 self.release = self._lock.release
600 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
603 self._last_serial = jstore.ReadSerial()
604 assert self._last_serial is not None, ("Serial file was modified between"
605 " check in jstore and here")
607 # Get initial list of nodes
608 self._nodes = dict((n.name, n.primary_ip)
609 for n in self.context.cfg.GetAllNodesInfo().values()
610 if n.master_candidate)
614 del self._nodes[self._my_hostname]
618 # TODO: Check consistency across nodes
621 self._wpool = _JobQueueWorkerPool(self)
623 # We need to lock here because WorkerPool.AddTask() may start a job while
624 # we're still doing our work.
627 logging.info("Inspecting job queue")
629 all_job_ids = self._GetJobIDsUnlocked()
630 jobs_count = len(all_job_ids)
631 lastinfo = time.time()
632 for idx, job_id in enumerate(all_job_ids):
633 # Give an update every 1000 jobs or 10 seconds
634 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
635 idx == (jobs_count - 1)):
636 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
637 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
638 lastinfo = time.time()
640 job = self._LoadJobUnlocked(job_id)
642 # a failure in loading the job can cause 'None' to be returned
646 status = job.CalcStatus()
648 if status in (constants.JOB_STATUS_QUEUED, ):
649 self._wpool.AddTask(job)
651 elif status in (constants.JOB_STATUS_RUNNING,
652 constants.JOB_STATUS_WAITLOCK,
653 constants.JOB_STATUS_CANCELING):
654 logging.warning("Unfinished job %s found: %s", job.id, job)
656 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
657 "Unclean master daemon shutdown")
659 self.UpdateJobUnlocked(job)
661 logging.info("Job queue inspection finished")
665 self._wpool.TerminateWorkers()
670 def AddNode(self, node):
671 """Register a new node with the queue.
673 @type node: L{objects.Node}
674 @param node: the node object to be added
677 node_name = node.name
678 assert node_name != self._my_hostname
680 # Clean queue directory on added node
681 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
682 msg = result.fail_msg
684 logging.warning("Cannot cleanup queue directory on node %s: %s",
687 if not node.master_candidate:
688 # remove if existing, ignoring errors
689 self._nodes.pop(node_name, None)
690 # and skip the replication of the job ids
693 # Upload the whole queue excluding archived jobs
694 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
696 # Upload current serial file
697 files.append(constants.JOB_QUEUE_SERIAL_FILE)
699 for file_name in files:
701 content = utils.ReadFile(file_name)
703 result = rpc.RpcRunner.call_jobqueue_update([node_name],
706 msg = result[node_name].fail_msg
708 logging.error("Failed to upload file %s to node %s: %s",
709 file_name, node_name, msg)
711 self._nodes[node_name] = node.primary_ip
715 def RemoveNode(self, node_name):
716 """Callback called when removing nodes from the cluster.
719 @param node_name: the name of the node to remove
723 # The queue is removed by the "leave node" RPC call.
724 del self._nodes[node_name]
729 def _CheckRpcResult(result, nodes, failmsg):
730 """Verifies the status of an RPC call.
732 Since we aim to keep consistency should this node (the current
733 master) fail, we will log errors if our rpc fail, and especially
734 log the case when more than half of the nodes fails.
736 @param result: the data as returned from the rpc call
738 @param nodes: the list of nodes we made the call to
740 @param failmsg: the identifier to be used for logging
747 msg = result[node].fail_msg
750 logging.error("RPC call %s (%s) failed on node %s: %s",
751 result[node].call, failmsg, node, msg)
755 # +1 for the master node
756 if (len(success) + 1) < len(failed):
757 # TODO: Handle failing nodes
758 logging.error("More than half of the nodes failed")
760 def _GetNodeIp(self):
761 """Helper for returning the node name/ip list.
764 @return: a tuple of two lists, the first one with the node
765 names and the second one with the node addresses
768 name_list = self._nodes.keys()
769 addr_list = [self._nodes[name] for name in name_list]
770 return name_list, addr_list
772 def _WriteAndReplicateFileUnlocked(self, file_name, data):
773 """Writes a file locally and then replicates it to all nodes.
775 This function will replace the contents of a file on the local
776 node and then replicate it to all the other nodes we have.
779 @param file_name: the path of the file to be replicated
781 @param data: the new contents of the file
784 utils.WriteFile(file_name, data=data)
786 names, addrs = self._GetNodeIp()
787 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
788 self._CheckRpcResult(result, self._nodes,
789 "Updating %s" % file_name)
791 def _RenameFilesUnlocked(self, rename):
792 """Renames a file locally and then replicate the change.
794 This function will rename a file in the local queue directory
795 and then replicate this rename to all the other nodes we have.
797 @type rename: list of (old, new)
798 @param rename: List containing tuples mapping old to new names
801 # Rename them locally
802 for old, new in rename:
803 utils.RenameFile(old, new, mkdir=True)
805 # ... and on all nodes
806 names, addrs = self._GetNodeIp()
807 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
808 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
811 def _FormatJobID(job_id):
812 """Convert a job ID to string format.
814 Currently this just does C{str(job_id)} after performing some
815 checks, but if we want to change the job id format this will
816 abstract this change.
818 @type job_id: int or long
819 @param job_id: the numeric job id
821 @return: the formatted job id
824 if not isinstance(job_id, (int, long)):
825 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
827 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
832 def _GetArchiveDirectory(cls, job_id):
833 """Returns the archive directory for a job.
836 @param job_id: Job identifier
838 @return: Directory name
841 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
843 def _NewSerialsUnlocked(self, count):
844 """Generates a new job identifier.
846 Job identifiers are unique during the lifetime of a cluster.
849 @param count: how many serials to return
851 @return: a string representing the job identifier.
856 serial = self._last_serial + count
859 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
862 result = [self._FormatJobID(v)
863 for v in range(self._last_serial, serial + 1)]
864 # Keep it only if we were able to write the file
865 self._last_serial = serial
870 def _GetJobPath(job_id):
871 """Returns the job file for a given job id.
874 @param job_id: the job identifier
876 @return: the path to the job file
879 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
882 def _GetArchivedJobPath(cls, job_id):
883 """Returns the archived job file for a give job id.
886 @param job_id: the job identifier
888 @return: the path to the archived job file
891 path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
892 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
895 def _ExtractJobID(cls, name):
896 """Extract the job id from a filename.
899 @param name: the job filename
900 @rtype: job id or None
901 @return: the job id corresponding to the given filename,
902 or None if the filename does not represent a valid
906 m = cls._RE_JOB_FILE.match(name)
912 def _GetJobIDsUnlocked(self, archived=False):
913 """Return all known job IDs.
915 If the parameter archived is True, archived jobs IDs will be
916 included. Currently this argument is unused.
918 The method only looks at disk because it's a requirement that all
919 jobs are present on disk (so in the _memcache we don't have any
923 @return: the list of job IDs
926 # pylint: disable-msg=W0613
927 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
928 jlist = utils.NiceSort(jlist)
931 def _ListJobFiles(self):
932 """Returns the list of current job files.
935 @return: the list of job file names
938 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
939 if self._RE_JOB_FILE.match(name)]
941 def _LoadJobUnlocked(self, job_id):
942 """Loads a job from the disk or memory.
944 Given a job id, this will return the cached job object if
945 existing, or try to load the job from the disk. If loading from
946 disk, it will also add the job to the cache.
948 @param job_id: the job id
949 @rtype: L{_QueuedJob} or None
950 @return: either None or the job object
953 job = self._memcache.get(job_id, None)
955 logging.debug("Found job %s in memcache", job_id)
958 filepath = self._GetJobPath(job_id)
959 logging.debug("Loading job from %s", filepath)
961 raw_data = utils.ReadFile(filepath)
963 if err.errno in (errno.ENOENT, ):
967 data = serializer.LoadJson(raw_data)
970 job = _QueuedJob.Restore(self, data)
971 except Exception, err: # pylint: disable-msg=W0703
972 new_path = self._GetArchivedJobPath(job_id)
973 if filepath == new_path:
974 # job already archived (future case)
975 logging.exception("Can't parse job %s", job_id)
978 logging.exception("Can't parse job %s, will archive.", job_id)
979 self._RenameFilesUnlocked([(filepath, new_path)])
982 self._memcache[job_id] = job
983 logging.debug("Added job %s to the cache", job_id)
986 def _GetJobsUnlocked(self, job_ids):
987 """Return a list of jobs based on their IDs.
990 @param job_ids: either an empty list (meaning all jobs),
993 @return: the list of job objects
997 job_ids = self._GetJobIDsUnlocked()
999 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1002 def _IsQueueMarkedDrain():
1003 """Check if the queue is marked from drain.
1005 This currently uses the queue drain file, which makes it a
1006 per-node flag. In the future this can be moved to the config file.
1009 @return: True of the job queue is marked for draining
1012 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1015 def SetDrainFlag(drain_flag):
1016 """Sets the drain flag for the queue.
1018 This is similar to the function L{backend.JobQueueSetDrainFlag},
1019 and in the future we might merge them.
1021 @type drain_flag: boolean
1022 @param drain_flag: Whether to set or unset the drain flag
1026 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1028 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1032 def _SubmitJobUnlocked(self, job_id, ops):
1033 """Create and store a new job.
1035 This enters the job into our job queue and also puts it on the new
1036 queue, in order for it to be picked up by the queue processors.
1038 @type job_id: job ID
1039 @param job_id: the job ID for the new job
1041 @param ops: The list of OpCodes that will become the new job.
1043 @return: the job ID of the newly created job
1044 @raise errors.JobQueueDrainError: if the job is marked for draining
1047 if self._IsQueueMarkedDrain():
1048 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1050 # Check job queue size
1051 size = len(self._ListJobFiles())
1052 if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1053 # TODO: Autoarchive jobs. Make sure it's not done on every job
1054 # submission, though.
1058 if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1059 raise errors.JobQueueFull()
1061 job = _QueuedJob(self, job_id, ops)
1064 self.UpdateJobUnlocked(job)
1066 logging.debug("Adding new job %s to the cache", job_id)
1067 self._memcache[job_id] = job
1069 # Add to worker pool
1070 self._wpool.AddTask(job)
1076 def SubmitJob(self, ops):
1077 """Create and store a new job.
1079 @see: L{_SubmitJobUnlocked}
1082 job_id = self._NewSerialsUnlocked(1)[0]
1083 return self._SubmitJobUnlocked(job_id, ops)
1087 def SubmitManyJobs(self, jobs):
1088 """Create and store multiple jobs.
1090 @see: L{_SubmitJobUnlocked}
1094 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1095 for job_id, ops in zip(all_job_ids, jobs):
1097 data = self._SubmitJobUnlocked(job_id, ops)
1099 except errors.GenericError, err:
1102 results.append((status, data))
1107 def UpdateJobUnlocked(self, job):
1108 """Update a job's on disk storage.
1110 After a job has been modified, this function needs to be called in
1111 order to write the changes to disk and replicate them to the other
1114 @type job: L{_QueuedJob}
1115 @param job: the changed job
1118 filename = self._GetJobPath(job.id)
1119 data = serializer.DumpJson(job.Serialize(), indent=False)
1120 logging.debug("Writing job %s to %s", job.id, filename)
1121 self._WriteAndReplicateFileUnlocked(filename, data)
1123 # Notify waiters about potential changes
1124 job.change.notifyAll()
1128 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1130 """Waits for changes in a job.
1132 @type job_id: string
1133 @param job_id: Job identifier
1134 @type fields: list of strings
1135 @param fields: Which fields to check for changes
1136 @type prev_job_info: list or None
1137 @param prev_job_info: Last job information returned
1138 @type prev_log_serial: int
1139 @param prev_log_serial: Last job message serial number
1140 @type timeout: float
1141 @param timeout: maximum time to wait
1142 @rtype: tuple (job info, log entries)
1143 @return: a tuple of the job information as required via
1144 the fields parameter, and the log entries as a list
1146 if the job has not changed and the timeout has expired,
1147 we instead return a special value,
1148 L{constants.JOB_NOTCHANGED}, which should be interpreted
1149 as such by the clients
1152 job = self._LoadJobUnlocked(job_id)
1154 logging.debug("Job %s not found", job_id)
1157 def _CheckForChanges():
1158 logging.debug("Waiting for changes in job %s", job_id)
1160 status = job.CalcStatus()
1161 job_info = self._GetJobInfoUnlocked(job, fields)
1162 log_entries = job.GetLogEntries(prev_log_serial)
1164 # Serializing and deserializing data can cause type changes (e.g. from
1165 # tuple to list) or precision loss. We're doing it here so that we get
1166 # the same modifications as the data received from the client. Without
1167 # this, the comparison afterwards might fail without the data being
1168 # significantly different.
1169 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1170 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1172 # Don't even try to wait if the job is no longer running, there will be
1174 if (status not in (constants.JOB_STATUS_QUEUED,
1175 constants.JOB_STATUS_RUNNING,
1176 constants.JOB_STATUS_WAITLOCK) or
1177 prev_job_info != job_info or
1178 (log_entries and prev_log_serial != log_entries[0][0])):
1179 logging.debug("Job %s changed", job_id)
1180 return (job_info, log_entries)
1182 raise utils.RetryAgain()
1185 # Setting wait function to release the queue lock while waiting
1186 return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1187 wait_fn=job.change.wait)
1188 except utils.RetryTimeout:
1189 return constants.JOB_NOTCHANGED
1193 def CancelJob(self, job_id):
1196 This will only succeed if the job has not started yet.
1198 @type job_id: string
1199 @param job_id: job ID of job to be cancelled.
1202 logging.info("Cancelling job %s", job_id)
1204 job = self._LoadJobUnlocked(job_id)
1206 logging.debug("Job %s not found", job_id)
1207 return (False, "Job %s not found" % job_id)
1209 job_status = job.CalcStatus()
1211 if job_status not in (constants.JOB_STATUS_QUEUED,
1212 constants.JOB_STATUS_WAITLOCK):
1213 logging.debug("Job %s is no longer waiting in the queue", job.id)
1214 return (False, "Job %s is no longer waiting in the queue" % job.id)
1216 if job_status == constants.JOB_STATUS_QUEUED:
1217 self.CancelJobUnlocked(job)
1218 return (True, "Job %s canceled" % job.id)
1220 elif job_status == constants.JOB_STATUS_WAITLOCK:
1221 # The worker will notice the new status and cancel the job
1223 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1225 self.UpdateJobUnlocked(job)
1226 return (True, "Job %s will be canceled" % job.id)
1229 def CancelJobUnlocked(self, job):
1230 """Marks a job as canceled.
1234 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1235 "Job canceled by request")
1237 self.UpdateJobUnlocked(job)
1240 def _ArchiveJobsUnlocked(self, jobs):
1243 @type jobs: list of L{_QueuedJob}
1244 @param jobs: Job objects
1246 @return: Number of archived jobs
1252 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1253 constants.JOB_STATUS_SUCCESS,
1254 constants.JOB_STATUS_ERROR):
1255 logging.debug("Job %s is not yet done", job.id)
1258 archive_jobs.append(job)
1260 old = self._GetJobPath(job.id)
1261 new = self._GetArchivedJobPath(job.id)
1262 rename_files.append((old, new))
1264 # TODO: What if 1..n files fail to rename?
1265 self._RenameFilesUnlocked(rename_files)
1267 logging.debug("Successfully archived job(s) %s",
1268 utils.CommaJoin(job.id for job in archive_jobs))
1270 return len(archive_jobs)
1274 def ArchiveJob(self, job_id):
1277 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1279 @type job_id: string
1280 @param job_id: Job ID of job to be archived.
1282 @return: Whether job was archived
1285 logging.info("Archiving job %s", job_id)
1287 job = self._LoadJobUnlocked(job_id)
1289 logging.debug("Job %s not found", job_id)
1292 return self._ArchiveJobsUnlocked([job]) == 1
1296 def AutoArchiveJobs(self, age, timeout):
1297 """Archives all jobs based on age.
1299 The method will archive all jobs which are older than the age
1300 parameter. For jobs that don't have an end timestamp, the start
1301 timestamp will be considered. The special '-1' age will cause
1302 archival of all jobs (that are not running or queued).
1305 @param age: the minimum age in seconds
1308 logging.info("Archiving jobs with age more than %s seconds", age)
1311 end_time = now + timeout
1315 all_job_ids = self._GetJobIDsUnlocked(archived=False)
1317 for idx, job_id in enumerate(all_job_ids):
1320 # Not optimal because jobs could be pending
1321 # TODO: Measure average duration for job archival and take number of
1322 # pending jobs into account.
1323 if time.time() > end_time:
1326 # Returns None if the job failed to load
1327 job = self._LoadJobUnlocked(job_id)
1329 if job.end_timestamp is None:
1330 if job.start_timestamp is None:
1331 job_age = job.received_timestamp
1333 job_age = job.start_timestamp
1335 job_age = job.end_timestamp
1337 if age == -1 or now - job_age[0] > age:
1340 # Archive 10 jobs at a time
1341 if len(pending) >= 10:
1342 archived_count += self._ArchiveJobsUnlocked(pending)
1346 archived_count += self._ArchiveJobsUnlocked(pending)
1348 return (archived_count, len(all_job_ids) - last_touched - 1)
1351 def _GetJobInfoUnlocked(job, fields):
1352 """Returns information about a job.
1354 @type job: L{_QueuedJob}
1355 @param job: the job which we query
1357 @param fields: names of fields to return
1359 @return: list with one element for each field
1360 @raise errors.OpExecError: when an invalid field
1365 for fname in fields:
1368 elif fname == "status":
1369 row.append(job.CalcStatus())
1370 elif fname == "ops":
1371 row.append([op.input.__getstate__() for op in job.ops])
1372 elif fname == "opresult":
1373 row.append([op.result for op in job.ops])
1374 elif fname == "opstatus":
1375 row.append([op.status for op in job.ops])
1376 elif fname == "oplog":
1377 row.append([op.log for op in job.ops])
1378 elif fname == "opstart":
1379 row.append([op.start_timestamp for op in job.ops])
1380 elif fname == "opend":
1381 row.append([op.end_timestamp for op in job.ops])
1382 elif fname == "received_ts":
1383 row.append(job.received_timestamp)
1384 elif fname == "start_ts":
1385 row.append(job.start_timestamp)
1386 elif fname == "end_ts":
1387 row.append(job.end_timestamp)
1388 elif fname == "lock_status":
1389 row.append(job.lock_status)
1390 elif fname == "summary":
1391 row.append([op.input.Summary() for op in job.ops])
1393 raise errors.OpExecError("Invalid job query field '%s'" % fname)
1398 def QueryJobs(self, job_ids, fields):
1399 """Returns a list of jobs in queue.
1401 This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1402 processing for each job.
1405 @param job_ids: sequence of job identifiers or None for all
1407 @param fields: names of fields to return
1409 @return: list one element per job, each element being list with
1410 the requested fields
1415 for job in self._GetJobsUnlocked(job_ids):
1419 jobs.append(self._GetJobInfoUnlocked(job, fields))
1426 """Stops the job queue.
1428 This shutdowns all the worker threads an closes the queue.
1431 self._wpool.TerminateWorkers()
1433 self._queue_lock.Close()
1434 self._queue_lock = None