4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
55 class CancelJob(Exception):
56 """Special exception to cancel a job.
62 """Returns the current timestamp.
65 @return: the current time in the (seconds, microseconds) format
68 return utils.SplitTime(time.time())
71 class _QueuedOpCode(object):
72 """Encapsulates an opcode object.
74 @ivar log: holds the execution log and consists of tuples
75 of the form C{(log_serial, timestamp, level, message)}
76 @ivar input: the OpCode we encapsulate
77 @ivar status: the current status
78 @ivar result: the result of the LU execution
79 @ivar start_timestamp: timestamp for the start of the execution
80 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
81 @ivar stop_timestamp: timestamp for the end of the execution
84 __slots__ = ["input", "status", "result", "log",
85 "start_timestamp", "exec_timestamp", "end_timestamp",
88 def __init__(self, op):
89 """Constructor for the _QuededOpCode.
91 @type op: L{opcodes.OpCode}
92 @param op: the opcode we encapsulate
96 self.status = constants.OP_STATUS_QUEUED
99 self.start_timestamp = None
100 self.exec_timestamp = None
101 self.end_timestamp = None
104 def Restore(cls, state):
105 """Restore the _QueuedOpCode from the serialized form.
108 @param state: the serialized state
109 @rtype: _QueuedOpCode
110 @return: a new _QueuedOpCode instance
113 obj = _QueuedOpCode.__new__(cls)
114 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
115 obj.status = state["status"]
116 obj.result = state["result"]
117 obj.log = state["log"]
118 obj.start_timestamp = state.get("start_timestamp", None)
119 obj.exec_timestamp = state.get("exec_timestamp", None)
120 obj.end_timestamp = state.get("end_timestamp", None)
124 """Serializes this _QueuedOpCode.
127 @return: the dictionary holding the serialized state
131 "input": self.input.__getstate__(),
132 "status": self.status,
133 "result": self.result,
135 "start_timestamp": self.start_timestamp,
136 "exec_timestamp": self.exec_timestamp,
137 "end_timestamp": self.end_timestamp,
141 class _QueuedJob(object):
142 """In-memory job representation.
144 This is what we use to track the user-submitted jobs. Locking must
145 be taken care of by users of this class.
147 @type queue: L{JobQueue}
148 @ivar queue: the parent queue
151 @ivar ops: the list of _QueuedOpCode that constitute the job
152 @type log_serial: int
153 @ivar log_serial: holds the index for the next log entry
154 @ivar received_timestamp: the timestamp for when the job was received
155 @ivar start_timestmap: the timestamp for start of execution
156 @ivar end_timestamp: the timestamp for end of execution
157 @ivar lock_status: In-memory locking information for debugging
158 @ivar change: a Condition variable we use for waiting for job changes
161 # pylint: disable-msg=W0212
162 __slots__ = ["queue", "id", "ops", "log_serial",
163 "received_timestamp", "start_timestamp", "end_timestamp",
164 "lock_status", "change",
167 def __init__(self, queue, job_id, ops):
168 """Constructor for the _QueuedJob.
170 @type queue: L{JobQueue}
171 @param queue: our parent queue
173 @param job_id: our job id
175 @param ops: the list of opcodes we hold, which will be encapsulated
180 # TODO: use a better exception
181 raise Exception("No opcodes")
185 self.ops = [_QueuedOpCode(op) for op in ops]
187 self.received_timestamp = TimeStampNow()
188 self.start_timestamp = None
189 self.end_timestamp = None
191 # In-memory attributes
192 self.lock_status = None
194 # Condition to wait for changes
195 self.change = threading.Condition(self.queue._lock)
198 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
200 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
202 return "<%s at %#x>" % (" ".join(status), id(self))
205 def Restore(cls, queue, state):
206 """Restore a _QueuedJob from serialized state:
208 @type queue: L{JobQueue}
209 @param queue: to which queue the restored job belongs
211 @param state: the serialized state
213 @return: the restored _JobQueue instance
216 obj = _QueuedJob.__new__(cls)
219 obj.received_timestamp = state.get("received_timestamp", None)
220 obj.start_timestamp = state.get("start_timestamp", None)
221 obj.end_timestamp = state.get("end_timestamp", None)
223 # In-memory attributes
224 obj.lock_status = None
228 for op_state in state["ops"]:
229 op = _QueuedOpCode.Restore(op_state)
230 for log_entry in op.log:
231 obj.log_serial = max(obj.log_serial, log_entry[0])
234 # Condition to wait for changes
235 obj.change = threading.Condition(obj.queue._lock)
240 """Serialize the _JobQueue instance.
243 @return: the serialized state
248 "ops": [op.Serialize() for op in self.ops],
249 "start_timestamp": self.start_timestamp,
250 "end_timestamp": self.end_timestamp,
251 "received_timestamp": self.received_timestamp,
254 def CalcStatus(self):
255 """Compute the status of this job.
257 This function iterates over all the _QueuedOpCodes in the job and
258 based on their status, computes the job status.
261 - if we find a cancelled, or finished with error, the job
262 status will be the same
263 - otherwise, the last opcode with the status one of:
268 will determine the job status
270 - otherwise, it means either all opcodes are queued, or success,
271 and the job status will be the same
273 @return: the job status
276 status = constants.JOB_STATUS_QUEUED
280 if op.status == constants.OP_STATUS_SUCCESS:
285 if op.status == constants.OP_STATUS_QUEUED:
287 elif op.status == constants.OP_STATUS_WAITLOCK:
288 status = constants.JOB_STATUS_WAITLOCK
289 elif op.status == constants.OP_STATUS_RUNNING:
290 status = constants.JOB_STATUS_RUNNING
291 elif op.status == constants.OP_STATUS_CANCELING:
292 status = constants.JOB_STATUS_CANCELING
294 elif op.status == constants.OP_STATUS_ERROR:
295 status = constants.JOB_STATUS_ERROR
296 # The whole job fails if one opcode failed
298 elif op.status == constants.OP_STATUS_CANCELED:
299 status = constants.OP_STATUS_CANCELED
303 status = constants.JOB_STATUS_SUCCESS
307 def GetLogEntries(self, newer_than):
308 """Selectively returns the log entries.
310 @type newer_than: None or int
311 @param newer_than: if this is None, return all log entries,
312 otherwise return only the log entries with serial higher
315 @return: the list of the log entries selected
318 if newer_than is None:
325 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
329 def MarkUnfinishedOps(self, status, result):
330 """Mark unfinished opcodes with a given status and result.
332 This is an utility function for marking all running or waiting to
333 be run opcodes with a given status. Opcodes which are already
334 finalised are not changed.
336 @param status: a given opcode status
337 @param result: the opcode result
342 if op.status in constants.OPS_FINALIZED:
343 assert not_marked, "Finalized opcodes found after non-finalized ones"
350 class _OpExecCallbacks(mcpu.OpExecCbBase):
351 def __init__(self, queue, job, op):
352 """Initializes this class.
354 @type queue: L{JobQueue}
355 @param queue: Job queue
356 @type job: L{_QueuedJob}
357 @param job: Job object
358 @type op: L{_QueuedOpCode}
362 assert queue, "Queue is missing"
363 assert job, "Job is missing"
364 assert op, "Opcode is missing"
370 def NotifyStart(self):
371 """Mark the opcode as running, not lock-waiting.
373 This is called from the mcpu code as a notifier function, when the LU is
374 finally about to start the Exec() method. Of course, to have end-user
375 visible results, the opcode must be initially (before calling into
376 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
379 self._queue.acquire()
381 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
382 constants.OP_STATUS_CANCELING)
384 # All locks are acquired by now
385 self._job.lock_status = None
387 # Cancel here if we were asked to
388 if self._op.status == constants.OP_STATUS_CANCELING:
391 self._op.status = constants.OP_STATUS_RUNNING
392 self._op.exec_timestamp = TimeStampNow()
394 self._queue.release()
396 def Feedback(self, *args):
397 """Append a log entry.
403 log_type = constants.ELOG_MESSAGE
406 (log_type, log_msg) = args
408 # The time is split to make serialization easier and not lose
410 timestamp = utils.SplitTime(time.time())
412 self._queue.acquire()
414 self._job.log_serial += 1
415 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
417 self._job.change.notifyAll()
419 self._queue.release()
421 def ReportLocks(self, msg):
422 """Write locking information to the job.
424 Called whenever the LU processor is waiting for a lock or has acquired one.
427 # Not getting the queue lock because this is a single assignment
428 self._job.lock_status = msg
431 class _JobQueueWorker(workerpool.BaseWorker):
432 """The actual job workers.
435 def RunTask(self, job): # pylint: disable-msg=W0221
438 This functions processes a job. It is closely tied to the _QueuedJob and
439 _QueuedOpCode classes.
441 @type job: L{_QueuedJob}
442 @param job: the job to be processed
445 logging.info("Processing job %s", job.id)
446 proc = mcpu.Processor(self.pool.queue.context, job.id)
451 for idx, op in enumerate(job.ops):
452 op_summary = op.input.Summary()
453 if op.status == constants.OP_STATUS_SUCCESS:
454 # this is a job that was partially completed before master
455 # daemon shutdown, so it can be expected that some opcodes
456 # are already completed successfully (if any did error
457 # out, then the whole job should have been aborted and not
458 # resubmitted for processing)
459 logging.info("Op %s/%s: opcode %s already processed, skipping",
460 idx + 1, count, op_summary)
463 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
468 if op.status == constants.OP_STATUS_CANCELED:
470 assert op.status == constants.OP_STATUS_QUEUED
471 op.status = constants.OP_STATUS_WAITLOCK
473 op.start_timestamp = TimeStampNow()
474 if idx == 0: # first opcode
475 job.start_timestamp = op.start_timestamp
476 queue.UpdateJobUnlocked(job)
478 input_opcode = op.input
482 # Make sure not to hold queue lock while calling ExecOpCode
483 result = proc.ExecOpCode(input_opcode,
484 _OpExecCallbacks(queue, job, op))
488 op.status = constants.OP_STATUS_SUCCESS
490 op.end_timestamp = TimeStampNow()
491 queue.UpdateJobUnlocked(job)
495 logging.info("Op %s/%s: Successfully finished opcode %s",
496 idx + 1, count, op_summary)
498 # Will be handled further up
500 except Exception, err:
504 op.status = constants.OP_STATUS_ERROR
505 if isinstance(err, errors.GenericError):
506 op.result = errors.EncodeException(err)
509 op.end_timestamp = TimeStampNow()
510 logging.info("Op %s/%s: Error in opcode %s: %s",
511 idx + 1, count, op_summary, err)
513 queue.UpdateJobUnlocked(job)
521 queue.CancelJobUnlocked(job)
524 except errors.GenericError, err:
525 logging.exception("Ganeti exception")
527 logging.exception("Unhandled exception")
532 job.lock_status = None
533 job.end_timestamp = TimeStampNow()
534 queue.UpdateJobUnlocked(job)
537 status = job.CalcStatus()
541 logging.info("Finished job %s, status = %s", job_id, status)
544 class _JobQueueWorkerPool(workerpool.WorkerPool):
545 """Simple class implementing a job-processing workerpool.
548 def __init__(self, queue):
549 super(_JobQueueWorkerPool, self).__init__("JobQueue",
555 def _RequireOpenQueue(fn):
556 """Decorator for "public" functions.
558 This function should be used for all 'public' functions. That is,
559 functions usually called from other classes. Note that this should
560 be applied only to methods (not plain functions), since it expects
561 that the decorated function is called with a first argument that has
562 a '_queue_lock' argument.
564 @warning: Use this decorator only after utils.LockedMethod!
573 def wrapper(self, *args, **kwargs):
574 # pylint: disable-msg=W0212
575 assert self._queue_lock is not None, "Queue should be open"
576 return fn(self, *args, **kwargs)
580 class JobQueue(object):
581 """Queue used to manage the jobs.
583 @cvar _RE_JOB_FILE: regex matching the valid job file names
586 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
588 def __init__(self, context):
589 """Constructor for JobQueue.
591 The constructor will initialize the job queue object and then
592 start loading the current jobs from disk, either for starting them
593 (if they were queue) or for aborting them (if they were already
596 @type context: GanetiContext
597 @param context: the context object for access to the configuration
598 data and other ganeti objects
601 self.context = context
602 self._memcache = weakref.WeakValueDictionary()
603 self._my_hostname = utils.HostInfo().name
606 self._lock = threading.Lock()
607 self.acquire = self._lock.acquire
608 self.release = self._lock.release
611 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
614 self._last_serial = jstore.ReadSerial()
615 assert self._last_serial is not None, ("Serial file was modified between"
616 " check in jstore and here")
618 # Get initial list of nodes
619 self._nodes = dict((n.name, n.primary_ip)
620 for n in self.context.cfg.GetAllNodesInfo().values()
621 if n.master_candidate)
625 del self._nodes[self._my_hostname]
629 # TODO: Check consistency across nodes
632 self._wpool = _JobQueueWorkerPool(self)
634 # We need to lock here because WorkerPool.AddTask() may start a job while
635 # we're still doing our work.
638 logging.info("Inspecting job queue")
640 all_job_ids = self._GetJobIDsUnlocked()
641 jobs_count = len(all_job_ids)
642 lastinfo = time.time()
643 for idx, job_id in enumerate(all_job_ids):
644 # Give an update every 1000 jobs or 10 seconds
645 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
646 idx == (jobs_count - 1)):
647 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
648 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
649 lastinfo = time.time()
651 job = self._LoadJobUnlocked(job_id)
653 # a failure in loading the job can cause 'None' to be returned
657 status = job.CalcStatus()
659 if status in (constants.JOB_STATUS_QUEUED, ):
660 self._wpool.AddTask(job)
662 elif status in (constants.JOB_STATUS_RUNNING,
663 constants.JOB_STATUS_WAITLOCK,
664 constants.JOB_STATUS_CANCELING):
665 logging.warning("Unfinished job %s found: %s", job.id, job)
667 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
668 "Unclean master daemon shutdown")
670 self.UpdateJobUnlocked(job)
672 logging.info("Job queue inspection finished")
676 self._wpool.TerminateWorkers()
681 def AddNode(self, node):
682 """Register a new node with the queue.
684 @type node: L{objects.Node}
685 @param node: the node object to be added
688 node_name = node.name
689 assert node_name != self._my_hostname
691 # Clean queue directory on added node
692 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
693 msg = result.fail_msg
695 logging.warning("Cannot cleanup queue directory on node %s: %s",
698 if not node.master_candidate:
699 # remove if existing, ignoring errors
700 self._nodes.pop(node_name, None)
701 # and skip the replication of the job ids
704 # Upload the whole queue excluding archived jobs
705 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
707 # Upload current serial file
708 files.append(constants.JOB_QUEUE_SERIAL_FILE)
710 for file_name in files:
712 content = utils.ReadFile(file_name)
714 result = rpc.RpcRunner.call_jobqueue_update([node_name],
717 msg = result[node_name].fail_msg
719 logging.error("Failed to upload file %s to node %s: %s",
720 file_name, node_name, msg)
722 self._nodes[node_name] = node.primary_ip
726 def RemoveNode(self, node_name):
727 """Callback called when removing nodes from the cluster.
730 @param node_name: the name of the node to remove
734 # The queue is removed by the "leave node" RPC call.
735 del self._nodes[node_name]
740 def _CheckRpcResult(result, nodes, failmsg):
741 """Verifies the status of an RPC call.
743 Since we aim to keep consistency should this node (the current
744 master) fail, we will log errors if our rpc fail, and especially
745 log the case when more than half of the nodes fails.
747 @param result: the data as returned from the rpc call
749 @param nodes: the list of nodes we made the call to
751 @param failmsg: the identifier to be used for logging
758 msg = result[node].fail_msg
761 logging.error("RPC call %s (%s) failed on node %s: %s",
762 result[node].call, failmsg, node, msg)
766 # +1 for the master node
767 if (len(success) + 1) < len(failed):
768 # TODO: Handle failing nodes
769 logging.error("More than half of the nodes failed")
771 def _GetNodeIp(self):
772 """Helper for returning the node name/ip list.
775 @return: a tuple of two lists, the first one with the node
776 names and the second one with the node addresses
779 name_list = self._nodes.keys()
780 addr_list = [self._nodes[name] for name in name_list]
781 return name_list, addr_list
783 def _WriteAndReplicateFileUnlocked(self, file_name, data):
784 """Writes a file locally and then replicates it to all nodes.
786 This function will replace the contents of a file on the local
787 node and then replicate it to all the other nodes we have.
790 @param file_name: the path of the file to be replicated
792 @param data: the new contents of the file
795 utils.WriteFile(file_name, data=data)
797 names, addrs = self._GetNodeIp()
798 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
799 self._CheckRpcResult(result, self._nodes,
800 "Updating %s" % file_name)
802 def _RenameFilesUnlocked(self, rename):
803 """Renames a file locally and then replicate the change.
805 This function will rename a file in the local queue directory
806 and then replicate this rename to all the other nodes we have.
808 @type rename: list of (old, new)
809 @param rename: List containing tuples mapping old to new names
812 # Rename them locally
813 for old, new in rename:
814 utils.RenameFile(old, new, mkdir=True)
816 # ... and on all nodes
817 names, addrs = self._GetNodeIp()
818 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
819 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
822 def _FormatJobID(job_id):
823 """Convert a job ID to string format.
825 Currently this just does C{str(job_id)} after performing some
826 checks, but if we want to change the job id format this will
827 abstract this change.
829 @type job_id: int or long
830 @param job_id: the numeric job id
832 @return: the formatted job id
835 if not isinstance(job_id, (int, long)):
836 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
838 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
843 def _GetArchiveDirectory(cls, job_id):
844 """Returns the archive directory for a job.
847 @param job_id: Job identifier
849 @return: Directory name
852 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
854 def _NewSerialsUnlocked(self, count):
855 """Generates a new job identifier.
857 Job identifiers are unique during the lifetime of a cluster.
860 @param count: how many serials to return
862 @return: a string representing the job identifier.
867 serial = self._last_serial + count
870 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
873 result = [self._FormatJobID(v)
874 for v in range(self._last_serial, serial + 1)]
875 # Keep it only if we were able to write the file
876 self._last_serial = serial
881 def _GetJobPath(job_id):
882 """Returns the job file for a given job id.
885 @param job_id: the job identifier
887 @return: the path to the job file
890 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
893 def _GetArchivedJobPath(cls, job_id):
894 """Returns the archived job file for a give job id.
897 @param job_id: the job identifier
899 @return: the path to the archived job file
902 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
903 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
906 def _ExtractJobID(cls, name):
907 """Extract the job id from a filename.
910 @param name: the job filename
911 @rtype: job id or None
912 @return: the job id corresponding to the given filename,
913 or None if the filename does not represent a valid
917 m = cls._RE_JOB_FILE.match(name)
923 def _GetJobIDsUnlocked(self, archived=False):
924 """Return all known job IDs.
926 If the parameter archived is True, archived jobs IDs will be
927 included. Currently this argument is unused.
929 The method only looks at disk because it's a requirement that all
930 jobs are present on disk (so in the _memcache we don't have any
934 @return: the list of job IDs
937 # pylint: disable-msg=W0613
938 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
939 jlist = utils.NiceSort(jlist)
942 def _ListJobFiles(self):
943 """Returns the list of current job files.
946 @return: the list of job file names
949 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
950 if self._RE_JOB_FILE.match(name)]
952 def _LoadJobUnlocked(self, job_id):
953 """Loads a job from the disk or memory.
955 Given a job id, this will return the cached job object if
956 existing, or try to load the job from the disk. If loading from
957 disk, it will also add the job to the cache.
959 @param job_id: the job id
960 @rtype: L{_QueuedJob} or None
961 @return: either None or the job object
964 job = self._memcache.get(job_id, None)
966 logging.debug("Found job %s in memcache", job_id)
969 filepath = self._GetJobPath(job_id)
970 logging.debug("Loading job from %s", filepath)
972 raw_data = utils.ReadFile(filepath)
974 if err.errno in (errno.ENOENT, ):
978 data = serializer.LoadJson(raw_data)
981 job = _QueuedJob.Restore(self, data)
982 except Exception, err: # pylint: disable-msg=W0703
983 new_path = self._GetArchivedJobPath(job_id)
984 if filepath == new_path:
985 # job already archived (future case)
986 logging.exception("Can't parse job %s", job_id)
989 logging.exception("Can't parse job %s, will archive.", job_id)
990 self._RenameFilesUnlocked([(filepath, new_path)])
993 self._memcache[job_id] = job
994 logging.debug("Added job %s to the cache", job_id)
997 def _GetJobsUnlocked(self, job_ids):
998 """Return a list of jobs based on their IDs.
1001 @param job_ids: either an empty list (meaning all jobs),
1002 or a list of job IDs
1004 @return: the list of job objects
1008 job_ids = self._GetJobIDsUnlocked()
1010 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1013 def _IsQueueMarkedDrain():
1014 """Check if the queue is marked from drain.
1016 This currently uses the queue drain file, which makes it a
1017 per-node flag. In the future this can be moved to the config file.
1020 @return: True of the job queue is marked for draining
1023 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1026 def SetDrainFlag(drain_flag):
1027 """Sets the drain flag for the queue.
1029 This is similar to the function L{backend.JobQueueSetDrainFlag},
1030 and in the future we might merge them.
1032 @type drain_flag: boolean
1033 @param drain_flag: Whether to set or unset the drain flag
1037 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1039 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1043 def _SubmitJobUnlocked(self, job_id, ops):
1044 """Create and store a new job.
1046 This enters the job into our job queue and also puts it on the new
1047 queue, in order for it to be picked up by the queue processors.
1049 @type job_id: job ID
1050 @param job_id: the job ID for the new job
1052 @param ops: The list of OpCodes that will become the new job.
1054 @return: the job ID of the newly created job
1055 @raise errors.JobQueueDrainError: if the job is marked for draining
1058 if self._IsQueueMarkedDrain():
1059 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1061 # Check job queue size
1062 size = len(self._ListJobFiles())
1063 if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1064 # TODO: Autoarchive jobs. Make sure it's not done on every job
1065 # submission, though.
1069 if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1070 raise errors.JobQueueFull()
1072 job = _QueuedJob(self, job_id, ops)
1075 self.UpdateJobUnlocked(job)
1077 logging.debug("Adding new job %s to the cache", job_id)
1078 self._memcache[job_id] = job
1080 # Add to worker pool
1081 self._wpool.AddTask(job)
1087 def SubmitJob(self, ops):
1088 """Create and store a new job.
1090 @see: L{_SubmitJobUnlocked}
1093 job_id = self._NewSerialsUnlocked(1)[0]
1094 return self._SubmitJobUnlocked(job_id, ops)
1098 def SubmitManyJobs(self, jobs):
1099 """Create and store multiple jobs.
1101 @see: L{_SubmitJobUnlocked}
1105 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1106 for job_id, ops in zip(all_job_ids, jobs):
1108 data = self._SubmitJobUnlocked(job_id, ops)
1110 except errors.GenericError, err:
1113 results.append((status, data))
1118 def UpdateJobUnlocked(self, job):
1119 """Update a job's on disk storage.
1121 After a job has been modified, this function needs to be called in
1122 order to write the changes to disk and replicate them to the other
1125 @type job: L{_QueuedJob}
1126 @param job: the changed job
1129 filename = self._GetJobPath(job.id)
1130 data = serializer.DumpJson(job.Serialize(), indent=False)
1131 logging.debug("Writing job %s to %s", job.id, filename)
1132 self._WriteAndReplicateFileUnlocked(filename, data)
1134 # Notify waiters about potential changes
1135 job.change.notifyAll()
1139 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1141 """Waits for changes in a job.
1143 @type job_id: string
1144 @param job_id: Job identifier
1145 @type fields: list of strings
1146 @param fields: Which fields to check for changes
1147 @type prev_job_info: list or None
1148 @param prev_job_info: Last job information returned
1149 @type prev_log_serial: int
1150 @param prev_log_serial: Last job message serial number
1151 @type timeout: float
1152 @param timeout: maximum time to wait
1153 @rtype: tuple (job info, log entries)
1154 @return: a tuple of the job information as required via
1155 the fields parameter, and the log entries as a list
1157 if the job has not changed and the timeout has expired,
1158 we instead return a special value,
1159 L{constants.JOB_NOTCHANGED}, which should be interpreted
1160 as such by the clients
1163 job = self._LoadJobUnlocked(job_id)
1165 logging.debug("Job %s not found", job_id)
1168 def _CheckForChanges():
1169 logging.debug("Waiting for changes in job %s", job_id)
1171 status = job.CalcStatus()
1172 job_info = self._GetJobInfoUnlocked(job, fields)
1173 log_entries = job.GetLogEntries(prev_log_serial)
1175 # Serializing and deserializing data can cause type changes (e.g. from
1176 # tuple to list) or precision loss. We're doing it here so that we get
1177 # the same modifications as the data received from the client. Without
1178 # this, the comparison afterwards might fail without the data being
1179 # significantly different.
1180 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1181 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1183 # Don't even try to wait if the job is no longer running, there will be
1185 if (status not in (constants.JOB_STATUS_QUEUED,
1186 constants.JOB_STATUS_RUNNING,
1187 constants.JOB_STATUS_WAITLOCK) or
1188 prev_job_info != job_info or
1189 (log_entries and prev_log_serial != log_entries[0][0])):
1190 logging.debug("Job %s changed", job_id)
1191 return (job_info, log_entries)
1193 raise utils.RetryAgain()
1196 # Setting wait function to release the queue lock while waiting
1197 return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1198 wait_fn=job.change.wait)
1199 except utils.RetryTimeout:
1200 return constants.JOB_NOTCHANGED
1204 def CancelJob(self, job_id):
1207 This will only succeed if the job has not started yet.
1209 @type job_id: string
1210 @param job_id: job ID of job to be cancelled.
1213 logging.info("Cancelling job %s", job_id)
1215 job = self._LoadJobUnlocked(job_id)
1217 logging.debug("Job %s not found", job_id)
1218 return (False, "Job %s not found" % job_id)
1220 job_status = job.CalcStatus()
1222 if job_status not in (constants.JOB_STATUS_QUEUED,
1223 constants.JOB_STATUS_WAITLOCK):
1224 logging.debug("Job %s is no longer waiting in the queue", job.id)
1225 return (False, "Job %s is no longer waiting in the queue" % job.id)
1227 if job_status == constants.JOB_STATUS_QUEUED:
1228 self.CancelJobUnlocked(job)
1229 return (True, "Job %s canceled" % job.id)
1231 elif job_status == constants.JOB_STATUS_WAITLOCK:
1232 # The worker will notice the new status and cancel the job
1234 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1236 self.UpdateJobUnlocked(job)
1237 return (True, "Job %s will be canceled" % job.id)
1240 def CancelJobUnlocked(self, job):
1241 """Marks a job as canceled.
1245 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1246 "Job canceled by request")
1248 self.UpdateJobUnlocked(job)
1251 def _ArchiveJobsUnlocked(self, jobs):
1254 @type jobs: list of L{_QueuedJob}
1255 @param jobs: Job objects
1257 @return: Number of archived jobs
1263 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1264 constants.JOB_STATUS_SUCCESS,
1265 constants.JOB_STATUS_ERROR):
1266 logging.debug("Job %s is not yet done", job.id)
1269 archive_jobs.append(job)
1271 old = self._GetJobPath(job.id)
1272 new = self._GetArchivedJobPath(job.id)
1273 rename_files.append((old, new))
1275 # TODO: What if 1..n files fail to rename?
1276 self._RenameFilesUnlocked(rename_files)
1278 logging.debug("Successfully archived job(s) %s",
1279 utils.CommaJoin(job.id for job in archive_jobs))
1281 return len(archive_jobs)
1285 def ArchiveJob(self, job_id):
1288 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1290 @type job_id: string
1291 @param job_id: Job ID of job to be archived.
1293 @return: Whether job was archived
1296 logging.info("Archiving job %s", job_id)
1298 job = self._LoadJobUnlocked(job_id)
1300 logging.debug("Job %s not found", job_id)
1303 return self._ArchiveJobsUnlocked([job]) == 1
1307 def AutoArchiveJobs(self, age, timeout):
1308 """Archives all jobs based on age.
1310 The method will archive all jobs which are older than the age
1311 parameter. For jobs that don't have an end timestamp, the start
1312 timestamp will be considered. The special '-1' age will cause
1313 archival of all jobs (that are not running or queued).
1316 @param age: the minimum age in seconds
1319 logging.info("Archiving jobs with age more than %s seconds", age)
1322 end_time = now + timeout
1326 all_job_ids = self._GetJobIDsUnlocked(archived=False)
1328 for idx, job_id in enumerate(all_job_ids):
1329 last_touched = idx + 1
1331 # Not optimal because jobs could be pending
1332 # TODO: Measure average duration for job archival and take number of
1333 # pending jobs into account.
1334 if time.time() > end_time:
1337 # Returns None if the job failed to load
1338 job = self._LoadJobUnlocked(job_id)
1340 if job.end_timestamp is None:
1341 if job.start_timestamp is None:
1342 job_age = job.received_timestamp
1344 job_age = job.start_timestamp
1346 job_age = job.end_timestamp
1348 if age == -1 or now - job_age[0] > age:
1351 # Archive 10 jobs at a time
1352 if len(pending) >= 10:
1353 archived_count += self._ArchiveJobsUnlocked(pending)
1357 archived_count += self._ArchiveJobsUnlocked(pending)
1359 return (archived_count, len(all_job_ids) - last_touched)
1362 def _GetJobInfoUnlocked(job, fields):
1363 """Returns information about a job.
1365 @type job: L{_QueuedJob}
1366 @param job: the job which we query
1368 @param fields: names of fields to return
1370 @return: list with one element for each field
1371 @raise errors.OpExecError: when an invalid field
1376 for fname in fields:
1379 elif fname == "status":
1380 row.append(job.CalcStatus())
1381 elif fname == "ops":
1382 row.append([op.input.__getstate__() for op in job.ops])
1383 elif fname == "opresult":
1384 row.append([op.result for op in job.ops])
1385 elif fname == "opstatus":
1386 row.append([op.status for op in job.ops])
1387 elif fname == "oplog":
1388 row.append([op.log for op in job.ops])
1389 elif fname == "opstart":
1390 row.append([op.start_timestamp for op in job.ops])
1391 elif fname == "opexec":
1392 row.append([op.exec_timestamp for op in job.ops])
1393 elif fname == "opend":
1394 row.append([op.end_timestamp for op in job.ops])
1395 elif fname == "received_ts":
1396 row.append(job.received_timestamp)
1397 elif fname == "start_ts":
1398 row.append(job.start_timestamp)
1399 elif fname == "end_ts":
1400 row.append(job.end_timestamp)
1401 elif fname == "lock_status":
1402 row.append(job.lock_status)
1403 elif fname == "summary":
1404 row.append([op.input.Summary() for op in job.ops])
1406 raise errors.OpExecError("Invalid job query field '%s'" % fname)
1411 def QueryJobs(self, job_ids, fields):
1412 """Returns a list of jobs in queue.
1414 This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1415 processing for each job.
1418 @param job_ids: sequence of job identifiers or None for all
1420 @param fields: names of fields to return
1422 @return: list one element per job, each element being list with
1423 the requested fields
1428 for job in self._GetJobsUnlocked(job_ids):
1432 jobs.append(self._GetJobInfoUnlocked(job, fields))
1439 """Stops the job queue.
1441 This shutdowns all the worker threads an closes the queue.
1444 self._wpool.TerminateWorkers()
1446 self._queue_lock.Close()
1447 self._queue_lock = None