4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
54 """Returns the current timestamp.
57 @return: the current time in the (seconds, microseconds) format
60 return utils.SplitTime(time.time())
63 class _QueuedOpCode(object):
64 """Encasulates an opcode object.
66 @ivar log: holds the execution log and consists of tuples
67 of the form C{(log_serial, timestamp, level, message)}
68 @ivar input: the OpCode we encapsulate
69 @ivar status: the current status
70 @ivar result: the result of the LU execution
71 @ivar start_timestamp: timestamp for the start of the execution
72 @ivar stop_timestamp: timestamp for the end of the execution
75 def __init__(self, op):
76 """Constructor for the _QuededOpCode.
78 @type op: L{opcodes.OpCode}
79 @param op: the opcode we encapsulate
83 self.status = constants.OP_STATUS_QUEUED
86 self.start_timestamp = None
87 self.end_timestamp = None
90 def Restore(cls, state):
91 """Restore the _QueuedOpCode from the serialized form.
94 @param state: the serialized state
96 @return: a new _QueuedOpCode instance
99 obj = _QueuedOpCode.__new__(cls)
100 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
101 obj.status = state["status"]
102 obj.result = state["result"]
103 obj.log = state["log"]
104 obj.start_timestamp = state.get("start_timestamp", None)
105 obj.end_timestamp = state.get("end_timestamp", None)
109 """Serializes this _QueuedOpCode.
112 @return: the dictionary holding the serialized state
116 "input": self.input.__getstate__(),
117 "status": self.status,
118 "result": self.result,
120 "start_timestamp": self.start_timestamp,
121 "end_timestamp": self.end_timestamp,
125 class _QueuedJob(object):
126 """In-memory job representation.
128 This is what we use to track the user-submitted jobs. Locking must
129 be taken care of by users of this class.
131 @type queue: L{JobQueue}
132 @ivar queue: the parent queue
135 @ivar ops: the list of _QueuedOpCode that constitute the job
136 @type run_op_index: int
137 @ivar run_op_index: the currently executing opcode, or -1 if
138 we didn't yet start executing
139 @type log_serial: int
140 @ivar log_serial: holds the index for the next log entry
141 @ivar received_timestamp: the timestamp for when the job was received
142 @ivar start_timestmap: the timestamp for start of execution
143 @ivar end_timestamp: the timestamp for end of execution
144 @ivar change: a Condition variable we use for waiting for job changes
147 def __init__(self, queue, job_id, ops):
148 """Constructor for the _QueuedJob.
150 @type queue: L{JobQueue}
151 @param queue: our parent queue
153 @param job_id: our job id
155 @param ops: the list of opcodes we hold, which will be encapsulated
160 # TODO: use a better exception
161 raise Exception("No opcodes")
165 self.ops = [_QueuedOpCode(op) for op in ops]
166 self.run_op_index = -1
168 self.received_timestamp = TimeStampNow()
169 self.start_timestamp = None
170 self.end_timestamp = None
172 # Condition to wait for changes
173 self.change = threading.Condition(self.queue._lock)
176 def Restore(cls, queue, state):
177 """Restore a _QueuedJob from serialized state:
179 @type queue: L{JobQueue}
180 @param queue: to which queue the restored job belongs
182 @param state: the serialized state
184 @return: the restored _JobQueue instance
187 obj = _QueuedJob.__new__(cls)
190 obj.run_op_index = state["run_op_index"]
191 obj.received_timestamp = state.get("received_timestamp", None)
192 obj.start_timestamp = state.get("start_timestamp", None)
193 obj.end_timestamp = state.get("end_timestamp", None)
197 for op_state in state["ops"]:
198 op = _QueuedOpCode.Restore(op_state)
199 for log_entry in op.log:
200 obj.log_serial = max(obj.log_serial, log_entry[0])
203 # Condition to wait for changes
204 obj.change = threading.Condition(obj.queue._lock)
209 """Serialize the _JobQueue instance.
212 @return: the serialized state
217 "ops": [op.Serialize() for op in self.ops],
218 "run_op_index": self.run_op_index,
219 "start_timestamp": self.start_timestamp,
220 "end_timestamp": self.end_timestamp,
221 "received_timestamp": self.received_timestamp,
224 def CalcStatus(self):
225 """Compute the status of this job.
227 This function iterates over all the _QueuedOpCodes in the job and
228 based on their status, computes the job status.
231 - if we find a cancelled, or finished with error, the job
232 status will be the same
233 - otherwise, the last opcode with the status one of:
237 will determine the job status
239 - otherwise, it means either all opcodes are queued, or success,
240 and the job status will be the same
242 @return: the job status
245 status = constants.JOB_STATUS_QUEUED
249 if op.status == constants.OP_STATUS_SUCCESS:
254 if op.status == constants.OP_STATUS_QUEUED:
256 elif op.status == constants.OP_STATUS_WAITLOCK:
257 status = constants.JOB_STATUS_WAITLOCK
258 elif op.status == constants.OP_STATUS_RUNNING:
259 status = constants.JOB_STATUS_RUNNING
260 elif op.status == constants.OP_STATUS_ERROR:
261 status = constants.JOB_STATUS_ERROR
262 # The whole job fails if one opcode failed
264 elif op.status == constants.OP_STATUS_CANCELED:
265 status = constants.OP_STATUS_CANCELED
269 status = constants.JOB_STATUS_SUCCESS
273 def GetLogEntries(self, newer_than):
274 """Selectively returns the log entries.
276 @type newer_than: None or int
277 @param newer_than: if this is None, return all log enties,
278 otherwise return only the log entries with serial higher
281 @return: the list of the log entries selected
284 if newer_than is None:
291 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
296 class _JobQueueWorker(workerpool.BaseWorker):
297 """The actual job workers.
300 def _NotifyStart(self):
301 """Mark the opcode as running, not lock-waiting.
303 This is called from the mcpu code as a notifier function, when the
304 LU is finally about to start the Exec() method. Of course, to have
305 end-user visible results, the opcode must be initially (before
306 calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
309 assert self.queue, "Queue attribute is missing"
310 assert self.opcode, "Opcode attribute is missing"
314 self.opcode.status = constants.OP_STATUS_RUNNING
318 def RunTask(self, job):
321 This functions processes a job. It is closely tied to the _QueuedJob and
322 _QueuedOpCode classes.
324 @type job: L{_QueuedJob}
325 @param job: the job to be processed
328 logging.debug("Worker %s processing job %s",
329 self.worker_id, job.id)
330 proc = mcpu.Processor(self.pool.queue.context)
331 self.queue = queue = job.queue
335 for idx, op in enumerate(job.ops):
337 logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
341 job.run_op_index = idx
342 op.status = constants.OP_STATUS_WAITLOCK
344 op.start_timestamp = TimeStampNow()
345 if idx == 0: # first opcode
346 job.start_timestamp = op.start_timestamp
347 queue.UpdateJobUnlocked(job)
349 input_opcode = op.input
354 """Append a log entry.
360 log_type = constants.ELOG_MESSAGE
363 log_type, log_msg = args
365 # The time is split to make serialization easier and not lose
367 timestamp = utils.SplitTime(time.time())
372 op.log.append((job.log_serial, timestamp, log_type, log_msg))
374 job.change.notifyAll()
378 # Make sure not to hold lock while _Log is called
380 result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
384 op.status = constants.OP_STATUS_SUCCESS
386 op.end_timestamp = TimeStampNow()
387 queue.UpdateJobUnlocked(job)
391 logging.debug("Op %s/%s: Successfully finished %s",
393 except Exception, err:
397 op.status = constants.OP_STATUS_ERROR
399 op.end_timestamp = TimeStampNow()
400 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
402 queue.UpdateJobUnlocked(job)
407 except errors.GenericError, err:
408 logging.exception("Ganeti exception")
410 logging.exception("Unhandled exception")
416 job.end_timestamp = TimeStampNow()
417 queue.UpdateJobUnlocked(job)
420 status = job.CalcStatus()
423 logging.debug("Worker %s finished job %s, status = %s",
424 self.worker_id, job_id, status)
427 class _JobQueueWorkerPool(workerpool.WorkerPool):
428 """Simple class implementing a job-processing workerpool.
431 def __init__(self, queue):
432 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
437 class JobQueue(object):
438 """Quue used to manaage the jobs.
440 @cvar _RE_JOB_FILE: regex matching the valid job file names
443 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
445 def _RequireOpenQueue(fn):
446 """Decorator for "public" functions.
448 This function should be used for all 'public' functions. That is,
449 functions usually called from other classes.
451 @warning: Use this decorator only after utils.LockedMethod!
460 def wrapper(self, *args, **kwargs):
461 assert self._queue_lock is not None, "Queue should be open"
462 return fn(self, *args, **kwargs)
465 def __init__(self, context):
466 """Constructor for JobQueue.
468 The constructor will initialize the job queue object and then
469 start loading the current jobs from disk, either for starting them
470 (if they were queue) or for aborting them (if they were already
473 @type context: GanetiContext
474 @param context: the context object for access to the configuration
475 data and other ganeti objects
478 self.context = context
479 self._memcache = weakref.WeakValueDictionary()
480 self._my_hostname = utils.HostInfo().name
483 self._lock = threading.Lock()
484 self.acquire = self._lock.acquire
485 self.release = self._lock.release
488 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
491 self._last_serial = jstore.ReadSerial()
492 assert self._last_serial is not None, ("Serial file was modified between"
493 " check in jstore and here")
495 # Get initial list of nodes
496 self._nodes = dict((n.name, n.primary_ip)
497 for n in self.context.cfg.GetAllNodesInfo().values())
501 del self._nodes[self._my_hostname]
505 # TODO: Check consistency across nodes
508 self._wpool = _JobQueueWorkerPool(self)
510 # We need to lock here because WorkerPool.AddTask() may start a job while
511 # we're still doing our work.
514 logging.info("Inspecting job queue")
516 all_job_ids = self._GetJobIDsUnlocked()
517 lastinfo = time.time()
518 for idx, job_id in enumerate(all_job_ids):
519 # Give an update every 1000 jobs or 10 seconds
520 if idx % 1000 == 0 or time.time() >= (lastinfo + 10.0):
521 jobs_count = len(all_job_ids)
522 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
523 idx, jobs_count, 100.0 * (idx + 1) / jobs_count)
524 lastinfo = time.time()
526 job = self._LoadJobUnlocked(job_id)
528 # a failure in loading the job can cause 'None' to be returned
532 status = job.CalcStatus()
534 if status in (constants.JOB_STATUS_QUEUED, ):
535 self._wpool.AddTask(job)
537 elif status in (constants.JOB_STATUS_RUNNING,
538 constants.JOB_STATUS_WAITLOCK):
539 logging.warning("Unfinished job %s found: %s", job.id, job)
542 op.status = constants.OP_STATUS_ERROR
543 op.result = "Unclean master daemon shutdown"
545 self.UpdateJobUnlocked(job)
547 logging.info("Job queue inspection finished")
551 self._wpool.TerminateWorkers()
556 def AddNode(self, node):
557 """Register a new node with the queue.
559 @type node: L{objects.Node}
560 @param node: the node object to be added
563 node_name = node.name
564 assert node_name != self._my_hostname
566 # Clean queue directory on added node
567 rpc.RpcRunner.call_jobqueue_purge(node_name)
569 # Upload the whole queue excluding archived jobs
570 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
572 # Upload current serial file
573 files.append(constants.JOB_QUEUE_SERIAL_FILE)
575 for file_name in files:
577 fd = open(file_name, "r")
583 result = rpc.RpcRunner.call_jobqueue_update([node_name],
586 if not result[node_name]:
587 logging.error("Failed to upload %s to %s", file_name, node_name)
589 self._nodes[node_name] = node.primary_ip
593 def RemoveNode(self, node_name):
594 """Callback called when removing nodes from the cluster.
597 @param node_name: the name of the node to remove
601 # The queue is removed by the "leave node" RPC call.
602 del self._nodes[node_name]
606 def _CheckRpcResult(self, result, nodes, failmsg):
607 """Verifies the status of an RPC call.
609 Since we aim to keep consistency should this node (the current
610 master) fail, we will log errors if our rpc fail, and especially
611 log the case when more than half of the nodes failes.
613 @param result: the data as returned from the rpc call
615 @param nodes: the list of nodes we made the call to
617 @param failmsg: the identifier to be used for logging
630 logging.error("%s failed on %s", failmsg, ", ".join(failed))
632 # +1 for the master node
633 if (len(success) + 1) < len(failed):
634 # TODO: Handle failing nodes
635 logging.error("More than half of the nodes failed")
637 def _GetNodeIp(self):
638 """Helper for returning the node name/ip list.
641 @return: a tuple of two lists, the first one with the node
642 names and the second one with the node addresses
645 name_list = self._nodes.keys()
646 addr_list = [self._nodes[name] for name in name_list]
647 return name_list, addr_list
649 def _WriteAndReplicateFileUnlocked(self, file_name, data):
650 """Writes a file locally and then replicates it to all nodes.
652 This function will replace the contents of a file on the local
653 node and then replicate it to all the other nodes we have.
656 @param file_name: the path of the file to be replicated
658 @param data: the new contents of the file
661 utils.WriteFile(file_name, data=data)
663 names, addrs = self._GetNodeIp()
664 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
665 self._CheckRpcResult(result, self._nodes,
666 "Updating %s" % file_name)
668 def _RenameFileUnlocked(self, old, new):
669 """Renames a file locally and then replicate the change.
671 This function will rename a file in the local queue directory
672 and then replicate this rename to all the other nodes we have.
675 @param old: the current name of the file
677 @param new: the new name of the file
682 names, addrs = self._GetNodeIp()
683 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
684 self._CheckRpcResult(result, self._nodes,
685 "Moving %s to %s" % (old, new))
687 def _FormatJobID(self, job_id):
688 """Convert a job ID to string format.
690 Currently this just does C{str(job_id)} after performing some
691 checks, but if we want to change the job id format this will
692 abstract this change.
694 @type job_id: int or long
695 @param job_id: the numeric job id
697 @return: the formatted job id
700 if not isinstance(job_id, (int, long)):
701 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
703 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
707 def _NewSerialUnlocked(self):
708 """Generates a new job identifier.
710 Job identifiers are unique during the lifetime of a cluster.
713 @return: a string representing the job identifier.
717 serial = self._last_serial + 1
720 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
723 # Keep it only if we were able to write the file
724 self._last_serial = serial
726 return self._FormatJobID(serial)
729 def _GetJobPath(job_id):
730 """Returns the job file for a given job id.
733 @param job_id: the job identifier
735 @return: the path to the job file
738 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
741 def _GetArchivedJobPath(job_id):
742 """Returns the archived job file for a give job id.
745 @param job_id: the job identifier
747 @return: the path to the archived job file
750 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
753 def _ExtractJobID(cls, name):
754 """Extract the job id from a filename.
757 @param name: the job filename
758 @rtype: job id or None
759 @return: the job id corresponding to the given filename,
760 or None if the filename does not represent a valid
764 m = cls._RE_JOB_FILE.match(name)
770 def _GetJobIDsUnlocked(self, archived=False):
771 """Return all known job IDs.
773 If the parameter archived is True, archived jobs IDs will be
774 included. Currently this argument is unused.
776 The method only looks at disk because it's a requirement that all
777 jobs are present on disk (so in the _memcache we don't have any
781 @return: the list of job IDs
784 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
785 jlist = utils.NiceSort(jlist)
788 def _ListJobFiles(self):
789 """Returns the list of current job files.
792 @return: the list of job file names
795 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
796 if self._RE_JOB_FILE.match(name)]
798 def _LoadJobUnlocked(self, job_id):
799 """Loads a job from the disk or memory.
801 Given a job id, this will return the cached job object if
802 existing, or try to load the job from the disk. If loading from
803 disk, it will also add the job to the cache.
805 @param job_id: the job id
806 @rtype: L{_QueuedJob} or None
807 @return: either None or the job object
810 job = self._memcache.get(job_id, None)
812 logging.debug("Found job %s in memcache", job_id)
815 filepath = self._GetJobPath(job_id)
816 logging.debug("Loading job from %s", filepath)
818 fd = open(filepath, "r")
820 if err.errno in (errno.ENOENT, ):
824 data = serializer.LoadJson(fd.read())
829 job = _QueuedJob.Restore(self, data)
830 except Exception, err:
831 new_path = self._GetArchivedJobPath(job_id)
832 if filepath == new_path:
833 # job already archived (future case)
834 logging.exception("Can't parse job %s", job_id)
837 logging.exception("Can't parse job %s, will archive.", job_id)
838 self._RenameFileUnlocked(filepath, new_path)
841 self._memcache[job_id] = job
842 logging.debug("Added job %s to the cache", job_id)
845 def _GetJobsUnlocked(self, job_ids):
846 """Return a list of jobs based on their IDs.
849 @param job_ids: either an empty list (meaning all jobs),
852 @return: the list of job objects
856 job_ids = self._GetJobIDsUnlocked()
858 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
861 def _IsQueueMarkedDrain():
862 """Check if the queue is marked from drain.
864 This currently uses the queue drain file, which makes it a
865 per-node flag. In the future this can be moved to the config file.
868 @return: True of the job queue is marked for draining
871 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
874 def SetDrainFlag(drain_flag):
875 """Sets the drain flag for the queue.
877 This is similar to the function L{backend.JobQueueSetDrainFlag},
878 and in the future we might merge them.
880 @type drain_flag: boolean
881 @param drain_flag: wheter to set or unset the drain flag
885 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
887 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
892 def SubmitJob(self, ops):
893 """Create and store a new job.
895 This enters the job into our job queue and also puts it on the new
896 queue, in order for it to be picked up by the queue processors.
899 @param ops: The list of OpCodes that will become the new job.
901 @return: the job ID of the newly created job
902 @raise errors.JobQueueDrainError: if the job is marked for draining
905 if self._IsQueueMarkedDrain():
906 raise errors.JobQueueDrainError()
908 job_id = self._NewSerialUnlocked()
909 job = _QueuedJob(self, job_id, ops)
912 self.UpdateJobUnlocked(job)
914 logging.debug("Adding new job %s to the cache", job_id)
915 self._memcache[job_id] = job
918 self._wpool.AddTask(job)
923 def UpdateJobUnlocked(self, job):
924 """Update a job's on disk storage.
926 After a job has been modified, this function needs to be called in
927 order to write the changes to disk and replicate them to the other
930 @type job: L{_QueuedJob}
931 @param job: the changed job
934 filename = self._GetJobPath(job.id)
935 data = serializer.DumpJson(job.Serialize(), indent=False)
936 logging.debug("Writing job %s to %s", job.id, filename)
937 self._WriteAndReplicateFileUnlocked(filename, data)
939 # Notify waiters about potential changes
940 job.change.notifyAll()
944 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
946 """Waits for changes in a job.
949 @param job_id: Job identifier
950 @type fields: list of strings
951 @param fields: Which fields to check for changes
952 @type prev_job_info: list or None
953 @param prev_job_info: Last job information returned
954 @type prev_log_serial: int
955 @param prev_log_serial: Last job message serial number
957 @param timeout: maximum time to wait
958 @rtype: tuple (job info, log entries)
959 @return: a tuple of the job information as required via
960 the fields parameter, and the log entries as a list
962 if the job has not changed and the timeout has expired,
963 we instead return a special value,
964 L{constants.JOB_NOTCHANGED}, which should be interpreted
965 as such by the clients
968 logging.debug("Waiting for changes in job %s", job_id)
969 end_time = time.time() + timeout
971 delta_time = end_time - time.time()
973 return constants.JOB_NOTCHANGED
975 job = self._LoadJobUnlocked(job_id)
977 logging.debug("Job %s not found", job_id)
980 status = job.CalcStatus()
981 job_info = self._GetJobInfoUnlocked(job, fields)
982 log_entries = job.GetLogEntries(prev_log_serial)
984 # Serializing and deserializing data can cause type changes (e.g. from
985 # tuple to list) or precision loss. We're doing it here so that we get
986 # the same modifications as the data received from the client. Without
987 # this, the comparison afterwards might fail without the data being
988 # significantly different.
989 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
990 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
992 if status not in (constants.JOB_STATUS_QUEUED,
993 constants.JOB_STATUS_RUNNING,
994 constants.JOB_STATUS_WAITLOCK):
995 # Don't even try to wait if the job is no longer running, there will be
999 if (prev_job_info != job_info or
1000 (log_entries and prev_log_serial != log_entries[0][0])):
1003 logging.debug("Waiting again")
1005 # Release the queue lock while waiting
1006 job.change.wait(delta_time)
1008 logging.debug("Job %s changed", job_id)
1010 return (job_info, log_entries)
1014 def CancelJob(self, job_id):
1017 This will only succeed if the job has not started yet.
1019 @type job_id: string
1020 @param job_id: job ID of job to be cancelled.
1023 logging.debug("Cancelling job %s", job_id)
1025 job = self._LoadJobUnlocked(job_id)
1027 logging.debug("Job %s not found", job_id)
1030 if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
1031 logging.debug("Job %s is no longer in the queue", job.id)
1036 op.status = constants.OP_STATUS_ERROR
1037 op.result = "Job cancelled by request"
1039 self.UpdateJobUnlocked(job)
1042 def _ArchiveJobUnlocked(self, job_id):
1045 @type job_id: string
1046 @param job_id: Job ID of job to be archived.
1049 logging.info("Archiving job %s", job_id)
1051 job = self._LoadJobUnlocked(job_id)
1053 logging.debug("Job %s not found", job_id)
1056 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1057 constants.JOB_STATUS_SUCCESS,
1058 constants.JOB_STATUS_ERROR):
1059 logging.debug("Job %s is not yet done", job.id)
1062 old = self._GetJobPath(job.id)
1063 new = self._GetArchivedJobPath(job.id)
1065 self._RenameFileUnlocked(old, new)
1067 logging.debug("Successfully archived job %s", job.id)
1071 def ArchiveJob(self, job_id):
1074 This is just a wrapper over L{_ArchiveJobUnlocked}.
1076 @type job_id: string
1077 @param job_id: Job ID of job to be archived.
1080 return self._ArchiveJobUnlocked(job_id)
1084 def AutoArchiveJobs(self, age):
1085 """Archives all jobs based on age.
1087 The method will archive all jobs which are older than the age
1088 parameter. For jobs that don't have an end timestamp, the start
1089 timestamp will be considered. The special '-1' age will cause
1090 archival of all jobs (that are not running or queued).
1093 @param age: the minimum age in seconds
1096 logging.info("Archiving jobs with age more than %s seconds", age)
1099 for jid in self._GetJobIDsUnlocked(archived=False):
1100 job = self._LoadJobUnlocked(jid)
1101 if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1102 constants.OP_STATUS_ERROR,
1103 constants.OP_STATUS_CANCELED):
1105 if job.end_timestamp is None:
1106 if job.start_timestamp is None:
1107 job_age = job.received_timestamp
1109 job_age = job.start_timestamp
1111 job_age = job.end_timestamp
1113 if age == -1 or now - job_age[0] > age:
1114 self._ArchiveJobUnlocked(jid)
1116 def _GetJobInfoUnlocked(self, job, fields):
1117 """Returns information about a job.
1119 @type job: L{_QueuedJob}
1120 @param job: the job which we query
1122 @param fields: names of fields to return
1124 @return: list with one element for each field
1125 @raise errors.OpExecError: when an invalid field
1130 for fname in fields:
1133 elif fname == "status":
1134 row.append(job.CalcStatus())
1135 elif fname == "ops":
1136 row.append([op.input.__getstate__() for op in job.ops])
1137 elif fname == "opresult":
1138 row.append([op.result for op in job.ops])
1139 elif fname == "opstatus":
1140 row.append([op.status for op in job.ops])
1141 elif fname == "oplog":
1142 row.append([op.log for op in job.ops])
1143 elif fname == "opstart":
1144 row.append([op.start_timestamp for op in job.ops])
1145 elif fname == "opend":
1146 row.append([op.end_timestamp for op in job.ops])
1147 elif fname == "received_ts":
1148 row.append(job.received_timestamp)
1149 elif fname == "start_ts":
1150 row.append(job.start_timestamp)
1151 elif fname == "end_ts":
1152 row.append(job.end_timestamp)
1153 elif fname == "summary":
1154 row.append([op.input.Summary() for op in job.ops])
1156 raise errors.OpExecError("Invalid job query field '%s'" % fname)
1161 def QueryJobs(self, job_ids, fields):
1162 """Returns a list of jobs in queue.
1164 This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1165 processing for each job.
1168 @param job_ids: sequence of job identifiers or None for all
1170 @param fields: names of fields to return
1172 @return: list one element per job, each element being list with
1173 the requested fields
1178 for job in self._GetJobsUnlocked(job_ids):
1182 jobs.append(self._GetJobInfoUnlocked(job, fields))
1189 """Stops the job queue.
1191 This shutdowns all the worker threads an closes the queue.
1194 self._wpool.TerminateWorkers()
1196 self._queue_lock.Close()
1197 self._queue_lock = None