4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
41 # pylint: disable=E0611
42 from pyinotify import pyinotify
46 from ganeti import asyncnotifier
47 from ganeti import constants
48 from ganeti import serializer
49 from ganeti import workerpool
50 from ganeti import locking
51 from ganeti import opcodes
52 from ganeti import errors
53 from ganeti import mcpu
54 from ganeti import utils
55 from ganeti import jstore
56 from ganeti import rpc
57 from ganeti import runtime
58 from ganeti import netutils
59 from ganeti import compat
61 from ganeti import query
62 from ganeti import qlang
63 from ganeti import pathutils
64 from ganeti import vcluster
69 # member lock names to be passed to @ssynchronized decorator
73 #: Retrieves "id" attribute
74 _GetIdAttr = operator.attrgetter("id")
77 class CancelJob(Exception):
78 """Special exception to cancel a job.
83 class QueueShutdown(Exception):
84 """Special exception to abort a job when the job queue is shutting down.
90 """Returns the current timestamp.
93 @return: the current time in the (seconds, microseconds) format
96 return utils.SplitTime(time.time())
99 def _CallJqUpdate(runner, names, file_name, content):
100 """Updates job queue file after virtualizing filename.
103 virt_file_name = vcluster.MakeVirtualPath(file_name)
104 return runner.call_jobqueue_update(names, virt_file_name, content)
107 class _SimpleJobQuery:
108 """Wrapper for job queries.
110 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
113 def __init__(self, fields):
114 """Initializes this class.
117 self._query = query.Query(query.JOB_FIELDS, fields)
119 def __call__(self, job):
120 """Executes a job query using cached field list.
123 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
126 class _QueuedOpCode(object):
127 """Encapsulates an opcode object.
129 @ivar log: holds the execution log and consists of tuples
130 of the form C{(log_serial, timestamp, level, message)}
131 @ivar input: the OpCode we encapsulate
132 @ivar status: the current status
133 @ivar result: the result of the LU execution
134 @ivar start_timestamp: timestamp for the start of the execution
135 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136 @ivar stop_timestamp: timestamp for the end of the execution
139 __slots__ = ["input", "status", "result", "log", "priority",
140 "start_timestamp", "exec_timestamp", "end_timestamp",
143 def __init__(self, op):
144 """Initializes instances of this class.
146 @type op: L{opcodes.OpCode}
147 @param op: the opcode we encapsulate
151 self.status = constants.OP_STATUS_QUEUED
154 self.start_timestamp = None
155 self.exec_timestamp = None
156 self.end_timestamp = None
158 # Get initial priority (it might change during the lifetime of this opcode)
159 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
162 def Restore(cls, state):
163 """Restore the _QueuedOpCode from the serialized form.
166 @param state: the serialized state
167 @rtype: _QueuedOpCode
168 @return: a new _QueuedOpCode instance
171 obj = _QueuedOpCode.__new__(cls)
172 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173 obj.status = state["status"]
174 obj.result = state["result"]
175 obj.log = state["log"]
176 obj.start_timestamp = state.get("start_timestamp", None)
177 obj.exec_timestamp = state.get("exec_timestamp", None)
178 obj.end_timestamp = state.get("end_timestamp", None)
179 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
183 """Serializes this _QueuedOpCode.
186 @return: the dictionary holding the serialized state
190 "input": self.input.__getstate__(),
191 "status": self.status,
192 "result": self.result,
194 "start_timestamp": self.start_timestamp,
195 "exec_timestamp": self.exec_timestamp,
196 "end_timestamp": self.end_timestamp,
197 "priority": self.priority,
201 class _QueuedJob(object):
202 """In-memory job representation.
204 This is what we use to track the user-submitted jobs. Locking must
205 be taken care of by users of this class.
207 @type queue: L{JobQueue}
208 @ivar queue: the parent queue
211 @ivar ops: the list of _QueuedOpCode that constitute the job
212 @type log_serial: int
213 @ivar log_serial: holds the index for the next log entry
214 @ivar received_timestamp: the timestamp for when the job was received
215 @ivar start_timestmap: the timestamp for start of execution
216 @ivar end_timestamp: the timestamp for end of execution
217 @ivar writable: Whether the job is allowed to be modified
220 # pylint: disable=W0212
221 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222 "received_timestamp", "start_timestamp", "end_timestamp",
223 "__weakref__", "processor_lock", "writable", "archived"]
225 def __init__(self, queue, job_id, ops, writable):
226 """Constructor for the _QueuedJob.
228 @type queue: L{JobQueue}
229 @param queue: our parent queue
231 @param job_id: our job id
233 @param ops: the list of opcodes we hold, which will be encapsulated
236 @param writable: Whether job can be modified
240 raise errors.GenericError("A job needs at least one opcode")
243 self.id = int(job_id)
244 self.ops = [_QueuedOpCode(op) for op in ops]
246 self.received_timestamp = TimeStampNow()
247 self.start_timestamp = None
248 self.end_timestamp = None
249 self.archived = False
251 self._InitInMemory(self, writable)
253 assert not self.archived, "New jobs can not be marked as archived"
256 def _InitInMemory(obj, writable):
257 """Initializes in-memory variables.
260 obj.writable = writable
264 # Read-only jobs are not processed and therefore don't need a lock
266 obj.processor_lock = threading.Lock()
268 obj.processor_lock = None
271 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
273 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
275 return "<%s at %#x>" % (" ".join(status), id(self))
278 def Restore(cls, queue, state, writable, archived):
279 """Restore a _QueuedJob from serialized state:
281 @type queue: L{JobQueue}
282 @param queue: to which queue the restored job belongs
284 @param state: the serialized state
286 @param writable: Whether job can be modified
288 @param archived: Whether job was already archived
290 @return: the restored _JobQueue instance
293 obj = _QueuedJob.__new__(cls)
295 obj.id = int(state["id"])
296 obj.received_timestamp = state.get("received_timestamp", None)
297 obj.start_timestamp = state.get("start_timestamp", None)
298 obj.end_timestamp = state.get("end_timestamp", None)
299 obj.archived = archived
303 for op_state in state["ops"]:
304 op = _QueuedOpCode.Restore(op_state)
305 for log_entry in op.log:
306 obj.log_serial = max(obj.log_serial, log_entry[0])
309 cls._InitInMemory(obj, writable)
314 """Serialize the _JobQueue instance.
317 @return: the serialized state
322 "ops": [op.Serialize() for op in self.ops],
323 "start_timestamp": self.start_timestamp,
324 "end_timestamp": self.end_timestamp,
325 "received_timestamp": self.received_timestamp,
328 def CalcStatus(self):
329 """Compute the status of this job.
331 This function iterates over all the _QueuedOpCodes in the job and
332 based on their status, computes the job status.
335 - if we find a cancelled, or finished with error, the job
336 status will be the same
337 - otherwise, the last opcode with the status one of:
342 will determine the job status
344 - otherwise, it means either all opcodes are queued, or success,
345 and the job status will be the same
347 @return: the job status
350 status = constants.JOB_STATUS_QUEUED
354 if op.status == constants.OP_STATUS_SUCCESS:
359 if op.status == constants.OP_STATUS_QUEUED:
361 elif op.status == constants.OP_STATUS_WAITING:
362 status = constants.JOB_STATUS_WAITING
363 elif op.status == constants.OP_STATUS_RUNNING:
364 status = constants.JOB_STATUS_RUNNING
365 elif op.status == constants.OP_STATUS_CANCELING:
366 status = constants.JOB_STATUS_CANCELING
368 elif op.status == constants.OP_STATUS_ERROR:
369 status = constants.JOB_STATUS_ERROR
370 # The whole job fails if one opcode failed
372 elif op.status == constants.OP_STATUS_CANCELED:
373 status = constants.OP_STATUS_CANCELED
377 status = constants.JOB_STATUS_SUCCESS
381 def CalcPriority(self):
382 """Gets the current priority for this job.
384 Only unfinished opcodes are considered. When all are done, the default
390 priorities = [op.priority for op in self.ops
391 if op.status not in constants.OPS_FINALIZED]
394 # All opcodes are done, assume default priority
395 return constants.OP_PRIO_DEFAULT
397 return min(priorities)
399 def GetLogEntries(self, newer_than):
400 """Selectively returns the log entries.
402 @type newer_than: None or int
403 @param newer_than: if this is None, return all log entries,
404 otherwise return only the log entries with serial higher
407 @return: the list of the log entries selected
410 if newer_than is None:
417 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
421 def GetInfo(self, fields):
422 """Returns information about a job.
425 @param fields: names of fields to return
427 @return: list with one element for each field
428 @raise errors.OpExecError: when an invalid field
432 return _SimpleJobQuery(fields)(self)
434 def MarkUnfinishedOps(self, status, result):
435 """Mark unfinished opcodes with a given status and result.
437 This is an utility function for marking all running or waiting to
438 be run opcodes with a given status. Opcodes which are already
439 finalised are not changed.
441 @param status: a given opcode status
442 @param result: the opcode result
447 if op.status in constants.OPS_FINALIZED:
448 assert not_marked, "Finalized opcodes found after non-finalized ones"
455 """Marks the job as finalized.
458 self.end_timestamp = TimeStampNow()
461 """Marks job as canceled/-ing if possible.
463 @rtype: tuple; (bool, string)
464 @return: Boolean describing whether job was successfully canceled or marked
465 as canceling and a text message
468 status = self.CalcStatus()
470 if status == constants.JOB_STATUS_QUEUED:
471 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
472 "Job canceled by request")
474 return (True, "Job %s canceled" % self.id)
476 elif status == constants.JOB_STATUS_WAITING:
477 # The worker will notice the new status and cancel the job
478 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
479 return (True, "Job %s will be canceled" % self.id)
482 logging.debug("Job %s is no longer waiting in the queue", self.id)
483 return (False, "Job %s is no longer waiting in the queue" % self.id)
485 def ChangePriority(self, priority):
486 """Changes the job priority.
489 @param priority: New priority
490 @rtype: tuple; (bool, string)
491 @return: Boolean describing whether job's priority was successfully changed
495 status = self.CalcStatus()
497 if status in constants.JOBS_FINALIZED:
498 return (False, "Job %s is finished" % self.id)
499 elif status == constants.JOB_STATUS_CANCELING:
500 return (False, "Job %s is cancelling" % self.id)
502 assert status in (constants.JOB_STATUS_QUEUED,
503 constants.JOB_STATUS_WAITING,
504 constants.JOB_STATUS_RUNNING)
508 if (op.status == constants.OP_STATUS_RUNNING or
509 op.status in constants.OPS_FINALIZED):
510 assert not changed, \
511 ("Found opcode for which priority should not be changed after"
512 " priority has been changed for previous opcodes")
515 assert op.status in (constants.OP_STATUS_QUEUED,
516 constants.OP_STATUS_WAITING)
520 # Set new priority (doesn't modify opcode input)
521 op.priority = priority
524 return (True, ("Priorities of pending opcodes for job %s have been"
525 " changed to %s" % (self.id, priority)))
527 return (False, "Job %s had no pending opcodes" % self.id)
530 class _OpExecCallbacks(mcpu.OpExecCbBase):
531 def __init__(self, queue, job, op):
532 """Initializes this class.
534 @type queue: L{JobQueue}
535 @param queue: Job queue
536 @type job: L{_QueuedJob}
537 @param job: Job object
538 @type op: L{_QueuedOpCode}
542 assert queue, "Queue is missing"
543 assert job, "Job is missing"
544 assert op, "Opcode is missing"
550 def _CheckCancel(self):
551 """Raises an exception to cancel the job if asked to.
554 # Cancel here if we were asked to
555 if self._op.status == constants.OP_STATUS_CANCELING:
556 logging.debug("Canceling opcode")
559 # See if queue is shutting down
560 if not self._queue.AcceptingJobsUnlocked():
561 logging.debug("Queue is shutting down")
562 raise QueueShutdown()
564 @locking.ssynchronized(_QUEUE, shared=1)
565 def NotifyStart(self):
566 """Mark the opcode as running, not lock-waiting.
568 This is called from the mcpu code as a notifier function, when the LU is
569 finally about to start the Exec() method. Of course, to have end-user
570 visible results, the opcode must be initially (before calling into
571 Processor.ExecOpCode) set to OP_STATUS_WAITING.
574 assert self._op in self._job.ops
575 assert self._op.status in (constants.OP_STATUS_WAITING,
576 constants.OP_STATUS_CANCELING)
578 # Cancel here if we were asked to
581 logging.debug("Opcode is now running")
583 self._op.status = constants.OP_STATUS_RUNNING
584 self._op.exec_timestamp = TimeStampNow()
586 # And finally replicate the job status
587 self._queue.UpdateJobUnlocked(self._job)
589 @locking.ssynchronized(_QUEUE, shared=1)
590 def _AppendFeedback(self, timestamp, log_type, log_msg):
591 """Internal feedback append function, with locks
594 self._job.log_serial += 1
595 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
596 self._queue.UpdateJobUnlocked(self._job, replicate=False)
598 def Feedback(self, *args):
599 """Append a log entry.
605 log_type = constants.ELOG_MESSAGE
608 (log_type, log_msg) = args
610 # The time is split to make serialization easier and not lose
612 timestamp = utils.SplitTime(time.time())
613 self._AppendFeedback(timestamp, log_type, log_msg)
615 def CurrentPriority(self):
616 """Returns current priority for opcode.
619 assert self._op.status in (constants.OP_STATUS_WAITING,
620 constants.OP_STATUS_CANCELING)
622 # Cancel here if we were asked to
625 return self._op.priority
627 def SubmitManyJobs(self, jobs):
628 """Submits jobs for processing.
630 See L{JobQueue.SubmitManyJobs}.
633 # Locking is done in job queue
634 return self._queue.SubmitManyJobs(jobs)
637 class _JobChangesChecker(object):
638 def __init__(self, fields, prev_job_info, prev_log_serial):
639 """Initializes this class.
641 @type fields: list of strings
642 @param fields: Fields requested by LUXI client
643 @type prev_job_info: string
644 @param prev_job_info: previous job info, as passed by the LUXI client
645 @type prev_log_serial: string
646 @param prev_log_serial: previous job serial, as passed by the LUXI client
649 self._squery = _SimpleJobQuery(fields)
650 self._prev_job_info = prev_job_info
651 self._prev_log_serial = prev_log_serial
653 def __call__(self, job):
654 """Checks whether job has changed.
656 @type job: L{_QueuedJob}
657 @param job: Job object
660 assert not job.writable, "Expected read-only job"
662 status = job.CalcStatus()
663 job_info = self._squery(job)
664 log_entries = job.GetLogEntries(self._prev_log_serial)
666 # Serializing and deserializing data can cause type changes (e.g. from
667 # tuple to list) or precision loss. We're doing it here so that we get
668 # the same modifications as the data received from the client. Without
669 # this, the comparison afterwards might fail without the data being
670 # significantly different.
671 # TODO: we just deserialized from disk, investigate how to make sure that
672 # the job info and log entries are compatible to avoid this further step.
673 # TODO: Doing something like in testutils.py:UnifyValueType might be more
674 # efficient, though floats will be tricky
675 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
676 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
678 # Don't even try to wait if the job is no longer running, there will be
680 if (status not in (constants.JOB_STATUS_QUEUED,
681 constants.JOB_STATUS_RUNNING,
682 constants.JOB_STATUS_WAITING) or
683 job_info != self._prev_job_info or
684 (log_entries and self._prev_log_serial != log_entries[0][0])):
685 logging.debug("Job %s changed", job.id)
686 return (job_info, log_entries)
691 class _JobFileChangesWaiter(object):
692 def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
693 """Initializes this class.
695 @type filename: string
696 @param filename: Path to job file
697 @raises errors.InotifyError: if the notifier cannot be setup
700 self._wm = _inotify_wm_cls()
701 self._inotify_handler = \
702 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
704 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
706 self._inotify_handler.enable()
708 # pyinotify doesn't close file descriptors automatically
709 self._notifier.stop()
712 def _OnInotify(self, notifier_enabled):
713 """Callback for inotify.
716 if not notifier_enabled:
717 self._inotify_handler.enable()
719 def Wait(self, timeout):
720 """Waits for the job file to change.
723 @param timeout: Timeout in seconds
724 @return: Whether there have been events
728 have_events = self._notifier.check_events(timeout * 1000)
730 self._notifier.read_events()
731 self._notifier.process_events()
735 """Closes underlying notifier and its file descriptor.
738 self._notifier.stop()
741 class _JobChangesWaiter(object):
742 def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
743 """Initializes this class.
745 @type filename: string
746 @param filename: Path to job file
749 self._filewaiter = None
750 self._filename = filename
751 self._waiter_cls = _waiter_cls
753 def Wait(self, timeout):
754 """Waits for a job to change.
757 @param timeout: Timeout in seconds
758 @return: Whether there have been events
762 return self._filewaiter.Wait(timeout)
764 # Lazy setup: Avoid inotify setup cost when job file has already changed.
765 # If this point is reached, return immediately and let caller check the job
766 # file again in case there were changes since the last check. This avoids a
768 self._filewaiter = self._waiter_cls(self._filename)
773 """Closes underlying waiter.
777 self._filewaiter.Close()
780 class _WaitForJobChangesHelper(object):
781 """Helper class using inotify to wait for changes in a job file.
783 This class takes a previous job status and serial, and alerts the client when
784 the current job status has changed.
788 def _CheckForChanges(counter, job_load_fn, check_fn):
789 if counter.next() > 0:
790 # If this isn't the first check the job is given some more time to change
791 # again. This gives better performance for jobs generating many
797 raise errors.JobLost()
799 result = check_fn(job)
801 raise utils.RetryAgain()
805 def __call__(self, filename, job_load_fn,
806 fields, prev_job_info, prev_log_serial, timeout,
807 _waiter_cls=_JobChangesWaiter):
808 """Waits for changes on a job.
810 @type filename: string
811 @param filename: File on which to wait for changes
812 @type job_load_fn: callable
813 @param job_load_fn: Function to load job
814 @type fields: list of strings
815 @param fields: Which fields to check for changes
816 @type prev_job_info: list or None
817 @param prev_job_info: Last job information returned
818 @type prev_log_serial: int
819 @param prev_log_serial: Last job message serial number
821 @param timeout: maximum time to wait in seconds
824 counter = itertools.count()
826 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
827 waiter = _waiter_cls(filename)
829 return utils.Retry(compat.partial(self._CheckForChanges,
830 counter, job_load_fn, check_fn),
831 utils.RETRY_REMAINING_TIME, timeout,
835 except errors.JobLost:
837 except utils.RetryTimeout:
838 return constants.JOB_NOTCHANGED
841 def _EncodeOpError(err):
842 """Encodes an error which occurred while processing an opcode.
845 if isinstance(err, errors.GenericError):
848 to_encode = errors.OpExecError(str(err))
850 return errors.EncodeException(to_encode)
853 class _TimeoutStrategyWrapper:
854 def __init__(self, fn):
855 """Initializes this class.
862 """Gets the next timeout if necessary.
865 if self._next is None:
866 self._next = self._fn()
869 """Returns the next timeout.
876 """Returns the current timeout and advances the internal state.
885 class _OpExecContext:
886 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
887 """Initializes this class.
892 self.log_prefix = log_prefix
893 self.summary = op.input.Summary()
895 # Create local copy to modify
896 if getattr(op.input, opcodes.DEPEND_ATTR, None):
897 self.jobdeps = op.input.depends[:]
901 self._timeout_strategy_factory = timeout_strategy_factory
902 self._ResetTimeoutStrategy()
904 def _ResetTimeoutStrategy(self):
905 """Creates a new timeout strategy.
908 self._timeout_strategy = \
909 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
911 def CheckPriorityIncrease(self):
912 """Checks whether priority can and should be increased.
914 Called when locks couldn't be acquired.
919 # Exhausted all retries and next round should not use blocking acquire
921 if (self._timeout_strategy.Peek() is None and
922 op.priority > constants.OP_PRIO_HIGHEST):
923 logging.debug("Increasing priority")
925 self._ResetTimeoutStrategy()
930 def GetNextLockTimeout(self):
931 """Returns the next lock acquire timeout.
934 return self._timeout_strategy.Next()
937 class _JobProcessor(object):
940 FINISHED) = range(1, 4)
942 def __init__(self, queue, opexec_fn, job,
943 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
944 """Initializes this class.
948 self.opexec_fn = opexec_fn
950 self._timeout_strategy_factory = _timeout_strategy_factory
953 def _FindNextOpcode(job, timeout_strategy_factory):
954 """Locates the next opcode to run.
956 @type job: L{_QueuedJob}
957 @param job: Job object
958 @param timeout_strategy_factory: Callable to create new timeout strategy
961 # Create some sort of a cache to speed up locating next opcode for future
963 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
964 # pending and one for processed ops.
965 if job.ops_iter is None:
966 job.ops_iter = enumerate(job.ops)
968 # Find next opcode to run
971 (idx, op) = job.ops_iter.next()
972 except StopIteration:
973 raise errors.ProgrammerError("Called for a finished job")
975 if op.status == constants.OP_STATUS_RUNNING:
976 # Found an opcode already marked as running
977 raise errors.ProgrammerError("Called for job marked as running")
979 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
980 timeout_strategy_factory)
982 if op.status not in constants.OPS_FINALIZED:
985 # This is a job that was partially completed before master daemon
986 # shutdown, so it can be expected that some opcodes are already
987 # completed successfully (if any did error out, then the whole job
988 # should have been aborted and not resubmitted for processing).
989 logging.info("%s: opcode %s already processed, skipping",
990 opctx.log_prefix, opctx.summary)
993 def _MarkWaitlock(job, op):
994 """Marks an opcode as waiting for locks.
996 The job's start timestamp is also set if necessary.
998 @type job: L{_QueuedJob}
999 @param job: Job object
1000 @type op: L{_QueuedOpCode}
1001 @param op: Opcode object
1004 assert op in job.ops
1005 assert op.status in (constants.OP_STATUS_QUEUED,
1006 constants.OP_STATUS_WAITING)
1012 if op.status == constants.OP_STATUS_QUEUED:
1013 op.status = constants.OP_STATUS_WAITING
1016 if op.start_timestamp is None:
1017 op.start_timestamp = TimeStampNow()
1020 if job.start_timestamp is None:
1021 job.start_timestamp = op.start_timestamp
1024 assert op.status == constants.OP_STATUS_WAITING
1029 def _CheckDependencies(queue, job, opctx):
1030 """Checks if an opcode has dependencies and if so, processes them.
1032 @type queue: L{JobQueue}
1033 @param queue: Queue object
1034 @type job: L{_QueuedJob}
1035 @param job: Job object
1036 @type opctx: L{_OpExecContext}
1037 @param opctx: Opcode execution context
1039 @return: Whether opcode will be re-scheduled by dependency tracker
1046 while opctx.jobdeps:
1047 (dep_job_id, dep_status) = opctx.jobdeps[0]
1049 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1051 assert ht.TNonEmptyString(depmsg), "No dependency message"
1053 logging.info("%s: %s", opctx.log_prefix, depmsg)
1055 if depresult == _JobDependencyManager.CONTINUE:
1056 # Remove dependency and continue
1057 opctx.jobdeps.pop(0)
1059 elif depresult == _JobDependencyManager.WAIT:
1060 # Need to wait for notification, dependency tracker will re-add job
1065 elif depresult == _JobDependencyManager.CANCEL:
1066 # Job was cancelled, cancel this job as well
1068 assert op.status == constants.OP_STATUS_CANCELING
1071 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1072 _JobDependencyManager.ERROR):
1073 # Job failed or there was an error, this job must fail
1074 op.status = constants.OP_STATUS_ERROR
1075 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1079 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1084 def _ExecOpCodeUnlocked(self, opctx):
1085 """Processes one opcode and returns the result.
1090 assert op.status == constants.OP_STATUS_WAITING
1092 timeout = opctx.GetNextLockTimeout()
1095 # Make sure not to hold queue lock while calling ExecOpCode
1096 result = self.opexec_fn(op.input,
1097 _OpExecCallbacks(self.queue, self.job, op),
1099 except mcpu.LockAcquireTimeout:
1100 assert timeout is not None, "Received timeout for blocking acquire"
1101 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1103 assert op.status in (constants.OP_STATUS_WAITING,
1104 constants.OP_STATUS_CANCELING)
1106 # Was job cancelled while we were waiting for the lock?
1107 if op.status == constants.OP_STATUS_CANCELING:
1108 return (constants.OP_STATUS_CANCELING, None)
1110 # Queue is shutting down, return to queued
1111 if not self.queue.AcceptingJobsUnlocked():
1112 return (constants.OP_STATUS_QUEUED, None)
1114 # Stay in waitlock while trying to re-acquire lock
1115 return (constants.OP_STATUS_WAITING, None)
1117 logging.exception("%s: Canceling job", opctx.log_prefix)
1118 assert op.status == constants.OP_STATUS_CANCELING
1119 return (constants.OP_STATUS_CANCELING, None)
1121 except QueueShutdown:
1122 logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1124 assert op.status == constants.OP_STATUS_WAITING
1126 # Job hadn't been started yet, so it should return to the queue
1127 return (constants.OP_STATUS_QUEUED, None)
1129 except Exception, err: # pylint: disable=W0703
1130 logging.exception("%s: Caught exception in %s",
1131 opctx.log_prefix, opctx.summary)
1132 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1134 logging.debug("%s: %s successful",
1135 opctx.log_prefix, opctx.summary)
1136 return (constants.OP_STATUS_SUCCESS, result)
1138 def __call__(self, _nextop_fn=None):
1139 """Continues execution of a job.
1141 @param _nextop_fn: Callback function for tests
1142 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1143 be deferred and C{WAITDEP} if the dependency manager
1144 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1150 logging.debug("Processing job %s", job.id)
1152 queue.acquire(shared=1)
1154 opcount = len(job.ops)
1156 assert job.writable, "Expected writable job"
1158 # Don't do anything for finalized jobs
1159 if job.CalcStatus() in constants.JOBS_FINALIZED:
1160 return self.FINISHED
1162 # Is a previous opcode still pending?
1164 opctx = job.cur_opctx
1165 job.cur_opctx = None
1167 if __debug__ and _nextop_fn:
1169 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1174 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1175 constants.OP_STATUS_CANCELING)
1176 for i in job.ops[opctx.index + 1:])
1178 assert op.status in (constants.OP_STATUS_QUEUED,
1179 constants.OP_STATUS_WAITING,
1180 constants.OP_STATUS_CANCELING)
1182 assert (op.priority <= constants.OP_PRIO_LOWEST and
1183 op.priority >= constants.OP_PRIO_HIGHEST)
1187 if op.status != constants.OP_STATUS_CANCELING:
1188 assert op.status in (constants.OP_STATUS_QUEUED,
1189 constants.OP_STATUS_WAITING)
1191 # Prepare to start opcode
1192 if self._MarkWaitlock(job, op):
1194 queue.UpdateJobUnlocked(job)
1196 assert op.status == constants.OP_STATUS_WAITING
1197 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1198 assert job.start_timestamp and op.start_timestamp
1199 assert waitjob is None
1201 # Check if waiting for a job is necessary
1202 waitjob = self._CheckDependencies(queue, job, opctx)
1204 assert op.status in (constants.OP_STATUS_WAITING,
1205 constants.OP_STATUS_CANCELING,
1206 constants.OP_STATUS_ERROR)
1208 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1209 constants.OP_STATUS_ERROR)):
1210 logging.info("%s: opcode %s waiting for locks",
1211 opctx.log_prefix, opctx.summary)
1213 assert not opctx.jobdeps, "Not all dependencies were removed"
1217 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1219 queue.acquire(shared=1)
1221 op.status = op_status
1222 op.result = op_result
1226 if op.status in (constants.OP_STATUS_WAITING,
1227 constants.OP_STATUS_QUEUED):
1228 # waiting: Couldn't get locks in time
1229 # queued: Queue is shutting down
1230 assert not op.end_timestamp
1233 op.end_timestamp = TimeStampNow()
1235 if op.status == constants.OP_STATUS_CANCELING:
1236 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1237 for i in job.ops[opctx.index:])
1239 assert op.status in constants.OPS_FINALIZED
1241 if op.status == constants.OP_STATUS_QUEUED:
1242 # Queue is shutting down
1248 job.cur_opctx = None
1250 # In no case must the status be finalized here
1251 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1253 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1256 if not waitjob and opctx.CheckPriorityIncrease():
1257 # Priority was changed, need to update on-disk file
1258 queue.UpdateJobUnlocked(job)
1260 # Keep around for another round
1261 job.cur_opctx = opctx
1263 assert (op.priority <= constants.OP_PRIO_LOWEST and
1264 op.priority >= constants.OP_PRIO_HIGHEST)
1266 # In no case must the status be finalized here
1267 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1270 # Ensure all opcodes so far have been successful
1271 assert (opctx.index == 0 or
1272 compat.all(i.status == constants.OP_STATUS_SUCCESS
1273 for i in job.ops[:opctx.index]))
1276 job.cur_opctx = None
1278 if op.status == constants.OP_STATUS_SUCCESS:
1281 elif op.status == constants.OP_STATUS_ERROR:
1282 # Ensure failed opcode has an exception as its result
1283 assert errors.GetEncodedError(job.ops[opctx.index].result)
1285 to_encode = errors.OpExecError("Preceding opcode failed")
1286 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1287 _EncodeOpError(to_encode))
1291 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1292 errors.GetEncodedError(i.result)
1293 for i in job.ops[opctx.index:])
1295 elif op.status == constants.OP_STATUS_CANCELING:
1296 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1297 "Job canceled by request")
1301 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1303 if opctx.index == (opcount - 1):
1304 # Finalize on last opcode
1308 # All opcodes have been run, finalize job
1311 # Write to disk. If the job status is final, this is the final write
1312 # allowed. Once the file has been written, it can be archived anytime.
1313 queue.UpdateJobUnlocked(job)
1318 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1319 return self.FINISHED
1321 assert not waitjob or queue.depmgr.JobWaiting(job)
1328 assert job.writable, "Job became read-only while being processed"
1332 def _EvaluateJobProcessorResult(depmgr, job, result):
1333 """Looks at a result from L{_JobProcessor} for a job.
1335 To be used in a L{_JobQueueWorker}.
1338 if result == _JobProcessor.FINISHED:
1339 # Notify waiting jobs
1340 depmgr.NotifyWaiters(job.id)
1342 elif result == _JobProcessor.DEFER:
1344 raise workerpool.DeferTask(priority=job.CalcPriority())
1346 elif result == _JobProcessor.WAITDEP:
1347 # No-op, dependency manager will re-schedule
1351 raise errors.ProgrammerError("Job processor returned unknown status %s" %
1355 class _JobQueueWorker(workerpool.BaseWorker):
1356 """The actual job workers.
1359 def RunTask(self, job): # pylint: disable=W0221
1362 @type job: L{_QueuedJob}
1363 @param job: the job to be processed
1366 assert job.writable, "Expected writable job"
1368 # Ensure only one worker is active on a single job. If a job registers for
1369 # a dependency job, and the other job notifies before the first worker is
1370 # done, the job can end up in the tasklist more than once.
1371 job.processor_lock.acquire()
1373 return self._RunTaskInner(job)
1375 job.processor_lock.release()
1377 def _RunTaskInner(self, job):
1380 Must be called with per-job lock acquired.
1384 assert queue == self.pool.queue
1386 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1389 proc = mcpu.Processor(queue.context, job.id)
1391 # Create wrapper for setting thread name
1392 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1395 _EvaluateJobProcessorResult(queue.depmgr, job,
1396 _JobProcessor(queue, wrap_execop_fn, job)())
1399 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1400 """Updates the worker thread name to include a short summary of the opcode.
1402 @param setname_fn: Callable setting worker thread name
1403 @param execop_fn: Callable for executing opcode (usually
1404 L{mcpu.Processor.ExecOpCode})
1409 return execop_fn(op, *args, **kwargs)
1414 def _GetWorkerName(job, op):
1415 """Sets the worker thread name.
1417 @type job: L{_QueuedJob}
1418 @type op: L{opcodes.OpCode}
1421 parts = ["Job%s" % job.id]
1424 parts.append(op.TinySummary())
1426 return "/".join(parts)
1429 class _JobQueueWorkerPool(workerpool.WorkerPool):
1430 """Simple class implementing a job-processing workerpool.
1433 def __init__(self, queue):
1434 super(_JobQueueWorkerPool, self).__init__("Jq",
1440 class _JobDependencyManager:
1441 """Keeps track of job dependencies.
1448 WRONGSTATUS) = range(1, 6)
1450 def __init__(self, getstatus_fn, enqueue_fn):
1451 """Initializes this class.
1454 self._getstatus_fn = getstatus_fn
1455 self._enqueue_fn = enqueue_fn
1458 self._lock = locking.SharedLock("JobDepMgr")
1460 @locking.ssynchronized(_LOCK, shared=1)
1461 def GetLockInfo(self, requested): # pylint: disable=W0613
1462 """Retrieves information about waiting jobs.
1464 @type requested: set
1465 @param requested: Requested information, see C{query.LQ_*}
1468 # No need to sort here, that's being done by the lock manager and query
1469 # library. There are no priorities for notifying jobs, hence all show up as
1470 # one item under "pending".
1471 return [("job/%s" % job_id, None, None,
1472 [("job", [job.id for job in waiters])])
1473 for job_id, waiters in self._waiters.items()
1476 @locking.ssynchronized(_LOCK, shared=1)
1477 def JobWaiting(self, job):
1478 """Checks if a job is waiting.
1481 return compat.any(job in jobs
1482 for jobs in self._waiters.values())
1484 @locking.ssynchronized(_LOCK)
1485 def CheckAndRegister(self, job, dep_job_id, dep_status):
1486 """Checks if a dependency job has the requested status.
1488 If the other job is not yet in a finalized status, the calling job will be
1489 notified (re-added to the workerpool) at a later point.
1491 @type job: L{_QueuedJob}
1492 @param job: Job object
1493 @type dep_job_id: int
1494 @param dep_job_id: ID of dependency job
1495 @type dep_status: list
1496 @param dep_status: Required status
1499 assert ht.TJobId(job.id)
1500 assert ht.TJobId(dep_job_id)
1501 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1503 if job.id == dep_job_id:
1504 return (self.ERROR, "Job can't depend on itself")
1506 # Get status of dependency job
1508 status = self._getstatus_fn(dep_job_id)
1509 except errors.JobLost, err:
1510 return (self.ERROR, "Dependency error: %s" % err)
1512 assert status in constants.JOB_STATUS_ALL
1514 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1516 if status not in constants.JOBS_FINALIZED:
1517 # Register for notification and wait for job to finish
1518 job_id_waiters.add(job)
1520 "Need to wait for job %s, wanted status '%s'" %
1521 (dep_job_id, dep_status))
1523 # Remove from waiters list
1524 if job in job_id_waiters:
1525 job_id_waiters.remove(job)
1527 if (status == constants.JOB_STATUS_CANCELED and
1528 constants.JOB_STATUS_CANCELED not in dep_status):
1529 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1531 elif not dep_status or status in dep_status:
1532 return (self.CONTINUE,
1533 "Dependency job %s finished with status '%s'" %
1534 (dep_job_id, status))
1537 return (self.WRONGSTATUS,
1538 "Dependency job %s finished with status '%s',"
1539 " not one of '%s' as required" %
1540 (dep_job_id, status, utils.CommaJoin(dep_status)))
1542 def _RemoveEmptyWaitersUnlocked(self):
1543 """Remove all jobs without actual waiters.
1546 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1548 del self._waiters[job_id]
1550 def NotifyWaiters(self, job_id):
1551 """Notifies all jobs waiting for a certain job ID.
1553 @attention: Do not call until L{CheckAndRegister} returned a status other
1554 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1556 @param job_id: Job ID
1559 assert ht.TJobId(job_id)
1561 self._lock.acquire()
1563 self._RemoveEmptyWaitersUnlocked()
1565 jobs = self._waiters.pop(job_id, None)
1567 self._lock.release()
1570 # Re-add jobs to workerpool
1571 logging.debug("Re-adding %s jobs which were waiting for job %s",
1573 self._enqueue_fn(jobs)
1576 def _RequireOpenQueue(fn):
1577 """Decorator for "public" functions.
1579 This function should be used for all 'public' functions. That is,
1580 functions usually called from other classes. Note that this should
1581 be applied only to methods (not plain functions), since it expects
1582 that the decorated function is called with a first argument that has
1583 a '_queue_filelock' argument.
1585 @warning: Use this decorator only after locking.ssynchronized
1588 @locking.ssynchronized(_LOCK)
1594 def wrapper(self, *args, **kwargs):
1595 # pylint: disable=W0212
1596 assert self._queue_filelock is not None, "Queue should be open"
1597 return fn(self, *args, **kwargs)
1601 def _RequireNonDrainedQueue(fn):
1602 """Decorator checking for a non-drained queue.
1604 To be used with functions submitting new jobs.
1607 def wrapper(self, *args, **kwargs):
1608 """Wrapper function.
1610 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1613 # Ok when sharing the big job queue lock, as the drain file is created when
1614 # the lock is exclusive.
1615 # Needs access to protected member, pylint: disable=W0212
1617 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1619 if not self._accepting_jobs:
1620 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1622 return fn(self, *args, **kwargs)
1626 class JobQueue(object):
1627 """Queue used to manage the jobs.
1630 def __init__(self, context):
1631 """Constructor for JobQueue.
1633 The constructor will initialize the job queue object and then
1634 start loading the current jobs from disk, either for starting them
1635 (if they were queue) or for aborting them (if they were already
1638 @type context: GanetiContext
1639 @param context: the context object for access to the configuration
1640 data and other ganeti objects
1643 self.context = context
1644 self._memcache = weakref.WeakValueDictionary()
1645 self._my_hostname = netutils.Hostname.GetSysName()
1647 # The Big JobQueue lock. If a code block or method acquires it in shared
1648 # mode safe it must guarantee concurrency with all the code acquiring it in
1649 # shared mode, including itself. In order not to acquire it at all
1650 # concurrency must be guaranteed with all code acquiring it in shared mode
1651 # and all code acquiring it exclusively.
1652 self._lock = locking.SharedLock("JobQueue")
1654 self.acquire = self._lock.acquire
1655 self.release = self._lock.release
1657 # Accept jobs by default
1658 self._accepting_jobs = True
1660 # Initialize the queue, and acquire the filelock.
1661 # This ensures no other process is working on the job queue.
1662 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1665 self._last_serial = jstore.ReadSerial()
1666 assert self._last_serial is not None, ("Serial file was modified between"
1667 " check in jstore and here")
1669 # Get initial list of nodes
1670 self._nodes = dict((n.name, n.primary_ip)
1671 for n in self.context.cfg.GetAllNodesInfo().values()
1672 if n.master_candidate)
1674 # Remove master node
1675 self._nodes.pop(self._my_hostname, None)
1677 # TODO: Check consistency across nodes
1679 self._queue_size = None
1680 self._UpdateQueueSizeUnlocked()
1681 assert ht.TInt(self._queue_size)
1682 self._drained = jstore.CheckDrainFlag()
1685 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1687 self.context.glm.AddToLockMonitor(self.depmgr)
1690 self._wpool = _JobQueueWorkerPool(self)
1692 self._InspectQueue()
1694 self._wpool.TerminateWorkers()
1697 @locking.ssynchronized(_LOCK)
1699 def _InspectQueue(self):
1700 """Loads the whole job queue and resumes unfinished jobs.
1702 This function needs the lock here because WorkerPool.AddTask() may start a
1703 job while we're still doing our work.
1706 logging.info("Inspecting job queue")
1710 all_job_ids = self._GetJobIDsUnlocked()
1711 jobs_count = len(all_job_ids)
1712 lastinfo = time.time()
1713 for idx, job_id in enumerate(all_job_ids):
1714 # Give an update every 1000 jobs or 10 seconds
1715 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1716 idx == (jobs_count - 1)):
1717 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1718 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1719 lastinfo = time.time()
1721 job = self._LoadJobUnlocked(job_id)
1723 # a failure in loading the job can cause 'None' to be returned
1727 status = job.CalcStatus()
1729 if status == constants.JOB_STATUS_QUEUED:
1730 restartjobs.append(job)
1732 elif status in (constants.JOB_STATUS_RUNNING,
1733 constants.JOB_STATUS_WAITING,
1734 constants.JOB_STATUS_CANCELING):
1735 logging.warning("Unfinished job %s found: %s", job.id, job)
1737 if status == constants.JOB_STATUS_WAITING:
1739 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1740 restartjobs.append(job)
1742 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1743 "Unclean master daemon shutdown")
1746 self.UpdateJobUnlocked(job)
1749 logging.info("Restarting %s jobs", len(restartjobs))
1750 self._EnqueueJobsUnlocked(restartjobs)
1752 logging.info("Job queue inspection finished")
1754 def _GetRpc(self, address_list):
1755 """Gets RPC runner with context.
1758 return rpc.JobQueueRunner(self.context, address_list)
1760 @locking.ssynchronized(_LOCK)
1762 def AddNode(self, node):
1763 """Register a new node with the queue.
1765 @type node: L{objects.Node}
1766 @param node: the node object to be added
1769 node_name = node.name
1770 assert node_name != self._my_hostname
1772 # Clean queue directory on added node
1773 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1774 msg = result.fail_msg
1776 logging.warning("Cannot cleanup queue directory on node %s: %s",
1779 if not node.master_candidate:
1780 # remove if existing, ignoring errors
1781 self._nodes.pop(node_name, None)
1782 # and skip the replication of the job ids
1785 # Upload the whole queue excluding archived jobs
1786 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1788 # Upload current serial file
1789 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1791 # Static address list
1792 addrs = [node.primary_ip]
1794 for file_name in files:
1796 content = utils.ReadFile(file_name)
1798 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1800 msg = result[node_name].fail_msg
1802 logging.error("Failed to upload file %s to node %s: %s",
1803 file_name, node_name, msg)
1805 # Set queue drained flag
1807 self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1809 msg = result[node_name].fail_msg
1811 logging.error("Failed to set queue drained flag on node %s: %s",
1814 self._nodes[node_name] = node.primary_ip
1816 @locking.ssynchronized(_LOCK)
1818 def RemoveNode(self, node_name):
1819 """Callback called when removing nodes from the cluster.
1821 @type node_name: str
1822 @param node_name: the name of the node to remove
1825 self._nodes.pop(node_name, None)
1828 def _CheckRpcResult(result, nodes, failmsg):
1829 """Verifies the status of an RPC call.
1831 Since we aim to keep consistency should this node (the current
1832 master) fail, we will log errors if our rpc fail, and especially
1833 log the case when more than half of the nodes fails.
1835 @param result: the data as returned from the rpc call
1837 @param nodes: the list of nodes we made the call to
1839 @param failmsg: the identifier to be used for logging
1846 msg = result[node].fail_msg
1849 logging.error("RPC call %s (%s) failed on node %s: %s",
1850 result[node].call, failmsg, node, msg)
1852 success.append(node)
1854 # +1 for the master node
1855 if (len(success) + 1) < len(failed):
1856 # TODO: Handle failing nodes
1857 logging.error("More than half of the nodes failed")
1859 def _GetNodeIp(self):
1860 """Helper for returning the node name/ip list.
1862 @rtype: (list, list)
1863 @return: a tuple of two lists, the first one with the node
1864 names and the second one with the node addresses
1867 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1868 name_list = self._nodes.keys()
1869 addr_list = [self._nodes[name] for name in name_list]
1870 return name_list, addr_list
1872 def _UpdateJobQueueFile(self, file_name, data, replicate):
1873 """Writes a file locally and then replicates it to all nodes.
1875 This function will replace the contents of a file on the local
1876 node and then replicate it to all the other nodes we have.
1878 @type file_name: str
1879 @param file_name: the path of the file to be replicated
1881 @param data: the new contents of the file
1882 @type replicate: boolean
1883 @param replicate: whether to spread the changes to the remote nodes
1886 getents = runtime.GetEnts()
1887 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1888 gid=getents.daemons_gid,
1889 mode=constants.JOB_QUEUE_FILES_PERMS)
1892 names, addrs = self._GetNodeIp()
1893 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1894 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1896 def _RenameFilesUnlocked(self, rename):
1897 """Renames a file locally and then replicate the change.
1899 This function will rename a file in the local queue directory
1900 and then replicate this rename to all the other nodes we have.
1902 @type rename: list of (old, new)
1903 @param rename: List containing tuples mapping old to new names
1906 # Rename them locally
1907 for old, new in rename:
1908 utils.RenameFile(old, new, mkdir=True)
1910 # ... and on all nodes
1911 names, addrs = self._GetNodeIp()
1912 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1913 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1915 def _NewSerialsUnlocked(self, count):
1916 """Generates a new job identifier.
1918 Job identifiers are unique during the lifetime of a cluster.
1920 @type count: integer
1921 @param count: how many serials to return
1923 @return: a list of job identifiers.
1926 assert ht.TNonNegativeInt(count)
1929 serial = self._last_serial + count
1932 self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1933 "%s\n" % serial, True)
1935 result = [jstore.FormatJobID(v)
1936 for v in range(self._last_serial + 1, serial + 1)]
1938 # Keep it only if we were able to write the file
1939 self._last_serial = serial
1941 assert len(result) == count
1946 def _GetJobPath(job_id):
1947 """Returns the job file for a given job id.
1950 @param job_id: the job identifier
1952 @return: the path to the job file
1955 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1958 def _GetArchivedJobPath(job_id):
1959 """Returns the archived job file for a give job id.
1962 @param job_id: the job identifier
1964 @return: the path to the archived job file
1967 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1968 jstore.GetArchiveDirectory(job_id),
1972 def _DetermineJobDirectories(archived):
1973 """Build list of directories containing job files.
1975 @type archived: bool
1976 @param archived: Whether to include directories for archived jobs
1980 result = [pathutils.QUEUE_DIR]
1983 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1984 result.extend(map(compat.partial(utils.PathJoin, archive_path),
1985 utils.ListVisibleFiles(archive_path)))
1990 def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1991 """Return all known job IDs.
1993 The method only looks at disk because it's a requirement that all
1994 jobs are present on disk (so in the _memcache we don't have any
1998 @param sort: perform sorting on the returned job ids
2000 @return: the list of job IDs
2005 for path in cls._DetermineJobDirectories(archived):
2006 for filename in utils.ListVisibleFiles(path):
2007 m = constants.JOB_FILE_RE.match(filename)
2009 jlist.append(int(m.group(1)))
2015 def _LoadJobUnlocked(self, job_id):
2016 """Loads a job from the disk or memory.
2018 Given a job id, this will return the cached job object if
2019 existing, or try to load the job from the disk. If loading from
2020 disk, it will also add the job to the cache.
2023 @param job_id: the job id
2024 @rtype: L{_QueuedJob} or None
2025 @return: either None or the job object
2028 job = self._memcache.get(job_id, None)
2030 logging.debug("Found job %s in memcache", job_id)
2031 assert job.writable, "Found read-only job in memcache"
2035 job = self._LoadJobFromDisk(job_id, False)
2038 except errors.JobFileCorrupted:
2039 old_path = self._GetJobPath(job_id)
2040 new_path = self._GetArchivedJobPath(job_id)
2041 if old_path == new_path:
2042 # job already archived (future case)
2043 logging.exception("Can't parse job %s", job_id)
2046 logging.exception("Can't parse job %s, will archive.", job_id)
2047 self._RenameFilesUnlocked([(old_path, new_path)])
2050 assert job.writable, "Job just loaded is not writable"
2052 self._memcache[job_id] = job
2053 logging.debug("Added job %s to the cache", job_id)
2056 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2057 """Load the given job file from disk.
2059 Given a job file, read, load and restore it in a _QueuedJob format.
2062 @param job_id: job identifier
2063 @type try_archived: bool
2064 @param try_archived: Whether to try loading an archived job
2065 @rtype: L{_QueuedJob} or None
2066 @return: either None or the job object
2069 path_functions = [(self._GetJobPath, False)]
2072 path_functions.append((self._GetArchivedJobPath, True))
2077 for (fn, archived) in path_functions:
2078 filepath = fn(job_id)
2079 logging.debug("Loading job from %s", filepath)
2081 raw_data = utils.ReadFile(filepath)
2082 except EnvironmentError, err:
2083 if err.errno != errno.ENOENT:
2091 if writable is None:
2092 writable = not archived
2095 data = serializer.LoadJson(raw_data)
2096 job = _QueuedJob.Restore(self, data, writable, archived)
2097 except Exception, err: # pylint: disable=W0703
2098 raise errors.JobFileCorrupted(err)
2102 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2103 """Load the given job file from disk.
2105 Given a job file, read, load and restore it in a _QueuedJob format.
2106 In case of error reading the job, it gets returned as None, and the
2107 exception is logged.
2110 @param job_id: job identifier
2111 @type try_archived: bool
2112 @param try_archived: Whether to try loading an archived job
2113 @rtype: L{_QueuedJob} or None
2114 @return: either None or the job object
2118 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2119 except (errors.JobFileCorrupted, EnvironmentError):
2120 logging.exception("Can't load/parse job %s", job_id)
2123 def _UpdateQueueSizeUnlocked(self):
2124 """Update the queue size.
2127 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2129 @locking.ssynchronized(_LOCK)
2131 def SetDrainFlag(self, drain_flag):
2132 """Sets the drain flag for the queue.
2134 @type drain_flag: boolean
2135 @param drain_flag: Whether to set or unset the drain flag
2138 # Change flag locally
2139 jstore.SetDrainFlag(drain_flag)
2141 self._drained = drain_flag
2143 # ... and on all nodes
2144 (names, addrs) = self._GetNodeIp()
2146 self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2147 self._CheckRpcResult(result, self._nodes,
2148 "Setting queue drain flag to %s" % drain_flag)
2153 def _SubmitJobUnlocked(self, job_id, ops):
2154 """Create and store a new job.
2156 This enters the job into our job queue and also puts it on the new
2157 queue, in order for it to be picked up by the queue processors.
2159 @type job_id: job ID
2160 @param job_id: the job ID for the new job
2162 @param ops: The list of OpCodes that will become the new job.
2163 @rtype: L{_QueuedJob}
2164 @return: the job object to be queued
2165 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2166 @raise errors.GenericError: If an opcode is not valid
2169 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2170 raise errors.JobQueueFull()
2172 job = _QueuedJob(self, job_id, ops, True)
2174 for idx, op in enumerate(job.ops):
2176 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2177 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2178 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2179 " are %s" % (idx, op.priority, allowed))
2181 # Check job dependencies
2182 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2183 if not opcodes.TNoRelativeJobDependencies(dependencies):
2184 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2186 (idx, opcodes.TNoRelativeJobDependencies,
2190 self.UpdateJobUnlocked(job)
2192 self._queue_size += 1
2194 logging.debug("Adding new job %s to the cache", job_id)
2195 self._memcache[job_id] = job
2199 @locking.ssynchronized(_LOCK)
2201 @_RequireNonDrainedQueue
2202 def SubmitJob(self, ops):
2203 """Create and store a new job.
2205 @see: L{_SubmitJobUnlocked}
2208 (job_id, ) = self._NewSerialsUnlocked(1)
2209 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2212 @locking.ssynchronized(_LOCK)
2214 @_RequireNonDrainedQueue
2215 def SubmitManyJobs(self, jobs):
2216 """Create and store multiple jobs.
2218 @see: L{_SubmitJobUnlocked}
2221 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2223 (results, added_jobs) = \
2224 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2226 self._EnqueueJobsUnlocked(added_jobs)
2231 def _FormatSubmitError(msg, ops):
2232 """Formats errors which occurred while submitting a job.
2235 return ("%s; opcodes %s" %
2236 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2239 def _ResolveJobDependencies(resolve_fn, deps):
2240 """Resolves relative job IDs in dependencies.
2242 @type resolve_fn: callable
2243 @param resolve_fn: Function to resolve a relative job ID
2245 @param deps: Dependencies
2246 @rtype: tuple; (boolean, string or list)
2247 @return: If successful (first tuple item), the returned list contains
2248 resolved job IDs along with the requested status; if not successful,
2249 the second element is an error message
2254 for (dep_job_id, dep_status) in deps:
2255 if ht.TRelativeJobId(dep_job_id):
2256 assert ht.TInt(dep_job_id) and dep_job_id < 0
2258 job_id = resolve_fn(dep_job_id)
2261 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2265 result.append((job_id, dep_status))
2267 return (True, result)
2269 def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2270 """Create and store multiple jobs.
2272 @see: L{_SubmitJobUnlocked}
2278 def resolve_fn(job_idx, reljobid):
2280 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2282 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2284 if getattr(op, opcodes.DEPEND_ATTR, None):
2286 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2289 # Abort resolving dependencies
2290 assert ht.TNonEmptyString(data), "No error message"
2292 # Use resolved dependencies
2296 job = self._SubmitJobUnlocked(job_id, ops)
2297 except errors.GenericError, err:
2299 data = self._FormatSubmitError(str(err), ops)
2303 added_jobs.append(job)
2305 results.append((status, data))
2307 return (results, added_jobs)
2309 @locking.ssynchronized(_LOCK)
2310 def _EnqueueJobs(self, jobs):
2311 """Helper function to add jobs to worker pool's queue.
2314 @param jobs: List of all jobs
2317 return self._EnqueueJobsUnlocked(jobs)
2319 def _EnqueueJobsUnlocked(self, jobs):
2320 """Helper function to add jobs to worker pool's queue.
2323 @param jobs: List of all jobs
2326 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2327 self._wpool.AddManyTasks([(job, ) for job in jobs],
2328 priority=[job.CalcPriority() for job in jobs],
2329 task_id=map(_GetIdAttr, jobs))
2331 def _GetJobStatusForDependencies(self, job_id):
2332 """Gets the status of a job for dependencies.
2335 @param job_id: Job ID
2336 @raise errors.JobLost: If job can't be found
2339 # Not using in-memory cache as doing so would require an exclusive lock
2341 # Try to load from disk
2342 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2344 assert not job.writable, "Got writable job" # pylint: disable=E1101
2347 return job.CalcStatus()
2349 raise errors.JobLost("Job %s not found" % job_id)
2352 def UpdateJobUnlocked(self, job, replicate=True):
2353 """Update a job's on disk storage.
2355 After a job has been modified, this function needs to be called in
2356 order to write the changes to disk and replicate them to the other
2359 @type job: L{_QueuedJob}
2360 @param job: the changed job
2361 @type replicate: boolean
2362 @param replicate: whether to replicate the change to remote nodes
2366 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2367 assert (finalized ^ (job.end_timestamp is None))
2368 assert job.writable, "Can't update read-only job"
2369 assert not job.archived, "Can't update archived job"
2371 filename = self._GetJobPath(job.id)
2372 data = serializer.DumpJson(job.Serialize())
2373 logging.debug("Writing job %s to %s", job.id, filename)
2374 self._UpdateJobQueueFile(filename, data, replicate)
2376 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2378 """Waits for changes in a job.
2381 @param job_id: Job identifier
2382 @type fields: list of strings
2383 @param fields: Which fields to check for changes
2384 @type prev_job_info: list or None
2385 @param prev_job_info: Last job information returned
2386 @type prev_log_serial: int
2387 @param prev_log_serial: Last job message serial number
2388 @type timeout: float
2389 @param timeout: maximum time to wait in seconds
2390 @rtype: tuple (job info, log entries)
2391 @return: a tuple of the job information as required via
2392 the fields parameter, and the log entries as a list
2394 if the job has not changed and the timeout has expired,
2395 we instead return a special value,
2396 L{constants.JOB_NOTCHANGED}, which should be interpreted
2397 as such by the clients
2400 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2403 helper = _WaitForJobChangesHelper()
2405 return helper(self._GetJobPath(job_id), load_fn,
2406 fields, prev_job_info, prev_log_serial, timeout)
2408 @locking.ssynchronized(_LOCK)
2410 def CancelJob(self, job_id):
2413 This will only succeed if the job has not started yet.
2416 @param job_id: job ID of job to be cancelled.
2419 logging.info("Cancelling job %s", job_id)
2421 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2423 @locking.ssynchronized(_LOCK)
2425 def ChangeJobPriority(self, job_id, priority):
2426 """Changes a job's priority.
2429 @param job_id: ID of the job whose priority should be changed
2431 @param priority: New priority
2434 logging.info("Changing priority of job %s to %s", job_id, priority)
2436 if priority not in constants.OP_PRIO_SUBMIT_VALID:
2437 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2438 raise errors.GenericError("Invalid priority %s, allowed are %s" %
2439 (priority, allowed))
2442 (success, msg) = job.ChangePriority(priority)
2446 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2447 except workerpool.NoSuchTask:
2448 logging.debug("Job %s is not in workerpool at this time", job.id)
2450 return (success, msg)
2452 return self._ModifyJobUnlocked(job_id, fn)
2454 def _ModifyJobUnlocked(self, job_id, mod_fn):
2458 @param job_id: Job ID
2459 @type mod_fn: callable
2460 @param mod_fn: Modifying function, receiving job object as parameter,
2461 returning tuple of (status boolean, message string)
2464 job = self._LoadJobUnlocked(job_id)
2466 logging.debug("Job %s not found", job_id)
2467 return (False, "Job %s not found" % job_id)
2469 assert job.writable, "Can't modify read-only job"
2470 assert not job.archived, "Can't modify archived job"
2472 (success, msg) = mod_fn(job)
2475 # If the job was finalized (e.g. cancelled), this is the final write
2476 # allowed. The job can be archived anytime.
2477 self.UpdateJobUnlocked(job)
2479 return (success, msg)
2482 def _ArchiveJobsUnlocked(self, jobs):
2485 @type jobs: list of L{_QueuedJob}
2486 @param jobs: Job objects
2488 @return: Number of archived jobs
2494 assert job.writable, "Can't archive read-only job"
2495 assert not job.archived, "Can't cancel archived job"
2497 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2498 logging.debug("Job %s is not yet done", job.id)
2501 archive_jobs.append(job)
2503 old = self._GetJobPath(job.id)
2504 new = self._GetArchivedJobPath(job.id)
2505 rename_files.append((old, new))
2507 # TODO: What if 1..n files fail to rename?
2508 self._RenameFilesUnlocked(rename_files)
2510 logging.debug("Successfully archived job(s) %s",
2511 utils.CommaJoin(job.id for job in archive_jobs))
2513 # Since we haven't quite checked, above, if we succeeded or failed renaming
2514 # the files, we update the cached queue size from the filesystem. When we
2515 # get around to fix the TODO: above, we can use the number of actually
2516 # archived jobs to fix this.
2517 self._UpdateQueueSizeUnlocked()
2518 return len(archive_jobs)
2520 @locking.ssynchronized(_LOCK)
2522 def ArchiveJob(self, job_id):
2525 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2528 @param job_id: Job ID of job to be archived.
2530 @return: Whether job was archived
2533 logging.info("Archiving job %s", job_id)
2535 job = self._LoadJobUnlocked(job_id)
2537 logging.debug("Job %s not found", job_id)
2540 return self._ArchiveJobsUnlocked([job]) == 1
2542 @locking.ssynchronized(_LOCK)
2544 def AutoArchiveJobs(self, age, timeout):
2545 """Archives all jobs based on age.
2547 The method will archive all jobs which are older than the age
2548 parameter. For jobs that don't have an end timestamp, the start
2549 timestamp will be considered. The special '-1' age will cause
2550 archival of all jobs (that are not running or queued).
2553 @param age: the minimum age in seconds
2556 logging.info("Archiving jobs with age more than %s seconds", age)
2559 end_time = now + timeout
2563 all_job_ids = self._GetJobIDsUnlocked()
2565 for idx, job_id in enumerate(all_job_ids):
2566 last_touched = idx + 1
2568 # Not optimal because jobs could be pending
2569 # TODO: Measure average duration for job archival and take number of
2570 # pending jobs into account.
2571 if time.time() > end_time:
2574 # Returns None if the job failed to load
2575 job = self._LoadJobUnlocked(job_id)
2577 if job.end_timestamp is None:
2578 if job.start_timestamp is None:
2579 job_age = job.received_timestamp
2581 job_age = job.start_timestamp
2583 job_age = job.end_timestamp
2585 if age == -1 or now - job_age[0] > age:
2588 # Archive 10 jobs at a time
2589 if len(pending) >= 10:
2590 archived_count += self._ArchiveJobsUnlocked(pending)
2594 archived_count += self._ArchiveJobsUnlocked(pending)
2596 return (archived_count, len(all_job_ids) - last_touched)
2598 def _Query(self, fields, qfilter):
2599 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2602 # Archived jobs are only looked at if the "archived" field is referenced
2603 # either as a requested field or in the filter. By default archived jobs
2605 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2607 job_ids = qobj.RequestedNames()
2609 list_all = (job_ids is None)
2612 # Since files are added to/removed from the queue atomically, there's no
2613 # risk of getting the job ids in an inconsistent state.
2614 job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2618 for job_id in job_ids:
2619 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2620 if job is not None or not list_all:
2621 jobs.append((job_id, job))
2623 return (qobj, jobs, list_all)
2625 def QueryJobs(self, fields, qfilter):
2626 """Returns a list of jobs in queue.
2628 @type fields: sequence
2629 @param fields: List of wanted fields
2630 @type qfilter: None or query2 filter (list)
2631 @param qfilter: Query filter
2634 (qobj, ctx, _) = self._Query(fields, qfilter)
2636 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2638 def OldStyleQueryJobs(self, job_ids, fields):
2639 """Returns a list of jobs in queue.
2642 @param job_ids: sequence of job identifiers or None for all
2644 @param fields: names of fields to return
2646 @return: list one element per job, each element being list with
2647 the requested fields
2651 job_ids = [int(jid) for jid in job_ids]
2652 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2654 (qobj, ctx, _) = self._Query(fields, qfilter)
2656 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2658 @locking.ssynchronized(_LOCK)
2659 def PrepareShutdown(self):
2660 """Prepare to stop the job queue.
2662 Disables execution of jobs in the workerpool and returns whether there are
2663 any jobs currently running. If the latter is the case, the job queue is not
2664 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2665 be called without interfering with any job. Queued and unfinished jobs will
2666 be resumed next time.
2668 Once this function has been called no new job submissions will be accepted
2669 (see L{_RequireNonDrainedQueue}).
2672 @return: Whether there are any running jobs
2675 if self._accepting_jobs:
2676 self._accepting_jobs = False
2678 # Tell worker pool to stop processing pending tasks
2679 self._wpool.SetActive(False)
2681 return self._wpool.HasRunningTasks()
2683 def AcceptingJobsUnlocked(self):
2684 """Returns whether jobs are accepted.
2686 Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2687 queue is shutting down.
2692 return self._accepting_jobs
2694 @locking.ssynchronized(_LOCK)
2697 """Stops the job queue.
2699 This shutdowns all the worker threads an closes the queue.
2702 self._wpool.TerminateWorkers()
2704 self._queue_filelock.Close()
2705 self._queue_filelock = None