4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
55 """Special exception to cancel a job.
61 """Returns the current timestamp.
64 @return: the current time in the (seconds, microseconds) format
67 return utils.SplitTime(time.time())
70 class _QueuedOpCode(object):
71 """Encasulates an opcode object.
73 @ivar log: holds the execution log and consists of tuples
74 of the form C{(log_serial, timestamp, level, message)}
75 @ivar input: the OpCode we encapsulate
76 @ivar status: the current status
77 @ivar result: the result of the LU execution
78 @ivar start_timestamp: timestamp for the start of the execution
79 @ivar stop_timestamp: timestamp for the end of the execution
82 def __init__(self, op):
83 """Constructor for the _QuededOpCode.
85 @type op: L{opcodes.OpCode}
86 @param op: the opcode we encapsulate
90 self.status = constants.OP_STATUS_QUEUED
93 self.start_timestamp = None
94 self.end_timestamp = None
97 def Restore(cls, state):
98 """Restore the _QueuedOpCode from the serialized form.
101 @param state: the serialized state
102 @rtype: _QueuedOpCode
103 @return: a new _QueuedOpCode instance
106 obj = _QueuedOpCode.__new__(cls)
107 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
108 obj.status = state["status"]
109 obj.result = state["result"]
110 obj.log = state["log"]
111 obj.start_timestamp = state.get("start_timestamp", None)
112 obj.end_timestamp = state.get("end_timestamp", None)
116 """Serializes this _QueuedOpCode.
119 @return: the dictionary holding the serialized state
123 "input": self.input.__getstate__(),
124 "status": self.status,
125 "result": self.result,
127 "start_timestamp": self.start_timestamp,
128 "end_timestamp": self.end_timestamp,
132 class _QueuedJob(object):
133 """In-memory job representation.
135 This is what we use to track the user-submitted jobs. Locking must
136 be taken care of by users of this class.
138 @type queue: L{JobQueue}
139 @ivar queue: the parent queue
142 @ivar ops: the list of _QueuedOpCode that constitute the job
143 @type run_op_index: int
144 @ivar run_op_index: the currently executing opcode, or -1 if
145 we didn't yet start executing
146 @type log_serial: int
147 @ivar log_serial: holds the index for the next log entry
148 @ivar received_timestamp: the timestamp for when the job was received
149 @ivar start_timestmap: the timestamp for start of execution
150 @ivar end_timestamp: the timestamp for end of execution
151 @ivar change: a Condition variable we use for waiting for job changes
154 def __init__(self, queue, job_id, ops):
155 """Constructor for the _QueuedJob.
157 @type queue: L{JobQueue}
158 @param queue: our parent queue
160 @param job_id: our job id
162 @param ops: the list of opcodes we hold, which will be encapsulated
167 # TODO: use a better exception
168 raise Exception("No opcodes")
172 self.ops = [_QueuedOpCode(op) for op in ops]
173 self.run_op_index = -1
175 self.received_timestamp = TimeStampNow()
176 self.start_timestamp = None
177 self.end_timestamp = None
179 # Condition to wait for changes
180 self.change = threading.Condition(self.queue._lock)
183 def Restore(cls, queue, state):
184 """Restore a _QueuedJob from serialized state:
186 @type queue: L{JobQueue}
187 @param queue: to which queue the restored job belongs
189 @param state: the serialized state
191 @return: the restored _JobQueue instance
194 obj = _QueuedJob.__new__(cls)
197 obj.run_op_index = state["run_op_index"]
198 obj.received_timestamp = state.get("received_timestamp", None)
199 obj.start_timestamp = state.get("start_timestamp", None)
200 obj.end_timestamp = state.get("end_timestamp", None)
204 for op_state in state["ops"]:
205 op = _QueuedOpCode.Restore(op_state)
206 for log_entry in op.log:
207 obj.log_serial = max(obj.log_serial, log_entry[0])
210 # Condition to wait for changes
211 obj.change = threading.Condition(obj.queue._lock)
216 """Serialize the _JobQueue instance.
219 @return: the serialized state
224 "ops": [op.Serialize() for op in self.ops],
225 "run_op_index": self.run_op_index,
226 "start_timestamp": self.start_timestamp,
227 "end_timestamp": self.end_timestamp,
228 "received_timestamp": self.received_timestamp,
231 def CalcStatus(self):
232 """Compute the status of this job.
234 This function iterates over all the _QueuedOpCodes in the job and
235 based on their status, computes the job status.
238 - if we find a cancelled, or finished with error, the job
239 status will be the same
240 - otherwise, the last opcode with the status one of:
245 will determine the job status
247 - otherwise, it means either all opcodes are queued, or success,
248 and the job status will be the same
250 @return: the job status
253 status = constants.JOB_STATUS_QUEUED
257 if op.status == constants.OP_STATUS_SUCCESS:
262 if op.status == constants.OP_STATUS_QUEUED:
264 elif op.status == constants.OP_STATUS_WAITLOCK:
265 status = constants.JOB_STATUS_WAITLOCK
266 elif op.status == constants.OP_STATUS_RUNNING:
267 status = constants.JOB_STATUS_RUNNING
268 elif op.status == constants.OP_STATUS_CANCELING:
269 status = constants.JOB_STATUS_CANCELING
271 elif op.status == constants.OP_STATUS_ERROR:
272 status = constants.JOB_STATUS_ERROR
273 # The whole job fails if one opcode failed
275 elif op.status == constants.OP_STATUS_CANCELED:
276 status = constants.OP_STATUS_CANCELED
280 status = constants.JOB_STATUS_SUCCESS
284 def GetLogEntries(self, newer_than):
285 """Selectively returns the log entries.
287 @type newer_than: None or int
288 @param newer_than: if this is None, return all log enties,
289 otherwise return only the log entries with serial higher
292 @return: the list of the log entries selected
295 if newer_than is None:
302 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
307 class _JobQueueWorker(workerpool.BaseWorker):
308 """The actual job workers.
311 def _NotifyStart(self):
312 """Mark the opcode as running, not lock-waiting.
314 This is called from the mcpu code as a notifier function, when the
315 LU is finally about to start the Exec() method. Of course, to have
316 end-user visible results, the opcode must be initially (before
317 calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
320 assert self.queue, "Queue attribute is missing"
321 assert self.opcode, "Opcode attribute is missing"
325 assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
326 constants.OP_STATUS_CANCELING)
328 # Cancel here if we were asked to
329 if self.opcode.status == constants.OP_STATUS_CANCELING:
332 self.opcode.status = constants.OP_STATUS_RUNNING
336 def RunTask(self, job):
339 This functions processes a job. It is closely tied to the _QueuedJob and
340 _QueuedOpCode classes.
342 @type job: L{_QueuedJob}
343 @param job: the job to be processed
346 logging.debug("Worker %s processing job %s",
347 self.worker_id, job.id)
348 proc = mcpu.Processor(self.pool.queue.context)
349 self.queue = queue = job.queue
353 for idx, op in enumerate(job.ops):
355 logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
359 assert op.status == constants.OP_STATUS_QUEUED
360 job.run_op_index = idx
361 op.status = constants.OP_STATUS_WAITLOCK
363 op.start_timestamp = TimeStampNow()
364 if idx == 0: # first opcode
365 job.start_timestamp = op.start_timestamp
366 queue.UpdateJobUnlocked(job)
368 input_opcode = op.input
373 """Append a log entry.
379 log_type = constants.ELOG_MESSAGE
382 log_type, log_msg = args
384 # The time is split to make serialization easier and not lose
386 timestamp = utils.SplitTime(time.time())
391 op.log.append((job.log_serial, timestamp, log_type, log_msg))
393 job.change.notifyAll()
397 # Make sure not to hold lock while _Log is called
399 result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
403 op.status = constants.OP_STATUS_SUCCESS
405 op.end_timestamp = TimeStampNow()
406 queue.UpdateJobUnlocked(job)
410 logging.debug("Op %s/%s: Successfully finished %s",
413 # Will be handled further up
415 except Exception, err:
419 op.status = constants.OP_STATUS_ERROR
421 op.end_timestamp = TimeStampNow()
422 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
424 queue.UpdateJobUnlocked(job)
432 queue.CancelJobUnlocked(job)
435 except errors.GenericError, err:
436 logging.exception("Ganeti exception")
438 logging.exception("Unhandled exception")
444 job.end_timestamp = TimeStampNow()
445 queue.UpdateJobUnlocked(job)
448 status = job.CalcStatus()
451 logging.debug("Worker %s finished job %s, status = %s",
452 self.worker_id, job_id, status)
455 class _JobQueueWorkerPool(workerpool.WorkerPool):
456 """Simple class implementing a job-processing workerpool.
459 def __init__(self, queue):
460 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
465 class JobQueue(object):
466 """Quue used to manaage the jobs.
468 @cvar _RE_JOB_FILE: regex matching the valid job file names
471 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
473 def _RequireOpenQueue(fn):
474 """Decorator for "public" functions.
476 This function should be used for all 'public' functions. That is,
477 functions usually called from other classes.
479 @warning: Use this decorator only after utils.LockedMethod!
488 def wrapper(self, *args, **kwargs):
489 assert self._queue_lock is not None, "Queue should be open"
490 return fn(self, *args, **kwargs)
493 def __init__(self, context):
494 """Constructor for JobQueue.
496 The constructor will initialize the job queue object and then
497 start loading the current jobs from disk, either for starting them
498 (if they were queue) or for aborting them (if they were already
501 @type context: GanetiContext
502 @param context: the context object for access to the configuration
503 data and other ganeti objects
506 self.context = context
507 self._memcache = weakref.WeakValueDictionary()
508 self._my_hostname = utils.HostInfo().name
511 self._lock = threading.Lock()
512 self.acquire = self._lock.acquire
513 self.release = self._lock.release
516 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
519 self._last_serial = jstore.ReadSerial()
520 assert self._last_serial is not None, ("Serial file was modified between"
521 " check in jstore and here")
523 # Get initial list of nodes
524 self._nodes = dict((n.name, n.primary_ip)
525 for n in self.context.cfg.GetAllNodesInfo().values()
526 if n.master_candidate)
530 del self._nodes[self._my_hostname]
534 # TODO: Check consistency across nodes
537 self._wpool = _JobQueueWorkerPool(self)
539 # We need to lock here because WorkerPool.AddTask() may start a job while
540 # we're still doing our work.
543 logging.info("Inspecting job queue")
545 all_job_ids = self._GetJobIDsUnlocked()
546 jobs_count = len(all_job_ids)
547 lastinfo = time.time()
548 for idx, job_id in enumerate(all_job_ids):
549 # Give an update every 1000 jobs or 10 seconds
550 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
551 idx == (jobs_count - 1)):
552 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
553 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
554 lastinfo = time.time()
556 job = self._LoadJobUnlocked(job_id)
558 # a failure in loading the job can cause 'None' to be returned
562 status = job.CalcStatus()
564 if status in (constants.JOB_STATUS_QUEUED, ):
565 self._wpool.AddTask(job)
567 elif status in (constants.JOB_STATUS_RUNNING,
568 constants.JOB_STATUS_WAITLOCK,
569 constants.JOB_STATUS_CANCELING):
570 logging.warning("Unfinished job %s found: %s", job.id, job)
573 op.status = constants.OP_STATUS_ERROR
574 op.result = "Unclean master daemon shutdown"
576 self.UpdateJobUnlocked(job)
578 logging.info("Job queue inspection finished")
582 self._wpool.TerminateWorkers()
587 def AddNode(self, node):
588 """Register a new node with the queue.
590 @type node: L{objects.Node}
591 @param node: the node object to be added
594 node_name = node.name
595 assert node_name != self._my_hostname
597 # Clean queue directory on added node
598 rpc.RpcRunner.call_jobqueue_purge(node_name)
600 if not node.master_candidate:
601 # remove if existing, ignoring errors
602 self._nodes.pop(node_name, None)
603 # and skip the replication of the job ids
606 # Upload the whole queue excluding archived jobs
607 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
609 # Upload current serial file
610 files.append(constants.JOB_QUEUE_SERIAL_FILE)
612 for file_name in files:
614 fd = open(file_name, "r")
620 result = rpc.RpcRunner.call_jobqueue_update([node_name],
623 if not result[node_name]:
624 logging.error("Failed to upload %s to %s", file_name, node_name)
626 self._nodes[node_name] = node.primary_ip
630 def RemoveNode(self, node_name):
631 """Callback called when removing nodes from the cluster.
634 @param node_name: the name of the node to remove
638 # The queue is removed by the "leave node" RPC call.
639 del self._nodes[node_name]
643 def _CheckRpcResult(self, result, nodes, failmsg):
644 """Verifies the status of an RPC call.
646 Since we aim to keep consistency should this node (the current
647 master) fail, we will log errors if our rpc fail, and especially
648 log the case when more than half of the nodes failes.
650 @param result: the data as returned from the rpc call
652 @param nodes: the list of nodes we made the call to
654 @param failmsg: the identifier to be used for logging
667 logging.error("%s failed on %s", failmsg, ", ".join(failed))
669 # +1 for the master node
670 if (len(success) + 1) < len(failed):
671 # TODO: Handle failing nodes
672 logging.error("More than half of the nodes failed")
674 def _GetNodeIp(self):
675 """Helper for returning the node name/ip list.
678 @return: a tuple of two lists, the first one with the node
679 names and the second one with the node addresses
682 name_list = self._nodes.keys()
683 addr_list = [self._nodes[name] for name in name_list]
684 return name_list, addr_list
686 def _WriteAndReplicateFileUnlocked(self, file_name, data):
687 """Writes a file locally and then replicates it to all nodes.
689 This function will replace the contents of a file on the local
690 node and then replicate it to all the other nodes we have.
693 @param file_name: the path of the file to be replicated
695 @param data: the new contents of the file
698 utils.WriteFile(file_name, data=data)
700 names, addrs = self._GetNodeIp()
701 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
702 self._CheckRpcResult(result, self._nodes,
703 "Updating %s" % file_name)
705 def _RenameFileUnlocked(self, old, new):
706 """Renames a file locally and then replicate the change.
708 This function will rename a file in the local queue directory
709 and then replicate this rename to all the other nodes we have.
712 @param old: the current name of the file
714 @param new: the new name of the file
719 names, addrs = self._GetNodeIp()
720 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
721 self._CheckRpcResult(result, self._nodes,
722 "Moving %s to %s" % (old, new))
724 def _FormatJobID(self, job_id):
725 """Convert a job ID to string format.
727 Currently this just does C{str(job_id)} after performing some
728 checks, but if we want to change the job id format this will
729 abstract this change.
731 @type job_id: int or long
732 @param job_id: the numeric job id
734 @return: the formatted job id
737 if not isinstance(job_id, (int, long)):
738 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
740 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
744 def _NewSerialUnlocked(self):
745 """Generates a new job identifier.
747 Job identifiers are unique during the lifetime of a cluster.
750 @return: a string representing the job identifier.
754 serial = self._last_serial + 1
757 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
760 # Keep it only if we were able to write the file
761 self._last_serial = serial
763 return self._FormatJobID(serial)
766 def _GetJobPath(job_id):
767 """Returns the job file for a given job id.
770 @param job_id: the job identifier
772 @return: the path to the job file
775 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
778 def _GetArchivedJobPath(job_id):
779 """Returns the archived job file for a give job id.
782 @param job_id: the job identifier
784 @return: the path to the archived job file
787 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
790 def _ExtractJobID(cls, name):
791 """Extract the job id from a filename.
794 @param name: the job filename
795 @rtype: job id or None
796 @return: the job id corresponding to the given filename,
797 or None if the filename does not represent a valid
801 m = cls._RE_JOB_FILE.match(name)
807 def _GetJobIDsUnlocked(self, archived=False):
808 """Return all known job IDs.
810 If the parameter archived is True, archived jobs IDs will be
811 included. Currently this argument is unused.
813 The method only looks at disk because it's a requirement that all
814 jobs are present on disk (so in the _memcache we don't have any
818 @return: the list of job IDs
821 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
822 jlist = utils.NiceSort(jlist)
825 def _ListJobFiles(self):
826 """Returns the list of current job files.
829 @return: the list of job file names
832 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
833 if self._RE_JOB_FILE.match(name)]
835 def _LoadJobUnlocked(self, job_id):
836 """Loads a job from the disk or memory.
838 Given a job id, this will return the cached job object if
839 existing, or try to load the job from the disk. If loading from
840 disk, it will also add the job to the cache.
842 @param job_id: the job id
843 @rtype: L{_QueuedJob} or None
844 @return: either None or the job object
847 job = self._memcache.get(job_id, None)
849 logging.debug("Found job %s in memcache", job_id)
852 filepath = self._GetJobPath(job_id)
853 logging.debug("Loading job from %s", filepath)
855 fd = open(filepath, "r")
857 if err.errno in (errno.ENOENT, ):
861 data = serializer.LoadJson(fd.read())
866 job = _QueuedJob.Restore(self, data)
867 except Exception, err:
868 new_path = self._GetArchivedJobPath(job_id)
869 if filepath == new_path:
870 # job already archived (future case)
871 logging.exception("Can't parse job %s", job_id)
874 logging.exception("Can't parse job %s, will archive.", job_id)
875 self._RenameFileUnlocked(filepath, new_path)
878 self._memcache[job_id] = job
879 logging.debug("Added job %s to the cache", job_id)
882 def _GetJobsUnlocked(self, job_ids):
883 """Return a list of jobs based on their IDs.
886 @param job_ids: either an empty list (meaning all jobs),
889 @return: the list of job objects
893 job_ids = self._GetJobIDsUnlocked()
895 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
898 def _IsQueueMarkedDrain():
899 """Check if the queue is marked from drain.
901 This currently uses the queue drain file, which makes it a
902 per-node flag. In the future this can be moved to the config file.
905 @return: True of the job queue is marked for draining
908 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
911 def SetDrainFlag(drain_flag):
912 """Sets the drain flag for the queue.
914 This is similar to the function L{backend.JobQueueSetDrainFlag},
915 and in the future we might merge them.
917 @type drain_flag: boolean
918 @param drain_flag: wheter to set or unset the drain flag
922 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
924 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
929 def SubmitJob(self, ops):
930 """Create and store a new job.
932 This enters the job into our job queue and also puts it on the new
933 queue, in order for it to be picked up by the queue processors.
936 @param ops: The list of OpCodes that will become the new job.
938 @return: the job ID of the newly created job
939 @raise errors.JobQueueDrainError: if the job is marked for draining
942 if self._IsQueueMarkedDrain():
943 raise errors.JobQueueDrainError()
945 job_id = self._NewSerialUnlocked()
946 job = _QueuedJob(self, job_id, ops)
949 self.UpdateJobUnlocked(job)
951 logging.debug("Adding new job %s to the cache", job_id)
952 self._memcache[job_id] = job
955 self._wpool.AddTask(job)
960 def UpdateJobUnlocked(self, job):
961 """Update a job's on disk storage.
963 After a job has been modified, this function needs to be called in
964 order to write the changes to disk and replicate them to the other
967 @type job: L{_QueuedJob}
968 @param job: the changed job
971 filename = self._GetJobPath(job.id)
972 data = serializer.DumpJson(job.Serialize(), indent=False)
973 logging.debug("Writing job %s to %s", job.id, filename)
974 self._WriteAndReplicateFileUnlocked(filename, data)
976 # Notify waiters about potential changes
977 job.change.notifyAll()
981 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
983 """Waits for changes in a job.
986 @param job_id: Job identifier
987 @type fields: list of strings
988 @param fields: Which fields to check for changes
989 @type prev_job_info: list or None
990 @param prev_job_info: Last job information returned
991 @type prev_log_serial: int
992 @param prev_log_serial: Last job message serial number
994 @param timeout: maximum time to wait
995 @rtype: tuple (job info, log entries)
996 @return: a tuple of the job information as required via
997 the fields parameter, and the log entries as a list
999 if the job has not changed and the timeout has expired,
1000 we instead return a special value,
1001 L{constants.JOB_NOTCHANGED}, which should be interpreted
1002 as such by the clients
1005 logging.debug("Waiting for changes in job %s", job_id)
1006 end_time = time.time() + timeout
1008 delta_time = end_time - time.time()
1010 return constants.JOB_NOTCHANGED
1012 job = self._LoadJobUnlocked(job_id)
1014 logging.debug("Job %s not found", job_id)
1017 status = job.CalcStatus()
1018 job_info = self._GetJobInfoUnlocked(job, fields)
1019 log_entries = job.GetLogEntries(prev_log_serial)
1021 # Serializing and deserializing data can cause type changes (e.g. from
1022 # tuple to list) or precision loss. We're doing it here so that we get
1023 # the same modifications as the data received from the client. Without
1024 # this, the comparison afterwards might fail without the data being
1025 # significantly different.
1026 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1027 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1029 if status not in (constants.JOB_STATUS_QUEUED,
1030 constants.JOB_STATUS_RUNNING,
1031 constants.JOB_STATUS_WAITLOCK):
1032 # Don't even try to wait if the job is no longer running, there will be
1036 if (prev_job_info != job_info or
1037 (log_entries and prev_log_serial != log_entries[0][0])):
1040 logging.debug("Waiting again")
1042 # Release the queue lock while waiting
1043 job.change.wait(delta_time)
1045 logging.debug("Job %s changed", job_id)
1047 return (job_info, log_entries)
1051 def CancelJob(self, job_id):
1054 This will only succeed if the job has not started yet.
1056 @type job_id: string
1057 @param job_id: job ID of job to be cancelled.
1060 logging.info("Cancelling job %s", job_id)
1062 job = self._LoadJobUnlocked(job_id)
1064 logging.debug("Job %s not found", job_id)
1065 return (False, "Job %s not found" % job_id)
1067 job_status = job.CalcStatus()
1069 if job_status not in (constants.JOB_STATUS_QUEUED,
1070 constants.JOB_STATUS_WAITLOCK):
1071 logging.debug("Job %s is no longer in the queue", job.id)
1072 return (False, "Job %s is no longer in the queue" % job.id)
1074 if job_status == constants.JOB_STATUS_QUEUED:
1075 self.CancelJobUnlocked(job)
1076 return (True, "Job %s canceled" % job.id)
1078 elif job_status == constants.JOB_STATUS_WAITLOCK:
1079 # The worker will notice the new status and cancel the job
1082 op.status = constants.OP_STATUS_CANCELING
1084 self.UpdateJobUnlocked(job)
1085 return (True, "Job %s will be canceled" % job.id)
1088 def CancelJobUnlocked(self, job):
1089 """Marks a job as canceled.
1094 op.status = constants.OP_STATUS_ERROR
1095 op.result = "Job canceled by request"
1097 self.UpdateJobUnlocked(job)
1100 def _ArchiveJobUnlocked(self, job_id):
1103 @type job_id: string
1104 @param job_id: Job ID of job to be archived.
1107 logging.info("Archiving job %s", job_id)
1109 job = self._LoadJobUnlocked(job_id)
1111 logging.debug("Job %s not found", job_id)
1114 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1115 constants.JOB_STATUS_SUCCESS,
1116 constants.JOB_STATUS_ERROR):
1117 logging.debug("Job %s is not yet done", job.id)
1120 old = self._GetJobPath(job.id)
1121 new = self._GetArchivedJobPath(job.id)
1123 self._RenameFileUnlocked(old, new)
1125 logging.debug("Successfully archived job %s", job.id)
1129 def ArchiveJob(self, job_id):
1132 This is just a wrapper over L{_ArchiveJobUnlocked}.
1134 @type job_id: string
1135 @param job_id: Job ID of job to be archived.
1138 return self._ArchiveJobUnlocked(job_id)
1142 def AutoArchiveJobs(self, age):
1143 """Archives all jobs based on age.
1145 The method will archive all jobs which are older than the age
1146 parameter. For jobs that don't have an end timestamp, the start
1147 timestamp will be considered. The special '-1' age will cause
1148 archival of all jobs (that are not running or queued).
1151 @param age: the minimum age in seconds
1154 logging.info("Archiving jobs with age more than %s seconds", age)
1157 for jid in self._GetJobIDsUnlocked(archived=False):
1158 job = self._LoadJobUnlocked(jid)
1159 if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1160 constants.OP_STATUS_ERROR,
1161 constants.OP_STATUS_CANCELED):
1163 if job.end_timestamp is None:
1164 if job.start_timestamp is None:
1165 job_age = job.received_timestamp
1167 job_age = job.start_timestamp
1169 job_age = job.end_timestamp
1171 if age == -1 or now - job_age[0] > age:
1172 self._ArchiveJobUnlocked(jid)
1174 def _GetJobInfoUnlocked(self, job, fields):
1175 """Returns information about a job.
1177 @type job: L{_QueuedJob}
1178 @param job: the job which we query
1180 @param fields: names of fields to return
1182 @return: list with one element for each field
1183 @raise errors.OpExecError: when an invalid field
1188 for fname in fields:
1191 elif fname == "status":
1192 row.append(job.CalcStatus())
1193 elif fname == "ops":
1194 row.append([op.input.__getstate__() for op in job.ops])
1195 elif fname == "opresult":
1196 row.append([op.result for op in job.ops])
1197 elif fname == "opstatus":
1198 row.append([op.status for op in job.ops])
1199 elif fname == "oplog":
1200 row.append([op.log for op in job.ops])
1201 elif fname == "opstart":
1202 row.append([op.start_timestamp for op in job.ops])
1203 elif fname == "opend":
1204 row.append([op.end_timestamp for op in job.ops])
1205 elif fname == "received_ts":
1206 row.append(job.received_timestamp)
1207 elif fname == "start_ts":
1208 row.append(job.start_timestamp)
1209 elif fname == "end_ts":
1210 row.append(job.end_timestamp)
1211 elif fname == "summary":
1212 row.append([op.input.Summary() for op in job.ops])
1214 raise errors.OpExecError("Invalid job query field '%s'" % fname)
1219 def QueryJobs(self, job_ids, fields):
1220 """Returns a list of jobs in queue.
1222 This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1223 processing for each job.
1226 @param job_ids: sequence of job identifiers or None for all
1228 @param fields: names of fields to return
1230 @return: list one element per job, each element being list with
1231 the requested fields
1236 for job in self._GetJobsUnlocked(job_ids):
1240 jobs.append(self._GetJobInfoUnlocked(job, fields))
1247 """Stops the job queue.
1249 This shutdowns all the worker threads an closes the queue.
1252 self._wpool.TerminateWorkers()
1254 self._queue_lock.Close()
1255 self._queue_lock = None