4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
41 # pylint: disable=E0611
42 from pyinotify import pyinotify
46 from ganeti import asyncnotifier
47 from ganeti import constants
48 from ganeti import serializer
49 from ganeti import workerpool
50 from ganeti import locking
51 from ganeti import opcodes
52 from ganeti import errors
53 from ganeti import mcpu
54 from ganeti import utils
55 from ganeti import jstore
56 from ganeti import rpc
57 from ganeti import runtime
58 from ganeti import netutils
59 from ganeti import compat
61 from ganeti import query
62 from ganeti import qlang
63 from ganeti import pathutils
64 from ganeti import vcluster
69 # member lock names to be passed to @ssynchronized decorator
73 #: Retrieves "id" attribute
74 _GetIdAttr = operator.attrgetter("id")
77 class CancelJob(Exception):
78 """Special exception to cancel a job.
83 class QueueShutdown(Exception):
84 """Special exception to abort a job when the job queue is shutting down.
90 """Returns the current timestamp.
93 @return: the current time in the (seconds, microseconds) format
96 return utils.SplitTime(time.time())
99 def _CallJqUpdate(runner, names, file_name, content):
100 """Updates job queue file after virtualizing filename.
103 virt_file_name = vcluster.MakeVirtualPath(file_name)
104 return runner.call_jobqueue_update(names, virt_file_name, content)
107 class _SimpleJobQuery:
108 """Wrapper for job queries.
110 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
113 def __init__(self, fields):
114 """Initializes this class.
117 self._query = query.Query(query.JOB_FIELDS, fields)
119 def __call__(self, job):
120 """Executes a job query using cached field list.
123 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
126 class _QueuedOpCode(object):
127 """Encapsulates an opcode object.
129 @ivar log: holds the execution log and consists of tuples
130 of the form C{(log_serial, timestamp, level, message)}
131 @ivar input: the OpCode we encapsulate
132 @ivar status: the current status
133 @ivar result: the result of the LU execution
134 @ivar start_timestamp: timestamp for the start of the execution
135 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136 @ivar stop_timestamp: timestamp for the end of the execution
139 __slots__ = ["input", "status", "result", "log", "priority",
140 "start_timestamp", "exec_timestamp", "end_timestamp",
143 def __init__(self, op):
144 """Initializes instances of this class.
146 @type op: L{opcodes.OpCode}
147 @param op: the opcode we encapsulate
151 self.status = constants.OP_STATUS_QUEUED
154 self.start_timestamp = None
155 self.exec_timestamp = None
156 self.end_timestamp = None
158 # Get initial priority (it might change during the lifetime of this opcode)
159 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
162 def Restore(cls, state):
163 """Restore the _QueuedOpCode from the serialized form.
166 @param state: the serialized state
167 @rtype: _QueuedOpCode
168 @return: a new _QueuedOpCode instance
171 obj = _QueuedOpCode.__new__(cls)
172 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173 obj.status = state["status"]
174 obj.result = state["result"]
175 obj.log = state["log"]
176 obj.start_timestamp = state.get("start_timestamp", None)
177 obj.exec_timestamp = state.get("exec_timestamp", None)
178 obj.end_timestamp = state.get("end_timestamp", None)
179 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
183 """Serializes this _QueuedOpCode.
186 @return: the dictionary holding the serialized state
190 "input": self.input.__getstate__(),
191 "status": self.status,
192 "result": self.result,
194 "start_timestamp": self.start_timestamp,
195 "exec_timestamp": self.exec_timestamp,
196 "end_timestamp": self.end_timestamp,
197 "priority": self.priority,
201 class _QueuedJob(object):
202 """In-memory job representation.
204 This is what we use to track the user-submitted jobs. Locking must
205 be taken care of by users of this class.
207 @type queue: L{JobQueue}
208 @ivar queue: the parent queue
211 @ivar ops: the list of _QueuedOpCode that constitute the job
212 @type log_serial: int
213 @ivar log_serial: holds the index for the next log entry
214 @ivar received_timestamp: the timestamp for when the job was received
215 @ivar start_timestmap: the timestamp for start of execution
216 @ivar end_timestamp: the timestamp for end of execution
217 @ivar writable: Whether the job is allowed to be modified
220 # pylint: disable=W0212
221 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222 "received_timestamp", "start_timestamp", "end_timestamp",
223 "__weakref__", "processor_lock", "writable", "archived"]
225 def _AddReasons(self):
226 """Extend the reason trail
228 Add the reason for all the opcodes of this job to be executed.
232 for queued_op in self.ops:
234 reason_src = opcodes.NameToReasonSrc(op.__class__.__name__)
235 reason_text = "job=%d;index=%d" % (self.id, count)
236 reason = getattr(op, "reason", [])
237 reason.append((reason_src, reason_text, utils.EpochNano()))
241 def __init__(self, queue, job_id, ops, writable):
242 """Constructor for the _QueuedJob.
244 @type queue: L{JobQueue}
245 @param queue: our parent queue
247 @param job_id: our job id
249 @param ops: the list of opcodes we hold, which will be encapsulated
252 @param writable: Whether job can be modified
256 raise errors.GenericError("A job needs at least one opcode")
259 self.id = int(job_id)
260 self.ops = [_QueuedOpCode(op) for op in ops]
263 self.received_timestamp = TimeStampNow()
264 self.start_timestamp = None
265 self.end_timestamp = None
266 self.archived = False
268 self._InitInMemory(self, writable)
270 assert not self.archived, "New jobs can not be marked as archived"
273 def _InitInMemory(obj, writable):
274 """Initializes in-memory variables.
277 obj.writable = writable
281 # Read-only jobs are not processed and therefore don't need a lock
283 obj.processor_lock = threading.Lock()
285 obj.processor_lock = None
288 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
290 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
292 return "<%s at %#x>" % (" ".join(status), id(self))
295 def Restore(cls, queue, state, writable, archived):
296 """Restore a _QueuedJob from serialized state:
298 @type queue: L{JobQueue}
299 @param queue: to which queue the restored job belongs
301 @param state: the serialized state
303 @param writable: Whether job can be modified
305 @param archived: Whether job was already archived
307 @return: the restored _JobQueue instance
310 obj = _QueuedJob.__new__(cls)
312 obj.id = int(state["id"])
313 obj.received_timestamp = state.get("received_timestamp", None)
314 obj.start_timestamp = state.get("start_timestamp", None)
315 obj.end_timestamp = state.get("end_timestamp", None)
316 obj.archived = archived
320 for op_state in state["ops"]:
321 op = _QueuedOpCode.Restore(op_state)
322 for log_entry in op.log:
323 obj.log_serial = max(obj.log_serial, log_entry[0])
326 cls._InitInMemory(obj, writable)
331 """Serialize the _JobQueue instance.
334 @return: the serialized state
339 "ops": [op.Serialize() for op in self.ops],
340 "start_timestamp": self.start_timestamp,
341 "end_timestamp": self.end_timestamp,
342 "received_timestamp": self.received_timestamp,
345 def CalcStatus(self):
346 """Compute the status of this job.
348 This function iterates over all the _QueuedOpCodes in the job and
349 based on their status, computes the job status.
352 - if we find a cancelled, or finished with error, the job
353 status will be the same
354 - otherwise, the last opcode with the status one of:
359 will determine the job status
361 - otherwise, it means either all opcodes are queued, or success,
362 and the job status will be the same
364 @return: the job status
367 status = constants.JOB_STATUS_QUEUED
371 if op.status == constants.OP_STATUS_SUCCESS:
376 if op.status == constants.OP_STATUS_QUEUED:
378 elif op.status == constants.OP_STATUS_WAITING:
379 status = constants.JOB_STATUS_WAITING
380 elif op.status == constants.OP_STATUS_RUNNING:
381 status = constants.JOB_STATUS_RUNNING
382 elif op.status == constants.OP_STATUS_CANCELING:
383 status = constants.JOB_STATUS_CANCELING
385 elif op.status == constants.OP_STATUS_ERROR:
386 status = constants.JOB_STATUS_ERROR
387 # The whole job fails if one opcode failed
389 elif op.status == constants.OP_STATUS_CANCELED:
390 status = constants.OP_STATUS_CANCELED
394 status = constants.JOB_STATUS_SUCCESS
398 def CalcPriority(self):
399 """Gets the current priority for this job.
401 Only unfinished opcodes are considered. When all are done, the default
407 priorities = [op.priority for op in self.ops
408 if op.status not in constants.OPS_FINALIZED]
411 # All opcodes are done, assume default priority
412 return constants.OP_PRIO_DEFAULT
414 return min(priorities)
416 def GetLogEntries(self, newer_than):
417 """Selectively returns the log entries.
419 @type newer_than: None or int
420 @param newer_than: if this is None, return all log entries,
421 otherwise return only the log entries with serial higher
424 @return: the list of the log entries selected
427 if newer_than is None:
434 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
438 def GetInfo(self, fields):
439 """Returns information about a job.
442 @param fields: names of fields to return
444 @return: list with one element for each field
445 @raise errors.OpExecError: when an invalid field
449 return _SimpleJobQuery(fields)(self)
451 def MarkUnfinishedOps(self, status, result):
452 """Mark unfinished opcodes with a given status and result.
454 This is an utility function for marking all running or waiting to
455 be run opcodes with a given status. Opcodes which are already
456 finalised are not changed.
458 @param status: a given opcode status
459 @param result: the opcode result
464 if op.status in constants.OPS_FINALIZED:
465 assert not_marked, "Finalized opcodes found after non-finalized ones"
472 """Marks the job as finalized.
475 self.end_timestamp = TimeStampNow()
478 """Marks job as canceled/-ing if possible.
480 @rtype: tuple; (bool, string)
481 @return: Boolean describing whether job was successfully canceled or marked
482 as canceling and a text message
485 status = self.CalcStatus()
487 if status == constants.JOB_STATUS_QUEUED:
488 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
489 "Job canceled by request")
491 return (True, "Job %s canceled" % self.id)
493 elif status == constants.JOB_STATUS_WAITING:
494 # The worker will notice the new status and cancel the job
495 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
496 return (True, "Job %s will be canceled" % self.id)
499 logging.debug("Job %s is no longer waiting in the queue", self.id)
500 return (False, "Job %s is no longer waiting in the queue" % self.id)
502 def ChangePriority(self, priority):
503 """Changes the job priority.
506 @param priority: New priority
507 @rtype: tuple; (bool, string)
508 @return: Boolean describing whether job's priority was successfully changed
512 status = self.CalcStatus()
514 if status in constants.JOBS_FINALIZED:
515 return (False, "Job %s is finished" % self.id)
516 elif status == constants.JOB_STATUS_CANCELING:
517 return (False, "Job %s is cancelling" % self.id)
519 assert status in (constants.JOB_STATUS_QUEUED,
520 constants.JOB_STATUS_WAITING,
521 constants.JOB_STATUS_RUNNING)
525 if (op.status == constants.OP_STATUS_RUNNING or
526 op.status in constants.OPS_FINALIZED):
527 assert not changed, \
528 ("Found opcode for which priority should not be changed after"
529 " priority has been changed for previous opcodes")
532 assert op.status in (constants.OP_STATUS_QUEUED,
533 constants.OP_STATUS_WAITING)
537 # Set new priority (doesn't modify opcode input)
538 op.priority = priority
541 return (True, ("Priorities of pending opcodes for job %s have been"
542 " changed to %s" % (self.id, priority)))
544 return (False, "Job %s had no pending opcodes" % self.id)
547 class _OpExecCallbacks(mcpu.OpExecCbBase):
548 def __init__(self, queue, job, op):
549 """Initializes this class.
551 @type queue: L{JobQueue}
552 @param queue: Job queue
553 @type job: L{_QueuedJob}
554 @param job: Job object
555 @type op: L{_QueuedOpCode}
559 assert queue, "Queue is missing"
560 assert job, "Job is missing"
561 assert op, "Opcode is missing"
567 def _CheckCancel(self):
568 """Raises an exception to cancel the job if asked to.
571 # Cancel here if we were asked to
572 if self._op.status == constants.OP_STATUS_CANCELING:
573 logging.debug("Canceling opcode")
576 # See if queue is shutting down
577 if not self._queue.AcceptingJobsUnlocked():
578 logging.debug("Queue is shutting down")
579 raise QueueShutdown()
581 @locking.ssynchronized(_QUEUE, shared=1)
582 def NotifyStart(self):
583 """Mark the opcode as running, not lock-waiting.
585 This is called from the mcpu code as a notifier function, when the LU is
586 finally about to start the Exec() method. Of course, to have end-user
587 visible results, the opcode must be initially (before calling into
588 Processor.ExecOpCode) set to OP_STATUS_WAITING.
591 assert self._op in self._job.ops
592 assert self._op.status in (constants.OP_STATUS_WAITING,
593 constants.OP_STATUS_CANCELING)
595 # Cancel here if we were asked to
598 logging.debug("Opcode is now running")
600 self._op.status = constants.OP_STATUS_RUNNING
601 self._op.exec_timestamp = TimeStampNow()
603 # And finally replicate the job status
604 self._queue.UpdateJobUnlocked(self._job)
606 @locking.ssynchronized(_QUEUE, shared=1)
607 def _AppendFeedback(self, timestamp, log_type, log_msg):
608 """Internal feedback append function, with locks
611 self._job.log_serial += 1
612 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
613 self._queue.UpdateJobUnlocked(self._job, replicate=False)
615 def Feedback(self, *args):
616 """Append a log entry.
622 log_type = constants.ELOG_MESSAGE
625 (log_type, log_msg) = args
627 # The time is split to make serialization easier and not lose
629 timestamp = utils.SplitTime(time.time())
630 self._AppendFeedback(timestamp, log_type, log_msg)
632 def CurrentPriority(self):
633 """Returns current priority for opcode.
636 assert self._op.status in (constants.OP_STATUS_WAITING,
637 constants.OP_STATUS_CANCELING)
639 # Cancel here if we were asked to
642 return self._op.priority
644 def SubmitManyJobs(self, jobs):
645 """Submits jobs for processing.
647 See L{JobQueue.SubmitManyJobs}.
650 # Locking is done in job queue
651 return self._queue.SubmitManyJobs(jobs)
654 class _JobChangesChecker(object):
655 def __init__(self, fields, prev_job_info, prev_log_serial):
656 """Initializes this class.
658 @type fields: list of strings
659 @param fields: Fields requested by LUXI client
660 @type prev_job_info: string
661 @param prev_job_info: previous job info, as passed by the LUXI client
662 @type prev_log_serial: string
663 @param prev_log_serial: previous job serial, as passed by the LUXI client
666 self._squery = _SimpleJobQuery(fields)
667 self._prev_job_info = prev_job_info
668 self._prev_log_serial = prev_log_serial
670 def __call__(self, job):
671 """Checks whether job has changed.
673 @type job: L{_QueuedJob}
674 @param job: Job object
677 assert not job.writable, "Expected read-only job"
679 status = job.CalcStatus()
680 job_info = self._squery(job)
681 log_entries = job.GetLogEntries(self._prev_log_serial)
683 # Serializing and deserializing data can cause type changes (e.g. from
684 # tuple to list) or precision loss. We're doing it here so that we get
685 # the same modifications as the data received from the client. Without
686 # this, the comparison afterwards might fail without the data being
687 # significantly different.
688 # TODO: we just deserialized from disk, investigate how to make sure that
689 # the job info and log entries are compatible to avoid this further step.
690 # TODO: Doing something like in testutils.py:UnifyValueType might be more
691 # efficient, though floats will be tricky
692 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
693 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
695 # Don't even try to wait if the job is no longer running, there will be
697 if (status not in (constants.JOB_STATUS_QUEUED,
698 constants.JOB_STATUS_RUNNING,
699 constants.JOB_STATUS_WAITING) or
700 job_info != self._prev_job_info or
701 (log_entries and self._prev_log_serial != log_entries[0][0])):
702 logging.debug("Job %s changed", job.id)
703 return (job_info, log_entries)
708 class _JobFileChangesWaiter(object):
709 def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
710 """Initializes this class.
712 @type filename: string
713 @param filename: Path to job file
714 @raises errors.InotifyError: if the notifier cannot be setup
717 self._wm = _inotify_wm_cls()
718 self._inotify_handler = \
719 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
721 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
723 self._inotify_handler.enable()
725 # pyinotify doesn't close file descriptors automatically
726 self._notifier.stop()
729 def _OnInotify(self, notifier_enabled):
730 """Callback for inotify.
733 if not notifier_enabled:
734 self._inotify_handler.enable()
736 def Wait(self, timeout):
737 """Waits for the job file to change.
740 @param timeout: Timeout in seconds
741 @return: Whether there have been events
745 have_events = self._notifier.check_events(timeout * 1000)
747 self._notifier.read_events()
748 self._notifier.process_events()
752 """Closes underlying notifier and its file descriptor.
755 self._notifier.stop()
758 class _JobChangesWaiter(object):
759 def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
760 """Initializes this class.
762 @type filename: string
763 @param filename: Path to job file
766 self._filewaiter = None
767 self._filename = filename
768 self._waiter_cls = _waiter_cls
770 def Wait(self, timeout):
771 """Waits for a job to change.
774 @param timeout: Timeout in seconds
775 @return: Whether there have been events
779 return self._filewaiter.Wait(timeout)
781 # Lazy setup: Avoid inotify setup cost when job file has already changed.
782 # If this point is reached, return immediately and let caller check the job
783 # file again in case there were changes since the last check. This avoids a
785 self._filewaiter = self._waiter_cls(self._filename)
790 """Closes underlying waiter.
794 self._filewaiter.Close()
797 class _WaitForJobChangesHelper(object):
798 """Helper class using inotify to wait for changes in a job file.
800 This class takes a previous job status and serial, and alerts the client when
801 the current job status has changed.
805 def _CheckForChanges(counter, job_load_fn, check_fn):
806 if counter.next() > 0:
807 # If this isn't the first check the job is given some more time to change
808 # again. This gives better performance for jobs generating many
814 raise errors.JobLost()
816 result = check_fn(job)
818 raise utils.RetryAgain()
822 def __call__(self, filename, job_load_fn,
823 fields, prev_job_info, prev_log_serial, timeout,
824 _waiter_cls=_JobChangesWaiter):
825 """Waits for changes on a job.
827 @type filename: string
828 @param filename: File on which to wait for changes
829 @type job_load_fn: callable
830 @param job_load_fn: Function to load job
831 @type fields: list of strings
832 @param fields: Which fields to check for changes
833 @type prev_job_info: list or None
834 @param prev_job_info: Last job information returned
835 @type prev_log_serial: int
836 @param prev_log_serial: Last job message serial number
838 @param timeout: maximum time to wait in seconds
841 counter = itertools.count()
843 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
844 waiter = _waiter_cls(filename)
846 return utils.Retry(compat.partial(self._CheckForChanges,
847 counter, job_load_fn, check_fn),
848 utils.RETRY_REMAINING_TIME, timeout,
852 except errors.JobLost:
854 except utils.RetryTimeout:
855 return constants.JOB_NOTCHANGED
858 def _EncodeOpError(err):
859 """Encodes an error which occurred while processing an opcode.
862 if isinstance(err, errors.GenericError):
865 to_encode = errors.OpExecError(str(err))
867 return errors.EncodeException(to_encode)
870 class _TimeoutStrategyWrapper:
871 def __init__(self, fn):
872 """Initializes this class.
879 """Gets the next timeout if necessary.
882 if self._next is None:
883 self._next = self._fn()
886 """Returns the next timeout.
893 """Returns the current timeout and advances the internal state.
902 class _OpExecContext:
903 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
904 """Initializes this class.
909 self.log_prefix = log_prefix
910 self.summary = op.input.Summary()
912 # Create local copy to modify
913 if getattr(op.input, opcodes.DEPEND_ATTR, None):
914 self.jobdeps = op.input.depends[:]
918 self._timeout_strategy_factory = timeout_strategy_factory
919 self._ResetTimeoutStrategy()
921 def _ResetTimeoutStrategy(self):
922 """Creates a new timeout strategy.
925 self._timeout_strategy = \
926 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
928 def CheckPriorityIncrease(self):
929 """Checks whether priority can and should be increased.
931 Called when locks couldn't be acquired.
936 # Exhausted all retries and next round should not use blocking acquire
938 if (self._timeout_strategy.Peek() is None and
939 op.priority > constants.OP_PRIO_HIGHEST):
940 logging.debug("Increasing priority")
942 self._ResetTimeoutStrategy()
947 def GetNextLockTimeout(self):
948 """Returns the next lock acquire timeout.
951 return self._timeout_strategy.Next()
954 class _JobProcessor(object):
957 FINISHED) = range(1, 4)
959 def __init__(self, queue, opexec_fn, job,
960 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
961 """Initializes this class.
965 self.opexec_fn = opexec_fn
967 self._timeout_strategy_factory = _timeout_strategy_factory
970 def _FindNextOpcode(job, timeout_strategy_factory):
971 """Locates the next opcode to run.
973 @type job: L{_QueuedJob}
974 @param job: Job object
975 @param timeout_strategy_factory: Callable to create new timeout strategy
978 # Create some sort of a cache to speed up locating next opcode for future
980 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
981 # pending and one for processed ops.
982 if job.ops_iter is None:
983 job.ops_iter = enumerate(job.ops)
985 # Find next opcode to run
988 (idx, op) = job.ops_iter.next()
989 except StopIteration:
990 raise errors.ProgrammerError("Called for a finished job")
992 if op.status == constants.OP_STATUS_RUNNING:
993 # Found an opcode already marked as running
994 raise errors.ProgrammerError("Called for job marked as running")
996 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
997 timeout_strategy_factory)
999 if op.status not in constants.OPS_FINALIZED:
1002 # This is a job that was partially completed before master daemon
1003 # shutdown, so it can be expected that some opcodes are already
1004 # completed successfully (if any did error out, then the whole job
1005 # should have been aborted and not resubmitted for processing).
1006 logging.info("%s: opcode %s already processed, skipping",
1007 opctx.log_prefix, opctx.summary)
1010 def _MarkWaitlock(job, op):
1011 """Marks an opcode as waiting for locks.
1013 The job's start timestamp is also set if necessary.
1015 @type job: L{_QueuedJob}
1016 @param job: Job object
1017 @type op: L{_QueuedOpCode}
1018 @param op: Opcode object
1021 assert op in job.ops
1022 assert op.status in (constants.OP_STATUS_QUEUED,
1023 constants.OP_STATUS_WAITING)
1029 if op.status == constants.OP_STATUS_QUEUED:
1030 op.status = constants.OP_STATUS_WAITING
1033 if op.start_timestamp is None:
1034 op.start_timestamp = TimeStampNow()
1037 if job.start_timestamp is None:
1038 job.start_timestamp = op.start_timestamp
1041 assert op.status == constants.OP_STATUS_WAITING
1046 def _CheckDependencies(queue, job, opctx):
1047 """Checks if an opcode has dependencies and if so, processes them.
1049 @type queue: L{JobQueue}
1050 @param queue: Queue object
1051 @type job: L{_QueuedJob}
1052 @param job: Job object
1053 @type opctx: L{_OpExecContext}
1054 @param opctx: Opcode execution context
1056 @return: Whether opcode will be re-scheduled by dependency tracker
1063 while opctx.jobdeps:
1064 (dep_job_id, dep_status) = opctx.jobdeps[0]
1066 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1068 assert ht.TNonEmptyString(depmsg), "No dependency message"
1070 logging.info("%s: %s", opctx.log_prefix, depmsg)
1072 if depresult == _JobDependencyManager.CONTINUE:
1073 # Remove dependency and continue
1074 opctx.jobdeps.pop(0)
1076 elif depresult == _JobDependencyManager.WAIT:
1077 # Need to wait for notification, dependency tracker will re-add job
1082 elif depresult == _JobDependencyManager.CANCEL:
1083 # Job was cancelled, cancel this job as well
1085 assert op.status == constants.OP_STATUS_CANCELING
1088 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1089 _JobDependencyManager.ERROR):
1090 # Job failed or there was an error, this job must fail
1091 op.status = constants.OP_STATUS_ERROR
1092 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1096 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1101 def _ExecOpCodeUnlocked(self, opctx):
1102 """Processes one opcode and returns the result.
1107 assert op.status == constants.OP_STATUS_WAITING
1109 timeout = opctx.GetNextLockTimeout()
1112 # Make sure not to hold queue lock while calling ExecOpCode
1113 result = self.opexec_fn(op.input,
1114 _OpExecCallbacks(self.queue, self.job, op),
1116 except mcpu.LockAcquireTimeout:
1117 assert timeout is not None, "Received timeout for blocking acquire"
1118 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1120 assert op.status in (constants.OP_STATUS_WAITING,
1121 constants.OP_STATUS_CANCELING)
1123 # Was job cancelled while we were waiting for the lock?
1124 if op.status == constants.OP_STATUS_CANCELING:
1125 return (constants.OP_STATUS_CANCELING, None)
1127 # Queue is shutting down, return to queued
1128 if not self.queue.AcceptingJobsUnlocked():
1129 return (constants.OP_STATUS_QUEUED, None)
1131 # Stay in waitlock while trying to re-acquire lock
1132 return (constants.OP_STATUS_WAITING, None)
1134 logging.exception("%s: Canceling job", opctx.log_prefix)
1135 assert op.status == constants.OP_STATUS_CANCELING
1136 return (constants.OP_STATUS_CANCELING, None)
1138 except QueueShutdown:
1139 logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1141 assert op.status == constants.OP_STATUS_WAITING
1143 # Job hadn't been started yet, so it should return to the queue
1144 return (constants.OP_STATUS_QUEUED, None)
1146 except Exception, err: # pylint: disable=W0703
1147 logging.exception("%s: Caught exception in %s",
1148 opctx.log_prefix, opctx.summary)
1149 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1151 logging.debug("%s: %s successful",
1152 opctx.log_prefix, opctx.summary)
1153 return (constants.OP_STATUS_SUCCESS, result)
1155 def __call__(self, _nextop_fn=None):
1156 """Continues execution of a job.
1158 @param _nextop_fn: Callback function for tests
1159 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1160 be deferred and C{WAITDEP} if the dependency manager
1161 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1167 logging.debug("Processing job %s", job.id)
1169 queue.acquire(shared=1)
1171 opcount = len(job.ops)
1173 assert job.writable, "Expected writable job"
1175 # Don't do anything for finalized jobs
1176 if job.CalcStatus() in constants.JOBS_FINALIZED:
1177 return self.FINISHED
1179 # Is a previous opcode still pending?
1181 opctx = job.cur_opctx
1182 job.cur_opctx = None
1184 if __debug__ and _nextop_fn:
1186 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1191 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1192 constants.OP_STATUS_CANCELING)
1193 for i in job.ops[opctx.index + 1:])
1195 assert op.status in (constants.OP_STATUS_QUEUED,
1196 constants.OP_STATUS_WAITING,
1197 constants.OP_STATUS_CANCELING)
1199 assert (op.priority <= constants.OP_PRIO_LOWEST and
1200 op.priority >= constants.OP_PRIO_HIGHEST)
1204 if op.status != constants.OP_STATUS_CANCELING:
1205 assert op.status in (constants.OP_STATUS_QUEUED,
1206 constants.OP_STATUS_WAITING)
1208 # Prepare to start opcode
1209 if self._MarkWaitlock(job, op):
1211 queue.UpdateJobUnlocked(job)
1213 assert op.status == constants.OP_STATUS_WAITING
1214 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1215 assert job.start_timestamp and op.start_timestamp
1216 assert waitjob is None
1218 # Check if waiting for a job is necessary
1219 waitjob = self._CheckDependencies(queue, job, opctx)
1221 assert op.status in (constants.OP_STATUS_WAITING,
1222 constants.OP_STATUS_CANCELING,
1223 constants.OP_STATUS_ERROR)
1225 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1226 constants.OP_STATUS_ERROR)):
1227 logging.info("%s: opcode %s waiting for locks",
1228 opctx.log_prefix, opctx.summary)
1230 assert not opctx.jobdeps, "Not all dependencies were removed"
1234 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1236 queue.acquire(shared=1)
1238 op.status = op_status
1239 op.result = op_result
1243 if op.status in (constants.OP_STATUS_WAITING,
1244 constants.OP_STATUS_QUEUED):
1245 # waiting: Couldn't get locks in time
1246 # queued: Queue is shutting down
1247 assert not op.end_timestamp
1250 op.end_timestamp = TimeStampNow()
1252 if op.status == constants.OP_STATUS_CANCELING:
1253 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1254 for i in job.ops[opctx.index:])
1256 assert op.status in constants.OPS_FINALIZED
1258 if op.status == constants.OP_STATUS_QUEUED:
1259 # Queue is shutting down
1265 job.cur_opctx = None
1267 # In no case must the status be finalized here
1268 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1270 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1273 if not waitjob and opctx.CheckPriorityIncrease():
1274 # Priority was changed, need to update on-disk file
1275 queue.UpdateJobUnlocked(job)
1277 # Keep around for another round
1278 job.cur_opctx = opctx
1280 assert (op.priority <= constants.OP_PRIO_LOWEST and
1281 op.priority >= constants.OP_PRIO_HIGHEST)
1283 # In no case must the status be finalized here
1284 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1287 # Ensure all opcodes so far have been successful
1288 assert (opctx.index == 0 or
1289 compat.all(i.status == constants.OP_STATUS_SUCCESS
1290 for i in job.ops[:opctx.index]))
1293 job.cur_opctx = None
1295 if op.status == constants.OP_STATUS_SUCCESS:
1298 elif op.status == constants.OP_STATUS_ERROR:
1299 # Ensure failed opcode has an exception as its result
1300 assert errors.GetEncodedError(job.ops[opctx.index].result)
1302 to_encode = errors.OpExecError("Preceding opcode failed")
1303 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1304 _EncodeOpError(to_encode))
1308 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1309 errors.GetEncodedError(i.result)
1310 for i in job.ops[opctx.index:])
1312 elif op.status == constants.OP_STATUS_CANCELING:
1313 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1314 "Job canceled by request")
1318 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1320 if opctx.index == (opcount - 1):
1321 # Finalize on last opcode
1325 # All opcodes have been run, finalize job
1328 # Write to disk. If the job status is final, this is the final write
1329 # allowed. Once the file has been written, it can be archived anytime.
1330 queue.UpdateJobUnlocked(job)
1335 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1336 return self.FINISHED
1338 assert not waitjob or queue.depmgr.JobWaiting(job)
1345 assert job.writable, "Job became read-only while being processed"
1349 def _EvaluateJobProcessorResult(depmgr, job, result):
1350 """Looks at a result from L{_JobProcessor} for a job.
1352 To be used in a L{_JobQueueWorker}.
1355 if result == _JobProcessor.FINISHED:
1356 # Notify waiting jobs
1357 depmgr.NotifyWaiters(job.id)
1359 elif result == _JobProcessor.DEFER:
1361 raise workerpool.DeferTask(priority=job.CalcPriority())
1363 elif result == _JobProcessor.WAITDEP:
1364 # No-op, dependency manager will re-schedule
1368 raise errors.ProgrammerError("Job processor returned unknown status %s" %
1372 class _JobQueueWorker(workerpool.BaseWorker):
1373 """The actual job workers.
1376 def RunTask(self, job): # pylint: disable=W0221
1379 @type job: L{_QueuedJob}
1380 @param job: the job to be processed
1383 assert job.writable, "Expected writable job"
1385 # Ensure only one worker is active on a single job. If a job registers for
1386 # a dependency job, and the other job notifies before the first worker is
1387 # done, the job can end up in the tasklist more than once.
1388 job.processor_lock.acquire()
1390 return self._RunTaskInner(job)
1392 job.processor_lock.release()
1394 def _RunTaskInner(self, job):
1397 Must be called with per-job lock acquired.
1401 assert queue == self.pool.queue
1403 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1406 proc = mcpu.Processor(queue.context, job.id)
1408 # Create wrapper for setting thread name
1409 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1412 _EvaluateJobProcessorResult(queue.depmgr, job,
1413 _JobProcessor(queue, wrap_execop_fn, job)())
1416 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1417 """Updates the worker thread name to include a short summary of the opcode.
1419 @param setname_fn: Callable setting worker thread name
1420 @param execop_fn: Callable for executing opcode (usually
1421 L{mcpu.Processor.ExecOpCode})
1426 return execop_fn(op, *args, **kwargs)
1431 def _GetWorkerName(job, op):
1432 """Sets the worker thread name.
1434 @type job: L{_QueuedJob}
1435 @type op: L{opcodes.OpCode}
1438 parts = ["Job%s" % job.id]
1441 parts.append(op.TinySummary())
1443 return "/".join(parts)
1446 class _JobQueueWorkerPool(workerpool.WorkerPool):
1447 """Simple class implementing a job-processing workerpool.
1450 def __init__(self, queue):
1451 super(_JobQueueWorkerPool, self).__init__("Jq",
1457 class _JobDependencyManager:
1458 """Keeps track of job dependencies.
1465 WRONGSTATUS) = range(1, 6)
1467 def __init__(self, getstatus_fn, enqueue_fn):
1468 """Initializes this class.
1471 self._getstatus_fn = getstatus_fn
1472 self._enqueue_fn = enqueue_fn
1475 self._lock = locking.SharedLock("JobDepMgr")
1477 @locking.ssynchronized(_LOCK, shared=1)
1478 def GetLockInfo(self, requested): # pylint: disable=W0613
1479 """Retrieves information about waiting jobs.
1481 @type requested: set
1482 @param requested: Requested information, see C{query.LQ_*}
1485 # No need to sort here, that's being done by the lock manager and query
1486 # library. There are no priorities for notifying jobs, hence all show up as
1487 # one item under "pending".
1488 return [("job/%s" % job_id, None, None,
1489 [("job", [job.id for job in waiters])])
1490 for job_id, waiters in self._waiters.items()
1493 @locking.ssynchronized(_LOCK, shared=1)
1494 def JobWaiting(self, job):
1495 """Checks if a job is waiting.
1498 return compat.any(job in jobs
1499 for jobs in self._waiters.values())
1501 @locking.ssynchronized(_LOCK)
1502 def CheckAndRegister(self, job, dep_job_id, dep_status):
1503 """Checks if a dependency job has the requested status.
1505 If the other job is not yet in a finalized status, the calling job will be
1506 notified (re-added to the workerpool) at a later point.
1508 @type job: L{_QueuedJob}
1509 @param job: Job object
1510 @type dep_job_id: int
1511 @param dep_job_id: ID of dependency job
1512 @type dep_status: list
1513 @param dep_status: Required status
1516 assert ht.TJobId(job.id)
1517 assert ht.TJobId(dep_job_id)
1518 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1520 if job.id == dep_job_id:
1521 return (self.ERROR, "Job can't depend on itself")
1523 # Get status of dependency job
1525 status = self._getstatus_fn(dep_job_id)
1526 except errors.JobLost, err:
1527 return (self.ERROR, "Dependency error: %s" % err)
1529 assert status in constants.JOB_STATUS_ALL
1531 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1533 if status not in constants.JOBS_FINALIZED:
1534 # Register for notification and wait for job to finish
1535 job_id_waiters.add(job)
1537 "Need to wait for job %s, wanted status '%s'" %
1538 (dep_job_id, dep_status))
1540 # Remove from waiters list
1541 if job in job_id_waiters:
1542 job_id_waiters.remove(job)
1544 if (status == constants.JOB_STATUS_CANCELED and
1545 constants.JOB_STATUS_CANCELED not in dep_status):
1546 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1548 elif not dep_status or status in dep_status:
1549 return (self.CONTINUE,
1550 "Dependency job %s finished with status '%s'" %
1551 (dep_job_id, status))
1554 return (self.WRONGSTATUS,
1555 "Dependency job %s finished with status '%s',"
1556 " not one of '%s' as required" %
1557 (dep_job_id, status, utils.CommaJoin(dep_status)))
1559 def _RemoveEmptyWaitersUnlocked(self):
1560 """Remove all jobs without actual waiters.
1563 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1565 del self._waiters[job_id]
1567 def NotifyWaiters(self, job_id):
1568 """Notifies all jobs waiting for a certain job ID.
1570 @attention: Do not call until L{CheckAndRegister} returned a status other
1571 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1573 @param job_id: Job ID
1576 assert ht.TJobId(job_id)
1578 self._lock.acquire()
1580 self._RemoveEmptyWaitersUnlocked()
1582 jobs = self._waiters.pop(job_id, None)
1584 self._lock.release()
1587 # Re-add jobs to workerpool
1588 logging.debug("Re-adding %s jobs which were waiting for job %s",
1590 self._enqueue_fn(jobs)
1593 def _RequireOpenQueue(fn):
1594 """Decorator for "public" functions.
1596 This function should be used for all 'public' functions. That is,
1597 functions usually called from other classes. Note that this should
1598 be applied only to methods (not plain functions), since it expects
1599 that the decorated function is called with a first argument that has
1600 a '_queue_filelock' argument.
1602 @warning: Use this decorator only after locking.ssynchronized
1605 @locking.ssynchronized(_LOCK)
1611 def wrapper(self, *args, **kwargs):
1612 # pylint: disable=W0212
1613 assert self._queue_filelock is not None, "Queue should be open"
1614 return fn(self, *args, **kwargs)
1618 def _RequireNonDrainedQueue(fn):
1619 """Decorator checking for a non-drained queue.
1621 To be used with functions submitting new jobs.
1624 def wrapper(self, *args, **kwargs):
1625 """Wrapper function.
1627 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1630 # Ok when sharing the big job queue lock, as the drain file is created when
1631 # the lock is exclusive.
1632 # Needs access to protected member, pylint: disable=W0212
1634 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1636 if not self._accepting_jobs:
1637 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1639 return fn(self, *args, **kwargs)
1643 class JobQueue(object):
1644 """Queue used to manage the jobs.
1647 def __init__(self, context):
1648 """Constructor for JobQueue.
1650 The constructor will initialize the job queue object and then
1651 start loading the current jobs from disk, either for starting them
1652 (if they were queue) or for aborting them (if they were already
1655 @type context: GanetiContext
1656 @param context: the context object for access to the configuration
1657 data and other ganeti objects
1660 self.context = context
1661 self._memcache = weakref.WeakValueDictionary()
1662 self._my_hostname = netutils.Hostname.GetSysName()
1664 # The Big JobQueue lock. If a code block or method acquires it in shared
1665 # mode safe it must guarantee concurrency with all the code acquiring it in
1666 # shared mode, including itself. In order not to acquire it at all
1667 # concurrency must be guaranteed with all code acquiring it in shared mode
1668 # and all code acquiring it exclusively.
1669 self._lock = locking.SharedLock("JobQueue")
1671 self.acquire = self._lock.acquire
1672 self.release = self._lock.release
1674 # Accept jobs by default
1675 self._accepting_jobs = True
1677 # Initialize the queue, and acquire the filelock.
1678 # This ensures no other process is working on the job queue.
1679 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1682 self._last_serial = jstore.ReadSerial()
1683 assert self._last_serial is not None, ("Serial file was modified between"
1684 " check in jstore and here")
1686 # Get initial list of nodes
1687 self._nodes = dict((n.name, n.primary_ip)
1688 for n in self.context.cfg.GetAllNodesInfo().values()
1689 if n.master_candidate)
1691 # Remove master node
1692 self._nodes.pop(self._my_hostname, None)
1694 # TODO: Check consistency across nodes
1696 self._queue_size = None
1697 self._UpdateQueueSizeUnlocked()
1698 assert ht.TInt(self._queue_size)
1699 self._drained = jstore.CheckDrainFlag()
1702 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1704 self.context.glm.AddToLockMonitor(self.depmgr)
1707 self._wpool = _JobQueueWorkerPool(self)
1709 self._InspectQueue()
1711 self._wpool.TerminateWorkers()
1714 @locking.ssynchronized(_LOCK)
1716 def _InspectQueue(self):
1717 """Loads the whole job queue and resumes unfinished jobs.
1719 This function needs the lock here because WorkerPool.AddTask() may start a
1720 job while we're still doing our work.
1723 logging.info("Inspecting job queue")
1727 all_job_ids = self._GetJobIDsUnlocked()
1728 jobs_count = len(all_job_ids)
1729 lastinfo = time.time()
1730 for idx, job_id in enumerate(all_job_ids):
1731 # Give an update every 1000 jobs or 10 seconds
1732 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1733 idx == (jobs_count - 1)):
1734 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1735 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1736 lastinfo = time.time()
1738 job = self._LoadJobUnlocked(job_id)
1740 # a failure in loading the job can cause 'None' to be returned
1744 status = job.CalcStatus()
1746 if status == constants.JOB_STATUS_QUEUED:
1747 restartjobs.append(job)
1749 elif status in (constants.JOB_STATUS_RUNNING,
1750 constants.JOB_STATUS_WAITING,
1751 constants.JOB_STATUS_CANCELING):
1752 logging.warning("Unfinished job %s found: %s", job.id, job)
1754 if status == constants.JOB_STATUS_WAITING:
1756 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1757 restartjobs.append(job)
1759 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1760 "Unclean master daemon shutdown")
1763 self.UpdateJobUnlocked(job)
1766 logging.info("Restarting %s jobs", len(restartjobs))
1767 self._EnqueueJobsUnlocked(restartjobs)
1769 logging.info("Job queue inspection finished")
1771 def _GetRpc(self, address_list):
1772 """Gets RPC runner with context.
1775 return rpc.JobQueueRunner(self.context, address_list)
1777 @locking.ssynchronized(_LOCK)
1779 def AddNode(self, node):
1780 """Register a new node with the queue.
1782 @type node: L{objects.Node}
1783 @param node: the node object to be added
1786 node_name = node.name
1787 assert node_name != self._my_hostname
1789 # Clean queue directory on added node
1790 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1791 msg = result.fail_msg
1793 logging.warning("Cannot cleanup queue directory on node %s: %s",
1796 if not node.master_candidate:
1797 # remove if existing, ignoring errors
1798 self._nodes.pop(node_name, None)
1799 # and skip the replication of the job ids
1802 # Upload the whole queue excluding archived jobs
1803 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1805 # Upload current serial file
1806 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1808 # Static address list
1809 addrs = [node.primary_ip]
1811 for file_name in files:
1813 content = utils.ReadFile(file_name)
1815 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1817 msg = result[node_name].fail_msg
1819 logging.error("Failed to upload file %s to node %s: %s",
1820 file_name, node_name, msg)
1822 # Set queue drained flag
1824 self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1826 msg = result[node_name].fail_msg
1828 logging.error("Failed to set queue drained flag on node %s: %s",
1831 self._nodes[node_name] = node.primary_ip
1833 @locking.ssynchronized(_LOCK)
1835 def RemoveNode(self, node_name):
1836 """Callback called when removing nodes from the cluster.
1838 @type node_name: str
1839 @param node_name: the name of the node to remove
1842 self._nodes.pop(node_name, None)
1845 def _CheckRpcResult(result, nodes, failmsg):
1846 """Verifies the status of an RPC call.
1848 Since we aim to keep consistency should this node (the current
1849 master) fail, we will log errors if our rpc fail, and especially
1850 log the case when more than half of the nodes fails.
1852 @param result: the data as returned from the rpc call
1854 @param nodes: the list of nodes we made the call to
1856 @param failmsg: the identifier to be used for logging
1863 msg = result[node].fail_msg
1866 logging.error("RPC call %s (%s) failed on node %s: %s",
1867 result[node].call, failmsg, node, msg)
1869 success.append(node)
1871 # +1 for the master node
1872 if (len(success) + 1) < len(failed):
1873 # TODO: Handle failing nodes
1874 logging.error("More than half of the nodes failed")
1876 def _GetNodeIp(self):
1877 """Helper for returning the node name/ip list.
1879 @rtype: (list, list)
1880 @return: a tuple of two lists, the first one with the node
1881 names and the second one with the node addresses
1884 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1885 name_list = self._nodes.keys()
1886 addr_list = [self._nodes[name] for name in name_list]
1887 return name_list, addr_list
1889 def _UpdateJobQueueFile(self, file_name, data, replicate):
1890 """Writes a file locally and then replicates it to all nodes.
1892 This function will replace the contents of a file on the local
1893 node and then replicate it to all the other nodes we have.
1895 @type file_name: str
1896 @param file_name: the path of the file to be replicated
1898 @param data: the new contents of the file
1899 @type replicate: boolean
1900 @param replicate: whether to spread the changes to the remote nodes
1903 getents = runtime.GetEnts()
1904 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1905 gid=getents.daemons_gid,
1906 mode=constants.JOB_QUEUE_FILES_PERMS)
1909 names, addrs = self._GetNodeIp()
1910 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1911 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1913 def _RenameFilesUnlocked(self, rename):
1914 """Renames a file locally and then replicate the change.
1916 This function will rename a file in the local queue directory
1917 and then replicate this rename to all the other nodes we have.
1919 @type rename: list of (old, new)
1920 @param rename: List containing tuples mapping old to new names
1923 # Rename them locally
1924 for old, new in rename:
1925 utils.RenameFile(old, new, mkdir=True)
1927 # ... and on all nodes
1928 names, addrs = self._GetNodeIp()
1929 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1930 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1932 def _NewSerialsUnlocked(self, count):
1933 """Generates a new job identifier.
1935 Job identifiers are unique during the lifetime of a cluster.
1937 @type count: integer
1938 @param count: how many serials to return
1940 @return: a list of job identifiers.
1943 assert ht.TNonNegativeInt(count)
1946 serial = self._last_serial + count
1949 self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1950 "%s\n" % serial, True)
1952 result = [jstore.FormatJobID(v)
1953 for v in range(self._last_serial + 1, serial + 1)]
1955 # Keep it only if we were able to write the file
1956 self._last_serial = serial
1958 assert len(result) == count
1963 def _GetJobPath(job_id):
1964 """Returns the job file for a given job id.
1967 @param job_id: the job identifier
1969 @return: the path to the job file
1972 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1975 def _GetArchivedJobPath(job_id):
1976 """Returns the archived job file for a give job id.
1979 @param job_id: the job identifier
1981 @return: the path to the archived job file
1984 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1985 jstore.GetArchiveDirectory(job_id),
1989 def _DetermineJobDirectories(archived):
1990 """Build list of directories containing job files.
1992 @type archived: bool
1993 @param archived: Whether to include directories for archived jobs
1997 result = [pathutils.QUEUE_DIR]
2000 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2001 result.extend(map(compat.partial(utils.PathJoin, archive_path),
2002 utils.ListVisibleFiles(archive_path)))
2007 def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2008 """Return all known job IDs.
2010 The method only looks at disk because it's a requirement that all
2011 jobs are present on disk (so in the _memcache we don't have any
2015 @param sort: perform sorting on the returned job ids
2017 @return: the list of job IDs
2022 for path in cls._DetermineJobDirectories(archived):
2023 for filename in utils.ListVisibleFiles(path):
2024 m = constants.JOB_FILE_RE.match(filename)
2026 jlist.append(int(m.group(1)))
2032 def _LoadJobUnlocked(self, job_id):
2033 """Loads a job from the disk or memory.
2035 Given a job id, this will return the cached job object if
2036 existing, or try to load the job from the disk. If loading from
2037 disk, it will also add the job to the cache.
2040 @param job_id: the job id
2041 @rtype: L{_QueuedJob} or None
2042 @return: either None or the job object
2045 job = self._memcache.get(job_id, None)
2047 logging.debug("Found job %s in memcache", job_id)
2048 assert job.writable, "Found read-only job in memcache"
2052 job = self._LoadJobFromDisk(job_id, False)
2055 except errors.JobFileCorrupted:
2056 old_path = self._GetJobPath(job_id)
2057 new_path = self._GetArchivedJobPath(job_id)
2058 if old_path == new_path:
2059 # job already archived (future case)
2060 logging.exception("Can't parse job %s", job_id)
2063 logging.exception("Can't parse job %s, will archive.", job_id)
2064 self._RenameFilesUnlocked([(old_path, new_path)])
2067 assert job.writable, "Job just loaded is not writable"
2069 self._memcache[job_id] = job
2070 logging.debug("Added job %s to the cache", job_id)
2073 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2074 """Load the given job file from disk.
2076 Given a job file, read, load and restore it in a _QueuedJob format.
2079 @param job_id: job identifier
2080 @type try_archived: bool
2081 @param try_archived: Whether to try loading an archived job
2082 @rtype: L{_QueuedJob} or None
2083 @return: either None or the job object
2086 path_functions = [(self._GetJobPath, False)]
2089 path_functions.append((self._GetArchivedJobPath, True))
2094 for (fn, archived) in path_functions:
2095 filepath = fn(job_id)
2096 logging.debug("Loading job from %s", filepath)
2098 raw_data = utils.ReadFile(filepath)
2099 except EnvironmentError, err:
2100 if err.errno != errno.ENOENT:
2108 if writable is None:
2109 writable = not archived
2112 data = serializer.LoadJson(raw_data)
2113 job = _QueuedJob.Restore(self, data, writable, archived)
2114 except Exception, err: # pylint: disable=W0703
2115 raise errors.JobFileCorrupted(err)
2119 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2120 """Load the given job file from disk.
2122 Given a job file, read, load and restore it in a _QueuedJob format.
2123 In case of error reading the job, it gets returned as None, and the
2124 exception is logged.
2127 @param job_id: job identifier
2128 @type try_archived: bool
2129 @param try_archived: Whether to try loading an archived job
2130 @rtype: L{_QueuedJob} or None
2131 @return: either None or the job object
2135 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2136 except (errors.JobFileCorrupted, EnvironmentError):
2137 logging.exception("Can't load/parse job %s", job_id)
2140 def _UpdateQueueSizeUnlocked(self):
2141 """Update the queue size.
2144 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2146 @locking.ssynchronized(_LOCK)
2148 def SetDrainFlag(self, drain_flag):
2149 """Sets the drain flag for the queue.
2151 @type drain_flag: boolean
2152 @param drain_flag: Whether to set or unset the drain flag
2155 # Change flag locally
2156 jstore.SetDrainFlag(drain_flag)
2158 self._drained = drain_flag
2160 # ... and on all nodes
2161 (names, addrs) = self._GetNodeIp()
2163 self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2164 self._CheckRpcResult(result, self._nodes,
2165 "Setting queue drain flag to %s" % drain_flag)
2170 def _SubmitJobUnlocked(self, job_id, ops):
2171 """Create and store a new job.
2173 This enters the job into our job queue and also puts it on the new
2174 queue, in order for it to be picked up by the queue processors.
2176 @type job_id: job ID
2177 @param job_id: the job ID for the new job
2179 @param ops: The list of OpCodes that will become the new job.
2180 @rtype: L{_QueuedJob}
2181 @return: the job object to be queued
2182 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2183 @raise errors.GenericError: If an opcode is not valid
2186 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2187 raise errors.JobQueueFull()
2189 job = _QueuedJob(self, job_id, ops, True)
2191 for idx, op in enumerate(job.ops):
2193 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2194 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2195 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2196 " are %s" % (idx, op.priority, allowed))
2198 # Check job dependencies
2199 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2200 if not opcodes.TNoRelativeJobDependencies(dependencies):
2201 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2203 (idx, opcodes.TNoRelativeJobDependencies,
2207 self.UpdateJobUnlocked(job)
2209 self._queue_size += 1
2211 logging.debug("Adding new job %s to the cache", job_id)
2212 self._memcache[job_id] = job
2216 @locking.ssynchronized(_LOCK)
2218 @_RequireNonDrainedQueue
2219 def SubmitJob(self, ops):
2220 """Create and store a new job.
2222 @see: L{_SubmitJobUnlocked}
2225 (job_id, ) = self._NewSerialsUnlocked(1)
2226 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2229 @locking.ssynchronized(_LOCK)
2231 @_RequireNonDrainedQueue
2232 def SubmitManyJobs(self, jobs):
2233 """Create and store multiple jobs.
2235 @see: L{_SubmitJobUnlocked}
2238 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2240 (results, added_jobs) = \
2241 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2243 self._EnqueueJobsUnlocked(added_jobs)
2248 def _FormatSubmitError(msg, ops):
2249 """Formats errors which occurred while submitting a job.
2252 return ("%s; opcodes %s" %
2253 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2256 def _ResolveJobDependencies(resolve_fn, deps):
2257 """Resolves relative job IDs in dependencies.
2259 @type resolve_fn: callable
2260 @param resolve_fn: Function to resolve a relative job ID
2262 @param deps: Dependencies
2263 @rtype: tuple; (boolean, string or list)
2264 @return: If successful (first tuple item), the returned list contains
2265 resolved job IDs along with the requested status; if not successful,
2266 the second element is an error message
2271 for (dep_job_id, dep_status) in deps:
2272 if ht.TRelativeJobId(dep_job_id):
2273 assert ht.TInt(dep_job_id) and dep_job_id < 0
2275 job_id = resolve_fn(dep_job_id)
2278 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2282 result.append((job_id, dep_status))
2284 return (True, result)
2286 def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2287 """Create and store multiple jobs.
2289 @see: L{_SubmitJobUnlocked}
2295 def resolve_fn(job_idx, reljobid):
2297 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2299 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2301 if getattr(op, opcodes.DEPEND_ATTR, None):
2303 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2306 # Abort resolving dependencies
2307 assert ht.TNonEmptyString(data), "No error message"
2309 # Use resolved dependencies
2313 job = self._SubmitJobUnlocked(job_id, ops)
2314 except errors.GenericError, err:
2316 data = self._FormatSubmitError(str(err), ops)
2320 added_jobs.append(job)
2322 results.append((status, data))
2324 return (results, added_jobs)
2326 @locking.ssynchronized(_LOCK)
2327 def _EnqueueJobs(self, jobs):
2328 """Helper function to add jobs to worker pool's queue.
2331 @param jobs: List of all jobs
2334 return self._EnqueueJobsUnlocked(jobs)
2336 def _EnqueueJobsUnlocked(self, jobs):
2337 """Helper function to add jobs to worker pool's queue.
2340 @param jobs: List of all jobs
2343 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2344 self._wpool.AddManyTasks([(job, ) for job in jobs],
2345 priority=[job.CalcPriority() for job in jobs],
2346 task_id=map(_GetIdAttr, jobs))
2348 def _GetJobStatusForDependencies(self, job_id):
2349 """Gets the status of a job for dependencies.
2352 @param job_id: Job ID
2353 @raise errors.JobLost: If job can't be found
2356 # Not using in-memory cache as doing so would require an exclusive lock
2358 # Try to load from disk
2359 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2361 assert not job.writable, "Got writable job" # pylint: disable=E1101
2364 return job.CalcStatus()
2366 raise errors.JobLost("Job %s not found" % job_id)
2369 def UpdateJobUnlocked(self, job, replicate=True):
2370 """Update a job's on disk storage.
2372 After a job has been modified, this function needs to be called in
2373 order to write the changes to disk and replicate them to the other
2376 @type job: L{_QueuedJob}
2377 @param job: the changed job
2378 @type replicate: boolean
2379 @param replicate: whether to replicate the change to remote nodes
2383 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2384 assert (finalized ^ (job.end_timestamp is None))
2385 assert job.writable, "Can't update read-only job"
2386 assert not job.archived, "Can't update archived job"
2388 filename = self._GetJobPath(job.id)
2389 data = serializer.DumpJson(job.Serialize())
2390 logging.debug("Writing job %s to %s", job.id, filename)
2391 self._UpdateJobQueueFile(filename, data, replicate)
2393 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2395 """Waits for changes in a job.
2398 @param job_id: Job identifier
2399 @type fields: list of strings
2400 @param fields: Which fields to check for changes
2401 @type prev_job_info: list or None
2402 @param prev_job_info: Last job information returned
2403 @type prev_log_serial: int
2404 @param prev_log_serial: Last job message serial number
2405 @type timeout: float
2406 @param timeout: maximum time to wait in seconds
2407 @rtype: tuple (job info, log entries)
2408 @return: a tuple of the job information as required via
2409 the fields parameter, and the log entries as a list
2411 if the job has not changed and the timeout has expired,
2412 we instead return a special value,
2413 L{constants.JOB_NOTCHANGED}, which should be interpreted
2414 as such by the clients
2417 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2420 helper = _WaitForJobChangesHelper()
2422 return helper(self._GetJobPath(job_id), load_fn,
2423 fields, prev_job_info, prev_log_serial, timeout)
2425 @locking.ssynchronized(_LOCK)
2427 def CancelJob(self, job_id):
2430 This will only succeed if the job has not started yet.
2433 @param job_id: job ID of job to be cancelled.
2436 logging.info("Cancelling job %s", job_id)
2438 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2440 @locking.ssynchronized(_LOCK)
2442 def ChangeJobPriority(self, job_id, priority):
2443 """Changes a job's priority.
2446 @param job_id: ID of the job whose priority should be changed
2448 @param priority: New priority
2451 logging.info("Changing priority of job %s to %s", job_id, priority)
2453 if priority not in constants.OP_PRIO_SUBMIT_VALID:
2454 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2455 raise errors.GenericError("Invalid priority %s, allowed are %s" %
2456 (priority, allowed))
2459 (success, msg) = job.ChangePriority(priority)
2463 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2464 except workerpool.NoSuchTask:
2465 logging.debug("Job %s is not in workerpool at this time", job.id)
2467 return (success, msg)
2469 return self._ModifyJobUnlocked(job_id, fn)
2471 def _ModifyJobUnlocked(self, job_id, mod_fn):
2475 @param job_id: Job ID
2476 @type mod_fn: callable
2477 @param mod_fn: Modifying function, receiving job object as parameter,
2478 returning tuple of (status boolean, message string)
2481 job = self._LoadJobUnlocked(job_id)
2483 logging.debug("Job %s not found", job_id)
2484 return (False, "Job %s not found" % job_id)
2486 assert job.writable, "Can't modify read-only job"
2487 assert not job.archived, "Can't modify archived job"
2489 (success, msg) = mod_fn(job)
2492 # If the job was finalized (e.g. cancelled), this is the final write
2493 # allowed. The job can be archived anytime.
2494 self.UpdateJobUnlocked(job)
2496 return (success, msg)
2499 def _ArchiveJobsUnlocked(self, jobs):
2502 @type jobs: list of L{_QueuedJob}
2503 @param jobs: Job objects
2505 @return: Number of archived jobs
2511 assert job.writable, "Can't archive read-only job"
2512 assert not job.archived, "Can't cancel archived job"
2514 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2515 logging.debug("Job %s is not yet done", job.id)
2518 archive_jobs.append(job)
2520 old = self._GetJobPath(job.id)
2521 new = self._GetArchivedJobPath(job.id)
2522 rename_files.append((old, new))
2524 # TODO: What if 1..n files fail to rename?
2525 self._RenameFilesUnlocked(rename_files)
2527 logging.debug("Successfully archived job(s) %s",
2528 utils.CommaJoin(job.id for job in archive_jobs))
2530 # Since we haven't quite checked, above, if we succeeded or failed renaming
2531 # the files, we update the cached queue size from the filesystem. When we
2532 # get around to fix the TODO: above, we can use the number of actually
2533 # archived jobs to fix this.
2534 self._UpdateQueueSizeUnlocked()
2535 return len(archive_jobs)
2537 @locking.ssynchronized(_LOCK)
2539 def ArchiveJob(self, job_id):
2542 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2545 @param job_id: Job ID of job to be archived.
2547 @return: Whether job was archived
2550 logging.info("Archiving job %s", job_id)
2552 job = self._LoadJobUnlocked(job_id)
2554 logging.debug("Job %s not found", job_id)
2557 return self._ArchiveJobsUnlocked([job]) == 1
2559 @locking.ssynchronized(_LOCK)
2561 def AutoArchiveJobs(self, age, timeout):
2562 """Archives all jobs based on age.
2564 The method will archive all jobs which are older than the age
2565 parameter. For jobs that don't have an end timestamp, the start
2566 timestamp will be considered. The special '-1' age will cause
2567 archival of all jobs (that are not running or queued).
2570 @param age: the minimum age in seconds
2573 logging.info("Archiving jobs with age more than %s seconds", age)
2576 end_time = now + timeout
2580 all_job_ids = self._GetJobIDsUnlocked()
2582 for idx, job_id in enumerate(all_job_ids):
2583 last_touched = idx + 1
2585 # Not optimal because jobs could be pending
2586 # TODO: Measure average duration for job archival and take number of
2587 # pending jobs into account.
2588 if time.time() > end_time:
2591 # Returns None if the job failed to load
2592 job = self._LoadJobUnlocked(job_id)
2594 if job.end_timestamp is None:
2595 if job.start_timestamp is None:
2596 job_age = job.received_timestamp
2598 job_age = job.start_timestamp
2600 job_age = job.end_timestamp
2602 if age == -1 or now - job_age[0] > age:
2605 # Archive 10 jobs at a time
2606 if len(pending) >= 10:
2607 archived_count += self._ArchiveJobsUnlocked(pending)
2611 archived_count += self._ArchiveJobsUnlocked(pending)
2613 return (archived_count, len(all_job_ids) - last_touched)
2615 def _Query(self, fields, qfilter):
2616 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2619 # Archived jobs are only looked at if the "archived" field is referenced
2620 # either as a requested field or in the filter. By default archived jobs
2622 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2624 job_ids = qobj.RequestedNames()
2626 list_all = (job_ids is None)
2629 # Since files are added to/removed from the queue atomically, there's no
2630 # risk of getting the job ids in an inconsistent state.
2631 job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2635 for job_id in job_ids:
2636 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2637 if job is not None or not list_all:
2638 jobs.append((job_id, job))
2640 return (qobj, jobs, list_all)
2642 def QueryJobs(self, fields, qfilter):
2643 """Returns a list of jobs in queue.
2645 @type fields: sequence
2646 @param fields: List of wanted fields
2647 @type qfilter: None or query2 filter (list)
2648 @param qfilter: Query filter
2651 (qobj, ctx, _) = self._Query(fields, qfilter)
2653 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2655 def OldStyleQueryJobs(self, job_ids, fields):
2656 """Returns a list of jobs in queue.
2659 @param job_ids: sequence of job identifiers or None for all
2661 @param fields: names of fields to return
2663 @return: list one element per job, each element being list with
2664 the requested fields
2668 job_ids = [int(jid) for jid in job_ids]
2669 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2671 (qobj, ctx, _) = self._Query(fields, qfilter)
2673 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2675 @locking.ssynchronized(_LOCK)
2676 def PrepareShutdown(self):
2677 """Prepare to stop the job queue.
2679 Disables execution of jobs in the workerpool and returns whether there are
2680 any jobs currently running. If the latter is the case, the job queue is not
2681 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2682 be called without interfering with any job. Queued and unfinished jobs will
2683 be resumed next time.
2685 Once this function has been called no new job submissions will be accepted
2686 (see L{_RequireNonDrainedQueue}).
2689 @return: Whether there are any running jobs
2692 if self._accepting_jobs:
2693 self._accepting_jobs = False
2695 # Tell worker pool to stop processing pending tasks
2696 self._wpool.SetActive(False)
2698 return self._wpool.HasRunningTasks()
2700 def AcceptingJobsUnlocked(self):
2701 """Returns whether jobs are accepted.
2703 Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2704 queue is shutting down.
2709 return self._accepting_jobs
2711 @locking.ssynchronized(_LOCK)
2714 """Stops the job queue.
2716 This shutdowns all the worker threads an closes the queue.
2719 self._wpool.TerminateWorkers()
2721 self._queue_filelock.Close()
2722 self._queue_filelock = None