4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
55 class CancelJob(Exception):
56 """Special exception to cancel a job.
62 """Returns the current timestamp.
65 @return: the current time in the (seconds, microseconds) format
68 return utils.SplitTime(time.time())
71 class _QueuedOpCode(object):
72 """Encasulates an opcode object.
74 @ivar log: holds the execution log and consists of tuples
75 of the form C{(log_serial, timestamp, level, message)}
76 @ivar input: the OpCode we encapsulate
77 @ivar status: the current status
78 @ivar result: the result of the LU execution
79 @ivar start_timestamp: timestamp for the start of the execution
80 @ivar stop_timestamp: timestamp for the end of the execution
83 def __init__(self, op):
84 """Constructor for the _QuededOpCode.
86 @type op: L{opcodes.OpCode}
87 @param op: the opcode we encapsulate
91 self.status = constants.OP_STATUS_QUEUED
94 self.start_timestamp = None
95 self.end_timestamp = None
98 def Restore(cls, state):
99 """Restore the _QueuedOpCode from the serialized form.
102 @param state: the serialized state
103 @rtype: _QueuedOpCode
104 @return: a new _QueuedOpCode instance
107 obj = _QueuedOpCode.__new__(cls)
108 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109 obj.status = state["status"]
110 obj.result = state["result"]
111 obj.log = state["log"]
112 obj.start_timestamp = state.get("start_timestamp", None)
113 obj.end_timestamp = state.get("end_timestamp", None)
117 """Serializes this _QueuedOpCode.
120 @return: the dictionary holding the serialized state
124 "input": self.input.__getstate__(),
125 "status": self.status,
126 "result": self.result,
128 "start_timestamp": self.start_timestamp,
129 "end_timestamp": self.end_timestamp,
133 class _QueuedJob(object):
134 """In-memory job representation.
136 This is what we use to track the user-submitted jobs. Locking must
137 be taken care of by users of this class.
139 @type queue: L{JobQueue}
140 @ivar queue: the parent queue
143 @ivar ops: the list of _QueuedOpCode that constitute the job
144 @type run_op_index: int
145 @ivar run_op_index: the currently executing opcode, or -1 if
146 we didn't yet start executing
147 @type log_serial: int
148 @ivar log_serial: holds the index for the next log entry
149 @ivar received_timestamp: the timestamp for when the job was received
150 @ivar start_timestmap: the timestamp for start of execution
151 @ivar end_timestamp: the timestamp for end of execution
152 @ivar change: a Condition variable we use for waiting for job changes
155 def __init__(self, queue, job_id, ops):
156 """Constructor for the _QueuedJob.
158 @type queue: L{JobQueue}
159 @param queue: our parent queue
161 @param job_id: our job id
163 @param ops: the list of opcodes we hold, which will be encapsulated
168 # TODO: use a better exception
169 raise Exception("No opcodes")
173 self.ops = [_QueuedOpCode(op) for op in ops]
174 self.run_op_index = -1
176 self.received_timestamp = TimeStampNow()
177 self.start_timestamp = None
178 self.end_timestamp = None
180 # Condition to wait for changes
181 self.change = threading.Condition(self.queue._lock)
184 def Restore(cls, queue, state):
185 """Restore a _QueuedJob from serialized state:
187 @type queue: L{JobQueue}
188 @param queue: to which queue the restored job belongs
190 @param state: the serialized state
192 @return: the restored _JobQueue instance
195 obj = _QueuedJob.__new__(cls)
198 obj.run_op_index = state["run_op_index"]
199 obj.received_timestamp = state.get("received_timestamp", None)
200 obj.start_timestamp = state.get("start_timestamp", None)
201 obj.end_timestamp = state.get("end_timestamp", None)
205 for op_state in state["ops"]:
206 op = _QueuedOpCode.Restore(op_state)
207 for log_entry in op.log:
208 obj.log_serial = max(obj.log_serial, log_entry[0])
211 # Condition to wait for changes
212 obj.change = threading.Condition(obj.queue._lock)
217 """Serialize the _JobQueue instance.
220 @return: the serialized state
225 "ops": [op.Serialize() for op in self.ops],
226 "run_op_index": self.run_op_index,
227 "start_timestamp": self.start_timestamp,
228 "end_timestamp": self.end_timestamp,
229 "received_timestamp": self.received_timestamp,
232 def CalcStatus(self):
233 """Compute the status of this job.
235 This function iterates over all the _QueuedOpCodes in the job and
236 based on their status, computes the job status.
239 - if we find a cancelled, or finished with error, the job
240 status will be the same
241 - otherwise, the last opcode with the status one of:
246 will determine the job status
248 - otherwise, it means either all opcodes are queued, or success,
249 and the job status will be the same
251 @return: the job status
254 status = constants.JOB_STATUS_QUEUED
258 if op.status == constants.OP_STATUS_SUCCESS:
263 if op.status == constants.OP_STATUS_QUEUED:
265 elif op.status == constants.OP_STATUS_WAITLOCK:
266 status = constants.JOB_STATUS_WAITLOCK
267 elif op.status == constants.OP_STATUS_RUNNING:
268 status = constants.JOB_STATUS_RUNNING
269 elif op.status == constants.OP_STATUS_CANCELING:
270 status = constants.JOB_STATUS_CANCELING
272 elif op.status == constants.OP_STATUS_ERROR:
273 status = constants.JOB_STATUS_ERROR
274 # The whole job fails if one opcode failed
276 elif op.status == constants.OP_STATUS_CANCELED:
277 status = constants.OP_STATUS_CANCELED
281 status = constants.JOB_STATUS_SUCCESS
285 def GetLogEntries(self, newer_than):
286 """Selectively returns the log entries.
288 @type newer_than: None or int
289 @param newer_than: if this is None, return all log enties,
290 otherwise return only the log entries with serial higher
293 @return: the list of the log entries selected
296 if newer_than is None:
303 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
308 class _JobQueueWorker(workerpool.BaseWorker):
309 """The actual job workers.
312 def _NotifyStart(self):
313 """Mark the opcode as running, not lock-waiting.
315 This is called from the mcpu code as a notifier function, when the
316 LU is finally about to start the Exec() method. Of course, to have
317 end-user visible results, the opcode must be initially (before
318 calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
321 assert self.queue, "Queue attribute is missing"
322 assert self.opcode, "Opcode attribute is missing"
326 assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327 constants.OP_STATUS_CANCELING)
329 # Cancel here if we were asked to
330 if self.opcode.status == constants.OP_STATUS_CANCELING:
333 self.opcode.status = constants.OP_STATUS_RUNNING
337 def RunTask(self, job):
340 This functions processes a job. It is closely tied to the _QueuedJob and
341 _QueuedOpCode classes.
343 @type job: L{_QueuedJob}
344 @param job: the job to be processed
347 logging.info("Worker %s processing job %s",
348 self.worker_id, job.id)
349 proc = mcpu.Processor(self.pool.queue.context)
350 self.queue = queue = job.queue
354 for idx, op in enumerate(job.ops):
355 op_summary = op.input.Summary()
357 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
362 assert op.status == constants.OP_STATUS_QUEUED
363 job.run_op_index = idx
364 op.status = constants.OP_STATUS_WAITLOCK
366 op.start_timestamp = TimeStampNow()
367 if idx == 0: # first opcode
368 job.start_timestamp = op.start_timestamp
369 queue.UpdateJobUnlocked(job)
371 input_opcode = op.input
376 """Append a log entry.
382 log_type = constants.ELOG_MESSAGE
385 log_type, log_msg = args
387 # The time is split to make serialization easier and not lose
389 timestamp = utils.SplitTime(time.time())
394 op.log.append((job.log_serial, timestamp, log_type, log_msg))
396 job.change.notifyAll()
400 # Make sure not to hold lock while _Log is called
402 result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
406 op.status = constants.OP_STATUS_SUCCESS
408 op.end_timestamp = TimeStampNow()
409 queue.UpdateJobUnlocked(job)
413 logging.info("Op %s/%s: Successfully finished opcode %s",
414 idx + 1, count, op_summary)
416 # Will be handled further up
418 except Exception, err:
422 op.status = constants.OP_STATUS_ERROR
424 op.end_timestamp = TimeStampNow()
425 logging.info("Op %s/%s: Error in opcode %s", idx + 1, count,
428 queue.UpdateJobUnlocked(job)
436 queue.CancelJobUnlocked(job)
439 except errors.GenericError, err:
440 logging.exception("Ganeti exception")
442 logging.exception("Unhandled exception")
448 job.end_timestamp = TimeStampNow()
449 queue.UpdateJobUnlocked(job)
452 status = job.CalcStatus()
455 logging.info("Worker %s finished job %s, status = %s",
456 self.worker_id, job_id, status)
459 class _JobQueueWorkerPool(workerpool.WorkerPool):
460 """Simple class implementing a job-processing workerpool.
463 def __init__(self, queue):
464 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
469 class JobQueue(object):
470 """Quue used to manaage the jobs.
472 @cvar _RE_JOB_FILE: regex matching the valid job file names
475 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
477 def _RequireOpenQueue(fn):
478 """Decorator for "public" functions.
480 This function should be used for all 'public' functions. That is,
481 functions usually called from other classes.
483 @warning: Use this decorator only after utils.LockedMethod!
492 def wrapper(self, *args, **kwargs):
493 assert self._queue_lock is not None, "Queue should be open"
494 return fn(self, *args, **kwargs)
497 def __init__(self, context):
498 """Constructor for JobQueue.
500 The constructor will initialize the job queue object and then
501 start loading the current jobs from disk, either for starting them
502 (if they were queue) or for aborting them (if they were already
505 @type context: GanetiContext
506 @param context: the context object for access to the configuration
507 data and other ganeti objects
510 self.context = context
511 self._memcache = weakref.WeakValueDictionary()
512 self._my_hostname = utils.HostInfo().name
515 self._lock = threading.Lock()
516 self.acquire = self._lock.acquire
517 self.release = self._lock.release
520 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
523 self._last_serial = jstore.ReadSerial()
524 assert self._last_serial is not None, ("Serial file was modified between"
525 " check in jstore and here")
527 # Get initial list of nodes
528 self._nodes = dict((n.name, n.primary_ip)
529 for n in self.context.cfg.GetAllNodesInfo().values()
530 if n.master_candidate)
534 del self._nodes[self._my_hostname]
538 # TODO: Check consistency across nodes
541 self._wpool = _JobQueueWorkerPool(self)
543 # We need to lock here because WorkerPool.AddTask() may start a job while
544 # we're still doing our work.
547 logging.info("Inspecting job queue")
549 all_job_ids = self._GetJobIDsUnlocked()
550 jobs_count = len(all_job_ids)
551 lastinfo = time.time()
552 for idx, job_id in enumerate(all_job_ids):
553 # Give an update every 1000 jobs or 10 seconds
554 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
555 idx == (jobs_count - 1)):
556 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
557 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
558 lastinfo = time.time()
560 job = self._LoadJobUnlocked(job_id)
562 # a failure in loading the job can cause 'None' to be returned
566 status = job.CalcStatus()
568 if status in (constants.JOB_STATUS_QUEUED, ):
569 self._wpool.AddTask(job)
571 elif status in (constants.JOB_STATUS_RUNNING,
572 constants.JOB_STATUS_WAITLOCK,
573 constants.JOB_STATUS_CANCELING):
574 logging.warning("Unfinished job %s found: %s", job.id, job)
577 op.status = constants.OP_STATUS_ERROR
578 op.result = "Unclean master daemon shutdown"
580 self.UpdateJobUnlocked(job)
582 logging.info("Job queue inspection finished")
586 self._wpool.TerminateWorkers()
591 def AddNode(self, node):
592 """Register a new node with the queue.
594 @type node: L{objects.Node}
595 @param node: the node object to be added
598 node_name = node.name
599 assert node_name != self._my_hostname
601 # Clean queue directory on added node
602 rpc.RpcRunner.call_jobqueue_purge(node_name)
604 if not node.master_candidate:
605 # remove if existing, ignoring errors
606 self._nodes.pop(node_name, None)
607 # and skip the replication of the job ids
610 # Upload the whole queue excluding archived jobs
611 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
613 # Upload current serial file
614 files.append(constants.JOB_QUEUE_SERIAL_FILE)
616 for file_name in files:
618 fd = open(file_name, "r")
624 result = rpc.RpcRunner.call_jobqueue_update([node_name],
627 if not result[node_name]:
628 logging.error("Failed to upload %s to %s", file_name, node_name)
630 self._nodes[node_name] = node.primary_ip
634 def RemoveNode(self, node_name):
635 """Callback called when removing nodes from the cluster.
638 @param node_name: the name of the node to remove
642 # The queue is removed by the "leave node" RPC call.
643 del self._nodes[node_name]
647 def _CheckRpcResult(self, result, nodes, failmsg):
648 """Verifies the status of an RPC call.
650 Since we aim to keep consistency should this node (the current
651 master) fail, we will log errors if our rpc fail, and especially
652 log the case when more than half of the nodes failes.
654 @param result: the data as returned from the rpc call
656 @param nodes: the list of nodes we made the call to
658 @param failmsg: the identifier to be used for logging
671 logging.error("%s failed on %s", failmsg, ", ".join(failed))
673 # +1 for the master node
674 if (len(success) + 1) < len(failed):
675 # TODO: Handle failing nodes
676 logging.error("More than half of the nodes failed")
678 def _GetNodeIp(self):
679 """Helper for returning the node name/ip list.
682 @return: a tuple of two lists, the first one with the node
683 names and the second one with the node addresses
686 name_list = self._nodes.keys()
687 addr_list = [self._nodes[name] for name in name_list]
688 return name_list, addr_list
690 def _WriteAndReplicateFileUnlocked(self, file_name, data):
691 """Writes a file locally and then replicates it to all nodes.
693 This function will replace the contents of a file on the local
694 node and then replicate it to all the other nodes we have.
697 @param file_name: the path of the file to be replicated
699 @param data: the new contents of the file
702 utils.WriteFile(file_name, data=data)
704 names, addrs = self._GetNodeIp()
705 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
706 self._CheckRpcResult(result, self._nodes,
707 "Updating %s" % file_name)
709 def _RenameFilesUnlocked(self, rename):
710 """Renames a file locally and then replicate the change.
712 This function will rename a file in the local queue directory
713 and then replicate this rename to all the other nodes we have.
715 @type rename: list of (old, new)
716 @param rename: List containing tuples mapping old to new names
719 # Rename them locally
720 for old, new in rename:
721 utils.RenameFile(old, new, mkdir=True)
723 # ... and on all nodes
724 names, addrs = self._GetNodeIp()
725 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
726 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
728 def _FormatJobID(self, job_id):
729 """Convert a job ID to string format.
731 Currently this just does C{str(job_id)} after performing some
732 checks, but if we want to change the job id format this will
733 abstract this change.
735 @type job_id: int or long
736 @param job_id: the numeric job id
738 @return: the formatted job id
741 if not isinstance(job_id, (int, long)):
742 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
744 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
749 def _GetArchiveDirectory(cls, job_id):
750 """Returns the archive directory for a job.
753 @param job_id: Job identifier
755 @return: Directory name
758 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
760 def _NewSerialUnlocked(self):
761 """Generates a new job identifier.
763 Job identifiers are unique during the lifetime of a cluster.
766 @return: a string representing the job identifier.
770 serial = self._last_serial + 1
773 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
776 # Keep it only if we were able to write the file
777 self._last_serial = serial
779 return self._FormatJobID(serial)
782 def _GetJobPath(job_id):
783 """Returns the job file for a given job id.
786 @param job_id: the job identifier
788 @return: the path to the job file
791 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
794 def _GetArchivedJobPath(cls, job_id):
795 """Returns the archived job file for a give job id.
798 @param job_id: the job identifier
800 @return: the path to the archived job file
803 path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
804 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
807 def _ExtractJobID(cls, name):
808 """Extract the job id from a filename.
811 @param name: the job filename
812 @rtype: job id or None
813 @return: the job id corresponding to the given filename,
814 or None if the filename does not represent a valid
818 m = cls._RE_JOB_FILE.match(name)
824 def _GetJobIDsUnlocked(self, archived=False):
825 """Return all known job IDs.
827 If the parameter archived is True, archived jobs IDs will be
828 included. Currently this argument is unused.
830 The method only looks at disk because it's a requirement that all
831 jobs are present on disk (so in the _memcache we don't have any
835 @return: the list of job IDs
838 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
839 jlist = utils.NiceSort(jlist)
842 def _ListJobFiles(self):
843 """Returns the list of current job files.
846 @return: the list of job file names
849 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
850 if self._RE_JOB_FILE.match(name)]
852 def _LoadJobUnlocked(self, job_id):
853 """Loads a job from the disk or memory.
855 Given a job id, this will return the cached job object if
856 existing, or try to load the job from the disk. If loading from
857 disk, it will also add the job to the cache.
859 @param job_id: the job id
860 @rtype: L{_QueuedJob} or None
861 @return: either None or the job object
864 job = self._memcache.get(job_id, None)
866 logging.debug("Found job %s in memcache", job_id)
869 filepath = self._GetJobPath(job_id)
870 logging.debug("Loading job from %s", filepath)
872 fd = open(filepath, "r")
874 if err.errno in (errno.ENOENT, ):
878 data = serializer.LoadJson(fd.read())
883 job = _QueuedJob.Restore(self, data)
884 except Exception, err:
885 new_path = self._GetArchivedJobPath(job_id)
886 if filepath == new_path:
887 # job already archived (future case)
888 logging.exception("Can't parse job %s", job_id)
891 logging.exception("Can't parse job %s, will archive.", job_id)
892 self._RenameFilesUnlocked([(filepath, new_path)])
895 self._memcache[job_id] = job
896 logging.debug("Added job %s to the cache", job_id)
899 def _GetJobsUnlocked(self, job_ids):
900 """Return a list of jobs based on their IDs.
903 @param job_ids: either an empty list (meaning all jobs),
906 @return: the list of job objects
910 job_ids = self._GetJobIDsUnlocked()
912 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
915 def _IsQueueMarkedDrain():
916 """Check if the queue is marked from drain.
918 This currently uses the queue drain file, which makes it a
919 per-node flag. In the future this can be moved to the config file.
922 @return: True of the job queue is marked for draining
925 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
928 def SetDrainFlag(drain_flag):
929 """Sets the drain flag for the queue.
931 This is similar to the function L{backend.JobQueueSetDrainFlag},
932 and in the future we might merge them.
934 @type drain_flag: boolean
935 @param drain_flag: wheter to set or unset the drain flag
939 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
941 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
946 def SubmitJob(self, ops):
947 """Create and store a new job.
949 This enters the job into our job queue and also puts it on the new
950 queue, in order for it to be picked up by the queue processors.
953 @param ops: The list of OpCodes that will become the new job.
955 @return: the job ID of the newly created job
956 @raise errors.JobQueueDrainError: if the job is marked for draining
959 if self._IsQueueMarkedDrain():
960 raise errors.JobQueueDrainError()
962 # Check job queue size
963 size = len(self._ListJobFiles())
964 if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
965 # TODO: Autoarchive jobs. Make sure it's not done on every job
966 # submission, though.
970 if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
971 raise errors.JobQueueFull()
974 job_id = self._NewSerialUnlocked()
975 job = _QueuedJob(self, job_id, ops)
978 self.UpdateJobUnlocked(job)
980 logging.debug("Adding new job %s to the cache", job_id)
981 self._memcache[job_id] = job
984 self._wpool.AddTask(job)
989 def UpdateJobUnlocked(self, job):
990 """Update a job's on disk storage.
992 After a job has been modified, this function needs to be called in
993 order to write the changes to disk and replicate them to the other
996 @type job: L{_QueuedJob}
997 @param job: the changed job
1000 filename = self._GetJobPath(job.id)
1001 data = serializer.DumpJson(job.Serialize(), indent=False)
1002 logging.debug("Writing job %s to %s", job.id, filename)
1003 self._WriteAndReplicateFileUnlocked(filename, data)
1005 # Notify waiters about potential changes
1006 job.change.notifyAll()
1010 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1012 """Waits for changes in a job.
1014 @type job_id: string
1015 @param job_id: Job identifier
1016 @type fields: list of strings
1017 @param fields: Which fields to check for changes
1018 @type prev_job_info: list or None
1019 @param prev_job_info: Last job information returned
1020 @type prev_log_serial: int
1021 @param prev_log_serial: Last job message serial number
1022 @type timeout: float
1023 @param timeout: maximum time to wait
1024 @rtype: tuple (job info, log entries)
1025 @return: a tuple of the job information as required via
1026 the fields parameter, and the log entries as a list
1028 if the job has not changed and the timeout has expired,
1029 we instead return a special value,
1030 L{constants.JOB_NOTCHANGED}, which should be interpreted
1031 as such by the clients
1034 logging.debug("Waiting for changes in job %s", job_id)
1035 end_time = time.time() + timeout
1037 delta_time = end_time - time.time()
1039 return constants.JOB_NOTCHANGED
1041 job = self._LoadJobUnlocked(job_id)
1043 logging.debug("Job %s not found", job_id)
1046 status = job.CalcStatus()
1047 job_info = self._GetJobInfoUnlocked(job, fields)
1048 log_entries = job.GetLogEntries(prev_log_serial)
1050 # Serializing and deserializing data can cause type changes (e.g. from
1051 # tuple to list) or precision loss. We're doing it here so that we get
1052 # the same modifications as the data received from the client. Without
1053 # this, the comparison afterwards might fail without the data being
1054 # significantly different.
1055 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1056 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1058 if status not in (constants.JOB_STATUS_QUEUED,
1059 constants.JOB_STATUS_RUNNING,
1060 constants.JOB_STATUS_WAITLOCK):
1061 # Don't even try to wait if the job is no longer running, there will be
1065 if (prev_job_info != job_info or
1066 (log_entries and prev_log_serial != log_entries[0][0])):
1069 logging.debug("Waiting again")
1071 # Release the queue lock while waiting
1072 job.change.wait(delta_time)
1074 logging.debug("Job %s changed", job_id)
1076 return (job_info, log_entries)
1080 def CancelJob(self, job_id):
1083 This will only succeed if the job has not started yet.
1085 @type job_id: string
1086 @param job_id: job ID of job to be cancelled.
1089 logging.info("Cancelling job %s", job_id)
1091 job = self._LoadJobUnlocked(job_id)
1093 logging.debug("Job %s not found", job_id)
1094 return (False, "Job %s not found" % job_id)
1096 job_status = job.CalcStatus()
1098 if job_status not in (constants.JOB_STATUS_QUEUED,
1099 constants.JOB_STATUS_WAITLOCK):
1100 logging.debug("Job %s is no longer in the queue", job.id)
1101 return (False, "Job %s is no longer in the queue" % job.id)
1103 if job_status == constants.JOB_STATUS_QUEUED:
1104 self.CancelJobUnlocked(job)
1105 return (True, "Job %s canceled" % job.id)
1107 elif job_status == constants.JOB_STATUS_WAITLOCK:
1108 # The worker will notice the new status and cancel the job
1111 op.status = constants.OP_STATUS_CANCELING
1113 self.UpdateJobUnlocked(job)
1114 return (True, "Job %s will be canceled" % job.id)
1117 def CancelJobUnlocked(self, job):
1118 """Marks a job as canceled.
1123 op.status = constants.OP_STATUS_ERROR
1124 op.result = "Job canceled by request"
1126 self.UpdateJobUnlocked(job)
1129 def _ArchiveJobsUnlocked(self, jobs):
1132 @type jobs: list of L{_QueuedJob}
1133 @param jobs: Job objects
1135 @return: Number of archived jobs
1141 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1142 constants.JOB_STATUS_SUCCESS,
1143 constants.JOB_STATUS_ERROR):
1144 logging.debug("Job %s is not yet done", job.id)
1147 archive_jobs.append(job)
1149 old = self._GetJobPath(job.id)
1150 new = self._GetArchivedJobPath(job.id)
1151 rename_files.append((old, new))
1153 # TODO: What if 1..n files fail to rename?
1154 self._RenameFilesUnlocked(rename_files)
1156 logging.debug("Successfully archived job(s) %s",
1157 ", ".join(job.id for job in archive_jobs))
1159 return len(archive_jobs)
1163 def ArchiveJob(self, job_id):
1166 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1168 @type job_id: string
1169 @param job_id: Job ID of job to be archived.
1171 @return: Whether job was archived
1174 logging.info("Archiving job %s", job_id)
1176 job = self._LoadJobUnlocked(job_id)
1178 logging.debug("Job %s not found", job_id)
1181 return self._ArchiveJobUnlocked([job]) == 1
1185 def AutoArchiveJobs(self, age, timeout):
1186 """Archives all jobs based on age.
1188 The method will archive all jobs which are older than the age
1189 parameter. For jobs that don't have an end timestamp, the start
1190 timestamp will be considered. The special '-1' age will cause
1191 archival of all jobs (that are not running or queued).
1194 @param age: the minimum age in seconds
1197 logging.info("Archiving jobs with age more than %s seconds", age)
1200 end_time = now + timeout
1204 all_job_ids = self._GetJobIDsUnlocked(archived=False)
1206 for idx, job_id in enumerate(all_job_ids):
1209 # Not optimal because jobs could be pending
1210 # TODO: Measure average duration for job archival and take number of
1211 # pending jobs into account.
1212 if time.time() > end_time:
1215 # Returns None if the job failed to load
1216 job = self._LoadJobUnlocked(job_id)
1218 if job.end_timestamp is None:
1219 if job.start_timestamp is None:
1220 job_age = job.received_timestamp
1222 job_age = job.start_timestamp
1224 job_age = job.end_timestamp
1226 if age == -1 or now - job_age[0] > age:
1229 # Archive 10 jobs at a time
1230 if len(pending) >= 10:
1231 archived_count += self._ArchiveJobsUnlocked(pending)
1235 archived_count += self._ArchiveJobsUnlocked(pending)
1237 return (archived_count, len(all_job_ids) - last_touched - 1)
1239 def _GetJobInfoUnlocked(self, job, fields):
1240 """Returns information about a job.
1242 @type job: L{_QueuedJob}
1243 @param job: the job which we query
1245 @param fields: names of fields to return
1247 @return: list with one element for each field
1248 @raise errors.OpExecError: when an invalid field
1253 for fname in fields:
1256 elif fname == "status":
1257 row.append(job.CalcStatus())
1258 elif fname == "ops":
1259 row.append([op.input.__getstate__() for op in job.ops])
1260 elif fname == "opresult":
1261 row.append([op.result for op in job.ops])
1262 elif fname == "opstatus":
1263 row.append([op.status for op in job.ops])
1264 elif fname == "oplog":
1265 row.append([op.log for op in job.ops])
1266 elif fname == "opstart":
1267 row.append([op.start_timestamp for op in job.ops])
1268 elif fname == "opend":
1269 row.append([op.end_timestamp for op in job.ops])
1270 elif fname == "received_ts":
1271 row.append(job.received_timestamp)
1272 elif fname == "start_ts":
1273 row.append(job.start_timestamp)
1274 elif fname == "end_ts":
1275 row.append(job.end_timestamp)
1276 elif fname == "summary":
1277 row.append([op.input.Summary() for op in job.ops])
1279 raise errors.OpExecError("Invalid job query field '%s'" % fname)
1284 def QueryJobs(self, job_ids, fields):
1285 """Returns a list of jobs in queue.
1287 This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1288 processing for each job.
1291 @param job_ids: sequence of job identifiers or None for all
1293 @param fields: names of fields to return
1295 @return: list one element per job, each element being list with
1296 the requested fields
1301 for job in self._GetJobsUnlocked(job_ids):
1305 jobs.append(self._GetJobInfoUnlocked(job, fields))
1312 """Stops the job queue.
1314 This shutdowns all the worker threads an closes the queue.
1317 self._wpool.TerminateWorkers()
1319 self._queue_lock.Close()
1320 self._queue_lock = None