4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
41 # pylint: disable=E0611
42 from pyinotify import pyinotify
46 from ganeti import asyncnotifier
47 from ganeti import constants
48 from ganeti import serializer
49 from ganeti import workerpool
50 from ganeti import locking
51 from ganeti import opcodes
52 from ganeti import errors
53 from ganeti import mcpu
54 from ganeti import utils
55 from ganeti import jstore
56 from ganeti import rpc
57 from ganeti import runtime
58 from ganeti import netutils
59 from ganeti import compat
61 from ganeti import query
62 from ganeti import qlang
63 from ganeti import pathutils
64 from ganeti import vcluster
69 # member lock names to be passed to @ssynchronized decorator
73 #: Retrieves "id" attribute
74 _GetIdAttr = operator.attrgetter("id")
77 class CancelJob(Exception):
78 """Special exception to cancel a job.
83 class QueueShutdown(Exception):
84 """Special exception to abort a job when the job queue is shutting down.
90 """Returns the current timestamp.
93 @return: the current time in the (seconds, microseconds) format
96 return utils.SplitTime(time.time())
99 def _CallJqUpdate(runner, names, file_name, content):
100 """Updates job queue file after virtualizing filename.
103 virt_file_name = vcluster.MakeVirtualPath(file_name)
104 return runner.call_jobqueue_update(names, virt_file_name, content)
107 class _SimpleJobQuery:
108 """Wrapper for job queries.
110 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
113 def __init__(self, fields):
114 """Initializes this class.
117 self._query = query.Query(query.JOB_FIELDS, fields)
119 def __call__(self, job):
120 """Executes a job query using cached field list.
123 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
126 class _QueuedOpCode(object):
127 """Encapsulates an opcode object.
129 @ivar log: holds the execution log and consists of tuples
130 of the form C{(log_serial, timestamp, level, message)}
131 @ivar input: the OpCode we encapsulate
132 @ivar status: the current status
133 @ivar result: the result of the LU execution
134 @ivar start_timestamp: timestamp for the start of the execution
135 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136 @ivar stop_timestamp: timestamp for the end of the execution
139 __slots__ = ["input", "status", "result", "log", "priority",
140 "start_timestamp", "exec_timestamp", "end_timestamp",
143 def __init__(self, op):
144 """Initializes instances of this class.
146 @type op: L{opcodes.OpCode}
147 @param op: the opcode we encapsulate
151 self.status = constants.OP_STATUS_QUEUED
154 self.start_timestamp = None
155 self.exec_timestamp = None
156 self.end_timestamp = None
158 # Get initial priority (it might change during the lifetime of this opcode)
159 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
162 def Restore(cls, state):
163 """Restore the _QueuedOpCode from the serialized form.
166 @param state: the serialized state
167 @rtype: _QueuedOpCode
168 @return: a new _QueuedOpCode instance
171 obj = _QueuedOpCode.__new__(cls)
172 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173 obj.status = state["status"]
174 obj.result = state["result"]
175 obj.log = state["log"]
176 obj.start_timestamp = state.get("start_timestamp", None)
177 obj.exec_timestamp = state.get("exec_timestamp", None)
178 obj.end_timestamp = state.get("end_timestamp", None)
179 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
183 """Serializes this _QueuedOpCode.
186 @return: the dictionary holding the serialized state
190 "input": self.input.__getstate__(),
191 "status": self.status,
192 "result": self.result,
194 "start_timestamp": self.start_timestamp,
195 "exec_timestamp": self.exec_timestamp,
196 "end_timestamp": self.end_timestamp,
197 "priority": self.priority,
201 class _QueuedJob(object):
202 """In-memory job representation.
204 This is what we use to track the user-submitted jobs. Locking must
205 be taken care of by users of this class.
207 @type queue: L{JobQueue}
208 @ivar queue: the parent queue
211 @ivar ops: the list of _QueuedOpCode that constitute the job
212 @type log_serial: int
213 @ivar log_serial: holds the index for the next log entry
214 @ivar received_timestamp: the timestamp for when the job was received
215 @ivar start_timestmap: the timestamp for start of execution
216 @ivar end_timestamp: the timestamp for end of execution
217 @ivar writable: Whether the job is allowed to be modified
220 # pylint: disable=W0212
221 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222 "received_timestamp", "start_timestamp", "end_timestamp",
223 "__weakref__", "processor_lock", "writable", "archived"]
225 def _AddReasons(self):
226 """Extend the reason trail
228 Add the reason for all the opcodes of this job to be executed.
232 for queued_op in self.ops:
234 reason_src = opcodes.NameToReasonSrc(op.__class__.__name__)
235 reason_text = "job=%d;index=%d" % (self.id, count)
236 reason = getattr(op, "reason", [])
237 reason.append((reason_src, reason_text, utils.EpochNano()))
241 def __init__(self, queue, job_id, ops, writable):
242 """Constructor for the _QueuedJob.
244 @type queue: L{JobQueue}
245 @param queue: our parent queue
247 @param job_id: our job id
249 @param ops: the list of opcodes we hold, which will be encapsulated
252 @param writable: Whether job can be modified
256 raise errors.GenericError("A job needs at least one opcode")
259 self.id = int(job_id)
260 self.ops = [_QueuedOpCode(op) for op in ops]
263 self.received_timestamp = TimeStampNow()
264 self.start_timestamp = None
265 self.end_timestamp = None
266 self.archived = False
268 self._InitInMemory(self, writable)
270 assert not self.archived, "New jobs can not be marked as archived"
273 def _InitInMemory(obj, writable):
274 """Initializes in-memory variables.
277 obj.writable = writable
281 # Read-only jobs are not processed and therefore don't need a lock
283 obj.processor_lock = threading.Lock()
285 obj.processor_lock = None
288 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
290 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
292 return "<%s at %#x>" % (" ".join(status), id(self))
295 def Restore(cls, queue, state, writable, archived):
296 """Restore a _QueuedJob from serialized state:
298 @type queue: L{JobQueue}
299 @param queue: to which queue the restored job belongs
301 @param state: the serialized state
303 @param writable: Whether job can be modified
305 @param archived: Whether job was already archived
307 @return: the restored _JobQueue instance
310 obj = _QueuedJob.__new__(cls)
312 obj.id = int(state["id"])
313 obj.received_timestamp = state.get("received_timestamp", None)
314 obj.start_timestamp = state.get("start_timestamp", None)
315 obj.end_timestamp = state.get("end_timestamp", None)
316 obj.archived = archived
320 for op_state in state["ops"]:
321 op = _QueuedOpCode.Restore(op_state)
322 for log_entry in op.log:
323 obj.log_serial = max(obj.log_serial, log_entry[0])
326 cls._InitInMemory(obj, writable)
331 """Serialize the _JobQueue instance.
334 @return: the serialized state
339 "ops": [op.Serialize() for op in self.ops],
340 "start_timestamp": self.start_timestamp,
341 "end_timestamp": self.end_timestamp,
342 "received_timestamp": self.received_timestamp,
345 def CalcStatus(self):
346 """Compute the status of this job.
348 This function iterates over all the _QueuedOpCodes in the job and
349 based on their status, computes the job status.
352 - if we find a cancelled, or finished with error, the job
353 status will be the same
354 - otherwise, the last opcode with the status one of:
359 will determine the job status
361 - otherwise, it means either all opcodes are queued, or success,
362 and the job status will be the same
364 @return: the job status
367 status = constants.JOB_STATUS_QUEUED
371 if op.status == constants.OP_STATUS_SUCCESS:
376 if op.status == constants.OP_STATUS_QUEUED:
378 elif op.status == constants.OP_STATUS_WAITING:
379 status = constants.JOB_STATUS_WAITING
380 elif op.status == constants.OP_STATUS_RUNNING:
381 status = constants.JOB_STATUS_RUNNING
382 elif op.status == constants.OP_STATUS_CANCELING:
383 status = constants.JOB_STATUS_CANCELING
385 elif op.status == constants.OP_STATUS_ERROR:
386 status = constants.JOB_STATUS_ERROR
387 # The whole job fails if one opcode failed
389 elif op.status == constants.OP_STATUS_CANCELED:
390 status = constants.OP_STATUS_CANCELED
394 status = constants.JOB_STATUS_SUCCESS
398 def CalcPriority(self):
399 """Gets the current priority for this job.
401 Only unfinished opcodes are considered. When all are done, the default
407 priorities = [op.priority for op in self.ops
408 if op.status not in constants.OPS_FINALIZED]
411 # All opcodes are done, assume default priority
412 return constants.OP_PRIO_DEFAULT
414 return min(priorities)
416 def GetLogEntries(self, newer_than):
417 """Selectively returns the log entries.
419 @type newer_than: None or int
420 @param newer_than: if this is None, return all log entries,
421 otherwise return only the log entries with serial higher
424 @return: the list of the log entries selected
427 if newer_than is None:
434 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
438 def GetInfo(self, fields):
439 """Returns information about a job.
442 @param fields: names of fields to return
444 @return: list with one element for each field
445 @raise errors.OpExecError: when an invalid field
449 return _SimpleJobQuery(fields)(self)
451 def MarkUnfinishedOps(self, status, result):
452 """Mark unfinished opcodes with a given status and result.
454 This is an utility function for marking all running or waiting to
455 be run opcodes with a given status. Opcodes which are already
456 finalised are not changed.
458 @param status: a given opcode status
459 @param result: the opcode result
464 if op.status in constants.OPS_FINALIZED:
465 assert not_marked, "Finalized opcodes found after non-finalized ones"
472 """Marks the job as finalized.
475 self.end_timestamp = TimeStampNow()
478 """Marks job as canceled/-ing if possible.
480 @rtype: tuple; (bool, string)
481 @return: Boolean describing whether job was successfully canceled or marked
482 as canceling and a text message
485 status = self.CalcStatus()
487 if status == constants.JOB_STATUS_QUEUED:
488 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
489 "Job canceled by request")
491 return (True, "Job %s canceled" % self.id)
493 elif status == constants.JOB_STATUS_WAITING:
494 # The worker will notice the new status and cancel the job
495 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
496 return (True, "Job %s will be canceled" % self.id)
499 logging.debug("Job %s is no longer waiting in the queue", self.id)
500 return (False, "Job %s is no longer waiting in the queue" % self.id)
502 def ChangePriority(self, priority):
503 """Changes the job priority.
506 @param priority: New priority
507 @rtype: tuple; (bool, string)
508 @return: Boolean describing whether job's priority was successfully changed
512 status = self.CalcStatus()
514 if status in constants.JOBS_FINALIZED:
515 return (False, "Job %s is finished" % self.id)
516 elif status == constants.JOB_STATUS_CANCELING:
517 return (False, "Job %s is cancelling" % self.id)
519 assert status in (constants.JOB_STATUS_QUEUED,
520 constants.JOB_STATUS_WAITING,
521 constants.JOB_STATUS_RUNNING)
525 if (op.status == constants.OP_STATUS_RUNNING or
526 op.status in constants.OPS_FINALIZED):
527 assert not changed, \
528 ("Found opcode for which priority should not be changed after"
529 " priority has been changed for previous opcodes")
532 assert op.status in (constants.OP_STATUS_QUEUED,
533 constants.OP_STATUS_WAITING)
537 # Set new priority (doesn't modify opcode input)
538 op.priority = priority
541 return (True, ("Priorities of pending opcodes for job %s have been"
542 " changed to %s" % (self.id, priority)))
544 return (False, "Job %s had no pending opcodes" % self.id)
547 class _OpExecCallbacks(mcpu.OpExecCbBase):
548 def __init__(self, queue, job, op):
549 """Initializes this class.
551 @type queue: L{JobQueue}
552 @param queue: Job queue
553 @type job: L{_QueuedJob}
554 @param job: Job object
555 @type op: L{_QueuedOpCode}
559 assert queue, "Queue is missing"
560 assert job, "Job is missing"
561 assert op, "Opcode is missing"
567 def _CheckCancel(self):
568 """Raises an exception to cancel the job if asked to.
571 # Cancel here if we were asked to
572 if self._op.status == constants.OP_STATUS_CANCELING:
573 logging.debug("Canceling opcode")
576 # See if queue is shutting down
577 if not self._queue.AcceptingJobsUnlocked():
578 logging.debug("Queue is shutting down")
579 raise QueueShutdown()
581 @locking.ssynchronized(_QUEUE, shared=1)
582 def NotifyStart(self):
583 """Mark the opcode as running, not lock-waiting.
585 This is called from the mcpu code as a notifier function, when the LU is
586 finally about to start the Exec() method. Of course, to have end-user
587 visible results, the opcode must be initially (before calling into
588 Processor.ExecOpCode) set to OP_STATUS_WAITING.
591 assert self._op in self._job.ops
592 assert self._op.status in (constants.OP_STATUS_WAITING,
593 constants.OP_STATUS_CANCELING)
595 # Cancel here if we were asked to
598 logging.debug("Opcode is now running")
600 self._op.status = constants.OP_STATUS_RUNNING
601 self._op.exec_timestamp = TimeStampNow()
603 # And finally replicate the job status
604 self._queue.UpdateJobUnlocked(self._job)
606 @locking.ssynchronized(_QUEUE, shared=1)
607 def _AppendFeedback(self, timestamp, log_type, log_msg):
608 """Internal feedback append function, with locks
611 self._job.log_serial += 1
612 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
613 self._queue.UpdateJobUnlocked(self._job, replicate=False)
615 def Feedback(self, *args):
616 """Append a log entry.
622 log_type = constants.ELOG_MESSAGE
625 (log_type, log_msg) = args
627 # The time is split to make serialization easier and not lose
629 timestamp = utils.SplitTime(time.time())
630 self._AppendFeedback(timestamp, log_type, log_msg)
632 def CurrentPriority(self):
633 """Returns current priority for opcode.
636 assert self._op.status in (constants.OP_STATUS_WAITING,
637 constants.OP_STATUS_CANCELING)
639 # Cancel here if we were asked to
642 return self._op.priority
644 def SubmitManyJobs(self, jobs):
645 """Submits jobs for processing.
647 See L{JobQueue.SubmitManyJobs}.
650 # Locking is done in job queue
651 return self._queue.SubmitManyJobs(jobs)
654 class _JobChangesChecker(object):
655 def __init__(self, fields, prev_job_info, prev_log_serial):
656 """Initializes this class.
658 @type fields: list of strings
659 @param fields: Fields requested by LUXI client
660 @type prev_job_info: string
661 @param prev_job_info: previous job info, as passed by the LUXI client
662 @type prev_log_serial: string
663 @param prev_log_serial: previous job serial, as passed by the LUXI client
666 self._squery = _SimpleJobQuery(fields)
667 self._prev_job_info = prev_job_info
668 self._prev_log_serial = prev_log_serial
670 def __call__(self, job):
671 """Checks whether job has changed.
673 @type job: L{_QueuedJob}
674 @param job: Job object
677 assert not job.writable, "Expected read-only job"
679 status = job.CalcStatus()
680 job_info = self._squery(job)
681 log_entries = job.GetLogEntries(self._prev_log_serial)
683 # Serializing and deserializing data can cause type changes (e.g. from
684 # tuple to list) or precision loss. We're doing it here so that we get
685 # the same modifications as the data received from the client. Without
686 # this, the comparison afterwards might fail without the data being
687 # significantly different.
688 # TODO: we just deserialized from disk, investigate how to make sure that
689 # the job info and log entries are compatible to avoid this further step.
690 # TODO: Doing something like in testutils.py:UnifyValueType might be more
691 # efficient, though floats will be tricky
692 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
693 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
695 # Don't even try to wait if the job is no longer running, there will be
697 if (status not in (constants.JOB_STATUS_QUEUED,
698 constants.JOB_STATUS_RUNNING,
699 constants.JOB_STATUS_WAITING) or
700 job_info != self._prev_job_info or
701 (log_entries and self._prev_log_serial != log_entries[0][0])):
702 logging.debug("Job %s changed", job.id)
703 return (job_info, log_entries)
708 class _JobFileChangesWaiter(object):
709 def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
710 """Initializes this class.
712 @type filename: string
713 @param filename: Path to job file
714 @raises errors.InotifyError: if the notifier cannot be setup
717 self._wm = _inotify_wm_cls()
718 self._inotify_handler = \
719 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
721 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
723 self._inotify_handler.enable()
725 # pyinotify doesn't close file descriptors automatically
726 self._notifier.stop()
729 def _OnInotify(self, notifier_enabled):
730 """Callback for inotify.
733 if not notifier_enabled:
734 self._inotify_handler.enable()
736 def Wait(self, timeout):
737 """Waits for the job file to change.
740 @param timeout: Timeout in seconds
741 @return: Whether there have been events
745 have_events = self._notifier.check_events(timeout * 1000)
747 self._notifier.read_events()
748 self._notifier.process_events()
752 """Closes underlying notifier and its file descriptor.
755 self._notifier.stop()
758 class _JobChangesWaiter(object):
759 def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
760 """Initializes this class.
762 @type filename: string
763 @param filename: Path to job file
766 self._filewaiter = None
767 self._filename = filename
768 self._waiter_cls = _waiter_cls
770 def Wait(self, timeout):
771 """Waits for a job to change.
774 @param timeout: Timeout in seconds
775 @return: Whether there have been events
779 return self._filewaiter.Wait(timeout)
781 # Lazy setup: Avoid inotify setup cost when job file has already changed.
782 # If this point is reached, return immediately and let caller check the job
783 # file again in case there were changes since the last check. This avoids a
785 self._filewaiter = self._waiter_cls(self._filename)
790 """Closes underlying waiter.
794 self._filewaiter.Close()
797 class _WaitForJobChangesHelper(object):
798 """Helper class using inotify to wait for changes in a job file.
800 This class takes a previous job status and serial, and alerts the client when
801 the current job status has changed.
805 def _CheckForChanges(counter, job_load_fn, check_fn):
806 if counter.next() > 0:
807 # If this isn't the first check the job is given some more time to change
808 # again. This gives better performance for jobs generating many
814 raise errors.JobLost()
816 result = check_fn(job)
818 raise utils.RetryAgain()
822 def __call__(self, filename, job_load_fn,
823 fields, prev_job_info, prev_log_serial, timeout,
824 _waiter_cls=_JobChangesWaiter):
825 """Waits for changes on a job.
827 @type filename: string
828 @param filename: File on which to wait for changes
829 @type job_load_fn: callable
830 @param job_load_fn: Function to load job
831 @type fields: list of strings
832 @param fields: Which fields to check for changes
833 @type prev_job_info: list or None
834 @param prev_job_info: Last job information returned
835 @type prev_log_serial: int
836 @param prev_log_serial: Last job message serial number
838 @param timeout: maximum time to wait in seconds
841 counter = itertools.count()
843 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
844 waiter = _waiter_cls(filename)
846 return utils.Retry(compat.partial(self._CheckForChanges,
847 counter, job_load_fn, check_fn),
848 utils.RETRY_REMAINING_TIME, timeout,
852 except errors.JobLost:
854 except utils.RetryTimeout:
855 return constants.JOB_NOTCHANGED
858 def _EncodeOpError(err):
859 """Encodes an error which occurred while processing an opcode.
862 if isinstance(err, errors.GenericError):
865 to_encode = errors.OpExecError(str(err))
867 return errors.EncodeException(to_encode)
870 class _TimeoutStrategyWrapper:
871 def __init__(self, fn):
872 """Initializes this class.
879 """Gets the next timeout if necessary.
882 if self._next is None:
883 self._next = self._fn()
886 """Returns the next timeout.
893 """Returns the current timeout and advances the internal state.
902 class _OpExecContext:
903 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
904 """Initializes this class.
909 self.log_prefix = log_prefix
910 self.summary = op.input.Summary()
912 # Create local copy to modify
913 if getattr(op.input, opcodes.DEPEND_ATTR, None):
914 self.jobdeps = op.input.depends[:]
918 self._timeout_strategy_factory = timeout_strategy_factory
919 self._ResetTimeoutStrategy()
921 def _ResetTimeoutStrategy(self):
922 """Creates a new timeout strategy.
925 self._timeout_strategy = \
926 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
928 def CheckPriorityIncrease(self):
929 """Checks whether priority can and should be increased.
931 Called when locks couldn't be acquired.
936 # Exhausted all retries and next round should not use blocking acquire
938 if (self._timeout_strategy.Peek() is None and
939 op.priority > constants.OP_PRIO_HIGHEST):
940 logging.debug("Increasing priority")
942 self._ResetTimeoutStrategy()
947 def GetNextLockTimeout(self):
948 """Returns the next lock acquire timeout.
951 return self._timeout_strategy.Next()
954 class _JobProcessor(object):
957 FINISHED) = range(1, 4)
959 def __init__(self, queue, opexec_fn, job,
960 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
961 """Initializes this class.
965 self.opexec_fn = opexec_fn
967 self._timeout_strategy_factory = _timeout_strategy_factory
970 def _FindNextOpcode(job, timeout_strategy_factory):
971 """Locates the next opcode to run.
973 @type job: L{_QueuedJob}
974 @param job: Job object
975 @param timeout_strategy_factory: Callable to create new timeout strategy
978 # Create some sort of a cache to speed up locating next opcode for future
980 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
981 # pending and one for processed ops.
982 if job.ops_iter is None:
983 job.ops_iter = enumerate(job.ops)
985 # Find next opcode to run
988 (idx, op) = job.ops_iter.next()
989 except StopIteration:
990 raise errors.ProgrammerError("Called for a finished job")
992 if op.status == constants.OP_STATUS_RUNNING:
993 # Found an opcode already marked as running
994 raise errors.ProgrammerError("Called for job marked as running")
996 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
997 timeout_strategy_factory)
999 if op.status not in constants.OPS_FINALIZED:
1002 # This is a job that was partially completed before master daemon
1003 # shutdown, so it can be expected that some opcodes are already
1004 # completed successfully (if any did error out, then the whole job
1005 # should have been aborted and not resubmitted for processing).
1006 logging.info("%s: opcode %s already processed, skipping",
1007 opctx.log_prefix, opctx.summary)
1010 def _MarkWaitlock(job, op):
1011 """Marks an opcode as waiting for locks.
1013 The job's start timestamp is also set if necessary.
1015 @type job: L{_QueuedJob}
1016 @param job: Job object
1017 @type op: L{_QueuedOpCode}
1018 @param op: Opcode object
1021 assert op in job.ops
1022 assert op.status in (constants.OP_STATUS_QUEUED,
1023 constants.OP_STATUS_WAITING)
1029 if op.status == constants.OP_STATUS_QUEUED:
1030 op.status = constants.OP_STATUS_WAITING
1033 if op.start_timestamp is None:
1034 op.start_timestamp = TimeStampNow()
1037 if job.start_timestamp is None:
1038 job.start_timestamp = op.start_timestamp
1041 assert op.status == constants.OP_STATUS_WAITING
1046 def _CheckDependencies(queue, job, opctx):
1047 """Checks if an opcode has dependencies and if so, processes them.
1049 @type queue: L{JobQueue}
1050 @param queue: Queue object
1051 @type job: L{_QueuedJob}
1052 @param job: Job object
1053 @type opctx: L{_OpExecContext}
1054 @param opctx: Opcode execution context
1056 @return: Whether opcode will be re-scheduled by dependency tracker
1063 while opctx.jobdeps:
1064 (dep_job_id, dep_status) = opctx.jobdeps[0]
1066 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1068 assert ht.TNonEmptyString(depmsg), "No dependency message"
1070 logging.info("%s: %s", opctx.log_prefix, depmsg)
1072 if depresult == _JobDependencyManager.CONTINUE:
1073 # Remove dependency and continue
1074 opctx.jobdeps.pop(0)
1076 elif depresult == _JobDependencyManager.WAIT:
1077 # Need to wait for notification, dependency tracker will re-add job
1082 elif depresult == _JobDependencyManager.CANCEL:
1083 # Job was cancelled, cancel this job as well
1085 assert op.status == constants.OP_STATUS_CANCELING
1088 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1089 _JobDependencyManager.ERROR):
1090 # Job failed or there was an error, this job must fail
1091 op.status = constants.OP_STATUS_ERROR
1092 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1096 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1101 def _ExecOpCodeUnlocked(self, opctx):
1102 """Processes one opcode and returns the result.
1107 assert op.status == constants.OP_STATUS_WAITING
1109 timeout = opctx.GetNextLockTimeout()
1112 # Make sure not to hold queue lock while calling ExecOpCode
1113 result = self.opexec_fn(op.input,
1114 _OpExecCallbacks(self.queue, self.job, op),
1116 except mcpu.LockAcquireTimeout:
1117 assert timeout is not None, "Received timeout for blocking acquire"
1118 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1120 assert op.status in (constants.OP_STATUS_WAITING,
1121 constants.OP_STATUS_CANCELING)
1123 # Was job cancelled while we were waiting for the lock?
1124 if op.status == constants.OP_STATUS_CANCELING:
1125 return (constants.OP_STATUS_CANCELING, None)
1127 # Queue is shutting down, return to queued
1128 if not self.queue.AcceptingJobsUnlocked():
1129 return (constants.OP_STATUS_QUEUED, None)
1131 # Stay in waitlock while trying to re-acquire lock
1132 return (constants.OP_STATUS_WAITING, None)
1134 logging.exception("%s: Canceling job", opctx.log_prefix)
1135 assert op.status == constants.OP_STATUS_CANCELING
1136 return (constants.OP_STATUS_CANCELING, None)
1138 except QueueShutdown:
1139 logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1141 assert op.status == constants.OP_STATUS_WAITING
1143 # Job hadn't been started yet, so it should return to the queue
1144 return (constants.OP_STATUS_QUEUED, None)
1146 except Exception, err: # pylint: disable=W0703
1147 logging.exception("%s: Caught exception in %s",
1148 opctx.log_prefix, opctx.summary)
1149 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1151 logging.debug("%s: %s successful",
1152 opctx.log_prefix, opctx.summary)
1153 return (constants.OP_STATUS_SUCCESS, result)
1155 def __call__(self, _nextop_fn=None):
1156 """Continues execution of a job.
1158 @param _nextop_fn: Callback function for tests
1159 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1160 be deferred and C{WAITDEP} if the dependency manager
1161 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1167 logging.debug("Processing job %s", job.id)
1169 queue.acquire(shared=1)
1171 opcount = len(job.ops)
1173 assert job.writable, "Expected writable job"
1175 # Don't do anything for finalized jobs
1176 if job.CalcStatus() in constants.JOBS_FINALIZED:
1177 return self.FINISHED
1179 # Is a previous opcode still pending?
1181 opctx = job.cur_opctx
1182 job.cur_opctx = None
1184 if __debug__ and _nextop_fn:
1186 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1191 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1192 constants.OP_STATUS_CANCELING)
1193 for i in job.ops[opctx.index + 1:])
1195 assert op.status in (constants.OP_STATUS_QUEUED,
1196 constants.OP_STATUS_WAITING,
1197 constants.OP_STATUS_CANCELING)
1199 assert (op.priority <= constants.OP_PRIO_LOWEST and
1200 op.priority >= constants.OP_PRIO_HIGHEST)
1204 if op.status != constants.OP_STATUS_CANCELING:
1205 assert op.status in (constants.OP_STATUS_QUEUED,
1206 constants.OP_STATUS_WAITING)
1208 # Prepare to start opcode
1209 if self._MarkWaitlock(job, op):
1211 queue.UpdateJobUnlocked(job)
1213 assert op.status == constants.OP_STATUS_WAITING
1214 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1215 assert job.start_timestamp and op.start_timestamp
1216 assert waitjob is None
1218 # Check if waiting for a job is necessary
1219 waitjob = self._CheckDependencies(queue, job, opctx)
1221 assert op.status in (constants.OP_STATUS_WAITING,
1222 constants.OP_STATUS_CANCELING,
1223 constants.OP_STATUS_ERROR)
1225 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1226 constants.OP_STATUS_ERROR)):
1227 logging.info("%s: opcode %s waiting for locks",
1228 opctx.log_prefix, opctx.summary)
1230 assert not opctx.jobdeps, "Not all dependencies were removed"
1234 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1236 queue.acquire(shared=1)
1238 op.status = op_status
1239 op.result = op_result
1243 if op.status in (constants.OP_STATUS_WAITING,
1244 constants.OP_STATUS_QUEUED):
1245 # waiting: Couldn't get locks in time
1246 # queued: Queue is shutting down
1247 assert not op.end_timestamp
1250 op.end_timestamp = TimeStampNow()
1252 if op.status == constants.OP_STATUS_CANCELING:
1253 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1254 for i in job.ops[opctx.index:])
1256 assert op.status in constants.OPS_FINALIZED
1258 if op.status == constants.OP_STATUS_QUEUED:
1259 # Queue is shutting down
1265 job.cur_opctx = None
1267 # In no case must the status be finalized here
1268 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1270 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1273 if not waitjob and opctx.CheckPriorityIncrease():
1274 # Priority was changed, need to update on-disk file
1275 queue.UpdateJobUnlocked(job)
1277 # Keep around for another round
1278 job.cur_opctx = opctx
1280 assert (op.priority <= constants.OP_PRIO_LOWEST and
1281 op.priority >= constants.OP_PRIO_HIGHEST)
1283 # In no case must the status be finalized here
1284 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1287 # Ensure all opcodes so far have been successful
1288 assert (opctx.index == 0 or
1289 compat.all(i.status == constants.OP_STATUS_SUCCESS
1290 for i in job.ops[:opctx.index]))
1293 job.cur_opctx = None
1295 if op.status == constants.OP_STATUS_SUCCESS:
1298 elif op.status == constants.OP_STATUS_ERROR:
1299 # Ensure failed opcode has an exception as its result
1300 assert errors.GetEncodedError(job.ops[opctx.index].result)
1302 to_encode = errors.OpExecError("Preceding opcode failed")
1303 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1304 _EncodeOpError(to_encode))
1308 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1309 errors.GetEncodedError(i.result)
1310 for i in job.ops[opctx.index:])
1312 elif op.status == constants.OP_STATUS_CANCELING:
1313 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1314 "Job canceled by request")
1318 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1320 if opctx.index == (opcount - 1):
1321 # Finalize on last opcode
1325 # All opcodes have been run, finalize job
1328 # Write to disk. If the job status is final, this is the final write
1329 # allowed. Once the file has been written, it can be archived anytime.
1330 queue.UpdateJobUnlocked(job)
1335 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1336 return self.FINISHED
1338 assert not waitjob or queue.depmgr.JobWaiting(job)
1345 assert job.writable, "Job became read-only while being processed"
1349 def _EvaluateJobProcessorResult(depmgr, job, result):
1350 """Looks at a result from L{_JobProcessor} for a job.
1352 To be used in a L{_JobQueueWorker}.
1355 if result == _JobProcessor.FINISHED:
1356 # Notify waiting jobs
1357 depmgr.NotifyWaiters(job.id)
1359 elif result == _JobProcessor.DEFER:
1361 raise workerpool.DeferTask(priority=job.CalcPriority())
1363 elif result == _JobProcessor.WAITDEP:
1364 # No-op, dependency manager will re-schedule
1368 raise errors.ProgrammerError("Job processor returned unknown status %s" %
1372 class _JobQueueWorker(workerpool.BaseWorker):
1373 """The actual job workers.
1376 def RunTask(self, job): # pylint: disable=W0221
1379 @type job: L{_QueuedJob}
1380 @param job: the job to be processed
1383 assert job.writable, "Expected writable job"
1385 # Ensure only one worker is active on a single job. If a job registers for
1386 # a dependency job, and the other job notifies before the first worker is
1387 # done, the job can end up in the tasklist more than once.
1388 job.processor_lock.acquire()
1390 return self._RunTaskInner(job)
1392 job.processor_lock.release()
1394 def _RunTaskInner(self, job):
1397 Must be called with per-job lock acquired.
1401 assert queue == self.pool.queue
1403 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1406 proc = mcpu.Processor(queue.context, job.id)
1408 # Create wrapper for setting thread name
1409 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1412 _EvaluateJobProcessorResult(queue.depmgr, job,
1413 _JobProcessor(queue, wrap_execop_fn, job)())
1416 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1417 """Updates the worker thread name to include a short summary of the opcode.
1419 @param setname_fn: Callable setting worker thread name
1420 @param execop_fn: Callable for executing opcode (usually
1421 L{mcpu.Processor.ExecOpCode})
1426 return execop_fn(op, *args, **kwargs)
1431 def _GetWorkerName(job, op):
1432 """Sets the worker thread name.
1434 @type job: L{_QueuedJob}
1435 @type op: L{opcodes.OpCode}
1438 parts = ["Job%s" % job.id]
1441 parts.append(op.TinySummary())
1443 return "/".join(parts)
1446 class _JobQueueWorkerPool(workerpool.WorkerPool):
1447 """Simple class implementing a job-processing workerpool.
1450 def __init__(self, queue):
1451 super(_JobQueueWorkerPool, self).__init__("Jq",
1457 class _JobDependencyManager:
1458 """Keeps track of job dependencies.
1465 WRONGSTATUS) = range(1, 6)
1467 def __init__(self, getstatus_fn, enqueue_fn):
1468 """Initializes this class.
1471 self._getstatus_fn = getstatus_fn
1472 self._enqueue_fn = enqueue_fn
1475 self._lock = locking.SharedLock("JobDepMgr")
1477 @locking.ssynchronized(_LOCK, shared=1)
1478 def GetLockInfo(self, requested): # pylint: disable=W0613
1479 """Retrieves information about waiting jobs.
1481 @type requested: set
1482 @param requested: Requested information, see C{query.LQ_*}
1485 # No need to sort here, that's being done by the lock manager and query
1486 # library. There are no priorities for notifying jobs, hence all show up as
1487 # one item under "pending".
1488 return [("job/%s" % job_id, None, None,
1489 [("job", [job.id for job in waiters])])
1490 for job_id, waiters in self._waiters.items()
1493 @locking.ssynchronized(_LOCK, shared=1)
1494 def JobWaiting(self, job):
1495 """Checks if a job is waiting.
1498 return compat.any(job in jobs
1499 for jobs in self._waiters.values())
1501 @locking.ssynchronized(_LOCK)
1502 def CheckAndRegister(self, job, dep_job_id, dep_status):
1503 """Checks if a dependency job has the requested status.
1505 If the other job is not yet in a finalized status, the calling job will be
1506 notified (re-added to the workerpool) at a later point.
1508 @type job: L{_QueuedJob}
1509 @param job: Job object
1510 @type dep_job_id: int
1511 @param dep_job_id: ID of dependency job
1512 @type dep_status: list
1513 @param dep_status: Required status
1516 assert ht.TJobId(job.id)
1517 assert ht.TJobId(dep_job_id)
1518 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1520 if job.id == dep_job_id:
1521 return (self.ERROR, "Job can't depend on itself")
1523 # Get status of dependency job
1525 status = self._getstatus_fn(dep_job_id)
1526 except errors.JobLost, err:
1527 return (self.ERROR, "Dependency error: %s" % err)
1529 assert status in constants.JOB_STATUS_ALL
1531 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1533 if status not in constants.JOBS_FINALIZED:
1534 # Register for notification and wait for job to finish
1535 job_id_waiters.add(job)
1537 "Need to wait for job %s, wanted status '%s'" %
1538 (dep_job_id, dep_status))
1540 # Remove from waiters list
1541 if job in job_id_waiters:
1542 job_id_waiters.remove(job)
1544 if (status == constants.JOB_STATUS_CANCELED and
1545 constants.JOB_STATUS_CANCELED not in dep_status):
1546 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1548 elif not dep_status or status in dep_status:
1549 return (self.CONTINUE,
1550 "Dependency job %s finished with status '%s'" %
1551 (dep_job_id, status))
1554 return (self.WRONGSTATUS,
1555 "Dependency job %s finished with status '%s',"
1556 " not one of '%s' as required" %
1557 (dep_job_id, status, utils.CommaJoin(dep_status)))
1559 def _RemoveEmptyWaitersUnlocked(self):
1560 """Remove all jobs without actual waiters.
1563 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1565 del self._waiters[job_id]
1567 def NotifyWaiters(self, job_id):
1568 """Notifies all jobs waiting for a certain job ID.
1570 @attention: Do not call until L{CheckAndRegister} returned a status other
1571 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1573 @param job_id: Job ID
1576 assert ht.TJobId(job_id)
1578 self._lock.acquire()
1580 self._RemoveEmptyWaitersUnlocked()
1582 jobs = self._waiters.pop(job_id, None)
1584 self._lock.release()
1587 # Re-add jobs to workerpool
1588 logging.debug("Re-adding %s jobs which were waiting for job %s",
1590 self._enqueue_fn(jobs)
1593 def _RequireOpenQueue(fn):
1594 """Decorator for "public" functions.
1596 This function should be used for all 'public' functions. That is,
1597 functions usually called from other classes. Note that this should
1598 be applied only to methods (not plain functions), since it expects
1599 that the decorated function is called with a first argument that has
1600 a '_queue_filelock' argument.
1602 @warning: Use this decorator only after locking.ssynchronized
1605 @locking.ssynchronized(_LOCK)
1611 def wrapper(self, *args, **kwargs):
1612 # pylint: disable=W0212
1613 assert self._queue_filelock is not None, "Queue should be open"
1614 return fn(self, *args, **kwargs)
1618 def _RequireNonDrainedQueue(fn):
1619 """Decorator checking for a non-drained queue.
1621 To be used with functions submitting new jobs.
1624 def wrapper(self, *args, **kwargs):
1625 """Wrapper function.
1627 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1630 # Ok when sharing the big job queue lock, as the drain file is created when
1631 # the lock is exclusive.
1632 # Needs access to protected member, pylint: disable=W0212
1634 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1636 if not self._accepting_jobs:
1637 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1639 return fn(self, *args, **kwargs)
1643 class JobQueue(object):
1644 """Queue used to manage the jobs.
1647 def __init__(self, context):
1648 """Constructor for JobQueue.
1650 The constructor will initialize the job queue object and then
1651 start loading the current jobs from disk, either for starting them
1652 (if they were queue) or for aborting them (if they were already
1655 @type context: GanetiContext
1656 @param context: the context object for access to the configuration
1657 data and other ganeti objects
1660 self.context = context
1661 self._memcache = weakref.WeakValueDictionary()
1662 self._my_hostname = netutils.Hostname.GetSysName()
1664 # The Big JobQueue lock. If a code block or method acquires it in shared
1665 # mode safe it must guarantee concurrency with all the code acquiring it in
1666 # shared mode, including itself. In order not to acquire it at all
1667 # concurrency must be guaranteed with all code acquiring it in shared mode
1668 # and all code acquiring it exclusively.
1669 self._lock = locking.SharedLock("JobQueue")
1671 self.acquire = self._lock.acquire
1672 self.release = self._lock.release
1674 # Accept jobs by default
1675 self._accepting_jobs = True
1677 # Initialize the queue, and acquire the filelock.
1678 # This ensures no other process is working on the job queue.
1679 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1682 self._last_serial = jstore.ReadSerial()
1683 assert self._last_serial is not None, ("Serial file was modified between"
1684 " check in jstore and here")
1686 # Get initial list of nodes
1687 self._nodes = dict((n.name, n.primary_ip)
1688 for n in self.context.cfg.GetAllNodesInfo().values()
1689 if n.master_candidate)
1691 # Remove master node
1692 self._nodes.pop(self._my_hostname, None)
1694 # TODO: Check consistency across nodes
1696 self._queue_size = None
1697 self._UpdateQueueSizeUnlocked()
1698 assert ht.TInt(self._queue_size)
1699 self._drained = jstore.CheckDrainFlag()
1702 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1704 self.context.glm.AddToLockMonitor(self.depmgr)
1707 self._wpool = _JobQueueWorkerPool(self)
1709 self._InspectQueue()
1711 self._wpool.TerminateWorkers()
1714 @locking.ssynchronized(_LOCK)
1716 def _InspectQueue(self):
1717 """Loads the whole job queue and resumes unfinished jobs.
1719 This function needs the lock here because WorkerPool.AddTask() may start a
1720 job while we're still doing our work.
1723 logging.info("Inspecting job queue")
1727 all_job_ids = self._GetJobIDsUnlocked()
1728 jobs_count = len(all_job_ids)
1729 lastinfo = time.time()
1730 for idx, job_id in enumerate(all_job_ids):
1731 # Give an update every 1000 jobs or 10 seconds
1732 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1733 idx == (jobs_count - 1)):
1734 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1735 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1736 lastinfo = time.time()
1738 job = self._LoadJobUnlocked(job_id)
1740 # a failure in loading the job can cause 'None' to be returned
1744 status = job.CalcStatus()
1746 if status == constants.JOB_STATUS_QUEUED:
1747 restartjobs.append(job)
1749 elif status in (constants.JOB_STATUS_RUNNING,
1750 constants.JOB_STATUS_WAITING,
1751 constants.JOB_STATUS_CANCELING):
1752 logging.warning("Unfinished job %s found: %s", job.id, job)
1754 if status == constants.JOB_STATUS_WAITING:
1756 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1757 restartjobs.append(job)
1759 to_encode = errors.OpExecError("Unclean master daemon shutdown")
1760 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1761 _EncodeOpError(to_encode))
1764 self.UpdateJobUnlocked(job)
1767 logging.info("Restarting %s jobs", len(restartjobs))
1768 self._EnqueueJobsUnlocked(restartjobs)
1770 logging.info("Job queue inspection finished")
1772 def _GetRpc(self, address_list):
1773 """Gets RPC runner with context.
1776 return rpc.JobQueueRunner(self.context, address_list)
1778 @locking.ssynchronized(_LOCK)
1780 def AddNode(self, node):
1781 """Register a new node with the queue.
1783 @type node: L{objects.Node}
1784 @param node: the node object to be added
1787 node_name = node.name
1788 assert node_name != self._my_hostname
1790 # Clean queue directory on added node
1791 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1792 msg = result.fail_msg
1794 logging.warning("Cannot cleanup queue directory on node %s: %s",
1797 if not node.master_candidate:
1798 # remove if existing, ignoring errors
1799 self._nodes.pop(node_name, None)
1800 # and skip the replication of the job ids
1803 # Upload the whole queue excluding archived jobs
1804 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1806 # Upload current serial file
1807 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1809 # Static address list
1810 addrs = [node.primary_ip]
1812 for file_name in files:
1814 content = utils.ReadFile(file_name)
1816 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1818 msg = result[node_name].fail_msg
1820 logging.error("Failed to upload file %s to node %s: %s",
1821 file_name, node_name, msg)
1823 # Set queue drained flag
1825 self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1827 msg = result[node_name].fail_msg
1829 logging.error("Failed to set queue drained flag on node %s: %s",
1832 self._nodes[node_name] = node.primary_ip
1834 @locking.ssynchronized(_LOCK)
1836 def RemoveNode(self, node_name):
1837 """Callback called when removing nodes from the cluster.
1839 @type node_name: str
1840 @param node_name: the name of the node to remove
1843 self._nodes.pop(node_name, None)
1846 def _CheckRpcResult(result, nodes, failmsg):
1847 """Verifies the status of an RPC call.
1849 Since we aim to keep consistency should this node (the current
1850 master) fail, we will log errors if our rpc fail, and especially
1851 log the case when more than half of the nodes fails.
1853 @param result: the data as returned from the rpc call
1855 @param nodes: the list of nodes we made the call to
1857 @param failmsg: the identifier to be used for logging
1864 msg = result[node].fail_msg
1867 logging.error("RPC call %s (%s) failed on node %s: %s",
1868 result[node].call, failmsg, node, msg)
1870 success.append(node)
1872 # +1 for the master node
1873 if (len(success) + 1) < len(failed):
1874 # TODO: Handle failing nodes
1875 logging.error("More than half of the nodes failed")
1877 def _GetNodeIp(self):
1878 """Helper for returning the node name/ip list.
1880 @rtype: (list, list)
1881 @return: a tuple of two lists, the first one with the node
1882 names and the second one with the node addresses
1885 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1886 name_list = self._nodes.keys()
1887 addr_list = [self._nodes[name] for name in name_list]
1888 return name_list, addr_list
1890 def _UpdateJobQueueFile(self, file_name, data, replicate):
1891 """Writes a file locally and then replicates it to all nodes.
1893 This function will replace the contents of a file on the local
1894 node and then replicate it to all the other nodes we have.
1896 @type file_name: str
1897 @param file_name: the path of the file to be replicated
1899 @param data: the new contents of the file
1900 @type replicate: boolean
1901 @param replicate: whether to spread the changes to the remote nodes
1904 getents = runtime.GetEnts()
1905 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1906 gid=getents.daemons_gid,
1907 mode=constants.JOB_QUEUE_FILES_PERMS)
1910 names, addrs = self._GetNodeIp()
1911 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1912 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1914 def _RenameFilesUnlocked(self, rename):
1915 """Renames a file locally and then replicate the change.
1917 This function will rename a file in the local queue directory
1918 and then replicate this rename to all the other nodes we have.
1920 @type rename: list of (old, new)
1921 @param rename: List containing tuples mapping old to new names
1924 # Rename them locally
1925 for old, new in rename:
1926 utils.RenameFile(old, new, mkdir=True)
1928 # ... and on all nodes
1929 names, addrs = self._GetNodeIp()
1930 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1931 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1933 def _NewSerialsUnlocked(self, count):
1934 """Generates a new job identifier.
1936 Job identifiers are unique during the lifetime of a cluster.
1938 @type count: integer
1939 @param count: how many serials to return
1941 @return: a list of job identifiers.
1944 assert ht.TNonNegativeInt(count)
1947 serial = self._last_serial + count
1950 self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1951 "%s\n" % serial, True)
1953 result = [jstore.FormatJobID(v)
1954 for v in range(self._last_serial + 1, serial + 1)]
1956 # Keep it only if we were able to write the file
1957 self._last_serial = serial
1959 assert len(result) == count
1964 def _GetJobPath(job_id):
1965 """Returns the job file for a given job id.
1968 @param job_id: the job identifier
1970 @return: the path to the job file
1973 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1976 def _GetArchivedJobPath(job_id):
1977 """Returns the archived job file for a give job id.
1980 @param job_id: the job identifier
1982 @return: the path to the archived job file
1985 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1986 jstore.GetArchiveDirectory(job_id),
1990 def _DetermineJobDirectories(archived):
1991 """Build list of directories containing job files.
1993 @type archived: bool
1994 @param archived: Whether to include directories for archived jobs
1998 result = [pathutils.QUEUE_DIR]
2001 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2002 result.extend(map(compat.partial(utils.PathJoin, archive_path),
2003 utils.ListVisibleFiles(archive_path)))
2008 def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2009 """Return all known job IDs.
2011 The method only looks at disk because it's a requirement that all
2012 jobs are present on disk (so in the _memcache we don't have any
2016 @param sort: perform sorting on the returned job ids
2018 @return: the list of job IDs
2023 for path in cls._DetermineJobDirectories(archived):
2024 for filename in utils.ListVisibleFiles(path):
2025 m = constants.JOB_FILE_RE.match(filename)
2027 jlist.append(int(m.group(1)))
2033 def _LoadJobUnlocked(self, job_id):
2034 """Loads a job from the disk or memory.
2036 Given a job id, this will return the cached job object if
2037 existing, or try to load the job from the disk. If loading from
2038 disk, it will also add the job to the cache.
2041 @param job_id: the job id
2042 @rtype: L{_QueuedJob} or None
2043 @return: either None or the job object
2046 job = self._memcache.get(job_id, None)
2048 logging.debug("Found job %s in memcache", job_id)
2049 assert job.writable, "Found read-only job in memcache"
2053 job = self._LoadJobFromDisk(job_id, False)
2056 except errors.JobFileCorrupted:
2057 old_path = self._GetJobPath(job_id)
2058 new_path = self._GetArchivedJobPath(job_id)
2059 if old_path == new_path:
2060 # job already archived (future case)
2061 logging.exception("Can't parse job %s", job_id)
2064 logging.exception("Can't parse job %s, will archive.", job_id)
2065 self._RenameFilesUnlocked([(old_path, new_path)])
2068 assert job.writable, "Job just loaded is not writable"
2070 self._memcache[job_id] = job
2071 logging.debug("Added job %s to the cache", job_id)
2074 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2075 """Load the given job file from disk.
2077 Given a job file, read, load and restore it in a _QueuedJob format.
2080 @param job_id: job identifier
2081 @type try_archived: bool
2082 @param try_archived: Whether to try loading an archived job
2083 @rtype: L{_QueuedJob} or None
2084 @return: either None or the job object
2087 path_functions = [(self._GetJobPath, False)]
2090 path_functions.append((self._GetArchivedJobPath, True))
2095 for (fn, archived) in path_functions:
2096 filepath = fn(job_id)
2097 logging.debug("Loading job from %s", filepath)
2099 raw_data = utils.ReadFile(filepath)
2100 except EnvironmentError, err:
2101 if err.errno != errno.ENOENT:
2109 if writable is None:
2110 writable = not archived
2113 data = serializer.LoadJson(raw_data)
2114 job = _QueuedJob.Restore(self, data, writable, archived)
2115 except Exception, err: # pylint: disable=W0703
2116 raise errors.JobFileCorrupted(err)
2120 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2121 """Load the given job file from disk.
2123 Given a job file, read, load and restore it in a _QueuedJob format.
2124 In case of error reading the job, it gets returned as None, and the
2125 exception is logged.
2128 @param job_id: job identifier
2129 @type try_archived: bool
2130 @param try_archived: Whether to try loading an archived job
2131 @rtype: L{_QueuedJob} or None
2132 @return: either None or the job object
2136 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2137 except (errors.JobFileCorrupted, EnvironmentError):
2138 logging.exception("Can't load/parse job %s", job_id)
2141 def _UpdateQueueSizeUnlocked(self):
2142 """Update the queue size.
2145 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2147 @locking.ssynchronized(_LOCK)
2149 def SetDrainFlag(self, drain_flag):
2150 """Sets the drain flag for the queue.
2152 @type drain_flag: boolean
2153 @param drain_flag: Whether to set or unset the drain flag
2156 # Change flag locally
2157 jstore.SetDrainFlag(drain_flag)
2159 self._drained = drain_flag
2161 # ... and on all nodes
2162 (names, addrs) = self._GetNodeIp()
2164 self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2165 self._CheckRpcResult(result, self._nodes,
2166 "Setting queue drain flag to %s" % drain_flag)
2171 def _SubmitJobUnlocked(self, job_id, ops):
2172 """Create and store a new job.
2174 This enters the job into our job queue and also puts it on the new
2175 queue, in order for it to be picked up by the queue processors.
2177 @type job_id: job ID
2178 @param job_id: the job ID for the new job
2180 @param ops: The list of OpCodes that will become the new job.
2181 @rtype: L{_QueuedJob}
2182 @return: the job object to be queued
2183 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2184 @raise errors.GenericError: If an opcode is not valid
2187 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2188 raise errors.JobQueueFull()
2190 job = _QueuedJob(self, job_id, ops, True)
2192 for idx, op in enumerate(job.ops):
2194 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2195 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2196 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2197 " are %s" % (idx, op.priority, allowed))
2199 # Check job dependencies
2200 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2201 if not opcodes.TNoRelativeJobDependencies(dependencies):
2202 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2204 (idx, opcodes.TNoRelativeJobDependencies,
2208 self.UpdateJobUnlocked(job)
2210 self._queue_size += 1
2212 logging.debug("Adding new job %s to the cache", job_id)
2213 self._memcache[job_id] = job
2217 @locking.ssynchronized(_LOCK)
2219 @_RequireNonDrainedQueue
2220 def SubmitJob(self, ops):
2221 """Create and store a new job.
2223 @see: L{_SubmitJobUnlocked}
2226 (job_id, ) = self._NewSerialsUnlocked(1)
2227 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2230 @locking.ssynchronized(_LOCK)
2232 @_RequireNonDrainedQueue
2233 def SubmitManyJobs(self, jobs):
2234 """Create and store multiple jobs.
2236 @see: L{_SubmitJobUnlocked}
2239 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2241 (results, added_jobs) = \
2242 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2244 self._EnqueueJobsUnlocked(added_jobs)
2249 def _FormatSubmitError(msg, ops):
2250 """Formats errors which occurred while submitting a job.
2253 return ("%s; opcodes %s" %
2254 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2257 def _ResolveJobDependencies(resolve_fn, deps):
2258 """Resolves relative job IDs in dependencies.
2260 @type resolve_fn: callable
2261 @param resolve_fn: Function to resolve a relative job ID
2263 @param deps: Dependencies
2264 @rtype: tuple; (boolean, string or list)
2265 @return: If successful (first tuple item), the returned list contains
2266 resolved job IDs along with the requested status; if not successful,
2267 the second element is an error message
2272 for (dep_job_id, dep_status) in deps:
2273 if ht.TRelativeJobId(dep_job_id):
2274 assert ht.TInt(dep_job_id) and dep_job_id < 0
2276 job_id = resolve_fn(dep_job_id)
2279 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2283 result.append((job_id, dep_status))
2285 return (True, result)
2287 def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2288 """Create and store multiple jobs.
2290 @see: L{_SubmitJobUnlocked}
2296 def resolve_fn(job_idx, reljobid):
2298 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2300 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2302 if getattr(op, opcodes.DEPEND_ATTR, None):
2304 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2307 # Abort resolving dependencies
2308 assert ht.TNonEmptyString(data), "No error message"
2310 # Use resolved dependencies
2314 job = self._SubmitJobUnlocked(job_id, ops)
2315 except errors.GenericError, err:
2317 data = self._FormatSubmitError(str(err), ops)
2321 added_jobs.append(job)
2323 results.append((status, data))
2325 return (results, added_jobs)
2327 @locking.ssynchronized(_LOCK)
2328 def _EnqueueJobs(self, jobs):
2329 """Helper function to add jobs to worker pool's queue.
2332 @param jobs: List of all jobs
2335 return self._EnqueueJobsUnlocked(jobs)
2337 def _EnqueueJobsUnlocked(self, jobs):
2338 """Helper function to add jobs to worker pool's queue.
2341 @param jobs: List of all jobs
2344 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2345 self._wpool.AddManyTasks([(job, ) for job in jobs],
2346 priority=[job.CalcPriority() for job in jobs],
2347 task_id=map(_GetIdAttr, jobs))
2349 def _GetJobStatusForDependencies(self, job_id):
2350 """Gets the status of a job for dependencies.
2353 @param job_id: Job ID
2354 @raise errors.JobLost: If job can't be found
2357 # Not using in-memory cache as doing so would require an exclusive lock
2359 # Try to load from disk
2360 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2362 assert not job.writable, "Got writable job" # pylint: disable=E1101
2365 return job.CalcStatus()
2367 raise errors.JobLost("Job %s not found" % job_id)
2370 def UpdateJobUnlocked(self, job, replicate=True):
2371 """Update a job's on disk storage.
2373 After a job has been modified, this function needs to be called in
2374 order to write the changes to disk and replicate them to the other
2377 @type job: L{_QueuedJob}
2378 @param job: the changed job
2379 @type replicate: boolean
2380 @param replicate: whether to replicate the change to remote nodes
2384 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2385 assert (finalized ^ (job.end_timestamp is None))
2386 assert job.writable, "Can't update read-only job"
2387 assert not job.archived, "Can't update archived job"
2389 filename = self._GetJobPath(job.id)
2390 data = serializer.DumpJson(job.Serialize())
2391 logging.debug("Writing job %s to %s", job.id, filename)
2392 self._UpdateJobQueueFile(filename, data, replicate)
2394 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2396 """Waits for changes in a job.
2399 @param job_id: Job identifier
2400 @type fields: list of strings
2401 @param fields: Which fields to check for changes
2402 @type prev_job_info: list or None
2403 @param prev_job_info: Last job information returned
2404 @type prev_log_serial: int
2405 @param prev_log_serial: Last job message serial number
2406 @type timeout: float
2407 @param timeout: maximum time to wait in seconds
2408 @rtype: tuple (job info, log entries)
2409 @return: a tuple of the job information as required via
2410 the fields parameter, and the log entries as a list
2412 if the job has not changed and the timeout has expired,
2413 we instead return a special value,
2414 L{constants.JOB_NOTCHANGED}, which should be interpreted
2415 as such by the clients
2418 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2421 helper = _WaitForJobChangesHelper()
2423 return helper(self._GetJobPath(job_id), load_fn,
2424 fields, prev_job_info, prev_log_serial, timeout)
2426 @locking.ssynchronized(_LOCK)
2428 def CancelJob(self, job_id):
2431 This will only succeed if the job has not started yet.
2434 @param job_id: job ID of job to be cancelled.
2437 logging.info("Cancelling job %s", job_id)
2439 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2441 @locking.ssynchronized(_LOCK)
2443 def ChangeJobPriority(self, job_id, priority):
2444 """Changes a job's priority.
2447 @param job_id: ID of the job whose priority should be changed
2449 @param priority: New priority
2452 logging.info("Changing priority of job %s to %s", job_id, priority)
2454 if priority not in constants.OP_PRIO_SUBMIT_VALID:
2455 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2456 raise errors.GenericError("Invalid priority %s, allowed are %s" %
2457 (priority, allowed))
2460 (success, msg) = job.ChangePriority(priority)
2464 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2465 except workerpool.NoSuchTask:
2466 logging.debug("Job %s is not in workerpool at this time", job.id)
2468 return (success, msg)
2470 return self._ModifyJobUnlocked(job_id, fn)
2472 def _ModifyJobUnlocked(self, job_id, mod_fn):
2476 @param job_id: Job ID
2477 @type mod_fn: callable
2478 @param mod_fn: Modifying function, receiving job object as parameter,
2479 returning tuple of (status boolean, message string)
2482 job = self._LoadJobUnlocked(job_id)
2484 logging.debug("Job %s not found", job_id)
2485 return (False, "Job %s not found" % job_id)
2487 assert job.writable, "Can't modify read-only job"
2488 assert not job.archived, "Can't modify archived job"
2490 (success, msg) = mod_fn(job)
2493 # If the job was finalized (e.g. cancelled), this is the final write
2494 # allowed. The job can be archived anytime.
2495 self.UpdateJobUnlocked(job)
2497 return (success, msg)
2500 def _ArchiveJobsUnlocked(self, jobs):
2503 @type jobs: list of L{_QueuedJob}
2504 @param jobs: Job objects
2506 @return: Number of archived jobs
2512 assert job.writable, "Can't archive read-only job"
2513 assert not job.archived, "Can't cancel archived job"
2515 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2516 logging.debug("Job %s is not yet done", job.id)
2519 archive_jobs.append(job)
2521 old = self._GetJobPath(job.id)
2522 new = self._GetArchivedJobPath(job.id)
2523 rename_files.append((old, new))
2525 # TODO: What if 1..n files fail to rename?
2526 self._RenameFilesUnlocked(rename_files)
2528 logging.debug("Successfully archived job(s) %s",
2529 utils.CommaJoin(job.id for job in archive_jobs))
2531 # Since we haven't quite checked, above, if we succeeded or failed renaming
2532 # the files, we update the cached queue size from the filesystem. When we
2533 # get around to fix the TODO: above, we can use the number of actually
2534 # archived jobs to fix this.
2535 self._UpdateQueueSizeUnlocked()
2536 return len(archive_jobs)
2538 @locking.ssynchronized(_LOCK)
2540 def ArchiveJob(self, job_id):
2543 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2546 @param job_id: Job ID of job to be archived.
2548 @return: Whether job was archived
2551 logging.info("Archiving job %s", job_id)
2553 job = self._LoadJobUnlocked(job_id)
2555 logging.debug("Job %s not found", job_id)
2558 return self._ArchiveJobsUnlocked([job]) == 1
2560 @locking.ssynchronized(_LOCK)
2562 def AutoArchiveJobs(self, age, timeout):
2563 """Archives all jobs based on age.
2565 The method will archive all jobs which are older than the age
2566 parameter. For jobs that don't have an end timestamp, the start
2567 timestamp will be considered. The special '-1' age will cause
2568 archival of all jobs (that are not running or queued).
2571 @param age: the minimum age in seconds
2574 logging.info("Archiving jobs with age more than %s seconds", age)
2577 end_time = now + timeout
2581 all_job_ids = self._GetJobIDsUnlocked()
2583 for idx, job_id in enumerate(all_job_ids):
2584 last_touched = idx + 1
2586 # Not optimal because jobs could be pending
2587 # TODO: Measure average duration for job archival and take number of
2588 # pending jobs into account.
2589 if time.time() > end_time:
2592 # Returns None if the job failed to load
2593 job = self._LoadJobUnlocked(job_id)
2595 if job.end_timestamp is None:
2596 if job.start_timestamp is None:
2597 job_age = job.received_timestamp
2599 job_age = job.start_timestamp
2601 job_age = job.end_timestamp
2603 if age == -1 or now - job_age[0] > age:
2606 # Archive 10 jobs at a time
2607 if len(pending) >= 10:
2608 archived_count += self._ArchiveJobsUnlocked(pending)
2612 archived_count += self._ArchiveJobsUnlocked(pending)
2614 return (archived_count, len(all_job_ids) - last_touched)
2616 def _Query(self, fields, qfilter):
2617 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2620 # Archived jobs are only looked at if the "archived" field is referenced
2621 # either as a requested field or in the filter. By default archived jobs
2623 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2625 job_ids = qobj.RequestedNames()
2627 list_all = (job_ids is None)
2630 # Since files are added to/removed from the queue atomically, there's no
2631 # risk of getting the job ids in an inconsistent state.
2632 job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2636 for job_id in job_ids:
2637 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2638 if job is not None or not list_all:
2639 jobs.append((job_id, job))
2641 return (qobj, jobs, list_all)
2643 def QueryJobs(self, fields, qfilter):
2644 """Returns a list of jobs in queue.
2646 @type fields: sequence
2647 @param fields: List of wanted fields
2648 @type qfilter: None or query2 filter (list)
2649 @param qfilter: Query filter
2652 (qobj, ctx, _) = self._Query(fields, qfilter)
2654 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2656 def OldStyleQueryJobs(self, job_ids, fields):
2657 """Returns a list of jobs in queue.
2660 @param job_ids: sequence of job identifiers or None for all
2662 @param fields: names of fields to return
2664 @return: list one element per job, each element being list with
2665 the requested fields
2669 job_ids = [int(jid) for jid in job_ids]
2670 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2672 (qobj, ctx, _) = self._Query(fields, qfilter)
2674 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2676 @locking.ssynchronized(_LOCK)
2677 def PrepareShutdown(self):
2678 """Prepare to stop the job queue.
2680 Disables execution of jobs in the workerpool and returns whether there are
2681 any jobs currently running. If the latter is the case, the job queue is not
2682 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2683 be called without interfering with any job. Queued and unfinished jobs will
2684 be resumed next time.
2686 Once this function has been called no new job submissions will be accepted
2687 (see L{_RequireNonDrainedQueue}).
2690 @return: Whether there are any running jobs
2693 if self._accepting_jobs:
2694 self._accepting_jobs = False
2696 # Tell worker pool to stop processing pending tasks
2697 self._wpool.SetActive(False)
2699 return self._wpool.HasRunningTasks()
2701 def AcceptingJobsUnlocked(self):
2702 """Returns whether jobs are accepted.
2704 Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2705 queue is shutting down.
2710 return self._accepting_jobs
2712 @locking.ssynchronized(_LOCK)
2715 """Stops the job queue.
2717 This shutdowns all the worker threads an closes the queue.
2720 self._wpool.TerminateWorkers()
2722 self._queue_filelock.Close()
2723 self._queue_filelock = None