4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
60 from ganeti import query
61 from ganeti import qlang
66 # member lock names to be passed to @ssynchronized decorator
71 class CancelJob(Exception):
72 """Special exception to cancel a job.
78 """Returns the current timestamp.
81 @return: the current time in the (seconds, microseconds) format
84 return utils.SplitTime(time.time())
87 class _SimpleJobQuery:
88 """Wrapper for job queries.
90 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
93 def __init__(self, fields):
94 """Initializes this class.
97 self._query = query.Query(query.JOB_FIELDS, fields)
99 def __call__(self, job):
100 """Executes a job query using cached field list.
103 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
106 class _QueuedOpCode(object):
107 """Encapsulates an opcode object.
109 @ivar log: holds the execution log and consists of tuples
110 of the form C{(log_serial, timestamp, level, message)}
111 @ivar input: the OpCode we encapsulate
112 @ivar status: the current status
113 @ivar result: the result of the LU execution
114 @ivar start_timestamp: timestamp for the start of the execution
115 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
116 @ivar stop_timestamp: timestamp for the end of the execution
119 __slots__ = ["input", "status", "result", "log", "priority",
120 "start_timestamp", "exec_timestamp", "end_timestamp",
123 def __init__(self, op):
124 """Initializes instances of this class.
126 @type op: L{opcodes.OpCode}
127 @param op: the opcode we encapsulate
131 self.status = constants.OP_STATUS_QUEUED
134 self.start_timestamp = None
135 self.exec_timestamp = None
136 self.end_timestamp = None
138 # Get initial priority (it might change during the lifetime of this opcode)
139 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
142 def Restore(cls, state):
143 """Restore the _QueuedOpCode from the serialized form.
146 @param state: the serialized state
147 @rtype: _QueuedOpCode
148 @return: a new _QueuedOpCode instance
151 obj = _QueuedOpCode.__new__(cls)
152 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
153 obj.status = state["status"]
154 obj.result = state["result"]
155 obj.log = state["log"]
156 obj.start_timestamp = state.get("start_timestamp", None)
157 obj.exec_timestamp = state.get("exec_timestamp", None)
158 obj.end_timestamp = state.get("end_timestamp", None)
159 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
163 """Serializes this _QueuedOpCode.
166 @return: the dictionary holding the serialized state
170 "input": self.input.__getstate__(),
171 "status": self.status,
172 "result": self.result,
174 "start_timestamp": self.start_timestamp,
175 "exec_timestamp": self.exec_timestamp,
176 "end_timestamp": self.end_timestamp,
177 "priority": self.priority,
181 class _QueuedJob(object):
182 """In-memory job representation.
184 This is what we use to track the user-submitted jobs. Locking must
185 be taken care of by users of this class.
187 @type queue: L{JobQueue}
188 @ivar queue: the parent queue
191 @ivar ops: the list of _QueuedOpCode that constitute the job
192 @type log_serial: int
193 @ivar log_serial: holds the index for the next log entry
194 @ivar received_timestamp: the timestamp for when the job was received
195 @ivar start_timestmap: the timestamp for start of execution
196 @ivar end_timestamp: the timestamp for end of execution
197 @ivar writable: Whether the job is allowed to be modified
200 # pylint: disable=W0212
201 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
202 "received_timestamp", "start_timestamp", "end_timestamp",
203 "__weakref__", "processor_lock", "writable"]
205 def __init__(self, queue, job_id, ops, writable):
206 """Constructor for the _QueuedJob.
208 @type queue: L{JobQueue}
209 @param queue: our parent queue
211 @param job_id: our job id
213 @param ops: the list of opcodes we hold, which will be encapsulated
216 @param writable: Whether job can be modified
220 raise errors.GenericError("A job needs at least one opcode")
223 self.id = int(job_id)
224 self.ops = [_QueuedOpCode(op) for op in ops]
226 self.received_timestamp = TimeStampNow()
227 self.start_timestamp = None
228 self.end_timestamp = None
230 self._InitInMemory(self, writable)
233 def _InitInMemory(obj, writable):
234 """Initializes in-memory variables.
237 obj.writable = writable
241 # Read-only jobs are not processed and therefore don't need a lock
243 obj.processor_lock = threading.Lock()
245 obj.processor_lock = None
248 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
250 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
252 return "<%s at %#x>" % (" ".join(status), id(self))
255 def Restore(cls, queue, state, writable):
256 """Restore a _QueuedJob from serialized state:
258 @type queue: L{JobQueue}
259 @param queue: to which queue the restored job belongs
261 @param state: the serialized state
263 @param writable: Whether job can be modified
265 @return: the restored _JobQueue instance
268 obj = _QueuedJob.__new__(cls)
270 obj.id = int(state["id"])
271 obj.received_timestamp = state.get("received_timestamp", None)
272 obj.start_timestamp = state.get("start_timestamp", None)
273 obj.end_timestamp = state.get("end_timestamp", None)
277 for op_state in state["ops"]:
278 op = _QueuedOpCode.Restore(op_state)
279 for log_entry in op.log:
280 obj.log_serial = max(obj.log_serial, log_entry[0])
283 cls._InitInMemory(obj, writable)
288 """Serialize the _JobQueue instance.
291 @return: the serialized state
296 "ops": [op.Serialize() for op in self.ops],
297 "start_timestamp": self.start_timestamp,
298 "end_timestamp": self.end_timestamp,
299 "received_timestamp": self.received_timestamp,
302 def CalcStatus(self):
303 """Compute the status of this job.
305 This function iterates over all the _QueuedOpCodes in the job and
306 based on their status, computes the job status.
309 - if we find a cancelled, or finished with error, the job
310 status will be the same
311 - otherwise, the last opcode with the status one of:
316 will determine the job status
318 - otherwise, it means either all opcodes are queued, or success,
319 and the job status will be the same
321 @return: the job status
324 status = constants.JOB_STATUS_QUEUED
328 if op.status == constants.OP_STATUS_SUCCESS:
333 if op.status == constants.OP_STATUS_QUEUED:
335 elif op.status == constants.OP_STATUS_WAITING:
336 status = constants.JOB_STATUS_WAITING
337 elif op.status == constants.OP_STATUS_RUNNING:
338 status = constants.JOB_STATUS_RUNNING
339 elif op.status == constants.OP_STATUS_CANCELING:
340 status = constants.JOB_STATUS_CANCELING
342 elif op.status == constants.OP_STATUS_ERROR:
343 status = constants.JOB_STATUS_ERROR
344 # The whole job fails if one opcode failed
346 elif op.status == constants.OP_STATUS_CANCELED:
347 status = constants.OP_STATUS_CANCELED
351 status = constants.JOB_STATUS_SUCCESS
355 def CalcPriority(self):
356 """Gets the current priority for this job.
358 Only unfinished opcodes are considered. When all are done, the default
364 priorities = [op.priority for op in self.ops
365 if op.status not in constants.OPS_FINALIZED]
368 # All opcodes are done, assume default priority
369 return constants.OP_PRIO_DEFAULT
371 return min(priorities)
373 def GetLogEntries(self, newer_than):
374 """Selectively returns the log entries.
376 @type newer_than: None or int
377 @param newer_than: if this is None, return all log entries,
378 otherwise return only the log entries with serial higher
381 @return: the list of the log entries selected
384 if newer_than is None:
391 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
395 def GetInfo(self, fields):
396 """Returns information about a job.
399 @param fields: names of fields to return
401 @return: list with one element for each field
402 @raise errors.OpExecError: when an invalid field
406 return _SimpleJobQuery(fields)(self)
408 def MarkUnfinishedOps(self, status, result):
409 """Mark unfinished opcodes with a given status and result.
411 This is an utility function for marking all running or waiting to
412 be run opcodes with a given status. Opcodes which are already
413 finalised are not changed.
415 @param status: a given opcode status
416 @param result: the opcode result
421 if op.status in constants.OPS_FINALIZED:
422 assert not_marked, "Finalized opcodes found after non-finalized ones"
429 """Marks the job as finalized.
432 self.end_timestamp = TimeStampNow()
435 """Marks job as canceled/-ing if possible.
437 @rtype: tuple; (bool, string)
438 @return: Boolean describing whether job was successfully canceled or marked
439 as canceling and a text message
442 status = self.CalcStatus()
444 if status == constants.JOB_STATUS_QUEUED:
445 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
446 "Job canceled by request")
448 return (True, "Job %s canceled" % self.id)
450 elif status == constants.JOB_STATUS_WAITING:
451 # The worker will notice the new status and cancel the job
452 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
453 return (True, "Job %s will be canceled" % self.id)
456 logging.debug("Job %s is no longer waiting in the queue", self.id)
457 return (False, "Job %s is no longer waiting in the queue" % self.id)
460 class _OpExecCallbacks(mcpu.OpExecCbBase):
461 def __init__(self, queue, job, op):
462 """Initializes this class.
464 @type queue: L{JobQueue}
465 @param queue: Job queue
466 @type job: L{_QueuedJob}
467 @param job: Job object
468 @type op: L{_QueuedOpCode}
472 assert queue, "Queue is missing"
473 assert job, "Job is missing"
474 assert op, "Opcode is missing"
480 def _CheckCancel(self):
481 """Raises an exception to cancel the job if asked to.
484 # Cancel here if we were asked to
485 if self._op.status == constants.OP_STATUS_CANCELING:
486 logging.debug("Canceling opcode")
489 @locking.ssynchronized(_QUEUE, shared=1)
490 def NotifyStart(self):
491 """Mark the opcode as running, not lock-waiting.
493 This is called from the mcpu code as a notifier function, when the LU is
494 finally about to start the Exec() method. Of course, to have end-user
495 visible results, the opcode must be initially (before calling into
496 Processor.ExecOpCode) set to OP_STATUS_WAITING.
499 assert self._op in self._job.ops
500 assert self._op.status in (constants.OP_STATUS_WAITING,
501 constants.OP_STATUS_CANCELING)
503 # Cancel here if we were asked to
506 logging.debug("Opcode is now running")
508 self._op.status = constants.OP_STATUS_RUNNING
509 self._op.exec_timestamp = TimeStampNow()
511 # And finally replicate the job status
512 self._queue.UpdateJobUnlocked(self._job)
514 @locking.ssynchronized(_QUEUE, shared=1)
515 def _AppendFeedback(self, timestamp, log_type, log_msg):
516 """Internal feedback append function, with locks
519 self._job.log_serial += 1
520 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
521 self._queue.UpdateJobUnlocked(self._job, replicate=False)
523 def Feedback(self, *args):
524 """Append a log entry.
530 log_type = constants.ELOG_MESSAGE
533 (log_type, log_msg) = args
535 # The time is split to make serialization easier and not lose
537 timestamp = utils.SplitTime(time.time())
538 self._AppendFeedback(timestamp, log_type, log_msg)
540 def CheckCancel(self):
541 """Check whether job has been cancelled.
544 assert self._op.status in (constants.OP_STATUS_WAITING,
545 constants.OP_STATUS_CANCELING)
547 # Cancel here if we were asked to
550 def SubmitManyJobs(self, jobs):
551 """Submits jobs for processing.
553 See L{JobQueue.SubmitManyJobs}.
556 # Locking is done in job queue
557 return self._queue.SubmitManyJobs(jobs)
560 class _JobChangesChecker(object):
561 def __init__(self, fields, prev_job_info, prev_log_serial):
562 """Initializes this class.
564 @type fields: list of strings
565 @param fields: Fields requested by LUXI client
566 @type prev_job_info: string
567 @param prev_job_info: previous job info, as passed by the LUXI client
568 @type prev_log_serial: string
569 @param prev_log_serial: previous job serial, as passed by the LUXI client
572 self._squery = _SimpleJobQuery(fields)
573 self._prev_job_info = prev_job_info
574 self._prev_log_serial = prev_log_serial
576 def __call__(self, job):
577 """Checks whether job has changed.
579 @type job: L{_QueuedJob}
580 @param job: Job object
583 assert not job.writable, "Expected read-only job"
585 status = job.CalcStatus()
586 job_info = self._squery(job)
587 log_entries = job.GetLogEntries(self._prev_log_serial)
589 # Serializing and deserializing data can cause type changes (e.g. from
590 # tuple to list) or precision loss. We're doing it here so that we get
591 # the same modifications as the data received from the client. Without
592 # this, the comparison afterwards might fail without the data being
593 # significantly different.
594 # TODO: we just deserialized from disk, investigate how to make sure that
595 # the job info and log entries are compatible to avoid this further step.
596 # TODO: Doing something like in testutils.py:UnifyValueType might be more
597 # efficient, though floats will be tricky
598 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
599 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
601 # Don't even try to wait if the job is no longer running, there will be
603 if (status not in (constants.JOB_STATUS_QUEUED,
604 constants.JOB_STATUS_RUNNING,
605 constants.JOB_STATUS_WAITING) or
606 job_info != self._prev_job_info or
607 (log_entries and self._prev_log_serial != log_entries[0][0])):
608 logging.debug("Job %s changed", job.id)
609 return (job_info, log_entries)
614 class _JobFileChangesWaiter(object):
615 def __init__(self, filename):
616 """Initializes this class.
618 @type filename: string
619 @param filename: Path to job file
620 @raises errors.InotifyError: if the notifier cannot be setup
623 self._wm = pyinotify.WatchManager()
624 self._inotify_handler = \
625 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
627 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
629 self._inotify_handler.enable()
631 # pyinotify doesn't close file descriptors automatically
632 self._notifier.stop()
635 def _OnInotify(self, notifier_enabled):
636 """Callback for inotify.
639 if not notifier_enabled:
640 self._inotify_handler.enable()
642 def Wait(self, timeout):
643 """Waits for the job file to change.
646 @param timeout: Timeout in seconds
647 @return: Whether there have been events
651 have_events = self._notifier.check_events(timeout * 1000)
653 self._notifier.read_events()
654 self._notifier.process_events()
658 """Closes underlying notifier and its file descriptor.
661 self._notifier.stop()
664 class _JobChangesWaiter(object):
665 def __init__(self, filename):
666 """Initializes this class.
668 @type filename: string
669 @param filename: Path to job file
672 self._filewaiter = None
673 self._filename = filename
675 def Wait(self, timeout):
676 """Waits for a job to change.
679 @param timeout: Timeout in seconds
680 @return: Whether there have been events
684 return self._filewaiter.Wait(timeout)
686 # Lazy setup: Avoid inotify setup cost when job file has already changed.
687 # If this point is reached, return immediately and let caller check the job
688 # file again in case there were changes since the last check. This avoids a
690 self._filewaiter = _JobFileChangesWaiter(self._filename)
695 """Closes underlying waiter.
699 self._filewaiter.Close()
702 class _WaitForJobChangesHelper(object):
703 """Helper class using inotify to wait for changes in a job file.
705 This class takes a previous job status and serial, and alerts the client when
706 the current job status has changed.
710 def _CheckForChanges(counter, job_load_fn, check_fn):
711 if counter.next() > 0:
712 # If this isn't the first check the job is given some more time to change
713 # again. This gives better performance for jobs generating many
719 raise errors.JobLost()
721 result = check_fn(job)
723 raise utils.RetryAgain()
727 def __call__(self, filename, job_load_fn,
728 fields, prev_job_info, prev_log_serial, timeout):
729 """Waits for changes on a job.
731 @type filename: string
732 @param filename: File on which to wait for changes
733 @type job_load_fn: callable
734 @param job_load_fn: Function to load job
735 @type fields: list of strings
736 @param fields: Which fields to check for changes
737 @type prev_job_info: list or None
738 @param prev_job_info: Last job information returned
739 @type prev_log_serial: int
740 @param prev_log_serial: Last job message serial number
742 @param timeout: maximum time to wait in seconds
745 counter = itertools.count()
747 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
748 waiter = _JobChangesWaiter(filename)
750 return utils.Retry(compat.partial(self._CheckForChanges,
751 counter, job_load_fn, check_fn),
752 utils.RETRY_REMAINING_TIME, timeout,
756 except (errors.InotifyError, errors.JobLost):
758 except utils.RetryTimeout:
759 return constants.JOB_NOTCHANGED
762 def _EncodeOpError(err):
763 """Encodes an error which occurred while processing an opcode.
766 if isinstance(err, errors.GenericError):
769 to_encode = errors.OpExecError(str(err))
771 return errors.EncodeException(to_encode)
774 class _TimeoutStrategyWrapper:
775 def __init__(self, fn):
776 """Initializes this class.
783 """Gets the next timeout if necessary.
786 if self._next is None:
787 self._next = self._fn()
790 """Returns the next timeout.
797 """Returns the current timeout and advances the internal state.
806 class _OpExecContext:
807 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
808 """Initializes this class.
813 self.log_prefix = log_prefix
814 self.summary = op.input.Summary()
816 # Create local copy to modify
817 if getattr(op.input, opcodes.DEPEND_ATTR, None):
818 self.jobdeps = op.input.depends[:]
822 self._timeout_strategy_factory = timeout_strategy_factory
823 self._ResetTimeoutStrategy()
825 def _ResetTimeoutStrategy(self):
826 """Creates a new timeout strategy.
829 self._timeout_strategy = \
830 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
832 def CheckPriorityIncrease(self):
833 """Checks whether priority can and should be increased.
835 Called when locks couldn't be acquired.
840 # Exhausted all retries and next round should not use blocking acquire
842 if (self._timeout_strategy.Peek() is None and
843 op.priority > constants.OP_PRIO_HIGHEST):
844 logging.debug("Increasing priority")
846 self._ResetTimeoutStrategy()
851 def GetNextLockTimeout(self):
852 """Returns the next lock acquire timeout.
855 return self._timeout_strategy.Next()
858 class _JobProcessor(object):
861 FINISHED) = range(1, 4)
863 def __init__(self, queue, opexec_fn, job,
864 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
865 """Initializes this class.
869 self.opexec_fn = opexec_fn
871 self._timeout_strategy_factory = _timeout_strategy_factory
874 def _FindNextOpcode(job, timeout_strategy_factory):
875 """Locates the next opcode to run.
877 @type job: L{_QueuedJob}
878 @param job: Job object
879 @param timeout_strategy_factory: Callable to create new timeout strategy
882 # Create some sort of a cache to speed up locating next opcode for future
884 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
885 # pending and one for processed ops.
886 if job.ops_iter is None:
887 job.ops_iter = enumerate(job.ops)
889 # Find next opcode to run
892 (idx, op) = job.ops_iter.next()
893 except StopIteration:
894 raise errors.ProgrammerError("Called for a finished job")
896 if op.status == constants.OP_STATUS_RUNNING:
897 # Found an opcode already marked as running
898 raise errors.ProgrammerError("Called for job marked as running")
900 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
901 timeout_strategy_factory)
903 if op.status not in constants.OPS_FINALIZED:
906 # This is a job that was partially completed before master daemon
907 # shutdown, so it can be expected that some opcodes are already
908 # completed successfully (if any did error out, then the whole job
909 # should have been aborted and not resubmitted for processing).
910 logging.info("%s: opcode %s already processed, skipping",
911 opctx.log_prefix, opctx.summary)
914 def _MarkWaitlock(job, op):
915 """Marks an opcode as waiting for locks.
917 The job's start timestamp is also set if necessary.
919 @type job: L{_QueuedJob}
920 @param job: Job object
921 @type op: L{_QueuedOpCode}
922 @param op: Opcode object
926 assert op.status in (constants.OP_STATUS_QUEUED,
927 constants.OP_STATUS_WAITING)
933 if op.status == constants.OP_STATUS_QUEUED:
934 op.status = constants.OP_STATUS_WAITING
937 if op.start_timestamp is None:
938 op.start_timestamp = TimeStampNow()
941 if job.start_timestamp is None:
942 job.start_timestamp = op.start_timestamp
945 assert op.status == constants.OP_STATUS_WAITING
950 def _CheckDependencies(queue, job, opctx):
951 """Checks if an opcode has dependencies and if so, processes them.
953 @type queue: L{JobQueue}
954 @param queue: Queue object
955 @type job: L{_QueuedJob}
956 @param job: Job object
957 @type opctx: L{_OpExecContext}
958 @param opctx: Opcode execution context
960 @return: Whether opcode will be re-scheduled by dependency tracker
968 (dep_job_id, dep_status) = opctx.jobdeps[0]
970 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
972 assert ht.TNonEmptyString(depmsg), "No dependency message"
974 logging.info("%s: %s", opctx.log_prefix, depmsg)
976 if depresult == _JobDependencyManager.CONTINUE:
977 # Remove dependency and continue
980 elif depresult == _JobDependencyManager.WAIT:
981 # Need to wait for notification, dependency tracker will re-add job
986 elif depresult == _JobDependencyManager.CANCEL:
987 # Job was cancelled, cancel this job as well
989 assert op.status == constants.OP_STATUS_CANCELING
992 elif depresult in (_JobDependencyManager.WRONGSTATUS,
993 _JobDependencyManager.ERROR):
994 # Job failed or there was an error, this job must fail
995 op.status = constants.OP_STATUS_ERROR
996 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1000 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1005 def _ExecOpCodeUnlocked(self, opctx):
1006 """Processes one opcode and returns the result.
1011 assert op.status == constants.OP_STATUS_WAITING
1013 timeout = opctx.GetNextLockTimeout()
1016 # Make sure not to hold queue lock while calling ExecOpCode
1017 result = self.opexec_fn(op.input,
1018 _OpExecCallbacks(self.queue, self.job, op),
1019 timeout=timeout, priority=op.priority)
1020 except mcpu.LockAcquireTimeout:
1021 assert timeout is not None, "Received timeout for blocking acquire"
1022 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1024 assert op.status in (constants.OP_STATUS_WAITING,
1025 constants.OP_STATUS_CANCELING)
1027 # Was job cancelled while we were waiting for the lock?
1028 if op.status == constants.OP_STATUS_CANCELING:
1029 return (constants.OP_STATUS_CANCELING, None)
1031 # Stay in waitlock while trying to re-acquire lock
1032 return (constants.OP_STATUS_WAITING, None)
1034 logging.exception("%s: Canceling job", opctx.log_prefix)
1035 assert op.status == constants.OP_STATUS_CANCELING
1036 return (constants.OP_STATUS_CANCELING, None)
1037 except Exception, err: # pylint: disable=W0703
1038 logging.exception("%s: Caught exception in %s",
1039 opctx.log_prefix, opctx.summary)
1040 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1042 logging.debug("%s: %s successful",
1043 opctx.log_prefix, opctx.summary)
1044 return (constants.OP_STATUS_SUCCESS, result)
1046 def __call__(self, _nextop_fn=None):
1047 """Continues execution of a job.
1049 @param _nextop_fn: Callback function for tests
1050 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1051 be deferred and C{WAITDEP} if the dependency manager
1052 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1058 logging.debug("Processing job %s", job.id)
1060 queue.acquire(shared=1)
1062 opcount = len(job.ops)
1064 assert job.writable, "Expected writable job"
1066 # Don't do anything for finalized jobs
1067 if job.CalcStatus() in constants.JOBS_FINALIZED:
1068 return self.FINISHED
1070 # Is a previous opcode still pending?
1072 opctx = job.cur_opctx
1073 job.cur_opctx = None
1075 if __debug__ and _nextop_fn:
1077 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1082 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1083 constants.OP_STATUS_CANCELING)
1084 for i in job.ops[opctx.index + 1:])
1086 assert op.status in (constants.OP_STATUS_QUEUED,
1087 constants.OP_STATUS_WAITING,
1088 constants.OP_STATUS_CANCELING)
1090 assert (op.priority <= constants.OP_PRIO_LOWEST and
1091 op.priority >= constants.OP_PRIO_HIGHEST)
1095 if op.status != constants.OP_STATUS_CANCELING:
1096 assert op.status in (constants.OP_STATUS_QUEUED,
1097 constants.OP_STATUS_WAITING)
1099 # Prepare to start opcode
1100 if self._MarkWaitlock(job, op):
1102 queue.UpdateJobUnlocked(job)
1104 assert op.status == constants.OP_STATUS_WAITING
1105 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1106 assert job.start_timestamp and op.start_timestamp
1107 assert waitjob is None
1109 # Check if waiting for a job is necessary
1110 waitjob = self._CheckDependencies(queue, job, opctx)
1112 assert op.status in (constants.OP_STATUS_WAITING,
1113 constants.OP_STATUS_CANCELING,
1114 constants.OP_STATUS_ERROR)
1116 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1117 constants.OP_STATUS_ERROR)):
1118 logging.info("%s: opcode %s waiting for locks",
1119 opctx.log_prefix, opctx.summary)
1121 assert not opctx.jobdeps, "Not all dependencies were removed"
1125 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1127 queue.acquire(shared=1)
1129 op.status = op_status
1130 op.result = op_result
1134 if op.status == constants.OP_STATUS_WAITING:
1135 # Couldn't get locks in time
1136 assert not op.end_timestamp
1139 op.end_timestamp = TimeStampNow()
1141 if op.status == constants.OP_STATUS_CANCELING:
1142 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1143 for i in job.ops[opctx.index:])
1145 assert op.status in constants.OPS_FINALIZED
1147 if op.status == constants.OP_STATUS_WAITING or waitjob:
1150 if not waitjob and opctx.CheckPriorityIncrease():
1151 # Priority was changed, need to update on-disk file
1152 queue.UpdateJobUnlocked(job)
1154 # Keep around for another round
1155 job.cur_opctx = opctx
1157 assert (op.priority <= constants.OP_PRIO_LOWEST and
1158 op.priority >= constants.OP_PRIO_HIGHEST)
1160 # In no case must the status be finalized here
1161 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1164 # Ensure all opcodes so far have been successful
1165 assert (opctx.index == 0 or
1166 compat.all(i.status == constants.OP_STATUS_SUCCESS
1167 for i in job.ops[:opctx.index]))
1170 job.cur_opctx = None
1172 if op.status == constants.OP_STATUS_SUCCESS:
1175 elif op.status == constants.OP_STATUS_ERROR:
1176 # Ensure failed opcode has an exception as its result
1177 assert errors.GetEncodedError(job.ops[opctx.index].result)
1179 to_encode = errors.OpExecError("Preceding opcode failed")
1180 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1181 _EncodeOpError(to_encode))
1185 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1186 errors.GetEncodedError(i.result)
1187 for i in job.ops[opctx.index:])
1189 elif op.status == constants.OP_STATUS_CANCELING:
1190 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1191 "Job canceled by request")
1195 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1197 if opctx.index == (opcount - 1):
1198 # Finalize on last opcode
1202 # All opcodes have been run, finalize job
1205 # Write to disk. If the job status is final, this is the final write
1206 # allowed. Once the file has been written, it can be archived anytime.
1207 queue.UpdateJobUnlocked(job)
1212 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1213 return self.FINISHED
1215 assert not waitjob or queue.depmgr.JobWaiting(job)
1222 assert job.writable, "Job became read-only while being processed"
1226 def _EvaluateJobProcessorResult(depmgr, job, result):
1227 """Looks at a result from L{_JobProcessor} for a job.
1229 To be used in a L{_JobQueueWorker}.
1232 if result == _JobProcessor.FINISHED:
1233 # Notify waiting jobs
1234 depmgr.NotifyWaiters(job.id)
1236 elif result == _JobProcessor.DEFER:
1238 raise workerpool.DeferTask(priority=job.CalcPriority())
1240 elif result == _JobProcessor.WAITDEP:
1241 # No-op, dependency manager will re-schedule
1245 raise errors.ProgrammerError("Job processor returned unknown status %s" %
1249 class _JobQueueWorker(workerpool.BaseWorker):
1250 """The actual job workers.
1253 def RunTask(self, job): # pylint: disable=W0221
1256 @type job: L{_QueuedJob}
1257 @param job: the job to be processed
1260 assert job.writable, "Expected writable job"
1262 # Ensure only one worker is active on a single job. If a job registers for
1263 # a dependency job, and the other job notifies before the first worker is
1264 # done, the job can end up in the tasklist more than once.
1265 job.processor_lock.acquire()
1267 return self._RunTaskInner(job)
1269 job.processor_lock.release()
1271 def _RunTaskInner(self, job):
1274 Must be called with per-job lock acquired.
1278 assert queue == self.pool.queue
1280 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1283 proc = mcpu.Processor(queue.context, job.id)
1285 # Create wrapper for setting thread name
1286 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1289 _EvaluateJobProcessorResult(queue.depmgr, job,
1290 _JobProcessor(queue, wrap_execop_fn, job)())
1293 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1294 """Updates the worker thread name to include a short summary of the opcode.
1296 @param setname_fn: Callable setting worker thread name
1297 @param execop_fn: Callable for executing opcode (usually
1298 L{mcpu.Processor.ExecOpCode})
1303 return execop_fn(op, *args, **kwargs)
1308 def _GetWorkerName(job, op):
1309 """Sets the worker thread name.
1311 @type job: L{_QueuedJob}
1312 @type op: L{opcodes.OpCode}
1315 parts = ["Job%s" % job.id]
1318 parts.append(op.TinySummary())
1320 return "/".join(parts)
1323 class _JobQueueWorkerPool(workerpool.WorkerPool):
1324 """Simple class implementing a job-processing workerpool.
1327 def __init__(self, queue):
1328 super(_JobQueueWorkerPool, self).__init__("Jq",
1334 class _JobDependencyManager:
1335 """Keeps track of job dependencies.
1342 WRONGSTATUS) = range(1, 6)
1344 def __init__(self, getstatus_fn, enqueue_fn):
1345 """Initializes this class.
1348 self._getstatus_fn = getstatus_fn
1349 self._enqueue_fn = enqueue_fn
1352 self._lock = locking.SharedLock("JobDepMgr")
1354 @locking.ssynchronized(_LOCK, shared=1)
1355 def GetLockInfo(self, requested): # pylint: disable=W0613
1356 """Retrieves information about waiting jobs.
1358 @type requested: set
1359 @param requested: Requested information, see C{query.LQ_*}
1362 # No need to sort here, that's being done by the lock manager and query
1363 # library. There are no priorities for notifying jobs, hence all show up as
1364 # one item under "pending".
1365 return [("job/%s" % job_id, None, None,
1366 [("job", [job.id for job in waiters])])
1367 for job_id, waiters in self._waiters.items()
1370 @locking.ssynchronized(_LOCK, shared=1)
1371 def JobWaiting(self, job):
1372 """Checks if a job is waiting.
1375 return compat.any(job in jobs
1376 for jobs in self._waiters.values())
1378 @locking.ssynchronized(_LOCK)
1379 def CheckAndRegister(self, job, dep_job_id, dep_status):
1380 """Checks if a dependency job has the requested status.
1382 If the other job is not yet in a finalized status, the calling job will be
1383 notified (re-added to the workerpool) at a later point.
1385 @type job: L{_QueuedJob}
1386 @param job: Job object
1387 @type dep_job_id: int
1388 @param dep_job_id: ID of dependency job
1389 @type dep_status: list
1390 @param dep_status: Required status
1393 assert ht.TJobId(job.id)
1394 assert ht.TJobId(dep_job_id)
1395 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1397 if job.id == dep_job_id:
1398 return (self.ERROR, "Job can't depend on itself")
1400 # Get status of dependency job
1402 status = self._getstatus_fn(dep_job_id)
1403 except errors.JobLost, err:
1404 return (self.ERROR, "Dependency error: %s" % err)
1406 assert status in constants.JOB_STATUS_ALL
1408 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1410 if status not in constants.JOBS_FINALIZED:
1411 # Register for notification and wait for job to finish
1412 job_id_waiters.add(job)
1414 "Need to wait for job %s, wanted status '%s'" %
1415 (dep_job_id, dep_status))
1417 # Remove from waiters list
1418 if job in job_id_waiters:
1419 job_id_waiters.remove(job)
1421 if (status == constants.JOB_STATUS_CANCELED and
1422 constants.JOB_STATUS_CANCELED not in dep_status):
1423 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1425 elif not dep_status or status in dep_status:
1426 return (self.CONTINUE,
1427 "Dependency job %s finished with status '%s'" %
1428 (dep_job_id, status))
1431 return (self.WRONGSTATUS,
1432 "Dependency job %s finished with status '%s',"
1433 " not one of '%s' as required" %
1434 (dep_job_id, status, utils.CommaJoin(dep_status)))
1436 def _RemoveEmptyWaitersUnlocked(self):
1437 """Remove all jobs without actual waiters.
1440 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1442 del self._waiters[job_id]
1444 def NotifyWaiters(self, job_id):
1445 """Notifies all jobs waiting for a certain job ID.
1447 @attention: Do not call until L{CheckAndRegister} returned a status other
1448 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1450 @param job_id: Job ID
1453 assert ht.TJobId(job_id)
1455 self._lock.acquire()
1457 self._RemoveEmptyWaitersUnlocked()
1459 jobs = self._waiters.pop(job_id, None)
1461 self._lock.release()
1464 # Re-add jobs to workerpool
1465 logging.debug("Re-adding %s jobs which were waiting for job %s",
1467 self._enqueue_fn(jobs)
1470 def _RequireOpenQueue(fn):
1471 """Decorator for "public" functions.
1473 This function should be used for all 'public' functions. That is,
1474 functions usually called from other classes. Note that this should
1475 be applied only to methods (not plain functions), since it expects
1476 that the decorated function is called with a first argument that has
1477 a '_queue_filelock' argument.
1479 @warning: Use this decorator only after locking.ssynchronized
1482 @locking.ssynchronized(_LOCK)
1488 def wrapper(self, *args, **kwargs):
1489 # pylint: disable=W0212
1490 assert self._queue_filelock is not None, "Queue should be open"
1491 return fn(self, *args, **kwargs)
1495 def _RequireNonDrainedQueue(fn):
1496 """Decorator checking for a non-drained queue.
1498 To be used with functions submitting new jobs.
1501 def wrapper(self, *args, **kwargs):
1502 """Wrapper function.
1504 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1507 # Ok when sharing the big job queue lock, as the drain file is created when
1508 # the lock is exclusive.
1509 # Needs access to protected member, pylint: disable=W0212
1511 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1513 if not self._accepting_jobs:
1514 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1516 return fn(self, *args, **kwargs)
1520 class JobQueue(object):
1521 """Queue used to manage the jobs.
1524 def __init__(self, context):
1525 """Constructor for JobQueue.
1527 The constructor will initialize the job queue object and then
1528 start loading the current jobs from disk, either for starting them
1529 (if they were queue) or for aborting them (if they were already
1532 @type context: GanetiContext
1533 @param context: the context object for access to the configuration
1534 data and other ganeti objects
1537 self.context = context
1538 self._memcache = weakref.WeakValueDictionary()
1539 self._my_hostname = netutils.Hostname.GetSysName()
1541 # The Big JobQueue lock. If a code block or method acquires it in shared
1542 # mode safe it must guarantee concurrency with all the code acquiring it in
1543 # shared mode, including itself. In order not to acquire it at all
1544 # concurrency must be guaranteed with all code acquiring it in shared mode
1545 # and all code acquiring it exclusively.
1546 self._lock = locking.SharedLock("JobQueue")
1548 self.acquire = self._lock.acquire
1549 self.release = self._lock.release
1551 # Accept jobs by default
1552 self._accepting_jobs = True
1554 # Initialize the queue, and acquire the filelock.
1555 # This ensures no other process is working on the job queue.
1556 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1559 self._last_serial = jstore.ReadSerial()
1560 assert self._last_serial is not None, ("Serial file was modified between"
1561 " check in jstore and here")
1563 # Get initial list of nodes
1564 self._nodes = dict((n.name, n.primary_ip)
1565 for n in self.context.cfg.GetAllNodesInfo().values()
1566 if n.master_candidate)
1568 # Remove master node
1569 self._nodes.pop(self._my_hostname, None)
1571 # TODO: Check consistency across nodes
1573 self._queue_size = None
1574 self._UpdateQueueSizeUnlocked()
1575 assert ht.TInt(self._queue_size)
1576 self._drained = jstore.CheckDrainFlag()
1579 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1581 self.context.glm.AddToLockMonitor(self.depmgr)
1584 self._wpool = _JobQueueWorkerPool(self)
1586 self._InspectQueue()
1588 self._wpool.TerminateWorkers()
1591 @locking.ssynchronized(_LOCK)
1593 def _InspectQueue(self):
1594 """Loads the whole job queue and resumes unfinished jobs.
1596 This function needs the lock here because WorkerPool.AddTask() may start a
1597 job while we're still doing our work.
1600 logging.info("Inspecting job queue")
1604 all_job_ids = self._GetJobIDsUnlocked()
1605 jobs_count = len(all_job_ids)
1606 lastinfo = time.time()
1607 for idx, job_id in enumerate(all_job_ids):
1608 # Give an update every 1000 jobs or 10 seconds
1609 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1610 idx == (jobs_count - 1)):
1611 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1612 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1613 lastinfo = time.time()
1615 job = self._LoadJobUnlocked(job_id)
1617 # a failure in loading the job can cause 'None' to be returned
1621 status = job.CalcStatus()
1623 if status == constants.JOB_STATUS_QUEUED:
1624 restartjobs.append(job)
1626 elif status in (constants.JOB_STATUS_RUNNING,
1627 constants.JOB_STATUS_WAITING,
1628 constants.JOB_STATUS_CANCELING):
1629 logging.warning("Unfinished job %s found: %s", job.id, job)
1631 if status == constants.JOB_STATUS_WAITING:
1633 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1634 restartjobs.append(job)
1636 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1637 "Unclean master daemon shutdown")
1640 self.UpdateJobUnlocked(job)
1643 logging.info("Restarting %s jobs", len(restartjobs))
1644 self._EnqueueJobsUnlocked(restartjobs)
1646 logging.info("Job queue inspection finished")
1648 def _GetRpc(self, address_list):
1649 """Gets RPC runner with context.
1652 return rpc.JobQueueRunner(self.context, address_list)
1654 @locking.ssynchronized(_LOCK)
1656 def AddNode(self, node):
1657 """Register a new node with the queue.
1659 @type node: L{objects.Node}
1660 @param node: the node object to be added
1663 node_name = node.name
1664 assert node_name != self._my_hostname
1666 # Clean queue directory on added node
1667 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1668 msg = result.fail_msg
1670 logging.warning("Cannot cleanup queue directory on node %s: %s",
1673 if not node.master_candidate:
1674 # remove if existing, ignoring errors
1675 self._nodes.pop(node_name, None)
1676 # and skip the replication of the job ids
1679 # Upload the whole queue excluding archived jobs
1680 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1682 # Upload current serial file
1683 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1685 # Static address list
1686 addrs = [node.primary_ip]
1688 for file_name in files:
1690 content = utils.ReadFile(file_name)
1692 result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1694 msg = result[node_name].fail_msg
1696 logging.error("Failed to upload file %s to node %s: %s",
1697 file_name, node_name, msg)
1699 self._nodes[node_name] = node.primary_ip
1701 @locking.ssynchronized(_LOCK)
1703 def RemoveNode(self, node_name):
1704 """Callback called when removing nodes from the cluster.
1706 @type node_name: str
1707 @param node_name: the name of the node to remove
1710 self._nodes.pop(node_name, None)
1713 def _CheckRpcResult(result, nodes, failmsg):
1714 """Verifies the status of an RPC call.
1716 Since we aim to keep consistency should this node (the current
1717 master) fail, we will log errors if our rpc fail, and especially
1718 log the case when more than half of the nodes fails.
1720 @param result: the data as returned from the rpc call
1722 @param nodes: the list of nodes we made the call to
1724 @param failmsg: the identifier to be used for logging
1731 msg = result[node].fail_msg
1734 logging.error("RPC call %s (%s) failed on node %s: %s",
1735 result[node].call, failmsg, node, msg)
1737 success.append(node)
1739 # +1 for the master node
1740 if (len(success) + 1) < len(failed):
1741 # TODO: Handle failing nodes
1742 logging.error("More than half of the nodes failed")
1744 def _GetNodeIp(self):
1745 """Helper for returning the node name/ip list.
1747 @rtype: (list, list)
1748 @return: a tuple of two lists, the first one with the node
1749 names and the second one with the node addresses
1752 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1753 name_list = self._nodes.keys()
1754 addr_list = [self._nodes[name] for name in name_list]
1755 return name_list, addr_list
1757 def _UpdateJobQueueFile(self, file_name, data, replicate):
1758 """Writes a file locally and then replicates it to all nodes.
1760 This function will replace the contents of a file on the local
1761 node and then replicate it to all the other nodes we have.
1763 @type file_name: str
1764 @param file_name: the path of the file to be replicated
1766 @param data: the new contents of the file
1767 @type replicate: boolean
1768 @param replicate: whether to spread the changes to the remote nodes
1771 getents = runtime.GetEnts()
1772 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1773 gid=getents.masterd_gid)
1776 names, addrs = self._GetNodeIp()
1777 result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1778 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1780 def _RenameFilesUnlocked(self, rename):
1781 """Renames a file locally and then replicate the change.
1783 This function will rename a file in the local queue directory
1784 and then replicate this rename to all the other nodes we have.
1786 @type rename: list of (old, new)
1787 @param rename: List containing tuples mapping old to new names
1790 # Rename them locally
1791 for old, new in rename:
1792 utils.RenameFile(old, new, mkdir=True)
1794 # ... and on all nodes
1795 names, addrs = self._GetNodeIp()
1796 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1797 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1799 def _NewSerialsUnlocked(self, count):
1800 """Generates a new job identifier.
1802 Job identifiers are unique during the lifetime of a cluster.
1804 @type count: integer
1805 @param count: how many serials to return
1807 @return: a list of job identifiers.
1810 assert ht.TPositiveInt(count)
1813 serial = self._last_serial + count
1816 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1817 "%s\n" % serial, True)
1819 result = [jstore.FormatJobID(v)
1820 for v in range(self._last_serial + 1, serial + 1)]
1822 # Keep it only if we were able to write the file
1823 self._last_serial = serial
1825 assert len(result) == count
1830 def _GetJobPath(job_id):
1831 """Returns the job file for a given job id.
1834 @param job_id: the job identifier
1836 @return: the path to the job file
1839 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1842 def _GetArchivedJobPath(job_id):
1843 """Returns the archived job file for a give job id.
1846 @param job_id: the job identifier
1848 @return: the path to the archived job file
1851 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1852 jstore.GetArchiveDirectory(job_id),
1856 def _GetJobIDsUnlocked(sort=True):
1857 """Return all known job IDs.
1859 The method only looks at disk because it's a requirement that all
1860 jobs are present on disk (so in the _memcache we don't have any
1864 @param sort: perform sorting on the returned job ids
1866 @return: the list of job IDs
1870 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1871 m = constants.JOB_FILE_RE.match(filename)
1873 jlist.append(int(m.group(1)))
1878 def _LoadJobUnlocked(self, job_id):
1879 """Loads a job from the disk or memory.
1881 Given a job id, this will return the cached job object if
1882 existing, or try to load the job from the disk. If loading from
1883 disk, it will also add the job to the cache.
1886 @param job_id: the job id
1887 @rtype: L{_QueuedJob} or None
1888 @return: either None or the job object
1891 job = self._memcache.get(job_id, None)
1893 logging.debug("Found job %s in memcache", job_id)
1894 assert job.writable, "Found read-only job in memcache"
1898 job = self._LoadJobFromDisk(job_id, False)
1901 except errors.JobFileCorrupted:
1902 old_path = self._GetJobPath(job_id)
1903 new_path = self._GetArchivedJobPath(job_id)
1904 if old_path == new_path:
1905 # job already archived (future case)
1906 logging.exception("Can't parse job %s", job_id)
1909 logging.exception("Can't parse job %s, will archive.", job_id)
1910 self._RenameFilesUnlocked([(old_path, new_path)])
1913 assert job.writable, "Job just loaded is not writable"
1915 self._memcache[job_id] = job
1916 logging.debug("Added job %s to the cache", job_id)
1919 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1920 """Load the given job file from disk.
1922 Given a job file, read, load and restore it in a _QueuedJob format.
1925 @param job_id: job identifier
1926 @type try_archived: bool
1927 @param try_archived: Whether to try loading an archived job
1928 @rtype: L{_QueuedJob} or None
1929 @return: either None or the job object
1932 path_functions = [(self._GetJobPath, True)]
1935 path_functions.append((self._GetArchivedJobPath, False))
1938 writable_default = None
1940 for (fn, writable_default) in path_functions:
1941 filepath = fn(job_id)
1942 logging.debug("Loading job from %s", filepath)
1944 raw_data = utils.ReadFile(filepath)
1945 except EnvironmentError, err:
1946 if err.errno != errno.ENOENT:
1954 if writable is None:
1955 writable = writable_default
1958 data = serializer.LoadJson(raw_data)
1959 job = _QueuedJob.Restore(self, data, writable)
1960 except Exception, err: # pylint: disable=W0703
1961 raise errors.JobFileCorrupted(err)
1965 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1966 """Load the given job file from disk.
1968 Given a job file, read, load and restore it in a _QueuedJob format.
1969 In case of error reading the job, it gets returned as None, and the
1970 exception is logged.
1973 @param job_id: job identifier
1974 @type try_archived: bool
1975 @param try_archived: Whether to try loading an archived job
1976 @rtype: L{_QueuedJob} or None
1977 @return: either None or the job object
1981 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1982 except (errors.JobFileCorrupted, EnvironmentError):
1983 logging.exception("Can't load/parse job %s", job_id)
1986 def _UpdateQueueSizeUnlocked(self):
1987 """Update the queue size.
1990 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1992 @locking.ssynchronized(_LOCK)
1994 def SetDrainFlag(self, drain_flag):
1995 """Sets the drain flag for the queue.
1997 @type drain_flag: boolean
1998 @param drain_flag: Whether to set or unset the drain flag
2001 jstore.SetDrainFlag(drain_flag)
2003 self._drained = drain_flag
2008 def _SubmitJobUnlocked(self, job_id, ops):
2009 """Create and store a new job.
2011 This enters the job into our job queue and also puts it on the new
2012 queue, in order for it to be picked up by the queue processors.
2014 @type job_id: job ID
2015 @param job_id: the job ID for the new job
2017 @param ops: The list of OpCodes that will become the new job.
2018 @rtype: L{_QueuedJob}
2019 @return: the job object to be queued
2020 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2021 @raise errors.GenericError: If an opcode is not valid
2024 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2025 raise errors.JobQueueFull()
2027 job = _QueuedJob(self, job_id, ops, True)
2030 for idx, op in enumerate(job.ops):
2031 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2032 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2033 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2034 " are %s" % (idx, op.priority, allowed))
2036 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2037 if not opcodes.TNoRelativeJobDependencies(dependencies):
2038 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2040 (idx, opcodes.TNoRelativeJobDependencies,
2044 self.UpdateJobUnlocked(job)
2046 self._queue_size += 1
2048 logging.debug("Adding new job %s to the cache", job_id)
2049 self._memcache[job_id] = job
2053 @locking.ssynchronized(_LOCK)
2055 @_RequireNonDrainedQueue
2056 def SubmitJob(self, ops):
2057 """Create and store a new job.
2059 @see: L{_SubmitJobUnlocked}
2062 (job_id, ) = self._NewSerialsUnlocked(1)
2063 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2066 @locking.ssynchronized(_LOCK)
2068 @_RequireNonDrainedQueue
2069 def SubmitManyJobs(self, jobs):
2070 """Create and store multiple jobs.
2072 @see: L{_SubmitJobUnlocked}
2075 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2077 (results, added_jobs) = \
2078 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2080 self._EnqueueJobsUnlocked(added_jobs)
2085 def _FormatSubmitError(msg, ops):
2086 """Formats errors which occurred while submitting a job.
2089 return ("%s; opcodes %s" %
2090 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2093 def _ResolveJobDependencies(resolve_fn, deps):
2094 """Resolves relative job IDs in dependencies.
2096 @type resolve_fn: callable
2097 @param resolve_fn: Function to resolve a relative job ID
2099 @param deps: Dependencies
2101 @return: Resolved dependencies
2106 for (dep_job_id, dep_status) in deps:
2107 if ht.TRelativeJobId(dep_job_id):
2108 assert ht.TInt(dep_job_id) and dep_job_id < 0
2110 job_id = resolve_fn(dep_job_id)
2113 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2117 result.append((job_id, dep_status))
2119 return (True, result)
2121 def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2122 """Create and store multiple jobs.
2124 @see: L{_SubmitJobUnlocked}
2130 def resolve_fn(job_idx, reljobid):
2132 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2134 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2136 if getattr(op, opcodes.DEPEND_ATTR, None):
2138 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2141 # Abort resolving dependencies
2142 assert ht.TNonEmptyString(data), "No error message"
2144 # Use resolved dependencies
2148 job = self._SubmitJobUnlocked(job_id, ops)
2149 except errors.GenericError, err:
2151 data = self._FormatSubmitError(str(err), ops)
2155 added_jobs.append(job)
2157 results.append((status, data))
2159 return (results, added_jobs)
2161 @locking.ssynchronized(_LOCK)
2162 def _EnqueueJobs(self, jobs):
2163 """Helper function to add jobs to worker pool's queue.
2166 @param jobs: List of all jobs
2169 return self._EnqueueJobsUnlocked(jobs)
2171 def _EnqueueJobsUnlocked(self, jobs):
2172 """Helper function to add jobs to worker pool's queue.
2175 @param jobs: List of all jobs
2178 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2179 self._wpool.AddManyTasks([(job, ) for job in jobs],
2180 priority=[job.CalcPriority() for job in jobs])
2182 def _GetJobStatusForDependencies(self, job_id):
2183 """Gets the status of a job for dependencies.
2186 @param job_id: Job ID
2187 @raise errors.JobLost: If job can't be found
2190 # Not using in-memory cache as doing so would require an exclusive lock
2192 # Try to load from disk
2193 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2195 assert not job.writable, "Got writable job" # pylint: disable=E1101
2198 return job.CalcStatus()
2200 raise errors.JobLost("Job %s not found" % job_id)
2203 def UpdateJobUnlocked(self, job, replicate=True):
2204 """Update a job's on disk storage.
2206 After a job has been modified, this function needs to be called in
2207 order to write the changes to disk and replicate them to the other
2210 @type job: L{_QueuedJob}
2211 @param job: the changed job
2212 @type replicate: boolean
2213 @param replicate: whether to replicate the change to remote nodes
2217 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2218 assert (finalized ^ (job.end_timestamp is None))
2219 assert job.writable, "Can't update read-only job"
2221 filename = self._GetJobPath(job.id)
2222 data = serializer.DumpJson(job.Serialize())
2223 logging.debug("Writing job %s to %s", job.id, filename)
2224 self._UpdateJobQueueFile(filename, data, replicate)
2226 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2228 """Waits for changes in a job.
2231 @param job_id: Job identifier
2232 @type fields: list of strings
2233 @param fields: Which fields to check for changes
2234 @type prev_job_info: list or None
2235 @param prev_job_info: Last job information returned
2236 @type prev_log_serial: int
2237 @param prev_log_serial: Last job message serial number
2238 @type timeout: float
2239 @param timeout: maximum time to wait in seconds
2240 @rtype: tuple (job info, log entries)
2241 @return: a tuple of the job information as required via
2242 the fields parameter, and the log entries as a list
2244 if the job has not changed and the timeout has expired,
2245 we instead return a special value,
2246 L{constants.JOB_NOTCHANGED}, which should be interpreted
2247 as such by the clients
2250 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2253 helper = _WaitForJobChangesHelper()
2255 return helper(self._GetJobPath(job_id), load_fn,
2256 fields, prev_job_info, prev_log_serial, timeout)
2258 @locking.ssynchronized(_LOCK)
2260 def CancelJob(self, job_id):
2263 This will only succeed if the job has not started yet.
2266 @param job_id: job ID of job to be cancelled.
2269 logging.info("Cancelling job %s", job_id)
2271 job = self._LoadJobUnlocked(job_id)
2273 logging.debug("Job %s not found", job_id)
2274 return (False, "Job %s not found" % job_id)
2276 assert job.writable, "Can't cancel read-only job"
2278 (success, msg) = job.Cancel()
2281 # If the job was finalized (e.g. cancelled), this is the final write
2282 # allowed. The job can be archived anytime.
2283 self.UpdateJobUnlocked(job)
2285 return (success, msg)
2288 def _ArchiveJobsUnlocked(self, jobs):
2291 @type jobs: list of L{_QueuedJob}
2292 @param jobs: Job objects
2294 @return: Number of archived jobs
2300 assert job.writable, "Can't archive read-only job"
2302 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2303 logging.debug("Job %s is not yet done", job.id)
2306 archive_jobs.append(job)
2308 old = self._GetJobPath(job.id)
2309 new = self._GetArchivedJobPath(job.id)
2310 rename_files.append((old, new))
2312 # TODO: What if 1..n files fail to rename?
2313 self._RenameFilesUnlocked(rename_files)
2315 logging.debug("Successfully archived job(s) %s",
2316 utils.CommaJoin(job.id for job in archive_jobs))
2318 # Since we haven't quite checked, above, if we succeeded or failed renaming
2319 # the files, we update the cached queue size from the filesystem. When we
2320 # get around to fix the TODO: above, we can use the number of actually
2321 # archived jobs to fix this.
2322 self._UpdateQueueSizeUnlocked()
2323 return len(archive_jobs)
2325 @locking.ssynchronized(_LOCK)
2327 def ArchiveJob(self, job_id):
2330 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2333 @param job_id: Job ID of job to be archived.
2335 @return: Whether job was archived
2338 logging.info("Archiving job %s", job_id)
2340 job = self._LoadJobUnlocked(job_id)
2342 logging.debug("Job %s not found", job_id)
2345 return self._ArchiveJobsUnlocked([job]) == 1
2347 @locking.ssynchronized(_LOCK)
2349 def AutoArchiveJobs(self, age, timeout):
2350 """Archives all jobs based on age.
2352 The method will archive all jobs which are older than the age
2353 parameter. For jobs that don't have an end timestamp, the start
2354 timestamp will be considered. The special '-1' age will cause
2355 archival of all jobs (that are not running or queued).
2358 @param age: the minimum age in seconds
2361 logging.info("Archiving jobs with age more than %s seconds", age)
2364 end_time = now + timeout
2368 all_job_ids = self._GetJobIDsUnlocked()
2370 for idx, job_id in enumerate(all_job_ids):
2371 last_touched = idx + 1
2373 # Not optimal because jobs could be pending
2374 # TODO: Measure average duration for job archival and take number of
2375 # pending jobs into account.
2376 if time.time() > end_time:
2379 # Returns None if the job failed to load
2380 job = self._LoadJobUnlocked(job_id)
2382 if job.end_timestamp is None:
2383 if job.start_timestamp is None:
2384 job_age = job.received_timestamp
2386 job_age = job.start_timestamp
2388 job_age = job.end_timestamp
2390 if age == -1 or now - job_age[0] > age:
2393 # Archive 10 jobs at a time
2394 if len(pending) >= 10:
2395 archived_count += self._ArchiveJobsUnlocked(pending)
2399 archived_count += self._ArchiveJobsUnlocked(pending)
2401 return (archived_count, len(all_job_ids) - last_touched)
2403 def _Query(self, fields, qfilter):
2404 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2407 job_ids = qobj.RequestedNames()
2409 list_all = (job_ids is None)
2412 # Since files are added to/removed from the queue atomically, there's no
2413 # risk of getting the job ids in an inconsistent state.
2414 job_ids = self._GetJobIDsUnlocked()
2418 for job_id in job_ids:
2419 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2420 if job is not None or not list_all:
2421 jobs.append((job_id, job))
2423 return (qobj, jobs, list_all)
2425 def QueryJobs(self, fields, qfilter):
2426 """Returns a list of jobs in queue.
2428 @type fields: sequence
2429 @param fields: List of wanted fields
2430 @type qfilter: None or query2 filter (list)
2431 @param qfilter: Query filter
2434 (qobj, ctx, _) = self._Query(fields, qfilter)
2436 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2438 def OldStyleQueryJobs(self, job_ids, fields):
2439 """Returns a list of jobs in queue.
2442 @param job_ids: sequence of job identifiers or None for all
2444 @param fields: names of fields to return
2446 @return: list one element per job, each element being list with
2447 the requested fields
2451 job_ids = [int(jid) for jid in job_ids]
2452 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2454 (qobj, ctx, _) = self._Query(fields, qfilter)
2456 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2458 @locking.ssynchronized(_LOCK)
2459 def PrepareShutdown(self):
2460 """Prepare to stop the job queue.
2462 Disables execution of jobs in the workerpool and returns whether there are
2463 any jobs currently running. If the latter is the case, the job queue is not
2464 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2465 be called without interfering with any job. Queued and unfinished jobs will
2466 be resumed next time.
2468 Once this function has been called no new job submissions will be accepted
2469 (see L{_RequireNonDrainedQueue}).
2472 @return: Whether there are any running jobs
2475 if self._accepting_jobs:
2476 self._accepting_jobs = False
2478 # Tell worker pool to stop processing pending tasks
2479 self._wpool.SetActive(False)
2481 return self._wpool.HasRunningTasks()
2483 @locking.ssynchronized(_LOCK)
2486 """Stops the job queue.
2488 This shutdowns all the worker threads an closes the queue.
2491 self._wpool.TerminateWorkers()
2493 self._queue_filelock.Close()
2494 self._queue_filelock = None