4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
55 class CancelJob(Exception):
56 """Special exception to cancel a job.
62 """Returns the current timestamp.
65 @return: the current time in the (seconds, microseconds) format
68 return utils.SplitTime(time.time())
71 class _QueuedOpCode(object):
72 """Encapsulates an opcode object.
74 @ivar log: holds the execution log and consists of tuples
75 of the form C{(log_serial, timestamp, level, message)}
76 @ivar input: the OpCode we encapsulate
77 @ivar status: the current status
78 @ivar result: the result of the LU execution
79 @ivar start_timestamp: timestamp for the start of the execution
80 @ivar stop_timestamp: timestamp for the end of the execution
83 __slots__ = ["input", "status", "result", "log",
84 "start_timestamp", "end_timestamp",
87 def __init__(self, op):
88 """Constructor for the _QuededOpCode.
90 @type op: L{opcodes.OpCode}
91 @param op: the opcode we encapsulate
95 self.status = constants.OP_STATUS_QUEUED
98 self.start_timestamp = None
99 self.end_timestamp = None
102 def Restore(cls, state):
103 """Restore the _QueuedOpCode from the serialized form.
106 @param state: the serialized state
107 @rtype: _QueuedOpCode
108 @return: a new _QueuedOpCode instance
111 obj = _QueuedOpCode.__new__(cls)
112 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113 obj.status = state["status"]
114 obj.result = state["result"]
115 obj.log = state["log"]
116 obj.start_timestamp = state.get("start_timestamp", None)
117 obj.end_timestamp = state.get("end_timestamp", None)
121 """Serializes this _QueuedOpCode.
124 @return: the dictionary holding the serialized state
128 "input": self.input.__getstate__(),
129 "status": self.status,
130 "result": self.result,
132 "start_timestamp": self.start_timestamp,
133 "end_timestamp": self.end_timestamp,
137 class _QueuedJob(object):
138 """In-memory job representation.
140 This is what we use to track the user-submitted jobs. Locking must
141 be taken care of by users of this class.
143 @type queue: L{JobQueue}
144 @ivar queue: the parent queue
147 @ivar ops: the list of _QueuedOpCode that constitute the job
148 @type run_op_index: int
149 @ivar run_op_index: the currently executing opcode, or -1 if
150 we didn't yet start executing
151 @type log_serial: int
152 @ivar log_serial: holds the index for the next log entry
153 @ivar received_timestamp: the timestamp for when the job was received
154 @ivar start_timestmap: the timestamp for start of execution
155 @ivar end_timestamp: the timestamp for end of execution
156 @ivar change: a Condition variable we use for waiting for job changes
159 __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160 "received_timestamp", "start_timestamp", "end_timestamp",
164 def __init__(self, queue, job_id, ops):
165 """Constructor for the _QueuedJob.
167 @type queue: L{JobQueue}
168 @param queue: our parent queue
170 @param job_id: our job id
172 @param ops: the list of opcodes we hold, which will be encapsulated
177 # TODO: use a better exception
178 raise Exception("No opcodes")
182 self.ops = [_QueuedOpCode(op) for op in ops]
183 self.run_op_index = -1
185 self.received_timestamp = TimeStampNow()
186 self.start_timestamp = None
187 self.end_timestamp = None
189 # Condition to wait for changes
190 self.change = threading.Condition(self.queue._lock)
193 def Restore(cls, queue, state):
194 """Restore a _QueuedJob from serialized state:
196 @type queue: L{JobQueue}
197 @param queue: to which queue the restored job belongs
199 @param state: the serialized state
201 @return: the restored _JobQueue instance
204 obj = _QueuedJob.__new__(cls)
207 obj.run_op_index = state["run_op_index"]
208 obj.received_timestamp = state.get("received_timestamp", None)
209 obj.start_timestamp = state.get("start_timestamp", None)
210 obj.end_timestamp = state.get("end_timestamp", None)
214 for op_state in state["ops"]:
215 op = _QueuedOpCode.Restore(op_state)
216 for log_entry in op.log:
217 obj.log_serial = max(obj.log_serial, log_entry[0])
220 # Condition to wait for changes
221 obj.change = threading.Condition(obj.queue._lock)
226 """Serialize the _JobQueue instance.
229 @return: the serialized state
234 "ops": [op.Serialize() for op in self.ops],
235 "run_op_index": self.run_op_index,
236 "start_timestamp": self.start_timestamp,
237 "end_timestamp": self.end_timestamp,
238 "received_timestamp": self.received_timestamp,
241 def CalcStatus(self):
242 """Compute the status of this job.
244 This function iterates over all the _QueuedOpCodes in the job and
245 based on their status, computes the job status.
248 - if we find a cancelled, or finished with error, the job
249 status will be the same
250 - otherwise, the last opcode with the status one of:
255 will determine the job status
257 - otherwise, it means either all opcodes are queued, or success,
258 and the job status will be the same
260 @return: the job status
263 status = constants.JOB_STATUS_QUEUED
267 if op.status == constants.OP_STATUS_SUCCESS:
272 if op.status == constants.OP_STATUS_QUEUED:
274 elif op.status == constants.OP_STATUS_WAITLOCK:
275 status = constants.JOB_STATUS_WAITLOCK
276 elif op.status == constants.OP_STATUS_RUNNING:
277 status = constants.JOB_STATUS_RUNNING
278 elif op.status == constants.OP_STATUS_CANCELING:
279 status = constants.JOB_STATUS_CANCELING
281 elif op.status == constants.OP_STATUS_ERROR:
282 status = constants.JOB_STATUS_ERROR
283 # The whole job fails if one opcode failed
285 elif op.status == constants.OP_STATUS_CANCELED:
286 status = constants.OP_STATUS_CANCELED
290 status = constants.JOB_STATUS_SUCCESS
294 def GetLogEntries(self, newer_than):
295 """Selectively returns the log entries.
297 @type newer_than: None or int
298 @param newer_than: if this is None, return all log entries,
299 otherwise return only the log entries with serial higher
302 @return: the list of the log entries selected
305 if newer_than is None:
312 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
317 class _JobQueueWorker(workerpool.BaseWorker):
318 """The actual job workers.
321 def _NotifyStart(self):
322 """Mark the opcode as running, not lock-waiting.
324 This is called from the mcpu code as a notifier function, when the
325 LU is finally about to start the Exec() method. Of course, to have
326 end-user visible results, the opcode must be initially (before
327 calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
330 assert self.queue, "Queue attribute is missing"
331 assert self.opcode, "Opcode attribute is missing"
335 assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
336 constants.OP_STATUS_CANCELING)
338 # Cancel here if we were asked to
339 if self.opcode.status == constants.OP_STATUS_CANCELING:
342 self.opcode.status = constants.OP_STATUS_RUNNING
346 def RunTask(self, job):
349 This functions processes a job. It is closely tied to the _QueuedJob and
350 _QueuedOpCode classes.
352 @type job: L{_QueuedJob}
353 @param job: the job to be processed
356 logging.info("Worker %s processing job %s",
357 self.worker_id, job.id)
358 proc = mcpu.Processor(self.pool.queue.context)
359 self.queue = queue = job.queue
363 for idx, op in enumerate(job.ops):
364 op_summary = op.input.Summary()
365 if op.status == constants.OP_STATUS_SUCCESS:
366 # this is a job that was partially completed before master
367 # daemon shutdown, so it can be expected that some opcodes
368 # are already completed successfully (if any did error
369 # out, then the whole job should have been aborted and not
370 # resubmitted for processing)
371 logging.info("Op %s/%s: opcode %s already processed, skipping",
372 idx + 1, count, op_summary)
375 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
380 if op.status == constants.OP_STATUS_CANCELED:
382 assert op.status == constants.OP_STATUS_QUEUED
383 job.run_op_index = idx
384 op.status = constants.OP_STATUS_WAITLOCK
386 op.start_timestamp = TimeStampNow()
387 if idx == 0: # first opcode
388 job.start_timestamp = op.start_timestamp
389 queue.UpdateJobUnlocked(job)
391 input_opcode = op.input
396 """Append a log entry.
402 log_type = constants.ELOG_MESSAGE
405 log_type, log_msg = args
407 # The time is split to make serialization easier and not lose
409 timestamp = utils.SplitTime(time.time())
414 op.log.append((job.log_serial, timestamp, log_type, log_msg))
416 job.change.notifyAll()
420 # Make sure not to hold lock while _Log is called
422 result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
426 op.status = constants.OP_STATUS_SUCCESS
428 op.end_timestamp = TimeStampNow()
429 queue.UpdateJobUnlocked(job)
433 logging.info("Op %s/%s: Successfully finished opcode %s",
434 idx + 1, count, op_summary)
436 # Will be handled further up
438 except Exception, err:
442 op.status = constants.OP_STATUS_ERROR
444 op.end_timestamp = TimeStampNow()
445 logging.info("Op %s/%s: Error in opcode %s: %s",
446 idx + 1, count, op_summary, err)
448 queue.UpdateJobUnlocked(job)
456 queue.CancelJobUnlocked(job)
459 except errors.GenericError, err:
460 logging.exception("Ganeti exception")
462 logging.exception("Unhandled exception")
467 job.run_op_index = -1
468 job.end_timestamp = TimeStampNow()
469 queue.UpdateJobUnlocked(job)
472 status = job.CalcStatus()
475 logging.info("Worker %s finished job %s, status = %s",
476 self.worker_id, job_id, status)
479 class _JobQueueWorkerPool(workerpool.WorkerPool):
480 """Simple class implementing a job-processing workerpool.
483 def __init__(self, queue):
484 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
489 class JobQueue(object):
490 """Queue used to manage the jobs.
492 @cvar _RE_JOB_FILE: regex matching the valid job file names
495 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
497 def _RequireOpenQueue(fn):
498 """Decorator for "public" functions.
500 This function should be used for all 'public' functions. That is,
501 functions usually called from other classes.
503 @warning: Use this decorator only after utils.LockedMethod!
512 def wrapper(self, *args, **kwargs):
513 assert self._queue_lock is not None, "Queue should be open"
514 return fn(self, *args, **kwargs)
517 def __init__(self, context):
518 """Constructor for JobQueue.
520 The constructor will initialize the job queue object and then
521 start loading the current jobs from disk, either for starting them
522 (if they were queue) or for aborting them (if they were already
525 @type context: GanetiContext
526 @param context: the context object for access to the configuration
527 data and other ganeti objects
530 self.context = context
531 self._memcache = weakref.WeakValueDictionary()
532 self._my_hostname = utils.HostInfo().name
535 self._lock = threading.Lock()
536 self.acquire = self._lock.acquire
537 self.release = self._lock.release
540 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
543 self._last_serial = jstore.ReadSerial()
544 assert self._last_serial is not None, ("Serial file was modified between"
545 " check in jstore and here")
547 # Get initial list of nodes
548 self._nodes = dict((n.name, n.primary_ip)
549 for n in self.context.cfg.GetAllNodesInfo().values()
550 if n.master_candidate)
554 del self._nodes[self._my_hostname]
558 # TODO: Check consistency across nodes
561 self._wpool = _JobQueueWorkerPool(self)
563 # We need to lock here because WorkerPool.AddTask() may start a job while
564 # we're still doing our work.
567 logging.info("Inspecting job queue")
569 all_job_ids = self._GetJobIDsUnlocked()
570 jobs_count = len(all_job_ids)
571 lastinfo = time.time()
572 for idx, job_id in enumerate(all_job_ids):
573 # Give an update every 1000 jobs or 10 seconds
574 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
575 idx == (jobs_count - 1)):
576 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
577 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
578 lastinfo = time.time()
580 job = self._LoadJobUnlocked(job_id)
582 # a failure in loading the job can cause 'None' to be returned
586 status = job.CalcStatus()
588 if status in (constants.JOB_STATUS_QUEUED, ):
589 self._wpool.AddTask(job)
591 elif status in (constants.JOB_STATUS_RUNNING,
592 constants.JOB_STATUS_WAITLOCK,
593 constants.JOB_STATUS_CANCELING):
594 logging.warning("Unfinished job %s found: %s", job.id, job)
597 op.status = constants.OP_STATUS_ERROR
598 op.result = "Unclean master daemon shutdown"
600 self.UpdateJobUnlocked(job)
602 logging.info("Job queue inspection finished")
606 self._wpool.TerminateWorkers()
611 def AddNode(self, node):
612 """Register a new node with the queue.
614 @type node: L{objects.Node}
615 @param node: the node object to be added
618 node_name = node.name
619 assert node_name != self._my_hostname
621 # Clean queue directory on added node
622 rpc.RpcRunner.call_jobqueue_purge(node_name)
624 if not node.master_candidate:
625 # remove if existing, ignoring errors
626 self._nodes.pop(node_name, None)
627 # and skip the replication of the job ids
630 # Upload the whole queue excluding archived jobs
631 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
633 # Upload current serial file
634 files.append(constants.JOB_QUEUE_SERIAL_FILE)
636 for file_name in files:
638 fd = open(file_name, "r")
644 result = rpc.RpcRunner.call_jobqueue_update([node_name],
647 if not result[node_name]:
648 logging.error("Failed to upload %s to %s", file_name, node_name)
650 self._nodes[node_name] = node.primary_ip
654 def RemoveNode(self, node_name):
655 """Callback called when removing nodes from the cluster.
658 @param node_name: the name of the node to remove
662 # The queue is removed by the "leave node" RPC call.
663 del self._nodes[node_name]
667 def _CheckRpcResult(self, result, nodes, failmsg):
668 """Verifies the status of an RPC call.
670 Since we aim to keep consistency should this node (the current
671 master) fail, we will log errors if our rpc fail, and especially
672 log the case when more than half of the nodes fails.
674 @param result: the data as returned from the rpc call
676 @param nodes: the list of nodes we made the call to
678 @param failmsg: the identifier to be used for logging
691 logging.error("%s failed on %s", failmsg, ", ".join(failed))
693 # +1 for the master node
694 if (len(success) + 1) < len(failed):
695 # TODO: Handle failing nodes
696 logging.error("More than half of the nodes failed")
698 def _GetNodeIp(self):
699 """Helper for returning the node name/ip list.
702 @return: a tuple of two lists, the first one with the node
703 names and the second one with the node addresses
706 name_list = self._nodes.keys()
707 addr_list = [self._nodes[name] for name in name_list]
708 return name_list, addr_list
710 def _WriteAndReplicateFileUnlocked(self, file_name, data):
711 """Writes a file locally and then replicates it to all nodes.
713 This function will replace the contents of a file on the local
714 node and then replicate it to all the other nodes we have.
717 @param file_name: the path of the file to be replicated
719 @param data: the new contents of the file
722 utils.WriteFile(file_name, data=data)
724 names, addrs = self._GetNodeIp()
725 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
726 self._CheckRpcResult(result, self._nodes,
727 "Updating %s" % file_name)
729 def _RenameFilesUnlocked(self, rename):
730 """Renames a file locally and then replicate the change.
732 This function will rename a file in the local queue directory
733 and then replicate this rename to all the other nodes we have.
735 @type rename: list of (old, new)
736 @param rename: List containing tuples mapping old to new names
739 # Rename them locally
740 for old, new in rename:
741 utils.RenameFile(old, new, mkdir=True)
743 # ... and on all nodes
744 names, addrs = self._GetNodeIp()
745 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
746 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
748 def _FormatJobID(self, job_id):
749 """Convert a job ID to string format.
751 Currently this just does C{str(job_id)} after performing some
752 checks, but if we want to change the job id format this will
753 abstract this change.
755 @type job_id: int or long
756 @param job_id: the numeric job id
758 @return: the formatted job id
761 if not isinstance(job_id, (int, long)):
762 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
764 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
769 def _GetArchiveDirectory(cls, job_id):
770 """Returns the archive directory for a job.
773 @param job_id: Job identifier
775 @return: Directory name
778 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
780 def _NewSerialUnlocked(self):
781 """Generates a new job identifier.
783 Job identifiers are unique during the lifetime of a cluster.
786 @return: a string representing the job identifier.
790 serial = self._last_serial + 1
793 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
796 # Keep it only if we were able to write the file
797 self._last_serial = serial
799 return self._FormatJobID(serial)
802 def _GetJobPath(job_id):
803 """Returns the job file for a given job id.
806 @param job_id: the job identifier
808 @return: the path to the job file
811 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
814 def _GetArchivedJobPath(cls, job_id):
815 """Returns the archived job file for a give job id.
818 @param job_id: the job identifier
820 @return: the path to the archived job file
823 path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
824 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
827 def _ExtractJobID(cls, name):
828 """Extract the job id from a filename.
831 @param name: the job filename
832 @rtype: job id or None
833 @return: the job id corresponding to the given filename,
834 or None if the filename does not represent a valid
838 m = cls._RE_JOB_FILE.match(name)
844 def _GetJobIDsUnlocked(self, archived=False):
845 """Return all known job IDs.
847 If the parameter archived is True, archived jobs IDs will be
848 included. Currently this argument is unused.
850 The method only looks at disk because it's a requirement that all
851 jobs are present on disk (so in the _memcache we don't have any
855 @return: the list of job IDs
858 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
859 jlist = utils.NiceSort(jlist)
862 def _ListJobFiles(self):
863 """Returns the list of current job files.
866 @return: the list of job file names
869 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
870 if self._RE_JOB_FILE.match(name)]
872 def _LoadJobUnlocked(self, job_id):
873 """Loads a job from the disk or memory.
875 Given a job id, this will return the cached job object if
876 existing, or try to load the job from the disk. If loading from
877 disk, it will also add the job to the cache.
879 @param job_id: the job id
880 @rtype: L{_QueuedJob} or None
881 @return: either None or the job object
884 job = self._memcache.get(job_id, None)
886 logging.debug("Found job %s in memcache", job_id)
889 filepath = self._GetJobPath(job_id)
890 logging.debug("Loading job from %s", filepath)
892 fd = open(filepath, "r")
894 if err.errno in (errno.ENOENT, ):
898 data = serializer.LoadJson(fd.read())
903 job = _QueuedJob.Restore(self, data)
904 except Exception, err:
905 new_path = self._GetArchivedJobPath(job_id)
906 if filepath == new_path:
907 # job already archived (future case)
908 logging.exception("Can't parse job %s", job_id)
911 logging.exception("Can't parse job %s, will archive.", job_id)
912 self._RenameFilesUnlocked([(filepath, new_path)])
915 self._memcache[job_id] = job
916 logging.debug("Added job %s to the cache", job_id)
919 def _GetJobsUnlocked(self, job_ids):
920 """Return a list of jobs based on their IDs.
923 @param job_ids: either an empty list (meaning all jobs),
926 @return: the list of job objects
930 job_ids = self._GetJobIDsUnlocked()
932 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
935 def _IsQueueMarkedDrain():
936 """Check if the queue is marked from drain.
938 This currently uses the queue drain file, which makes it a
939 per-node flag. In the future this can be moved to the config file.
942 @return: True of the job queue is marked for draining
945 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
948 def SetDrainFlag(drain_flag):
949 """Sets the drain flag for the queue.
951 This is similar to the function L{backend.JobQueueSetDrainFlag},
952 and in the future we might merge them.
954 @type drain_flag: boolean
955 @param drain_flag: Whether to set or unset the drain flag
959 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
961 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
966 def SubmitJob(self, ops):
967 """Create and store a new job.
969 This enters the job into our job queue and also puts it on the new
970 queue, in order for it to be picked up by the queue processors.
973 @param ops: The list of OpCodes that will become the new job.
975 @return: the job ID of the newly created job
976 @raise errors.JobQueueDrainError: if the job is marked for draining
979 if self._IsQueueMarkedDrain():
980 raise errors.JobQueueDrainError()
982 # Check job queue size
983 size = len(self._ListJobFiles())
984 if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
985 # TODO: Autoarchive jobs. Make sure it's not done on every job
986 # submission, though.
990 if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
991 raise errors.JobQueueFull()
994 job_id = self._NewSerialUnlocked()
995 job = _QueuedJob(self, job_id, ops)
998 self.UpdateJobUnlocked(job)
1000 logging.debug("Adding new job %s to the cache", job_id)
1001 self._memcache[job_id] = job
1003 # Add to worker pool
1004 self._wpool.AddTask(job)
1009 def UpdateJobUnlocked(self, job):
1010 """Update a job's on disk storage.
1012 After a job has been modified, this function needs to be called in
1013 order to write the changes to disk and replicate them to the other
1016 @type job: L{_QueuedJob}
1017 @param job: the changed job
1020 filename = self._GetJobPath(job.id)
1021 data = serializer.DumpJson(job.Serialize(), indent=False)
1022 logging.debug("Writing job %s to %s", job.id, filename)
1023 self._WriteAndReplicateFileUnlocked(filename, data)
1025 # Notify waiters about potential changes
1026 job.change.notifyAll()
1030 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1032 """Waits for changes in a job.
1034 @type job_id: string
1035 @param job_id: Job identifier
1036 @type fields: list of strings
1037 @param fields: Which fields to check for changes
1038 @type prev_job_info: list or None
1039 @param prev_job_info: Last job information returned
1040 @type prev_log_serial: int
1041 @param prev_log_serial: Last job message serial number
1042 @type timeout: float
1043 @param timeout: maximum time to wait
1044 @rtype: tuple (job info, log entries)
1045 @return: a tuple of the job information as required via
1046 the fields parameter, and the log entries as a list
1048 if the job has not changed and the timeout has expired,
1049 we instead return a special value,
1050 L{constants.JOB_NOTCHANGED}, which should be interpreted
1051 as such by the clients
1054 logging.debug("Waiting for changes in job %s", job_id)
1055 end_time = time.time() + timeout
1057 delta_time = end_time - time.time()
1059 return constants.JOB_NOTCHANGED
1061 job = self._LoadJobUnlocked(job_id)
1063 logging.debug("Job %s not found", job_id)
1066 status = job.CalcStatus()
1067 job_info = self._GetJobInfoUnlocked(job, fields)
1068 log_entries = job.GetLogEntries(prev_log_serial)
1070 # Serializing and deserializing data can cause type changes (e.g. from
1071 # tuple to list) or precision loss. We're doing it here so that we get
1072 # the same modifications as the data received from the client. Without
1073 # this, the comparison afterwards might fail without the data being
1074 # significantly different.
1075 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1076 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1078 if status not in (constants.JOB_STATUS_QUEUED,
1079 constants.JOB_STATUS_RUNNING,
1080 constants.JOB_STATUS_WAITLOCK):
1081 # Don't even try to wait if the job is no longer running, there will be
1085 if (prev_job_info != job_info or
1086 (log_entries and prev_log_serial != log_entries[0][0])):
1089 logging.debug("Waiting again")
1091 # Release the queue lock while waiting
1092 job.change.wait(delta_time)
1094 logging.debug("Job %s changed", job_id)
1096 return (job_info, log_entries)
1100 def CancelJob(self, job_id):
1103 This will only succeed if the job has not started yet.
1105 @type job_id: string
1106 @param job_id: job ID of job to be cancelled.
1109 logging.info("Cancelling job %s", job_id)
1111 job = self._LoadJobUnlocked(job_id)
1113 logging.debug("Job %s not found", job_id)
1114 return (False, "Job %s not found" % job_id)
1116 job_status = job.CalcStatus()
1118 if job_status not in (constants.JOB_STATUS_QUEUED,
1119 constants.JOB_STATUS_WAITLOCK):
1120 logging.debug("Job %s is no longer in the queue", job.id)
1121 return (False, "Job %s is no longer in the queue" % job.id)
1123 if job_status == constants.JOB_STATUS_QUEUED:
1124 self.CancelJobUnlocked(job)
1125 return (True, "Job %s canceled" % job.id)
1127 elif job_status == constants.JOB_STATUS_WAITLOCK:
1128 # The worker will notice the new status and cancel the job
1131 op.status = constants.OP_STATUS_CANCELING
1133 self.UpdateJobUnlocked(job)
1134 return (True, "Job %s will be canceled" % job.id)
1137 def CancelJobUnlocked(self, job):
1138 """Marks a job as canceled.
1143 op.status = constants.OP_STATUS_CANCELED
1144 op.result = "Job canceled by request"
1146 self.UpdateJobUnlocked(job)
1149 def _ArchiveJobsUnlocked(self, jobs):
1152 @type jobs: list of L{_QueuedJob}
1153 @param jobs: Job objects
1155 @return: Number of archived jobs
1161 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1162 constants.JOB_STATUS_SUCCESS,
1163 constants.JOB_STATUS_ERROR):
1164 logging.debug("Job %s is not yet done", job.id)
1167 archive_jobs.append(job)
1169 old = self._GetJobPath(job.id)
1170 new = self._GetArchivedJobPath(job.id)
1171 rename_files.append((old, new))
1173 # TODO: What if 1..n files fail to rename?
1174 self._RenameFilesUnlocked(rename_files)
1176 logging.debug("Successfully archived job(s) %s",
1177 ", ".join(job.id for job in archive_jobs))
1179 return len(archive_jobs)
1183 def ArchiveJob(self, job_id):
1186 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1188 @type job_id: string
1189 @param job_id: Job ID of job to be archived.
1191 @return: Whether job was archived
1194 logging.info("Archiving job %s", job_id)
1196 job = self._LoadJobUnlocked(job_id)
1198 logging.debug("Job %s not found", job_id)
1201 return self._ArchiveJobsUnlocked([job]) == 1
1205 def AutoArchiveJobs(self, age, timeout):
1206 """Archives all jobs based on age.
1208 The method will archive all jobs which are older than the age
1209 parameter. For jobs that don't have an end timestamp, the start
1210 timestamp will be considered. The special '-1' age will cause
1211 archival of all jobs (that are not running or queued).
1214 @param age: the minimum age in seconds
1217 logging.info("Archiving jobs with age more than %s seconds", age)
1220 end_time = now + timeout
1224 all_job_ids = self._GetJobIDsUnlocked(archived=False)
1226 for idx, job_id in enumerate(all_job_ids):
1229 # Not optimal because jobs could be pending
1230 # TODO: Measure average duration for job archival and take number of
1231 # pending jobs into account.
1232 if time.time() > end_time:
1235 # Returns None if the job failed to load
1236 job = self._LoadJobUnlocked(job_id)
1238 if job.end_timestamp is None:
1239 if job.start_timestamp is None:
1240 job_age = job.received_timestamp
1242 job_age = job.start_timestamp
1244 job_age = job.end_timestamp
1246 if age == -1 or now - job_age[0] > age:
1249 # Archive 10 jobs at a time
1250 if len(pending) >= 10:
1251 archived_count += self._ArchiveJobsUnlocked(pending)
1255 archived_count += self._ArchiveJobsUnlocked(pending)
1257 return (archived_count, len(all_job_ids) - last_touched - 1)
1259 def _GetJobInfoUnlocked(self, job, fields):
1260 """Returns information about a job.
1262 @type job: L{_QueuedJob}
1263 @param job: the job which we query
1265 @param fields: names of fields to return
1267 @return: list with one element for each field
1268 @raise errors.OpExecError: when an invalid field
1273 for fname in fields:
1276 elif fname == "status":
1277 row.append(job.CalcStatus())
1278 elif fname == "ops":
1279 row.append([op.input.__getstate__() for op in job.ops])
1280 elif fname == "opresult":
1281 row.append([op.result for op in job.ops])
1282 elif fname == "opstatus":
1283 row.append([op.status for op in job.ops])
1284 elif fname == "oplog":
1285 row.append([op.log for op in job.ops])
1286 elif fname == "opstart":
1287 row.append([op.start_timestamp for op in job.ops])
1288 elif fname == "opend":
1289 row.append([op.end_timestamp for op in job.ops])
1290 elif fname == "received_ts":
1291 row.append(job.received_timestamp)
1292 elif fname == "start_ts":
1293 row.append(job.start_timestamp)
1294 elif fname == "end_ts":
1295 row.append(job.end_timestamp)
1296 elif fname == "summary":
1297 row.append([op.input.Summary() for op in job.ops])
1299 raise errors.OpExecError("Invalid job query field '%s'" % fname)
1304 def QueryJobs(self, job_ids, fields):
1305 """Returns a list of jobs in queue.
1307 This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1308 processing for each job.
1311 @param job_ids: sequence of job identifiers or None for all
1313 @param fields: names of fields to return
1315 @return: list one element per job, each element being list with
1316 the requested fields
1321 for job in self._GetJobsUnlocked(job_ids):
1325 jobs.append(self._GetJobInfoUnlocked(job, fields))
1332 """Stops the job queue.
1334 This shutdowns all the worker threads an closes the queue.
1337 self._wpool.TerminateWorkers()
1339 self._queue_lock.Close()
1340 self._queue_lock = None