4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
55 class CancelJob(Exception):
56 """Special exception to cancel a job.
62 """Returns the current timestamp.
65 @return: the current time in the (seconds, microseconds) format
68 return utils.SplitTime(time.time())
71 class _QueuedOpCode(object):
72 """Encasulates an opcode object.
74 @ivar log: holds the execution log and consists of tuples
75 of the form C{(log_serial, timestamp, level, message)}
76 @ivar input: the OpCode we encapsulate
77 @ivar status: the current status
78 @ivar result: the result of the LU execution
79 @ivar start_timestamp: timestamp for the start of the execution
80 @ivar stop_timestamp: timestamp for the end of the execution
83 def __init__(self, op):
84 """Constructor for the _QuededOpCode.
86 @type op: L{opcodes.OpCode}
87 @param op: the opcode we encapsulate
91 self.status = constants.OP_STATUS_QUEUED
94 self.start_timestamp = None
95 self.end_timestamp = None
98 def Restore(cls, state):
99 """Restore the _QueuedOpCode from the serialized form.
102 @param state: the serialized state
103 @rtype: _QueuedOpCode
104 @return: a new _QueuedOpCode instance
107 obj = _QueuedOpCode.__new__(cls)
108 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109 obj.status = state["status"]
110 obj.result = state["result"]
111 obj.log = state["log"]
112 obj.start_timestamp = state.get("start_timestamp", None)
113 obj.end_timestamp = state.get("end_timestamp", None)
117 """Serializes this _QueuedOpCode.
120 @return: the dictionary holding the serialized state
124 "input": self.input.__getstate__(),
125 "status": self.status,
126 "result": self.result,
128 "start_timestamp": self.start_timestamp,
129 "end_timestamp": self.end_timestamp,
133 class _QueuedJob(object):
134 """In-memory job representation.
136 This is what we use to track the user-submitted jobs. Locking must
137 be taken care of by users of this class.
139 @type queue: L{JobQueue}
140 @ivar queue: the parent queue
143 @ivar ops: the list of _QueuedOpCode that constitute the job
144 @type run_op_index: int
145 @ivar run_op_index: the currently executing opcode, or -1 if
146 we didn't yet start executing
147 @type log_serial: int
148 @ivar log_serial: holds the index for the next log entry
149 @ivar received_timestamp: the timestamp for when the job was received
150 @ivar start_timestmap: the timestamp for start of execution
151 @ivar end_timestamp: the timestamp for end of execution
152 @ivar change: a Condition variable we use for waiting for job changes
155 def __init__(self, queue, job_id, ops):
156 """Constructor for the _QueuedJob.
158 @type queue: L{JobQueue}
159 @param queue: our parent queue
161 @param job_id: our job id
163 @param ops: the list of opcodes we hold, which will be encapsulated
168 # TODO: use a better exception
169 raise Exception("No opcodes")
173 self.ops = [_QueuedOpCode(op) for op in ops]
174 self.run_op_index = -1
176 self.received_timestamp = TimeStampNow()
177 self.start_timestamp = None
178 self.end_timestamp = None
180 # Condition to wait for changes
181 self.change = threading.Condition(self.queue._lock)
184 def Restore(cls, queue, state):
185 """Restore a _QueuedJob from serialized state:
187 @type queue: L{JobQueue}
188 @param queue: to which queue the restored job belongs
190 @param state: the serialized state
192 @return: the restored _JobQueue instance
195 obj = _QueuedJob.__new__(cls)
198 obj.run_op_index = state["run_op_index"]
199 obj.received_timestamp = state.get("received_timestamp", None)
200 obj.start_timestamp = state.get("start_timestamp", None)
201 obj.end_timestamp = state.get("end_timestamp", None)
205 for op_state in state["ops"]:
206 op = _QueuedOpCode.Restore(op_state)
207 for log_entry in op.log:
208 obj.log_serial = max(obj.log_serial, log_entry[0])
211 # Condition to wait for changes
212 obj.change = threading.Condition(obj.queue._lock)
217 """Serialize the _JobQueue instance.
220 @return: the serialized state
225 "ops": [op.Serialize() for op in self.ops],
226 "run_op_index": self.run_op_index,
227 "start_timestamp": self.start_timestamp,
228 "end_timestamp": self.end_timestamp,
229 "received_timestamp": self.received_timestamp,
232 def CalcStatus(self):
233 """Compute the status of this job.
235 This function iterates over all the _QueuedOpCodes in the job and
236 based on their status, computes the job status.
239 - if we find a cancelled, or finished with error, the job
240 status will be the same
241 - otherwise, the last opcode with the status one of:
246 will determine the job status
248 - otherwise, it means either all opcodes are queued, or success,
249 and the job status will be the same
251 @return: the job status
254 status = constants.JOB_STATUS_QUEUED
258 if op.status == constants.OP_STATUS_SUCCESS:
263 if op.status == constants.OP_STATUS_QUEUED:
265 elif op.status == constants.OP_STATUS_WAITLOCK:
266 status = constants.JOB_STATUS_WAITLOCK
267 elif op.status == constants.OP_STATUS_RUNNING:
268 status = constants.JOB_STATUS_RUNNING
269 elif op.status == constants.OP_STATUS_CANCELING:
270 status = constants.JOB_STATUS_CANCELING
272 elif op.status == constants.OP_STATUS_ERROR:
273 status = constants.JOB_STATUS_ERROR
274 # The whole job fails if one opcode failed
276 elif op.status == constants.OP_STATUS_CANCELED:
277 status = constants.OP_STATUS_CANCELED
281 status = constants.JOB_STATUS_SUCCESS
285 def GetLogEntries(self, newer_than):
286 """Selectively returns the log entries.
288 @type newer_than: None or int
289 @param newer_than: if this is None, return all log enties,
290 otherwise return only the log entries with serial higher
293 @return: the list of the log entries selected
296 if newer_than is None:
303 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
308 class _JobQueueWorker(workerpool.BaseWorker):
309 """The actual job workers.
312 def _NotifyStart(self):
313 """Mark the opcode as running, not lock-waiting.
315 This is called from the mcpu code as a notifier function, when the
316 LU is finally about to start the Exec() method. Of course, to have
317 end-user visible results, the opcode must be initially (before
318 calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
321 assert self.queue, "Queue attribute is missing"
322 assert self.opcode, "Opcode attribute is missing"
326 assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327 constants.OP_STATUS_CANCELING)
329 # Cancel here if we were asked to
330 if self.opcode.status == constants.OP_STATUS_CANCELING:
333 self.opcode.status = constants.OP_STATUS_RUNNING
337 def RunTask(self, job):
340 This functions processes a job. It is closely tied to the _QueuedJob and
341 _QueuedOpCode classes.
343 @type job: L{_QueuedJob}
344 @param job: the job to be processed
347 logging.info("Worker %s processing job %s",
348 self.worker_id, job.id)
349 proc = mcpu.Processor(self.pool.queue.context)
350 self.queue = queue = job.queue
354 for idx, op in enumerate(job.ops):
355 op_summary = op.input.Summary()
357 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
362 if op.status == constants.OP_STATUS_CANCELED:
364 assert op.status == constants.OP_STATUS_QUEUED
365 job.run_op_index = idx
366 op.status = constants.OP_STATUS_WAITLOCK
368 op.start_timestamp = TimeStampNow()
369 if idx == 0: # first opcode
370 job.start_timestamp = op.start_timestamp
371 queue.UpdateJobUnlocked(job)
373 input_opcode = op.input
378 """Append a log entry.
384 log_type = constants.ELOG_MESSAGE
387 log_type, log_msg = args
389 # The time is split to make serialization easier and not lose
391 timestamp = utils.SplitTime(time.time())
396 op.log.append((job.log_serial, timestamp, log_type, log_msg))
398 job.change.notifyAll()
402 # Make sure not to hold lock while _Log is called
404 result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
408 op.status = constants.OP_STATUS_SUCCESS
410 op.end_timestamp = TimeStampNow()
411 queue.UpdateJobUnlocked(job)
415 logging.info("Op %s/%s: Successfully finished opcode %s",
416 idx + 1, count, op_summary)
418 # Will be handled further up
420 except Exception, err:
424 op.status = constants.OP_STATUS_ERROR
426 op.end_timestamp = TimeStampNow()
427 logging.info("Op %s/%s: Error in opcode %s: %s",
428 idx + 1, count, op_summary, err)
430 queue.UpdateJobUnlocked(job)
438 queue.CancelJobUnlocked(job)
441 except errors.GenericError, err:
442 logging.exception("Ganeti exception")
444 logging.exception("Unhandled exception")
450 job.end_timestamp = TimeStampNow()
451 queue.UpdateJobUnlocked(job)
454 status = job.CalcStatus()
457 logging.info("Worker %s finished job %s, status = %s",
458 self.worker_id, job_id, status)
461 class _JobQueueWorkerPool(workerpool.WorkerPool):
462 """Simple class implementing a job-processing workerpool.
465 def __init__(self, queue):
466 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
471 class JobQueue(object):
472 """Quue used to manaage the jobs.
474 @cvar _RE_JOB_FILE: regex matching the valid job file names
477 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
479 def _RequireOpenQueue(fn):
480 """Decorator for "public" functions.
482 This function should be used for all 'public' functions. That is,
483 functions usually called from other classes.
485 @warning: Use this decorator only after utils.LockedMethod!
494 def wrapper(self, *args, **kwargs):
495 assert self._queue_lock is not None, "Queue should be open"
496 return fn(self, *args, **kwargs)
499 def __init__(self, context):
500 """Constructor for JobQueue.
502 The constructor will initialize the job queue object and then
503 start loading the current jobs from disk, either for starting them
504 (if they were queue) or for aborting them (if they were already
507 @type context: GanetiContext
508 @param context: the context object for access to the configuration
509 data and other ganeti objects
512 self.context = context
513 self._memcache = weakref.WeakValueDictionary()
514 self._my_hostname = utils.HostInfo().name
517 self._lock = threading.Lock()
518 self.acquire = self._lock.acquire
519 self.release = self._lock.release
522 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
525 self._last_serial = jstore.ReadSerial()
526 assert self._last_serial is not None, ("Serial file was modified between"
527 " check in jstore and here")
529 # Get initial list of nodes
530 self._nodes = dict((n.name, n.primary_ip)
531 for n in self.context.cfg.GetAllNodesInfo().values()
532 if n.master_candidate)
536 del self._nodes[self._my_hostname]
540 # TODO: Check consistency across nodes
543 self._wpool = _JobQueueWorkerPool(self)
545 # We need to lock here because WorkerPool.AddTask() may start a job while
546 # we're still doing our work.
549 logging.info("Inspecting job queue")
551 all_job_ids = self._GetJobIDsUnlocked()
552 jobs_count = len(all_job_ids)
553 lastinfo = time.time()
554 for idx, job_id in enumerate(all_job_ids):
555 # Give an update every 1000 jobs or 10 seconds
556 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
557 idx == (jobs_count - 1)):
558 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
559 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
560 lastinfo = time.time()
562 job = self._LoadJobUnlocked(job_id)
564 # a failure in loading the job can cause 'None' to be returned
568 status = job.CalcStatus()
570 if status in (constants.JOB_STATUS_QUEUED, ):
571 self._wpool.AddTask(job)
573 elif status in (constants.JOB_STATUS_RUNNING,
574 constants.JOB_STATUS_WAITLOCK,
575 constants.JOB_STATUS_CANCELING):
576 logging.warning("Unfinished job %s found: %s", job.id, job)
579 op.status = constants.OP_STATUS_ERROR
580 op.result = "Unclean master daemon shutdown"
582 self.UpdateJobUnlocked(job)
584 logging.info("Job queue inspection finished")
588 self._wpool.TerminateWorkers()
593 def AddNode(self, node):
594 """Register a new node with the queue.
596 @type node: L{objects.Node}
597 @param node: the node object to be added
600 node_name = node.name
601 assert node_name != self._my_hostname
603 # Clean queue directory on added node
604 rpc.RpcRunner.call_jobqueue_purge(node_name)
606 if not node.master_candidate:
607 # remove if existing, ignoring errors
608 self._nodes.pop(node_name, None)
609 # and skip the replication of the job ids
612 # Upload the whole queue excluding archived jobs
613 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
615 # Upload current serial file
616 files.append(constants.JOB_QUEUE_SERIAL_FILE)
618 for file_name in files:
620 fd = open(file_name, "r")
626 result = rpc.RpcRunner.call_jobqueue_update([node_name],
629 if not result[node_name]:
630 logging.error("Failed to upload %s to %s", file_name, node_name)
632 self._nodes[node_name] = node.primary_ip
636 def RemoveNode(self, node_name):
637 """Callback called when removing nodes from the cluster.
640 @param node_name: the name of the node to remove
644 # The queue is removed by the "leave node" RPC call.
645 del self._nodes[node_name]
649 def _CheckRpcResult(self, result, nodes, failmsg):
650 """Verifies the status of an RPC call.
652 Since we aim to keep consistency should this node (the current
653 master) fail, we will log errors if our rpc fail, and especially
654 log the case when more than half of the nodes failes.
656 @param result: the data as returned from the rpc call
658 @param nodes: the list of nodes we made the call to
660 @param failmsg: the identifier to be used for logging
673 logging.error("%s failed on %s", failmsg, ", ".join(failed))
675 # +1 for the master node
676 if (len(success) + 1) < len(failed):
677 # TODO: Handle failing nodes
678 logging.error("More than half of the nodes failed")
680 def _GetNodeIp(self):
681 """Helper for returning the node name/ip list.
684 @return: a tuple of two lists, the first one with the node
685 names and the second one with the node addresses
688 name_list = self._nodes.keys()
689 addr_list = [self._nodes[name] for name in name_list]
690 return name_list, addr_list
692 def _WriteAndReplicateFileUnlocked(self, file_name, data):
693 """Writes a file locally and then replicates it to all nodes.
695 This function will replace the contents of a file on the local
696 node and then replicate it to all the other nodes we have.
699 @param file_name: the path of the file to be replicated
701 @param data: the new contents of the file
704 utils.WriteFile(file_name, data=data)
706 names, addrs = self._GetNodeIp()
707 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
708 self._CheckRpcResult(result, self._nodes,
709 "Updating %s" % file_name)
711 def _RenameFilesUnlocked(self, rename):
712 """Renames a file locally and then replicate the change.
714 This function will rename a file in the local queue directory
715 and then replicate this rename to all the other nodes we have.
717 @type rename: list of (old, new)
718 @param rename: List containing tuples mapping old to new names
721 # Rename them locally
722 for old, new in rename:
723 utils.RenameFile(old, new, mkdir=True)
725 # ... and on all nodes
726 names, addrs = self._GetNodeIp()
727 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
728 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
730 def _FormatJobID(self, job_id):
731 """Convert a job ID to string format.
733 Currently this just does C{str(job_id)} after performing some
734 checks, but if we want to change the job id format this will
735 abstract this change.
737 @type job_id: int or long
738 @param job_id: the numeric job id
740 @return: the formatted job id
743 if not isinstance(job_id, (int, long)):
744 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
746 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
751 def _GetArchiveDirectory(cls, job_id):
752 """Returns the archive directory for a job.
755 @param job_id: Job identifier
757 @return: Directory name
760 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
762 def _NewSerialUnlocked(self):
763 """Generates a new job identifier.
765 Job identifiers are unique during the lifetime of a cluster.
768 @return: a string representing the job identifier.
772 serial = self._last_serial + 1
775 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
778 # Keep it only if we were able to write the file
779 self._last_serial = serial
781 return self._FormatJobID(serial)
784 def _GetJobPath(job_id):
785 """Returns the job file for a given job id.
788 @param job_id: the job identifier
790 @return: the path to the job file
793 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
796 def _GetArchivedJobPath(cls, job_id):
797 """Returns the archived job file for a give job id.
800 @param job_id: the job identifier
802 @return: the path to the archived job file
805 path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
806 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
809 def _ExtractJobID(cls, name):
810 """Extract the job id from a filename.
813 @param name: the job filename
814 @rtype: job id or None
815 @return: the job id corresponding to the given filename,
816 or None if the filename does not represent a valid
820 m = cls._RE_JOB_FILE.match(name)
826 def _GetJobIDsUnlocked(self, archived=False):
827 """Return all known job IDs.
829 If the parameter archived is True, archived jobs IDs will be
830 included. Currently this argument is unused.
832 The method only looks at disk because it's a requirement that all
833 jobs are present on disk (so in the _memcache we don't have any
837 @return: the list of job IDs
840 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
841 jlist = utils.NiceSort(jlist)
844 def _ListJobFiles(self):
845 """Returns the list of current job files.
848 @return: the list of job file names
851 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
852 if self._RE_JOB_FILE.match(name)]
854 def _LoadJobUnlocked(self, job_id):
855 """Loads a job from the disk or memory.
857 Given a job id, this will return the cached job object if
858 existing, or try to load the job from the disk. If loading from
859 disk, it will also add the job to the cache.
861 @param job_id: the job id
862 @rtype: L{_QueuedJob} or None
863 @return: either None or the job object
866 job = self._memcache.get(job_id, None)
868 logging.debug("Found job %s in memcache", job_id)
871 filepath = self._GetJobPath(job_id)
872 logging.debug("Loading job from %s", filepath)
874 fd = open(filepath, "r")
876 if err.errno in (errno.ENOENT, ):
880 data = serializer.LoadJson(fd.read())
885 job = _QueuedJob.Restore(self, data)
886 except Exception, err:
887 new_path = self._GetArchivedJobPath(job_id)
888 if filepath == new_path:
889 # job already archived (future case)
890 logging.exception("Can't parse job %s", job_id)
893 logging.exception("Can't parse job %s, will archive.", job_id)
894 self._RenameFilesUnlocked([(filepath, new_path)])
897 self._memcache[job_id] = job
898 logging.debug("Added job %s to the cache", job_id)
901 def _GetJobsUnlocked(self, job_ids):
902 """Return a list of jobs based on their IDs.
905 @param job_ids: either an empty list (meaning all jobs),
908 @return: the list of job objects
912 job_ids = self._GetJobIDsUnlocked()
914 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
917 def _IsQueueMarkedDrain():
918 """Check if the queue is marked from drain.
920 This currently uses the queue drain file, which makes it a
921 per-node flag. In the future this can be moved to the config file.
924 @return: True of the job queue is marked for draining
927 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
930 def SetDrainFlag(drain_flag):
931 """Sets the drain flag for the queue.
933 This is similar to the function L{backend.JobQueueSetDrainFlag},
934 and in the future we might merge them.
936 @type drain_flag: boolean
937 @param drain_flag: wheter to set or unset the drain flag
941 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
943 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
947 def _SubmitJobUnlocked(self, ops):
948 """Create and store a new job.
950 This enters the job into our job queue and also puts it on the new
951 queue, in order for it to be picked up by the queue processors.
954 @param ops: The list of OpCodes that will become the new job.
956 @return: the job ID of the newly created job
957 @raise errors.JobQueueDrainError: if the job is marked for draining
960 if self._IsQueueMarkedDrain():
961 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
963 # Check job queue size
964 size = len(self._ListJobFiles())
965 if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
966 # TODO: Autoarchive jobs. Make sure it's not done on every job
967 # submission, though.
971 if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
972 raise errors.JobQueueFull()
975 job_id = self._NewSerialUnlocked()
976 job = _QueuedJob(self, job_id, ops)
979 self.UpdateJobUnlocked(job)
981 logging.debug("Adding new job %s to the cache", job_id)
982 self._memcache[job_id] = job
985 self._wpool.AddTask(job)
991 def SubmitJob(self, ops):
992 """Create and store a new job.
994 @see: L{_SubmitJobUnlocked}
997 return self._SubmitJobUnlocked(ops)
1001 def SubmitManyJobs(self, jobs):
1002 """Create and store multiple jobs.
1004 @see: L{_SubmitJobUnlocked}
1010 data = self._SubmitJobUnlocked(ops)
1012 except errors.GenericError, err:
1015 results.append((status, data))
1021 def UpdateJobUnlocked(self, job):
1022 """Update a job's on disk storage.
1024 After a job has been modified, this function needs to be called in
1025 order to write the changes to disk and replicate them to the other
1028 @type job: L{_QueuedJob}
1029 @param job: the changed job
1032 filename = self._GetJobPath(job.id)
1033 data = serializer.DumpJson(job.Serialize(), indent=False)
1034 logging.debug("Writing job %s to %s", job.id, filename)
1035 self._WriteAndReplicateFileUnlocked(filename, data)
1037 # Notify waiters about potential changes
1038 job.change.notifyAll()
1042 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1044 """Waits for changes in a job.
1046 @type job_id: string
1047 @param job_id: Job identifier
1048 @type fields: list of strings
1049 @param fields: Which fields to check for changes
1050 @type prev_job_info: list or None
1051 @param prev_job_info: Last job information returned
1052 @type prev_log_serial: int
1053 @param prev_log_serial: Last job message serial number
1054 @type timeout: float
1055 @param timeout: maximum time to wait
1056 @rtype: tuple (job info, log entries)
1057 @return: a tuple of the job information as required via
1058 the fields parameter, and the log entries as a list
1060 if the job has not changed and the timeout has expired,
1061 we instead return a special value,
1062 L{constants.JOB_NOTCHANGED}, which should be interpreted
1063 as such by the clients
1066 logging.debug("Waiting for changes in job %s", job_id)
1067 end_time = time.time() + timeout
1069 delta_time = end_time - time.time()
1071 return constants.JOB_NOTCHANGED
1073 job = self._LoadJobUnlocked(job_id)
1075 logging.debug("Job %s not found", job_id)
1078 status = job.CalcStatus()
1079 job_info = self._GetJobInfoUnlocked(job, fields)
1080 log_entries = job.GetLogEntries(prev_log_serial)
1082 # Serializing and deserializing data can cause type changes (e.g. from
1083 # tuple to list) or precision loss. We're doing it here so that we get
1084 # the same modifications as the data received from the client. Without
1085 # this, the comparison afterwards might fail without the data being
1086 # significantly different.
1087 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1088 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1090 if status not in (constants.JOB_STATUS_QUEUED,
1091 constants.JOB_STATUS_RUNNING,
1092 constants.JOB_STATUS_WAITLOCK):
1093 # Don't even try to wait if the job is no longer running, there will be
1097 if (prev_job_info != job_info or
1098 (log_entries and prev_log_serial != log_entries[0][0])):
1101 logging.debug("Waiting again")
1103 # Release the queue lock while waiting
1104 job.change.wait(delta_time)
1106 logging.debug("Job %s changed", job_id)
1108 return (job_info, log_entries)
1112 def CancelJob(self, job_id):
1115 This will only succeed if the job has not started yet.
1117 @type job_id: string
1118 @param job_id: job ID of job to be cancelled.
1121 logging.info("Cancelling job %s", job_id)
1123 job = self._LoadJobUnlocked(job_id)
1125 logging.debug("Job %s not found", job_id)
1126 return (False, "Job %s not found" % job_id)
1128 job_status = job.CalcStatus()
1130 if job_status not in (constants.JOB_STATUS_QUEUED,
1131 constants.JOB_STATUS_WAITLOCK):
1132 logging.debug("Job %s is no longer in the queue", job.id)
1133 return (False, "Job %s is no longer in the queue" % job.id)
1135 if job_status == constants.JOB_STATUS_QUEUED:
1136 self.CancelJobUnlocked(job)
1137 return (True, "Job %s canceled" % job.id)
1139 elif job_status == constants.JOB_STATUS_WAITLOCK:
1140 # The worker will notice the new status and cancel the job
1143 op.status = constants.OP_STATUS_CANCELING
1145 self.UpdateJobUnlocked(job)
1146 return (True, "Job %s will be canceled" % job.id)
1149 def CancelJobUnlocked(self, job):
1150 """Marks a job as canceled.
1155 op.status = constants.OP_STATUS_CANCELED
1156 op.result = "Job canceled by request"
1158 self.UpdateJobUnlocked(job)
1161 def _ArchiveJobsUnlocked(self, jobs):
1164 @type jobs: list of L{_QueuedJob}
1165 @param jobs: Job objects
1167 @return: Number of archived jobs
1173 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1174 constants.JOB_STATUS_SUCCESS,
1175 constants.JOB_STATUS_ERROR):
1176 logging.debug("Job %s is not yet done", job.id)
1179 archive_jobs.append(job)
1181 old = self._GetJobPath(job.id)
1182 new = self._GetArchivedJobPath(job.id)
1183 rename_files.append((old, new))
1185 # TODO: What if 1..n files fail to rename?
1186 self._RenameFilesUnlocked(rename_files)
1188 logging.debug("Successfully archived job(s) %s",
1189 ", ".join(job.id for job in archive_jobs))
1191 return len(archive_jobs)
1195 def ArchiveJob(self, job_id):
1198 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1200 @type job_id: string
1201 @param job_id: Job ID of job to be archived.
1203 @return: Whether job was archived
1206 logging.info("Archiving job %s", job_id)
1208 job = self._LoadJobUnlocked(job_id)
1210 logging.debug("Job %s not found", job_id)
1213 return self._ArchiveJobsUnlocked([job]) == 1
1217 def AutoArchiveJobs(self, age, timeout):
1218 """Archives all jobs based on age.
1220 The method will archive all jobs which are older than the age
1221 parameter. For jobs that don't have an end timestamp, the start
1222 timestamp will be considered. The special '-1' age will cause
1223 archival of all jobs (that are not running or queued).
1226 @param age: the minimum age in seconds
1229 logging.info("Archiving jobs with age more than %s seconds", age)
1232 end_time = now + timeout
1236 all_job_ids = self._GetJobIDsUnlocked(archived=False)
1238 for idx, job_id in enumerate(all_job_ids):
1241 # Not optimal because jobs could be pending
1242 # TODO: Measure average duration for job archival and take number of
1243 # pending jobs into account.
1244 if time.time() > end_time:
1247 # Returns None if the job failed to load
1248 job = self._LoadJobUnlocked(job_id)
1250 if job.end_timestamp is None:
1251 if job.start_timestamp is None:
1252 job_age = job.received_timestamp
1254 job_age = job.start_timestamp
1256 job_age = job.end_timestamp
1258 if age == -1 or now - job_age[0] > age:
1261 # Archive 10 jobs at a time
1262 if len(pending) >= 10:
1263 archived_count += self._ArchiveJobsUnlocked(pending)
1267 archived_count += self._ArchiveJobsUnlocked(pending)
1269 return (archived_count, len(all_job_ids) - last_touched - 1)
1271 def _GetJobInfoUnlocked(self, job, fields):
1272 """Returns information about a job.
1274 @type job: L{_QueuedJob}
1275 @param job: the job which we query
1277 @param fields: names of fields to return
1279 @return: list with one element for each field
1280 @raise errors.OpExecError: when an invalid field
1285 for fname in fields:
1288 elif fname == "status":
1289 row.append(job.CalcStatus())
1290 elif fname == "ops":
1291 row.append([op.input.__getstate__() for op in job.ops])
1292 elif fname == "opresult":
1293 row.append([op.result for op in job.ops])
1294 elif fname == "opstatus":
1295 row.append([op.status for op in job.ops])
1296 elif fname == "oplog":
1297 row.append([op.log for op in job.ops])
1298 elif fname == "opstart":
1299 row.append([op.start_timestamp for op in job.ops])
1300 elif fname == "opend":
1301 row.append([op.end_timestamp for op in job.ops])
1302 elif fname == "received_ts":
1303 row.append(job.received_timestamp)
1304 elif fname == "start_ts":
1305 row.append(job.start_timestamp)
1306 elif fname == "end_ts":
1307 row.append(job.end_timestamp)
1308 elif fname == "summary":
1309 row.append([op.input.Summary() for op in job.ops])
1311 raise errors.OpExecError("Invalid job query field '%s'" % fname)
1316 def QueryJobs(self, job_ids, fields):
1317 """Returns a list of jobs in queue.
1319 This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1320 processing for each job.
1323 @param job_ids: sequence of job identifiers or None for all
1325 @param fields: names of fields to return
1327 @return: list one element per job, each element being list with
1328 the requested fields
1333 for job in self._GetJobsUnlocked(job_ids):
1337 jobs.append(self._GetJobInfoUnlocked(job, fields))
1344 """Stops the job queue.
1346 This shutdowns all the worker threads an closes the queue.
1349 self._wpool.TerminateWorkers()
1351 self._queue_lock.Close()
1352 self._queue_lock = None