4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
55 class CancelJob(Exception):
56 """Special exception to cancel a job.
62 """Returns the current timestamp.
65 @return: the current time in the (seconds, microseconds) format
68 return utils.SplitTime(time.time())
71 class _QueuedOpCode(object):
72 """Encapsulates an opcode object.
74 @ivar log: holds the execution log and consists of tuples
75 of the form C{(log_serial, timestamp, level, message)}
76 @ivar input: the OpCode we encapsulate
77 @ivar status: the current status
78 @ivar result: the result of the LU execution
79 @ivar start_timestamp: timestamp for the start of the execution
80 @ivar stop_timestamp: timestamp for the end of the execution
83 __slots__ = ["input", "status", "result", "log",
84 "start_timestamp", "end_timestamp",
87 def __init__(self, op):
88 """Constructor for the _QuededOpCode.
90 @type op: L{opcodes.OpCode}
91 @param op: the opcode we encapsulate
95 self.status = constants.OP_STATUS_QUEUED
98 self.start_timestamp = None
99 self.end_timestamp = None
102 def Restore(cls, state):
103 """Restore the _QueuedOpCode from the serialized form.
106 @param state: the serialized state
107 @rtype: _QueuedOpCode
108 @return: a new _QueuedOpCode instance
111 obj = _QueuedOpCode.__new__(cls)
112 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113 obj.status = state["status"]
114 obj.result = state["result"]
115 obj.log = state["log"]
116 obj.start_timestamp = state.get("start_timestamp", None)
117 obj.end_timestamp = state.get("end_timestamp", None)
121 """Serializes this _QueuedOpCode.
124 @return: the dictionary holding the serialized state
128 "input": self.input.__getstate__(),
129 "status": self.status,
130 "result": self.result,
132 "start_timestamp": self.start_timestamp,
133 "end_timestamp": self.end_timestamp,
137 class _QueuedJob(object):
138 """In-memory job representation.
140 This is what we use to track the user-submitted jobs. Locking must
141 be taken care of by users of this class.
143 @type queue: L{JobQueue}
144 @ivar queue: the parent queue
147 @ivar ops: the list of _QueuedOpCode that constitute the job
148 @type run_op_index: int
149 @ivar run_op_index: the currently executing opcode, or -1 if
150 we didn't yet start executing
151 @type log_serial: int
152 @ivar log_serial: holds the index for the next log entry
153 @ivar received_timestamp: the timestamp for when the job was received
154 @ivar start_timestmap: the timestamp for start of execution
155 @ivar end_timestamp: the timestamp for end of execution
156 @ivar change: a Condition variable we use for waiting for job changes
159 __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160 "received_timestamp", "start_timestamp", "end_timestamp",
164 def __init__(self, queue, job_id, ops):
165 """Constructor for the _QueuedJob.
167 @type queue: L{JobQueue}
168 @param queue: our parent queue
170 @param job_id: our job id
172 @param ops: the list of opcodes we hold, which will be encapsulated
177 # TODO: use a better exception
178 raise Exception("No opcodes")
182 self.ops = [_QueuedOpCode(op) for op in ops]
183 self.run_op_index = -1
185 self.received_timestamp = TimeStampNow()
186 self.start_timestamp = None
187 self.end_timestamp = None
189 # Condition to wait for changes
190 self.change = threading.Condition(self.queue._lock)
193 def Restore(cls, queue, state):
194 """Restore a _QueuedJob from serialized state:
196 @type queue: L{JobQueue}
197 @param queue: to which queue the restored job belongs
199 @param state: the serialized state
201 @return: the restored _JobQueue instance
204 obj = _QueuedJob.__new__(cls)
207 obj.run_op_index = state["run_op_index"]
208 obj.received_timestamp = state.get("received_timestamp", None)
209 obj.start_timestamp = state.get("start_timestamp", None)
210 obj.end_timestamp = state.get("end_timestamp", None)
214 for op_state in state["ops"]:
215 op = _QueuedOpCode.Restore(op_state)
216 for log_entry in op.log:
217 obj.log_serial = max(obj.log_serial, log_entry[0])
220 # Condition to wait for changes
221 obj.change = threading.Condition(obj.queue._lock)
226 """Serialize the _JobQueue instance.
229 @return: the serialized state
234 "ops": [op.Serialize() for op in self.ops],
235 "run_op_index": self.run_op_index,
236 "start_timestamp": self.start_timestamp,
237 "end_timestamp": self.end_timestamp,
238 "received_timestamp": self.received_timestamp,
241 def CalcStatus(self):
242 """Compute the status of this job.
244 This function iterates over all the _QueuedOpCodes in the job and
245 based on their status, computes the job status.
248 - if we find a cancelled, or finished with error, the job
249 status will be the same
250 - otherwise, the last opcode with the status one of:
255 will determine the job status
257 - otherwise, it means either all opcodes are queued, or success,
258 and the job status will be the same
260 @return: the job status
263 status = constants.JOB_STATUS_QUEUED
267 if op.status == constants.OP_STATUS_SUCCESS:
272 if op.status == constants.OP_STATUS_QUEUED:
274 elif op.status == constants.OP_STATUS_WAITLOCK:
275 status = constants.JOB_STATUS_WAITLOCK
276 elif op.status == constants.OP_STATUS_RUNNING:
277 status = constants.JOB_STATUS_RUNNING
278 elif op.status == constants.OP_STATUS_CANCELING:
279 status = constants.JOB_STATUS_CANCELING
281 elif op.status == constants.OP_STATUS_ERROR:
282 status = constants.JOB_STATUS_ERROR
283 # The whole job fails if one opcode failed
285 elif op.status == constants.OP_STATUS_CANCELED:
286 status = constants.OP_STATUS_CANCELED
290 status = constants.JOB_STATUS_SUCCESS
294 def GetLogEntries(self, newer_than):
295 """Selectively returns the log entries.
297 @type newer_than: None or int
298 @param newer_than: if this is None, return all log entries,
299 otherwise return only the log entries with serial higher
302 @return: the list of the log entries selected
305 if newer_than is None:
312 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
316 def MarkUnfinishedOps(self, status, result):
317 """Mark unfinished opcodes with a given status and result.
319 This is an utility function for marking all running or waiting to
320 be run opcodes with a given status. Opcodes which are already
321 finalised are not changed.
323 @param status: a given opcode status
324 @param result: the opcode result
329 if op.status in constants.OPS_FINALIZED:
330 assert not_marked, "Finalized opcodes found after non-finalized ones"
337 class _JobQueueWorker(workerpool.BaseWorker):
338 """The actual job workers.
341 def _NotifyStart(self):
342 """Mark the opcode as running, not lock-waiting.
344 This is called from the mcpu code as a notifier function, when the
345 LU is finally about to start the Exec() method. Of course, to have
346 end-user visible results, the opcode must be initially (before
347 calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
350 assert self.queue, "Queue attribute is missing"
351 assert self.opcode, "Opcode attribute is missing"
355 assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
356 constants.OP_STATUS_CANCELING)
358 # Cancel here if we were asked to
359 if self.opcode.status == constants.OP_STATUS_CANCELING:
362 self.opcode.status = constants.OP_STATUS_RUNNING
366 def RunTask(self, job):
369 This functions processes a job. It is closely tied to the _QueuedJob and
370 _QueuedOpCode classes.
372 @type job: L{_QueuedJob}
373 @param job: the job to be processed
376 logging.info("Worker %s processing job %s",
377 self.worker_id, job.id)
378 proc = mcpu.Processor(self.pool.queue.context)
379 self.queue = queue = job.queue
383 for idx, op in enumerate(job.ops):
384 op_summary = op.input.Summary()
385 if op.status == constants.OP_STATUS_SUCCESS:
386 # this is a job that was partially completed before master
387 # daemon shutdown, so it can be expected that some opcodes
388 # are already completed successfully (if any did error
389 # out, then the whole job should have been aborted and not
390 # resubmitted for processing)
391 logging.info("Op %s/%s: opcode %s already processed, skipping",
392 idx + 1, count, op_summary)
395 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
400 if op.status == constants.OP_STATUS_CANCELED:
402 assert op.status == constants.OP_STATUS_QUEUED
403 job.run_op_index = idx
404 op.status = constants.OP_STATUS_WAITLOCK
406 op.start_timestamp = TimeStampNow()
407 if idx == 0: # first opcode
408 job.start_timestamp = op.start_timestamp
409 queue.UpdateJobUnlocked(job)
411 input_opcode = op.input
416 """Append a log entry.
422 log_type = constants.ELOG_MESSAGE
425 log_type, log_msg = args
427 # The time is split to make serialization easier and not lose
429 timestamp = utils.SplitTime(time.time())
434 op.log.append((job.log_serial, timestamp, log_type, log_msg))
436 job.change.notifyAll()
440 # Make sure not to hold lock while _Log is called
442 result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
446 op.status = constants.OP_STATUS_SUCCESS
448 op.end_timestamp = TimeStampNow()
449 queue.UpdateJobUnlocked(job)
453 logging.info("Op %s/%s: Successfully finished opcode %s",
454 idx + 1, count, op_summary)
456 # Will be handled further up
458 except Exception, err:
462 op.status = constants.OP_STATUS_ERROR
463 if isinstance(err, errors.GenericError):
464 op.result = errors.EncodeException(err)
467 op.end_timestamp = TimeStampNow()
468 logging.info("Op %s/%s: Error in opcode %s: %s",
469 idx + 1, count, op_summary, err)
471 queue.UpdateJobUnlocked(job)
479 queue.CancelJobUnlocked(job)
482 except errors.GenericError, err:
483 logging.exception("Ganeti exception")
485 logging.exception("Unhandled exception")
490 job.run_op_index = -1
491 job.end_timestamp = TimeStampNow()
492 queue.UpdateJobUnlocked(job)
495 status = job.CalcStatus()
498 logging.info("Worker %s finished job %s, status = %s",
499 self.worker_id, job_id, status)
502 class _JobQueueWorkerPool(workerpool.WorkerPool):
503 """Simple class implementing a job-processing workerpool.
506 def __init__(self, queue):
507 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
512 def _RequireOpenQueue(fn):
513 """Decorator for "public" functions.
515 This function should be used for all 'public' functions. That is,
516 functions usually called from other classes. Note that this should
517 be applied only to methods (not plain functions), since it expects
518 that the decorated function is called with a first argument that has
519 a '_queue_lock' argument.
521 @warning: Use this decorator only after utils.LockedMethod!
530 def wrapper(self, *args, **kwargs):
531 assert self._queue_lock is not None, "Queue should be open"
532 return fn(self, *args, **kwargs)
536 class JobQueue(object):
537 """Queue used to manage the jobs.
539 @cvar _RE_JOB_FILE: regex matching the valid job file names
542 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
544 def __init__(self, context):
545 """Constructor for JobQueue.
547 The constructor will initialize the job queue object and then
548 start loading the current jobs from disk, either for starting them
549 (if they were queue) or for aborting them (if they were already
552 @type context: GanetiContext
553 @param context: the context object for access to the configuration
554 data and other ganeti objects
557 self.context = context
558 self._memcache = weakref.WeakValueDictionary()
559 self._my_hostname = utils.HostInfo().name
562 self._lock = threading.Lock()
563 self.acquire = self._lock.acquire
564 self.release = self._lock.release
567 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
570 self._last_serial = jstore.ReadSerial()
571 assert self._last_serial is not None, ("Serial file was modified between"
572 " check in jstore and here")
574 # Get initial list of nodes
575 self._nodes = dict((n.name, n.primary_ip)
576 for n in self.context.cfg.GetAllNodesInfo().values()
577 if n.master_candidate)
581 del self._nodes[self._my_hostname]
585 # TODO: Check consistency across nodes
588 self._wpool = _JobQueueWorkerPool(self)
590 # We need to lock here because WorkerPool.AddTask() may start a job while
591 # we're still doing our work.
594 logging.info("Inspecting job queue")
596 all_job_ids = self._GetJobIDsUnlocked()
597 jobs_count = len(all_job_ids)
598 lastinfo = time.time()
599 for idx, job_id in enumerate(all_job_ids):
600 # Give an update every 1000 jobs or 10 seconds
601 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
602 idx == (jobs_count - 1)):
603 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
604 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
605 lastinfo = time.time()
607 job = self._LoadJobUnlocked(job_id)
609 # a failure in loading the job can cause 'None' to be returned
613 status = job.CalcStatus()
615 if status in (constants.JOB_STATUS_QUEUED, ):
616 self._wpool.AddTask(job)
618 elif status in (constants.JOB_STATUS_RUNNING,
619 constants.JOB_STATUS_WAITLOCK,
620 constants.JOB_STATUS_CANCELING):
621 logging.warning("Unfinished job %s found: %s", job.id, job)
623 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
624 "Unclean master daemon shutdown")
626 self.UpdateJobUnlocked(job)
628 logging.info("Job queue inspection finished")
632 self._wpool.TerminateWorkers()
637 def AddNode(self, node):
638 """Register a new node with the queue.
640 @type node: L{objects.Node}
641 @param node: the node object to be added
644 node_name = node.name
645 assert node_name != self._my_hostname
647 # Clean queue directory on added node
648 rpc.RpcRunner.call_jobqueue_purge(node_name)
650 if not node.master_candidate:
651 # remove if existing, ignoring errors
652 self._nodes.pop(node_name, None)
653 # and skip the replication of the job ids
656 # Upload the whole queue excluding archived jobs
657 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
659 # Upload current serial file
660 files.append(constants.JOB_QUEUE_SERIAL_FILE)
662 for file_name in files:
664 fd = open(file_name, "r")
670 result = rpc.RpcRunner.call_jobqueue_update([node_name],
673 if not result[node_name]:
674 logging.error("Failed to upload %s to %s", file_name, node_name)
676 self._nodes[node_name] = node.primary_ip
680 def RemoveNode(self, node_name):
681 """Callback called when removing nodes from the cluster.
684 @param node_name: the name of the node to remove
688 # The queue is removed by the "leave node" RPC call.
689 del self._nodes[node_name]
693 def _CheckRpcResult(self, result, nodes, failmsg):
694 """Verifies the status of an RPC call.
696 Since we aim to keep consistency should this node (the current
697 master) fail, we will log errors if our rpc fail, and especially
698 log the case when more than half of the nodes fails.
700 @param result: the data as returned from the rpc call
702 @param nodes: the list of nodes we made the call to
704 @param failmsg: the identifier to be used for logging
717 logging.error("%s failed on %s", failmsg, ", ".join(failed))
719 # +1 for the master node
720 if (len(success) + 1) < len(failed):
721 # TODO: Handle failing nodes
722 logging.error("More than half of the nodes failed")
724 def _GetNodeIp(self):
725 """Helper for returning the node name/ip list.
728 @return: a tuple of two lists, the first one with the node
729 names and the second one with the node addresses
732 name_list = self._nodes.keys()
733 addr_list = [self._nodes[name] for name in name_list]
734 return name_list, addr_list
736 def _WriteAndReplicateFileUnlocked(self, file_name, data):
737 """Writes a file locally and then replicates it to all nodes.
739 This function will replace the contents of a file on the local
740 node and then replicate it to all the other nodes we have.
743 @param file_name: the path of the file to be replicated
745 @param data: the new contents of the file
748 utils.WriteFile(file_name, data=data)
750 names, addrs = self._GetNodeIp()
751 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
752 self._CheckRpcResult(result, self._nodes,
753 "Updating %s" % file_name)
755 def _RenameFilesUnlocked(self, rename):
756 """Renames a file locally and then replicate the change.
758 This function will rename a file in the local queue directory
759 and then replicate this rename to all the other nodes we have.
761 @type rename: list of (old, new)
762 @param rename: List containing tuples mapping old to new names
765 # Rename them locally
766 for old, new in rename:
767 utils.RenameFile(old, new, mkdir=True)
769 # ... and on all nodes
770 names, addrs = self._GetNodeIp()
771 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
772 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
774 def _FormatJobID(self, job_id):
775 """Convert a job ID to string format.
777 Currently this just does C{str(job_id)} after performing some
778 checks, but if we want to change the job id format this will
779 abstract this change.
781 @type job_id: int or long
782 @param job_id: the numeric job id
784 @return: the formatted job id
787 if not isinstance(job_id, (int, long)):
788 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
790 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
795 def _GetArchiveDirectory(cls, job_id):
796 """Returns the archive directory for a job.
799 @param job_id: Job identifier
801 @return: Directory name
804 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
806 def _NewSerialsUnlocked(self, count):
807 """Generates a new job identifier.
809 Job identifiers are unique during the lifetime of a cluster.
812 @param count: how many serials to return
814 @return: a string representing the job identifier.
819 serial = self._last_serial + count
822 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
825 result = [self._FormatJobID(v)
826 for v in range(self._last_serial, serial + 1)]
827 # Keep it only if we were able to write the file
828 self._last_serial = serial
833 def _GetJobPath(job_id):
834 """Returns the job file for a given job id.
837 @param job_id: the job identifier
839 @return: the path to the job file
842 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
845 def _GetArchivedJobPath(cls, job_id):
846 """Returns the archived job file for a give job id.
849 @param job_id: the job identifier
851 @return: the path to the archived job file
854 path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
855 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
858 def _ExtractJobID(cls, name):
859 """Extract the job id from a filename.
862 @param name: the job filename
863 @rtype: job id or None
864 @return: the job id corresponding to the given filename,
865 or None if the filename does not represent a valid
869 m = cls._RE_JOB_FILE.match(name)
875 def _GetJobIDsUnlocked(self, archived=False):
876 """Return all known job IDs.
878 If the parameter archived is True, archived jobs IDs will be
879 included. Currently this argument is unused.
881 The method only looks at disk because it's a requirement that all
882 jobs are present on disk (so in the _memcache we don't have any
886 @return: the list of job IDs
889 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
890 jlist = utils.NiceSort(jlist)
893 def _ListJobFiles(self):
894 """Returns the list of current job files.
897 @return: the list of job file names
900 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
901 if self._RE_JOB_FILE.match(name)]
903 def _LoadJobUnlocked(self, job_id):
904 """Loads a job from the disk or memory.
906 Given a job id, this will return the cached job object if
907 existing, or try to load the job from the disk. If loading from
908 disk, it will also add the job to the cache.
910 @param job_id: the job id
911 @rtype: L{_QueuedJob} or None
912 @return: either None or the job object
915 job = self._memcache.get(job_id, None)
917 logging.debug("Found job %s in memcache", job_id)
920 filepath = self._GetJobPath(job_id)
921 logging.debug("Loading job from %s", filepath)
923 fd = open(filepath, "r")
925 if err.errno in (errno.ENOENT, ):
929 data = serializer.LoadJson(fd.read())
934 job = _QueuedJob.Restore(self, data)
935 except Exception, err:
936 new_path = self._GetArchivedJobPath(job_id)
937 if filepath == new_path:
938 # job already archived (future case)
939 logging.exception("Can't parse job %s", job_id)
942 logging.exception("Can't parse job %s, will archive.", job_id)
943 self._RenameFilesUnlocked([(filepath, new_path)])
946 self._memcache[job_id] = job
947 logging.debug("Added job %s to the cache", job_id)
950 def _GetJobsUnlocked(self, job_ids):
951 """Return a list of jobs based on their IDs.
954 @param job_ids: either an empty list (meaning all jobs),
957 @return: the list of job objects
961 job_ids = self._GetJobIDsUnlocked()
963 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
966 def _IsQueueMarkedDrain():
967 """Check if the queue is marked from drain.
969 This currently uses the queue drain file, which makes it a
970 per-node flag. In the future this can be moved to the config file.
973 @return: True of the job queue is marked for draining
976 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
979 def SetDrainFlag(drain_flag):
980 """Sets the drain flag for the queue.
982 This is similar to the function L{backend.JobQueueSetDrainFlag},
983 and in the future we might merge them.
985 @type drain_flag: boolean
986 @param drain_flag: Whether to set or unset the drain flag
990 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
992 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
996 def _SubmitJobUnlocked(self, job_id, ops):
997 """Create and store a new job.
999 This enters the job into our job queue and also puts it on the new
1000 queue, in order for it to be picked up by the queue processors.
1002 @type job_id: job ID
1003 @param jod_id: the job ID for the new job
1005 @param ops: The list of OpCodes that will become the new job.
1007 @return: the job ID of the newly created job
1008 @raise errors.JobQueueDrainError: if the job is marked for draining
1011 if self._IsQueueMarkedDrain():
1012 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1014 # Check job queue size
1015 size = len(self._ListJobFiles())
1016 if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1017 # TODO: Autoarchive jobs. Make sure it's not done on every job
1018 # submission, though.
1022 if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1023 raise errors.JobQueueFull()
1025 job = _QueuedJob(self, job_id, ops)
1028 self.UpdateJobUnlocked(job)
1030 logging.debug("Adding new job %s to the cache", job_id)
1031 self._memcache[job_id] = job
1033 # Add to worker pool
1034 self._wpool.AddTask(job)
1040 def SubmitJob(self, ops):
1041 """Create and store a new job.
1043 @see: L{_SubmitJobUnlocked}
1046 job_id = self._NewSerialsUnlocked(1)[0]
1047 return self._SubmitJobUnlocked(job_id, ops)
1051 def SubmitManyJobs(self, jobs):
1052 """Create and store multiple jobs.
1054 @see: L{_SubmitJobUnlocked}
1058 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1059 for job_id, ops in zip(all_job_ids, jobs):
1061 data = self._SubmitJobUnlocked(job_id, ops)
1063 except errors.GenericError, err:
1066 results.append((status, data))
1072 def UpdateJobUnlocked(self, job):
1073 """Update a job's on disk storage.
1075 After a job has been modified, this function needs to be called in
1076 order to write the changes to disk and replicate them to the other
1079 @type job: L{_QueuedJob}
1080 @param job: the changed job
1083 filename = self._GetJobPath(job.id)
1084 data = serializer.DumpJson(job.Serialize(), indent=False)
1085 logging.debug("Writing job %s to %s", job.id, filename)
1086 self._WriteAndReplicateFileUnlocked(filename, data)
1088 # Notify waiters about potential changes
1089 job.change.notifyAll()
1093 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1095 """Waits for changes in a job.
1097 @type job_id: string
1098 @param job_id: Job identifier
1099 @type fields: list of strings
1100 @param fields: Which fields to check for changes
1101 @type prev_job_info: list or None
1102 @param prev_job_info: Last job information returned
1103 @type prev_log_serial: int
1104 @param prev_log_serial: Last job message serial number
1105 @type timeout: float
1106 @param timeout: maximum time to wait
1107 @rtype: tuple (job info, log entries)
1108 @return: a tuple of the job information as required via
1109 the fields parameter, and the log entries as a list
1111 if the job has not changed and the timeout has expired,
1112 we instead return a special value,
1113 L{constants.JOB_NOTCHANGED}, which should be interpreted
1114 as such by the clients
1117 logging.debug("Waiting for changes in job %s", job_id)
1122 end_time = time.time() + timeout
1124 delta_time = end_time - time.time()
1126 return constants.JOB_NOTCHANGED
1128 job = self._LoadJobUnlocked(job_id)
1130 logging.debug("Job %s not found", job_id)
1133 status = job.CalcStatus()
1134 job_info = self._GetJobInfoUnlocked(job, fields)
1135 log_entries = job.GetLogEntries(prev_log_serial)
1137 # Serializing and deserializing data can cause type changes (e.g. from
1138 # tuple to list) or precision loss. We're doing it here so that we get
1139 # the same modifications as the data received from the client. Without
1140 # this, the comparison afterwards might fail without the data being
1141 # significantly different.
1142 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1143 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1145 if status not in (constants.JOB_STATUS_QUEUED,
1146 constants.JOB_STATUS_RUNNING,
1147 constants.JOB_STATUS_WAITLOCK):
1148 # Don't even try to wait if the job is no longer running, there will be
1152 if (prev_job_info != job_info or
1153 (log_entries and prev_log_serial != log_entries[0][0])):
1156 logging.debug("Waiting again")
1158 # Release the queue lock while waiting
1159 job.change.wait(delta_time)
1161 logging.debug("Job %s changed", job_id)
1163 if job_info is None and log_entries is None:
1166 return (job_info, log_entries)
1170 def CancelJob(self, job_id):
1173 This will only succeed if the job has not started yet.
1175 @type job_id: string
1176 @param job_id: job ID of job to be cancelled.
1179 logging.info("Cancelling job %s", job_id)
1181 job = self._LoadJobUnlocked(job_id)
1183 logging.debug("Job %s not found", job_id)
1184 return (False, "Job %s not found" % job_id)
1186 job_status = job.CalcStatus()
1188 if job_status not in (constants.JOB_STATUS_QUEUED,
1189 constants.JOB_STATUS_WAITLOCK):
1190 logging.debug("Job %s is no longer waiting in the queue", job.id)
1191 return (False, "Job %s is no longer waiting in the queue" % job.id)
1193 if job_status == constants.JOB_STATUS_QUEUED:
1194 self.CancelJobUnlocked(job)
1195 return (True, "Job %s canceled" % job.id)
1197 elif job_status == constants.JOB_STATUS_WAITLOCK:
1198 # The worker will notice the new status and cancel the job
1200 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1202 self.UpdateJobUnlocked(job)
1203 return (True, "Job %s will be canceled" % job.id)
1206 def CancelJobUnlocked(self, job):
1207 """Marks a job as canceled.
1211 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1212 "Job canceled by request")
1214 self.UpdateJobUnlocked(job)
1217 def _ArchiveJobsUnlocked(self, jobs):
1220 @type jobs: list of L{_QueuedJob}
1221 @param jobs: Job objects
1223 @return: Number of archived jobs
1229 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1230 constants.JOB_STATUS_SUCCESS,
1231 constants.JOB_STATUS_ERROR):
1232 logging.debug("Job %s is not yet done", job.id)
1235 archive_jobs.append(job)
1237 old = self._GetJobPath(job.id)
1238 new = self._GetArchivedJobPath(job.id)
1239 rename_files.append((old, new))
1241 # TODO: What if 1..n files fail to rename?
1242 self._RenameFilesUnlocked(rename_files)
1244 logging.debug("Successfully archived job(s) %s",
1245 ", ".join(job.id for job in archive_jobs))
1247 return len(archive_jobs)
1251 def ArchiveJob(self, job_id):
1254 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1256 @type job_id: string
1257 @param job_id: Job ID of job to be archived.
1259 @return: Whether job was archived
1262 logging.info("Archiving job %s", job_id)
1264 job = self._LoadJobUnlocked(job_id)
1266 logging.debug("Job %s not found", job_id)
1269 return self._ArchiveJobsUnlocked([job]) == 1
1273 def AutoArchiveJobs(self, age, timeout):
1274 """Archives all jobs based on age.
1276 The method will archive all jobs which are older than the age
1277 parameter. For jobs that don't have an end timestamp, the start
1278 timestamp will be considered. The special '-1' age will cause
1279 archival of all jobs (that are not running or queued).
1282 @param age: the minimum age in seconds
1285 logging.info("Archiving jobs with age more than %s seconds", age)
1288 end_time = now + timeout
1292 all_job_ids = self._GetJobIDsUnlocked(archived=False)
1294 for idx, job_id in enumerate(all_job_ids):
1297 # Not optimal because jobs could be pending
1298 # TODO: Measure average duration for job archival and take number of
1299 # pending jobs into account.
1300 if time.time() > end_time:
1303 # Returns None if the job failed to load
1304 job = self._LoadJobUnlocked(job_id)
1306 if job.end_timestamp is None:
1307 if job.start_timestamp is None:
1308 job_age = job.received_timestamp
1310 job_age = job.start_timestamp
1312 job_age = job.end_timestamp
1314 if age == -1 or now - job_age[0] > age:
1317 # Archive 10 jobs at a time
1318 if len(pending) >= 10:
1319 archived_count += self._ArchiveJobsUnlocked(pending)
1323 archived_count += self._ArchiveJobsUnlocked(pending)
1325 return (archived_count, len(all_job_ids) - last_touched - 1)
1327 def _GetJobInfoUnlocked(self, job, fields):
1328 """Returns information about a job.
1330 @type job: L{_QueuedJob}
1331 @param job: the job which we query
1333 @param fields: names of fields to return
1335 @return: list with one element for each field
1336 @raise errors.OpExecError: when an invalid field
1341 for fname in fields:
1344 elif fname == "status":
1345 row.append(job.CalcStatus())
1346 elif fname == "ops":
1347 row.append([op.input.__getstate__() for op in job.ops])
1348 elif fname == "opresult":
1349 row.append([op.result for op in job.ops])
1350 elif fname == "opstatus":
1351 row.append([op.status for op in job.ops])
1352 elif fname == "oplog":
1353 row.append([op.log for op in job.ops])
1354 elif fname == "opstart":
1355 row.append([op.start_timestamp for op in job.ops])
1356 elif fname == "opend":
1357 row.append([op.end_timestamp for op in job.ops])
1358 elif fname == "received_ts":
1359 row.append(job.received_timestamp)
1360 elif fname == "start_ts":
1361 row.append(job.start_timestamp)
1362 elif fname == "end_ts":
1363 row.append(job.end_timestamp)
1364 elif fname == "summary":
1365 row.append([op.input.Summary() for op in job.ops])
1367 raise errors.OpExecError("Invalid job query field '%s'" % fname)
1372 def QueryJobs(self, job_ids, fields):
1373 """Returns a list of jobs in queue.
1375 This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1376 processing for each job.
1379 @param job_ids: sequence of job identifiers or None for all
1381 @param fields: names of fields to return
1383 @return: list one element per job, each element being list with
1384 the requested fields
1389 for job in self._GetJobsUnlocked(job_ids):
1393 jobs.append(self._GetJobInfoUnlocked(job, fields))
1400 """Stops the job queue.
1402 This shutdowns all the worker threads an closes the queue.
1405 self._wpool.TerminateWorkers()
1407 self._queue_lock.Close()
1408 self._queue_lock = None