4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
55 class CancelJob(Exception):
56 """Special exception to cancel a job.
62 """Returns the current timestamp.
65 @return: the current time in the (seconds, microseconds) format
68 return utils.SplitTime(time.time())
71 class _QueuedOpCode(object):
72 """Encasulates an opcode object.
74 @ivar log: holds the execution log and consists of tuples
75 of the form C{(log_serial, timestamp, level, message)}
76 @ivar input: the OpCode we encapsulate
77 @ivar status: the current status
78 @ivar result: the result of the LU execution
79 @ivar start_timestamp: timestamp for the start of the execution
80 @ivar stop_timestamp: timestamp for the end of the execution
83 def __init__(self, op):
84 """Constructor for the _QuededOpCode.
86 @type op: L{opcodes.OpCode}
87 @param op: the opcode we encapsulate
91 self.status = constants.OP_STATUS_QUEUED
94 self.start_timestamp = None
95 self.end_timestamp = None
98 def Restore(cls, state):
99 """Restore the _QueuedOpCode from the serialized form.
102 @param state: the serialized state
103 @rtype: _QueuedOpCode
104 @return: a new _QueuedOpCode instance
107 obj = _QueuedOpCode.__new__(cls)
108 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109 obj.status = state["status"]
110 obj.result = state["result"]
111 obj.log = state["log"]
112 obj.start_timestamp = state.get("start_timestamp", None)
113 obj.end_timestamp = state.get("end_timestamp", None)
117 """Serializes this _QueuedOpCode.
120 @return: the dictionary holding the serialized state
124 "input": self.input.__getstate__(),
125 "status": self.status,
126 "result": self.result,
128 "start_timestamp": self.start_timestamp,
129 "end_timestamp": self.end_timestamp,
133 class _QueuedJob(object):
134 """In-memory job representation.
136 This is what we use to track the user-submitted jobs. Locking must
137 be taken care of by users of this class.
139 @type queue: L{JobQueue}
140 @ivar queue: the parent queue
143 @ivar ops: the list of _QueuedOpCode that constitute the job
144 @type run_op_index: int
145 @ivar run_op_index: the currently executing opcode, or -1 if
146 we didn't yet start executing
147 @type log_serial: int
148 @ivar log_serial: holds the index for the next log entry
149 @ivar received_timestamp: the timestamp for when the job was received
150 @ivar start_timestmap: the timestamp for start of execution
151 @ivar end_timestamp: the timestamp for end of execution
152 @ivar change: a Condition variable we use for waiting for job changes
155 def __init__(self, queue, job_id, ops):
156 """Constructor for the _QueuedJob.
158 @type queue: L{JobQueue}
159 @param queue: our parent queue
161 @param job_id: our job id
163 @param ops: the list of opcodes we hold, which will be encapsulated
168 # TODO: use a better exception
169 raise Exception("No opcodes")
173 self.ops = [_QueuedOpCode(op) for op in ops]
174 self.run_op_index = -1
176 self.received_timestamp = TimeStampNow()
177 self.start_timestamp = None
178 self.end_timestamp = None
180 # Condition to wait for changes
181 self.change = threading.Condition(self.queue._lock)
184 def Restore(cls, queue, state):
185 """Restore a _QueuedJob from serialized state:
187 @type queue: L{JobQueue}
188 @param queue: to which queue the restored job belongs
190 @param state: the serialized state
192 @return: the restored _JobQueue instance
195 obj = _QueuedJob.__new__(cls)
198 obj.run_op_index = state["run_op_index"]
199 obj.received_timestamp = state.get("received_timestamp", None)
200 obj.start_timestamp = state.get("start_timestamp", None)
201 obj.end_timestamp = state.get("end_timestamp", None)
205 for op_state in state["ops"]:
206 op = _QueuedOpCode.Restore(op_state)
207 for log_entry in op.log:
208 obj.log_serial = max(obj.log_serial, log_entry[0])
211 # Condition to wait for changes
212 obj.change = threading.Condition(obj.queue._lock)
217 """Serialize the _JobQueue instance.
220 @return: the serialized state
225 "ops": [op.Serialize() for op in self.ops],
226 "run_op_index": self.run_op_index,
227 "start_timestamp": self.start_timestamp,
228 "end_timestamp": self.end_timestamp,
229 "received_timestamp": self.received_timestamp,
232 def CalcStatus(self):
233 """Compute the status of this job.
235 This function iterates over all the _QueuedOpCodes in the job and
236 based on their status, computes the job status.
239 - if we find a cancelled, or finished with error, the job
240 status will be the same
241 - otherwise, the last opcode with the status one of:
246 will determine the job status
248 - otherwise, it means either all opcodes are queued, or success,
249 and the job status will be the same
251 @return: the job status
254 status = constants.JOB_STATUS_QUEUED
258 if op.status == constants.OP_STATUS_SUCCESS:
263 if op.status == constants.OP_STATUS_QUEUED:
265 elif op.status == constants.OP_STATUS_WAITLOCK:
266 status = constants.JOB_STATUS_WAITLOCK
267 elif op.status == constants.OP_STATUS_RUNNING:
268 status = constants.JOB_STATUS_RUNNING
269 elif op.status == constants.OP_STATUS_CANCELING:
270 status = constants.JOB_STATUS_CANCELING
272 elif op.status == constants.OP_STATUS_ERROR:
273 status = constants.JOB_STATUS_ERROR
274 # The whole job fails if one opcode failed
276 elif op.status == constants.OP_STATUS_CANCELED:
277 status = constants.OP_STATUS_CANCELED
281 status = constants.JOB_STATUS_SUCCESS
285 def GetLogEntries(self, newer_than):
286 """Selectively returns the log entries.
288 @type newer_than: None or int
289 @param newer_than: if this is None, return all log enties,
290 otherwise return only the log entries with serial higher
293 @return: the list of the log entries selected
296 if newer_than is None:
303 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
308 class _JobQueueWorker(workerpool.BaseWorker):
309 """The actual job workers.
312 def _NotifyStart(self):
313 """Mark the opcode as running, not lock-waiting.
315 This is called from the mcpu code as a notifier function, when the
316 LU is finally about to start the Exec() method. Of course, to have
317 end-user visible results, the opcode must be initially (before
318 calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
321 assert self.queue, "Queue attribute is missing"
322 assert self.opcode, "Opcode attribute is missing"
326 assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327 constants.OP_STATUS_CANCELING)
329 # Cancel here if we were asked to
330 if self.opcode.status == constants.OP_STATUS_CANCELING:
333 self.opcode.status = constants.OP_STATUS_RUNNING
337 def RunTask(self, job):
340 This functions processes a job. It is closely tied to the _QueuedJob and
341 _QueuedOpCode classes.
343 @type job: L{_QueuedJob}
344 @param job: the job to be processed
347 logging.debug("Worker %s processing job %s",
348 self.worker_id, job.id)
349 proc = mcpu.Processor(self.pool.queue.context)
350 self.queue = queue = job.queue
354 for idx, op in enumerate(job.ops):
356 logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
360 assert op.status == constants.OP_STATUS_QUEUED
361 job.run_op_index = idx
362 op.status = constants.OP_STATUS_WAITLOCK
364 op.start_timestamp = TimeStampNow()
365 if idx == 0: # first opcode
366 job.start_timestamp = op.start_timestamp
367 queue.UpdateJobUnlocked(job)
369 input_opcode = op.input
374 """Append a log entry.
380 log_type = constants.ELOG_MESSAGE
383 log_type, log_msg = args
385 # The time is split to make serialization easier and not lose
387 timestamp = utils.SplitTime(time.time())
392 op.log.append((job.log_serial, timestamp, log_type, log_msg))
394 job.change.notifyAll()
398 # Make sure not to hold lock while _Log is called
400 result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
404 op.status = constants.OP_STATUS_SUCCESS
406 op.end_timestamp = TimeStampNow()
407 queue.UpdateJobUnlocked(job)
411 logging.debug("Op %s/%s: Successfully finished %s",
414 # Will be handled further up
416 except Exception, err:
420 op.status = constants.OP_STATUS_ERROR
422 op.end_timestamp = TimeStampNow()
423 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
425 queue.UpdateJobUnlocked(job)
433 queue.CancelJobUnlocked(job)
436 except errors.GenericError, err:
437 logging.exception("Ganeti exception")
439 logging.exception("Unhandled exception")
445 job.end_timestamp = TimeStampNow()
446 queue.UpdateJobUnlocked(job)
449 status = job.CalcStatus()
452 logging.debug("Worker %s finished job %s, status = %s",
453 self.worker_id, job_id, status)
456 class _JobQueueWorkerPool(workerpool.WorkerPool):
457 """Simple class implementing a job-processing workerpool.
460 def __init__(self, queue):
461 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
466 class JobQueue(object):
467 """Quue used to manaage the jobs.
469 @cvar _RE_JOB_FILE: regex matching the valid job file names
472 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
474 def _RequireOpenQueue(fn):
475 """Decorator for "public" functions.
477 This function should be used for all 'public' functions. That is,
478 functions usually called from other classes.
480 @warning: Use this decorator only after utils.LockedMethod!
489 def wrapper(self, *args, **kwargs):
490 assert self._queue_lock is not None, "Queue should be open"
491 return fn(self, *args, **kwargs)
494 def __init__(self, context):
495 """Constructor for JobQueue.
497 The constructor will initialize the job queue object and then
498 start loading the current jobs from disk, either for starting them
499 (if they were queue) or for aborting them (if they were already
502 @type context: GanetiContext
503 @param context: the context object for access to the configuration
504 data and other ganeti objects
507 self.context = context
508 self._memcache = weakref.WeakValueDictionary()
509 self._my_hostname = utils.HostInfo().name
512 self._lock = threading.Lock()
513 self.acquire = self._lock.acquire
514 self.release = self._lock.release
517 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
520 self._last_serial = jstore.ReadSerial()
521 assert self._last_serial is not None, ("Serial file was modified between"
522 " check in jstore and here")
524 # Get initial list of nodes
525 self._nodes = dict((n.name, n.primary_ip)
526 for n in self.context.cfg.GetAllNodesInfo().values()
527 if n.master_candidate)
531 del self._nodes[self._my_hostname]
535 # TODO: Check consistency across nodes
538 self._wpool = _JobQueueWorkerPool(self)
540 # We need to lock here because WorkerPool.AddTask() may start a job while
541 # we're still doing our work.
544 logging.info("Inspecting job queue")
546 all_job_ids = self._GetJobIDsUnlocked()
547 jobs_count = len(all_job_ids)
548 lastinfo = time.time()
549 for idx, job_id in enumerate(all_job_ids):
550 # Give an update every 1000 jobs or 10 seconds
551 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
552 idx == (jobs_count - 1)):
553 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
554 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
555 lastinfo = time.time()
557 job = self._LoadJobUnlocked(job_id)
559 # a failure in loading the job can cause 'None' to be returned
563 status = job.CalcStatus()
565 if status in (constants.JOB_STATUS_QUEUED, ):
566 self._wpool.AddTask(job)
568 elif status in (constants.JOB_STATUS_RUNNING,
569 constants.JOB_STATUS_WAITLOCK,
570 constants.JOB_STATUS_CANCELING):
571 logging.warning("Unfinished job %s found: %s", job.id, job)
574 op.status = constants.OP_STATUS_ERROR
575 op.result = "Unclean master daemon shutdown"
577 self.UpdateJobUnlocked(job)
579 logging.info("Job queue inspection finished")
583 self._wpool.TerminateWorkers()
588 def AddNode(self, node):
589 """Register a new node with the queue.
591 @type node: L{objects.Node}
592 @param node: the node object to be added
595 node_name = node.name
596 assert node_name != self._my_hostname
598 # Clean queue directory on added node
599 rpc.RpcRunner.call_jobqueue_purge(node_name)
601 if not node.master_candidate:
602 # remove if existing, ignoring errors
603 self._nodes.pop(node_name, None)
604 # and skip the replication of the job ids
607 # Upload the whole queue excluding archived jobs
608 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
610 # Upload current serial file
611 files.append(constants.JOB_QUEUE_SERIAL_FILE)
613 for file_name in files:
615 fd = open(file_name, "r")
621 result = rpc.RpcRunner.call_jobqueue_update([node_name],
624 if not result[node_name]:
625 logging.error("Failed to upload %s to %s", file_name, node_name)
627 self._nodes[node_name] = node.primary_ip
631 def RemoveNode(self, node_name):
632 """Callback called when removing nodes from the cluster.
635 @param node_name: the name of the node to remove
639 # The queue is removed by the "leave node" RPC call.
640 del self._nodes[node_name]
644 def _CheckRpcResult(self, result, nodes, failmsg):
645 """Verifies the status of an RPC call.
647 Since we aim to keep consistency should this node (the current
648 master) fail, we will log errors if our rpc fail, and especially
649 log the case when more than half of the nodes failes.
651 @param result: the data as returned from the rpc call
653 @param nodes: the list of nodes we made the call to
655 @param failmsg: the identifier to be used for logging
668 logging.error("%s failed on %s", failmsg, ", ".join(failed))
670 # +1 for the master node
671 if (len(success) + 1) < len(failed):
672 # TODO: Handle failing nodes
673 logging.error("More than half of the nodes failed")
675 def _GetNodeIp(self):
676 """Helper for returning the node name/ip list.
679 @return: a tuple of two lists, the first one with the node
680 names and the second one with the node addresses
683 name_list = self._nodes.keys()
684 addr_list = [self._nodes[name] for name in name_list]
685 return name_list, addr_list
687 def _WriteAndReplicateFileUnlocked(self, file_name, data):
688 """Writes a file locally and then replicates it to all nodes.
690 This function will replace the contents of a file on the local
691 node and then replicate it to all the other nodes we have.
694 @param file_name: the path of the file to be replicated
696 @param data: the new contents of the file
699 utils.WriteFile(file_name, data=data)
701 names, addrs = self._GetNodeIp()
702 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
703 self._CheckRpcResult(result, self._nodes,
704 "Updating %s" % file_name)
706 def _RenameFilesUnlocked(self, rename):
707 """Renames a file locally and then replicate the change.
709 This function will rename a file in the local queue directory
710 and then replicate this rename to all the other nodes we have.
712 @type rename: list of (old, new)
713 @param rename: List containing tuples mapping old to new names
716 # Rename them locally
717 for old, new in rename:
718 utils.RenameFile(old, new, mkdir=True)
720 # ... and on all nodes
721 names, addrs = self._GetNodeIp()
722 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
723 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
725 def _FormatJobID(self, job_id):
726 """Convert a job ID to string format.
728 Currently this just does C{str(job_id)} after performing some
729 checks, but if we want to change the job id format this will
730 abstract this change.
732 @type job_id: int or long
733 @param job_id: the numeric job id
735 @return: the formatted job id
738 if not isinstance(job_id, (int, long)):
739 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
741 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
746 def _GetArchiveDirectory(cls, job_id):
747 """Returns the archive directory for a job.
750 @param job_id: Job identifier
752 @return: Directory name
755 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
757 def _NewSerialUnlocked(self):
758 """Generates a new job identifier.
760 Job identifiers are unique during the lifetime of a cluster.
763 @return: a string representing the job identifier.
767 serial = self._last_serial + 1
770 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
773 # Keep it only if we were able to write the file
774 self._last_serial = serial
776 return self._FormatJobID(serial)
779 def _GetJobPath(job_id):
780 """Returns the job file for a given job id.
783 @param job_id: the job identifier
785 @return: the path to the job file
788 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
791 def _GetArchivedJobPath(cls, job_id):
792 """Returns the archived job file for a give job id.
795 @param job_id: the job identifier
797 @return: the path to the archived job file
800 path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
801 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
804 def _ExtractJobID(cls, name):
805 """Extract the job id from a filename.
808 @param name: the job filename
809 @rtype: job id or None
810 @return: the job id corresponding to the given filename,
811 or None if the filename does not represent a valid
815 m = cls._RE_JOB_FILE.match(name)
821 def _GetJobIDsUnlocked(self, archived=False):
822 """Return all known job IDs.
824 If the parameter archived is True, archived jobs IDs will be
825 included. Currently this argument is unused.
827 The method only looks at disk because it's a requirement that all
828 jobs are present on disk (so in the _memcache we don't have any
832 @return: the list of job IDs
835 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
836 jlist = utils.NiceSort(jlist)
839 def _ListJobFiles(self):
840 """Returns the list of current job files.
843 @return: the list of job file names
846 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
847 if self._RE_JOB_FILE.match(name)]
849 def _LoadJobUnlocked(self, job_id):
850 """Loads a job from the disk or memory.
852 Given a job id, this will return the cached job object if
853 existing, or try to load the job from the disk. If loading from
854 disk, it will also add the job to the cache.
856 @param job_id: the job id
857 @rtype: L{_QueuedJob} or None
858 @return: either None or the job object
861 job = self._memcache.get(job_id, None)
863 logging.debug("Found job %s in memcache", job_id)
866 filepath = self._GetJobPath(job_id)
867 logging.debug("Loading job from %s", filepath)
869 fd = open(filepath, "r")
871 if err.errno in (errno.ENOENT, ):
875 data = serializer.LoadJson(fd.read())
880 job = _QueuedJob.Restore(self, data)
881 except Exception, err:
882 new_path = self._GetArchivedJobPath(job_id)
883 if filepath == new_path:
884 # job already archived (future case)
885 logging.exception("Can't parse job %s", job_id)
888 logging.exception("Can't parse job %s, will archive.", job_id)
889 self._RenameFilesUnlocked([(filepath, new_path)])
892 self._memcache[job_id] = job
893 logging.debug("Added job %s to the cache", job_id)
896 def _GetJobsUnlocked(self, job_ids):
897 """Return a list of jobs based on their IDs.
900 @param job_ids: either an empty list (meaning all jobs),
903 @return: the list of job objects
907 job_ids = self._GetJobIDsUnlocked()
909 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
912 def _IsQueueMarkedDrain():
913 """Check if the queue is marked from drain.
915 This currently uses the queue drain file, which makes it a
916 per-node flag. In the future this can be moved to the config file.
919 @return: True of the job queue is marked for draining
922 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
925 def SetDrainFlag(drain_flag):
926 """Sets the drain flag for the queue.
928 This is similar to the function L{backend.JobQueueSetDrainFlag},
929 and in the future we might merge them.
931 @type drain_flag: boolean
932 @param drain_flag: wheter to set or unset the drain flag
936 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
938 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
943 def SubmitJob(self, ops):
944 """Create and store a new job.
946 This enters the job into our job queue and also puts it on the new
947 queue, in order for it to be picked up by the queue processors.
950 @param ops: The list of OpCodes that will become the new job.
952 @return: the job ID of the newly created job
953 @raise errors.JobQueueDrainError: if the job is marked for draining
956 if self._IsQueueMarkedDrain():
957 raise errors.JobQueueDrainError()
959 # Check job queue size
960 size = len(self._ListJobFiles())
961 if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
962 # TODO: Autoarchive jobs. Make sure it's not done on every job
963 # submission, though.
967 if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
968 raise errors.JobQueueFull()
971 job_id = self._NewSerialUnlocked()
972 job = _QueuedJob(self, job_id, ops)
975 self.UpdateJobUnlocked(job)
977 logging.debug("Adding new job %s to the cache", job_id)
978 self._memcache[job_id] = job
981 self._wpool.AddTask(job)
986 def UpdateJobUnlocked(self, job):
987 """Update a job's on disk storage.
989 After a job has been modified, this function needs to be called in
990 order to write the changes to disk and replicate them to the other
993 @type job: L{_QueuedJob}
994 @param job: the changed job
997 filename = self._GetJobPath(job.id)
998 data = serializer.DumpJson(job.Serialize(), indent=False)
999 logging.debug("Writing job %s to %s", job.id, filename)
1000 self._WriteAndReplicateFileUnlocked(filename, data)
1002 # Notify waiters about potential changes
1003 job.change.notifyAll()
1007 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1009 """Waits for changes in a job.
1011 @type job_id: string
1012 @param job_id: Job identifier
1013 @type fields: list of strings
1014 @param fields: Which fields to check for changes
1015 @type prev_job_info: list or None
1016 @param prev_job_info: Last job information returned
1017 @type prev_log_serial: int
1018 @param prev_log_serial: Last job message serial number
1019 @type timeout: float
1020 @param timeout: maximum time to wait
1021 @rtype: tuple (job info, log entries)
1022 @return: a tuple of the job information as required via
1023 the fields parameter, and the log entries as a list
1025 if the job has not changed and the timeout has expired,
1026 we instead return a special value,
1027 L{constants.JOB_NOTCHANGED}, which should be interpreted
1028 as such by the clients
1031 logging.debug("Waiting for changes in job %s", job_id)
1032 end_time = time.time() + timeout
1034 delta_time = end_time - time.time()
1036 return constants.JOB_NOTCHANGED
1038 job = self._LoadJobUnlocked(job_id)
1040 logging.debug("Job %s not found", job_id)
1043 status = job.CalcStatus()
1044 job_info = self._GetJobInfoUnlocked(job, fields)
1045 log_entries = job.GetLogEntries(prev_log_serial)
1047 # Serializing and deserializing data can cause type changes (e.g. from
1048 # tuple to list) or precision loss. We're doing it here so that we get
1049 # the same modifications as the data received from the client. Without
1050 # this, the comparison afterwards might fail without the data being
1051 # significantly different.
1052 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1053 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1055 if status not in (constants.JOB_STATUS_QUEUED,
1056 constants.JOB_STATUS_RUNNING,
1057 constants.JOB_STATUS_WAITLOCK):
1058 # Don't even try to wait if the job is no longer running, there will be
1062 if (prev_job_info != job_info or
1063 (log_entries and prev_log_serial != log_entries[0][0])):
1066 logging.debug("Waiting again")
1068 # Release the queue lock while waiting
1069 job.change.wait(delta_time)
1071 logging.debug("Job %s changed", job_id)
1073 return (job_info, log_entries)
1077 def CancelJob(self, job_id):
1080 This will only succeed if the job has not started yet.
1082 @type job_id: string
1083 @param job_id: job ID of job to be cancelled.
1086 logging.info("Cancelling job %s", job_id)
1088 job = self._LoadJobUnlocked(job_id)
1090 logging.debug("Job %s not found", job_id)
1091 return (False, "Job %s not found" % job_id)
1093 job_status = job.CalcStatus()
1095 if job_status not in (constants.JOB_STATUS_QUEUED,
1096 constants.JOB_STATUS_WAITLOCK):
1097 logging.debug("Job %s is no longer in the queue", job.id)
1098 return (False, "Job %s is no longer in the queue" % job.id)
1100 if job_status == constants.JOB_STATUS_QUEUED:
1101 self.CancelJobUnlocked(job)
1102 return (True, "Job %s canceled" % job.id)
1104 elif job_status == constants.JOB_STATUS_WAITLOCK:
1105 # The worker will notice the new status and cancel the job
1108 op.status = constants.OP_STATUS_CANCELING
1110 self.UpdateJobUnlocked(job)
1111 return (True, "Job %s will be canceled" % job.id)
1114 def CancelJobUnlocked(self, job):
1115 """Marks a job as canceled.
1120 op.status = constants.OP_STATUS_ERROR
1121 op.result = "Job canceled by request"
1123 self.UpdateJobUnlocked(job)
1126 def _ArchiveJobsUnlocked(self, jobs):
1129 @type jobs: list of L{_QueuedJob}
1130 @param jobs: Job objects
1132 @return: Number of archived jobs
1138 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1139 constants.JOB_STATUS_SUCCESS,
1140 constants.JOB_STATUS_ERROR):
1141 logging.debug("Job %s is not yet done", job.id)
1144 archive_jobs.append(job)
1146 old = self._GetJobPath(job.id)
1147 new = self._GetArchivedJobPath(job.id)
1148 rename_files.append((old, new))
1150 # TODO: What if 1..n files fail to rename?
1151 self._RenameFilesUnlocked(rename_files)
1153 logging.debug("Successfully archived job(s) %s",
1154 ", ".join(job.id for job in archive_jobs))
1156 return len(archive_jobs)
1160 def ArchiveJob(self, job_id):
1163 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1165 @type job_id: string
1166 @param job_id: Job ID of job to be archived.
1168 @return: Whether job was archived
1171 logging.info("Archiving job %s", job_id)
1173 job = self._LoadJobUnlocked(job_id)
1175 logging.debug("Job %s not found", job_id)
1178 return self._ArchiveJobUnlocked([job]) == 1
1182 def AutoArchiveJobs(self, age, timeout):
1183 """Archives all jobs based on age.
1185 The method will archive all jobs which are older than the age
1186 parameter. For jobs that don't have an end timestamp, the start
1187 timestamp will be considered. The special '-1' age will cause
1188 archival of all jobs (that are not running or queued).
1191 @param age: the minimum age in seconds
1194 logging.info("Archiving jobs with age more than %s seconds", age)
1197 end_time = now + timeout
1201 all_job_ids = self._GetJobIDsUnlocked(archived=False)
1203 for idx, job_id in enumerate(all_job_ids):
1206 # Not optimal because jobs could be pending
1207 # TODO: Measure average duration for job archival and take number of
1208 # pending jobs into account.
1209 if time.time() > end_time:
1212 # Returns None if the job failed to load
1213 job = self._LoadJobUnlocked(job_id)
1215 if job.end_timestamp is None:
1216 if job.start_timestamp is None:
1217 job_age = job.received_timestamp
1219 job_age = job.start_timestamp
1221 job_age = job.end_timestamp
1223 if age == -1 or now - job_age[0] > age:
1226 # Archive 10 jobs at a time
1227 if len(pending) >= 10:
1228 archived_count += self._ArchiveJobsUnlocked(pending)
1232 archived_count += self._ArchiveJobsUnlocked(pending)
1234 return (archived_count, len(all_job_ids) - last_touched - 1)
1236 def _GetJobInfoUnlocked(self, job, fields):
1237 """Returns information about a job.
1239 @type job: L{_QueuedJob}
1240 @param job: the job which we query
1242 @param fields: names of fields to return
1244 @return: list with one element for each field
1245 @raise errors.OpExecError: when an invalid field
1250 for fname in fields:
1253 elif fname == "status":
1254 row.append(job.CalcStatus())
1255 elif fname == "ops":
1256 row.append([op.input.__getstate__() for op in job.ops])
1257 elif fname == "opresult":
1258 row.append([op.result for op in job.ops])
1259 elif fname == "opstatus":
1260 row.append([op.status for op in job.ops])
1261 elif fname == "oplog":
1262 row.append([op.log for op in job.ops])
1263 elif fname == "opstart":
1264 row.append([op.start_timestamp for op in job.ops])
1265 elif fname == "opend":
1266 row.append([op.end_timestamp for op in job.ops])
1267 elif fname == "received_ts":
1268 row.append(job.received_timestamp)
1269 elif fname == "start_ts":
1270 row.append(job.start_timestamp)
1271 elif fname == "end_ts":
1272 row.append(job.end_timestamp)
1273 elif fname == "summary":
1274 row.append([op.input.Summary() for op in job.ops])
1276 raise errors.OpExecError("Invalid job query field '%s'" % fname)
1281 def QueryJobs(self, job_ids, fields):
1282 """Returns a list of jobs in queue.
1284 This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1285 processing for each job.
1288 @param job_ids: sequence of job identifiers or None for all
1290 @param fields: names of fields to return
1292 @return: list one element per job, each element being list with
1293 the requested fields
1298 for job in self._GetJobsUnlocked(job_ids):
1302 jobs.append(self._GetJobInfoUnlocked(job, fields))
1309 """Stops the job queue.
1311 This shutdowns all the worker threads an closes the queue.
1314 self._wpool.TerminateWorkers()
1316 self._queue_lock.Close()
1317 self._queue_lock = None