4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
41 # pylint: disable=E0611
42 from pyinotify import pyinotify
46 from ganeti import asyncnotifier
47 from ganeti import constants
48 from ganeti import serializer
49 from ganeti import workerpool
50 from ganeti import locking
51 from ganeti import opcodes
52 from ganeti import opcodes_base
53 from ganeti import errors
54 from ganeti import mcpu
55 from ganeti import utils
56 from ganeti import jstore
57 from ganeti import rpc
58 from ganeti import runtime
59 from ganeti import netutils
60 from ganeti import compat
62 from ganeti import query
63 from ganeti import qlang
64 from ganeti import pathutils
65 from ganeti import vcluster
70 # member lock names to be passed to @ssynchronized decorator
74 #: Retrieves "id" attribute
75 _GetIdAttr = operator.attrgetter("id")
78 class CancelJob(Exception):
79 """Special exception to cancel a job.
84 class QueueShutdown(Exception):
85 """Special exception to abort a job when the job queue is shutting down.
91 """Returns the current timestamp.
94 @return: the current time in the (seconds, microseconds) format
97 return utils.SplitTime(time.time())
100 def _CallJqUpdate(runner, names, file_name, content):
101 """Updates job queue file after virtualizing filename.
104 virt_file_name = vcluster.MakeVirtualPath(file_name)
105 return runner.call_jobqueue_update(names, virt_file_name, content)
108 class _SimpleJobQuery:
109 """Wrapper for job queries.
111 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
114 def __init__(self, fields):
115 """Initializes this class.
118 self._query = query.Query(query.JOB_FIELDS, fields)
120 def __call__(self, job):
121 """Executes a job query using cached field list.
124 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
127 class _QueuedOpCode(object):
128 """Encapsulates an opcode object.
130 @ivar log: holds the execution log and consists of tuples
131 of the form C{(log_serial, timestamp, level, message)}
132 @ivar input: the OpCode we encapsulate
133 @ivar status: the current status
134 @ivar result: the result of the LU execution
135 @ivar start_timestamp: timestamp for the start of the execution
136 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
137 @ivar stop_timestamp: timestamp for the end of the execution
140 __slots__ = ["input", "status", "result", "log", "priority",
141 "start_timestamp", "exec_timestamp", "end_timestamp",
144 def __init__(self, op):
145 """Initializes instances of this class.
147 @type op: L{opcodes.OpCode}
148 @param op: the opcode we encapsulate
152 self.status = constants.OP_STATUS_QUEUED
155 self.start_timestamp = None
156 self.exec_timestamp = None
157 self.end_timestamp = None
159 # Get initial priority (it might change during the lifetime of this opcode)
160 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
163 def Restore(cls, state):
164 """Restore the _QueuedOpCode from the serialized form.
167 @param state: the serialized state
168 @rtype: _QueuedOpCode
169 @return: a new _QueuedOpCode instance
172 obj = _QueuedOpCode.__new__(cls)
173 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
174 obj.status = state["status"]
175 obj.result = state["result"]
176 obj.log = state["log"]
177 obj.start_timestamp = state.get("start_timestamp", None)
178 obj.exec_timestamp = state.get("exec_timestamp", None)
179 obj.end_timestamp = state.get("end_timestamp", None)
180 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
184 """Serializes this _QueuedOpCode.
187 @return: the dictionary holding the serialized state
191 "input": self.input.__getstate__(),
192 "status": self.status,
193 "result": self.result,
195 "start_timestamp": self.start_timestamp,
196 "exec_timestamp": self.exec_timestamp,
197 "end_timestamp": self.end_timestamp,
198 "priority": self.priority,
202 class _QueuedJob(object):
203 """In-memory job representation.
205 This is what we use to track the user-submitted jobs. Locking must
206 be taken care of by users of this class.
208 @type queue: L{JobQueue}
209 @ivar queue: the parent queue
212 @ivar ops: the list of _QueuedOpCode that constitute the job
213 @type log_serial: int
214 @ivar log_serial: holds the index for the next log entry
215 @ivar received_timestamp: the timestamp for when the job was received
216 @ivar start_timestmap: the timestamp for start of execution
217 @ivar end_timestamp: the timestamp for end of execution
218 @ivar writable: Whether the job is allowed to be modified
221 # pylint: disable=W0212
222 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
223 "received_timestamp", "start_timestamp", "end_timestamp",
224 "__weakref__", "processor_lock", "writable", "archived"]
226 def _AddReasons(self):
227 """Extend the reason trail
229 Add the reason for all the opcodes of this job to be executed.
233 for queued_op in self.ops:
235 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
236 reason_text = "job=%d;index=%d" % (self.id, count)
237 reason = getattr(op, "reason", [])
238 reason.append((reason_src, reason_text, utils.EpochNano()))
242 def __init__(self, queue, job_id, ops, writable):
243 """Constructor for the _QueuedJob.
245 @type queue: L{JobQueue}
246 @param queue: our parent queue
248 @param job_id: our job id
250 @param ops: the list of opcodes we hold, which will be encapsulated
253 @param writable: Whether job can be modified
257 raise errors.GenericError("A job needs at least one opcode")
260 self.id = int(job_id)
261 self.ops = [_QueuedOpCode(op) for op in ops]
264 self.received_timestamp = TimeStampNow()
265 self.start_timestamp = None
266 self.end_timestamp = None
267 self.archived = False
269 self._InitInMemory(self, writable)
271 assert not self.archived, "New jobs can not be marked as archived"
274 def _InitInMemory(obj, writable):
275 """Initializes in-memory variables.
278 obj.writable = writable
282 # Read-only jobs are not processed and therefore don't need a lock
284 obj.processor_lock = threading.Lock()
286 obj.processor_lock = None
289 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
291 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
293 return "<%s at %#x>" % (" ".join(status), id(self))
296 def Restore(cls, queue, state, writable, archived):
297 """Restore a _QueuedJob from serialized state:
299 @type queue: L{JobQueue}
300 @param queue: to which queue the restored job belongs
302 @param state: the serialized state
304 @param writable: Whether job can be modified
306 @param archived: Whether job was already archived
308 @return: the restored _JobQueue instance
311 obj = _QueuedJob.__new__(cls)
313 obj.id = int(state["id"])
314 obj.received_timestamp = state.get("received_timestamp", None)
315 obj.start_timestamp = state.get("start_timestamp", None)
316 obj.end_timestamp = state.get("end_timestamp", None)
317 obj.archived = archived
321 for op_state in state["ops"]:
322 op = _QueuedOpCode.Restore(op_state)
323 for log_entry in op.log:
324 obj.log_serial = max(obj.log_serial, log_entry[0])
327 cls._InitInMemory(obj, writable)
332 """Serialize the _JobQueue instance.
335 @return: the serialized state
340 "ops": [op.Serialize() for op in self.ops],
341 "start_timestamp": self.start_timestamp,
342 "end_timestamp": self.end_timestamp,
343 "received_timestamp": self.received_timestamp,
346 def CalcStatus(self):
347 """Compute the status of this job.
349 This function iterates over all the _QueuedOpCodes in the job and
350 based on their status, computes the job status.
353 - if we find a cancelled, or finished with error, the job
354 status will be the same
355 - otherwise, the last opcode with the status one of:
360 will determine the job status
362 - otherwise, it means either all opcodes are queued, or success,
363 and the job status will be the same
365 @return: the job status
368 status = constants.JOB_STATUS_QUEUED
372 if op.status == constants.OP_STATUS_SUCCESS:
377 if op.status == constants.OP_STATUS_QUEUED:
379 elif op.status == constants.OP_STATUS_WAITING:
380 status = constants.JOB_STATUS_WAITING
381 elif op.status == constants.OP_STATUS_RUNNING:
382 status = constants.JOB_STATUS_RUNNING
383 elif op.status == constants.OP_STATUS_CANCELING:
384 status = constants.JOB_STATUS_CANCELING
386 elif op.status == constants.OP_STATUS_ERROR:
387 status = constants.JOB_STATUS_ERROR
388 # The whole job fails if one opcode failed
390 elif op.status == constants.OP_STATUS_CANCELED:
391 status = constants.OP_STATUS_CANCELED
395 status = constants.JOB_STATUS_SUCCESS
399 def CalcPriority(self):
400 """Gets the current priority for this job.
402 Only unfinished opcodes are considered. When all are done, the default
408 priorities = [op.priority for op in self.ops
409 if op.status not in constants.OPS_FINALIZED]
412 # All opcodes are done, assume default priority
413 return constants.OP_PRIO_DEFAULT
415 return min(priorities)
417 def GetLogEntries(self, newer_than):
418 """Selectively returns the log entries.
420 @type newer_than: None or int
421 @param newer_than: if this is None, return all log entries,
422 otherwise return only the log entries with serial higher
425 @return: the list of the log entries selected
428 if newer_than is None:
435 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
439 def GetInfo(self, fields):
440 """Returns information about a job.
443 @param fields: names of fields to return
445 @return: list with one element for each field
446 @raise errors.OpExecError: when an invalid field
450 return _SimpleJobQuery(fields)(self)
452 def MarkUnfinishedOps(self, status, result):
453 """Mark unfinished opcodes with a given status and result.
455 This is an utility function for marking all running or waiting to
456 be run opcodes with a given status. Opcodes which are already
457 finalised are not changed.
459 @param status: a given opcode status
460 @param result: the opcode result
465 if op.status in constants.OPS_FINALIZED:
466 assert not_marked, "Finalized opcodes found after non-finalized ones"
473 """Marks the job as finalized.
476 self.end_timestamp = TimeStampNow()
479 """Marks job as canceled/-ing if possible.
481 @rtype: tuple; (bool, string)
482 @return: Boolean describing whether job was successfully canceled or marked
483 as canceling and a text message
486 status = self.CalcStatus()
488 if status == constants.JOB_STATUS_QUEUED:
489 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
490 "Job canceled by request")
492 return (True, "Job %s canceled" % self.id)
494 elif status == constants.JOB_STATUS_WAITING:
495 # The worker will notice the new status and cancel the job
496 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
497 return (True, "Job %s will be canceled" % self.id)
500 logging.debug("Job %s is no longer waiting in the queue", self.id)
501 return (False, "Job %s is no longer waiting in the queue" % self.id)
503 def ChangePriority(self, priority):
504 """Changes the job priority.
507 @param priority: New priority
508 @rtype: tuple; (bool, string)
509 @return: Boolean describing whether job's priority was successfully changed
513 status = self.CalcStatus()
515 if status in constants.JOBS_FINALIZED:
516 return (False, "Job %s is finished" % self.id)
517 elif status == constants.JOB_STATUS_CANCELING:
518 return (False, "Job %s is cancelling" % self.id)
520 assert status in (constants.JOB_STATUS_QUEUED,
521 constants.JOB_STATUS_WAITING,
522 constants.JOB_STATUS_RUNNING)
526 if (op.status == constants.OP_STATUS_RUNNING or
527 op.status in constants.OPS_FINALIZED):
528 assert not changed, \
529 ("Found opcode for which priority should not be changed after"
530 " priority has been changed for previous opcodes")
533 assert op.status in (constants.OP_STATUS_QUEUED,
534 constants.OP_STATUS_WAITING)
538 # Set new priority (doesn't modify opcode input)
539 op.priority = priority
542 return (True, ("Priorities of pending opcodes for job %s have been"
543 " changed to %s" % (self.id, priority)))
545 return (False, "Job %s had no pending opcodes" % self.id)
548 class _OpExecCallbacks(mcpu.OpExecCbBase):
549 def __init__(self, queue, job, op):
550 """Initializes this class.
552 @type queue: L{JobQueue}
553 @param queue: Job queue
554 @type job: L{_QueuedJob}
555 @param job: Job object
556 @type op: L{_QueuedOpCode}
560 assert queue, "Queue is missing"
561 assert job, "Job is missing"
562 assert op, "Opcode is missing"
568 def _CheckCancel(self):
569 """Raises an exception to cancel the job if asked to.
572 # Cancel here if we were asked to
573 if self._op.status == constants.OP_STATUS_CANCELING:
574 logging.debug("Canceling opcode")
577 # See if queue is shutting down
578 if not self._queue.AcceptingJobsUnlocked():
579 logging.debug("Queue is shutting down")
580 raise QueueShutdown()
582 @locking.ssynchronized(_QUEUE, shared=1)
583 def NotifyStart(self):
584 """Mark the opcode as running, not lock-waiting.
586 This is called from the mcpu code as a notifier function, when the LU is
587 finally about to start the Exec() method. Of course, to have end-user
588 visible results, the opcode must be initially (before calling into
589 Processor.ExecOpCode) set to OP_STATUS_WAITING.
592 assert self._op in self._job.ops
593 assert self._op.status in (constants.OP_STATUS_WAITING,
594 constants.OP_STATUS_CANCELING)
596 # Cancel here if we were asked to
599 logging.debug("Opcode is now running")
601 self._op.status = constants.OP_STATUS_RUNNING
602 self._op.exec_timestamp = TimeStampNow()
604 # And finally replicate the job status
605 self._queue.UpdateJobUnlocked(self._job)
607 @locking.ssynchronized(_QUEUE, shared=1)
608 def _AppendFeedback(self, timestamp, log_type, log_msg):
609 """Internal feedback append function, with locks
612 self._job.log_serial += 1
613 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
614 self._queue.UpdateJobUnlocked(self._job, replicate=False)
616 def Feedback(self, *args):
617 """Append a log entry.
623 log_type = constants.ELOG_MESSAGE
626 (log_type, log_msg) = args
628 # The time is split to make serialization easier and not lose
630 timestamp = utils.SplitTime(time.time())
631 self._AppendFeedback(timestamp, log_type, log_msg)
633 def CurrentPriority(self):
634 """Returns current priority for opcode.
637 assert self._op.status in (constants.OP_STATUS_WAITING,
638 constants.OP_STATUS_CANCELING)
640 # Cancel here if we were asked to
643 return self._op.priority
645 def SubmitManyJobs(self, jobs):
646 """Submits jobs for processing.
648 See L{JobQueue.SubmitManyJobs}.
651 # Locking is done in job queue
652 return self._queue.SubmitManyJobs(jobs)
655 class _JobChangesChecker(object):
656 def __init__(self, fields, prev_job_info, prev_log_serial):
657 """Initializes this class.
659 @type fields: list of strings
660 @param fields: Fields requested by LUXI client
661 @type prev_job_info: string
662 @param prev_job_info: previous job info, as passed by the LUXI client
663 @type prev_log_serial: string
664 @param prev_log_serial: previous job serial, as passed by the LUXI client
667 self._squery = _SimpleJobQuery(fields)
668 self._prev_job_info = prev_job_info
669 self._prev_log_serial = prev_log_serial
671 def __call__(self, job):
672 """Checks whether job has changed.
674 @type job: L{_QueuedJob}
675 @param job: Job object
678 assert not job.writable, "Expected read-only job"
680 status = job.CalcStatus()
681 job_info = self._squery(job)
682 log_entries = job.GetLogEntries(self._prev_log_serial)
684 # Serializing and deserializing data can cause type changes (e.g. from
685 # tuple to list) or precision loss. We're doing it here so that we get
686 # the same modifications as the data received from the client. Without
687 # this, the comparison afterwards might fail without the data being
688 # significantly different.
689 # TODO: we just deserialized from disk, investigate how to make sure that
690 # the job info and log entries are compatible to avoid this further step.
691 # TODO: Doing something like in testutils.py:UnifyValueType might be more
692 # efficient, though floats will be tricky
693 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
694 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
696 # Don't even try to wait if the job is no longer running, there will be
698 if (status not in (constants.JOB_STATUS_QUEUED,
699 constants.JOB_STATUS_RUNNING,
700 constants.JOB_STATUS_WAITING) or
701 job_info != self._prev_job_info or
702 (log_entries and self._prev_log_serial != log_entries[0][0])):
703 logging.debug("Job %s changed", job.id)
704 return (job_info, log_entries)
709 class _JobFileChangesWaiter(object):
710 def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
711 """Initializes this class.
713 @type filename: string
714 @param filename: Path to job file
715 @raises errors.InotifyError: if the notifier cannot be setup
718 self._wm = _inotify_wm_cls()
719 self._inotify_handler = \
720 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
722 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
724 self._inotify_handler.enable()
726 # pyinotify doesn't close file descriptors automatically
727 self._notifier.stop()
730 def _OnInotify(self, notifier_enabled):
731 """Callback for inotify.
734 if not notifier_enabled:
735 self._inotify_handler.enable()
737 def Wait(self, timeout):
738 """Waits for the job file to change.
741 @param timeout: Timeout in seconds
742 @return: Whether there have been events
746 have_events = self._notifier.check_events(timeout * 1000)
748 self._notifier.read_events()
749 self._notifier.process_events()
753 """Closes underlying notifier and its file descriptor.
756 self._notifier.stop()
759 class _JobChangesWaiter(object):
760 def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
761 """Initializes this class.
763 @type filename: string
764 @param filename: Path to job file
767 self._filewaiter = None
768 self._filename = filename
769 self._waiter_cls = _waiter_cls
771 def Wait(self, timeout):
772 """Waits for a job to change.
775 @param timeout: Timeout in seconds
776 @return: Whether there have been events
780 return self._filewaiter.Wait(timeout)
782 # Lazy setup: Avoid inotify setup cost when job file has already changed.
783 # If this point is reached, return immediately and let caller check the job
784 # file again in case there were changes since the last check. This avoids a
786 self._filewaiter = self._waiter_cls(self._filename)
791 """Closes underlying waiter.
795 self._filewaiter.Close()
798 class _WaitForJobChangesHelper(object):
799 """Helper class using inotify to wait for changes in a job file.
801 This class takes a previous job status and serial, and alerts the client when
802 the current job status has changed.
806 def _CheckForChanges(counter, job_load_fn, check_fn):
807 if counter.next() > 0:
808 # If this isn't the first check the job is given some more time to change
809 # again. This gives better performance for jobs generating many
815 raise errors.JobLost()
817 result = check_fn(job)
819 raise utils.RetryAgain()
823 def __call__(self, filename, job_load_fn,
824 fields, prev_job_info, prev_log_serial, timeout,
825 _waiter_cls=_JobChangesWaiter):
826 """Waits for changes on a job.
828 @type filename: string
829 @param filename: File on which to wait for changes
830 @type job_load_fn: callable
831 @param job_load_fn: Function to load job
832 @type fields: list of strings
833 @param fields: Which fields to check for changes
834 @type prev_job_info: list or None
835 @param prev_job_info: Last job information returned
836 @type prev_log_serial: int
837 @param prev_log_serial: Last job message serial number
839 @param timeout: maximum time to wait in seconds
842 counter = itertools.count()
844 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
845 waiter = _waiter_cls(filename)
847 return utils.Retry(compat.partial(self._CheckForChanges,
848 counter, job_load_fn, check_fn),
849 utils.RETRY_REMAINING_TIME, timeout,
853 except errors.JobLost:
855 except utils.RetryTimeout:
856 return constants.JOB_NOTCHANGED
859 def _EncodeOpError(err):
860 """Encodes an error which occurred while processing an opcode.
863 if isinstance(err, errors.GenericError):
866 to_encode = errors.OpExecError(str(err))
868 return errors.EncodeException(to_encode)
871 class _TimeoutStrategyWrapper:
872 def __init__(self, fn):
873 """Initializes this class.
880 """Gets the next timeout if necessary.
883 if self._next is None:
884 self._next = self._fn()
887 """Returns the next timeout.
894 """Returns the current timeout and advances the internal state.
903 class _OpExecContext:
904 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
905 """Initializes this class.
910 self.log_prefix = log_prefix
911 self.summary = op.input.Summary()
913 # Create local copy to modify
914 if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
915 self.jobdeps = op.input.depends[:]
919 self._timeout_strategy_factory = timeout_strategy_factory
920 self._ResetTimeoutStrategy()
922 def _ResetTimeoutStrategy(self):
923 """Creates a new timeout strategy.
926 self._timeout_strategy = \
927 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
929 def CheckPriorityIncrease(self):
930 """Checks whether priority can and should be increased.
932 Called when locks couldn't be acquired.
937 # Exhausted all retries and next round should not use blocking acquire
939 if (self._timeout_strategy.Peek() is None and
940 op.priority > constants.OP_PRIO_HIGHEST):
941 logging.debug("Increasing priority")
943 self._ResetTimeoutStrategy()
948 def GetNextLockTimeout(self):
949 """Returns the next lock acquire timeout.
952 return self._timeout_strategy.Next()
955 class _JobProcessor(object):
958 FINISHED) = range(1, 4)
960 def __init__(self, queue, opexec_fn, job,
961 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
962 """Initializes this class.
966 self.opexec_fn = opexec_fn
968 self._timeout_strategy_factory = _timeout_strategy_factory
971 def _FindNextOpcode(job, timeout_strategy_factory):
972 """Locates the next opcode to run.
974 @type job: L{_QueuedJob}
975 @param job: Job object
976 @param timeout_strategy_factory: Callable to create new timeout strategy
979 # Create some sort of a cache to speed up locating next opcode for future
981 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
982 # pending and one for processed ops.
983 if job.ops_iter is None:
984 job.ops_iter = enumerate(job.ops)
986 # Find next opcode to run
989 (idx, op) = job.ops_iter.next()
990 except StopIteration:
991 raise errors.ProgrammerError("Called for a finished job")
993 if op.status == constants.OP_STATUS_RUNNING:
994 # Found an opcode already marked as running
995 raise errors.ProgrammerError("Called for job marked as running")
997 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
998 timeout_strategy_factory)
1000 if op.status not in constants.OPS_FINALIZED:
1003 # This is a job that was partially completed before master daemon
1004 # shutdown, so it can be expected that some opcodes are already
1005 # completed successfully (if any did error out, then the whole job
1006 # should have been aborted and not resubmitted for processing).
1007 logging.info("%s: opcode %s already processed, skipping",
1008 opctx.log_prefix, opctx.summary)
1011 def _MarkWaitlock(job, op):
1012 """Marks an opcode as waiting for locks.
1014 The job's start timestamp is also set if necessary.
1016 @type job: L{_QueuedJob}
1017 @param job: Job object
1018 @type op: L{_QueuedOpCode}
1019 @param op: Opcode object
1022 assert op in job.ops
1023 assert op.status in (constants.OP_STATUS_QUEUED,
1024 constants.OP_STATUS_WAITING)
1030 if op.status == constants.OP_STATUS_QUEUED:
1031 op.status = constants.OP_STATUS_WAITING
1034 if op.start_timestamp is None:
1035 op.start_timestamp = TimeStampNow()
1038 if job.start_timestamp is None:
1039 job.start_timestamp = op.start_timestamp
1042 assert op.status == constants.OP_STATUS_WAITING
1047 def _CheckDependencies(queue, job, opctx):
1048 """Checks if an opcode has dependencies and if so, processes them.
1050 @type queue: L{JobQueue}
1051 @param queue: Queue object
1052 @type job: L{_QueuedJob}
1053 @param job: Job object
1054 @type opctx: L{_OpExecContext}
1055 @param opctx: Opcode execution context
1057 @return: Whether opcode will be re-scheduled by dependency tracker
1064 while opctx.jobdeps:
1065 (dep_job_id, dep_status) = opctx.jobdeps[0]
1067 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1069 assert ht.TNonEmptyString(depmsg), "No dependency message"
1071 logging.info("%s: %s", opctx.log_prefix, depmsg)
1073 if depresult == _JobDependencyManager.CONTINUE:
1074 # Remove dependency and continue
1075 opctx.jobdeps.pop(0)
1077 elif depresult == _JobDependencyManager.WAIT:
1078 # Need to wait for notification, dependency tracker will re-add job
1083 elif depresult == _JobDependencyManager.CANCEL:
1084 # Job was cancelled, cancel this job as well
1086 assert op.status == constants.OP_STATUS_CANCELING
1089 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1090 _JobDependencyManager.ERROR):
1091 # Job failed or there was an error, this job must fail
1092 op.status = constants.OP_STATUS_ERROR
1093 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1097 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1102 def _ExecOpCodeUnlocked(self, opctx):
1103 """Processes one opcode and returns the result.
1108 assert op.status == constants.OP_STATUS_WAITING
1110 timeout = opctx.GetNextLockTimeout()
1113 # Make sure not to hold queue lock while calling ExecOpCode
1114 result = self.opexec_fn(op.input,
1115 _OpExecCallbacks(self.queue, self.job, op),
1117 except mcpu.LockAcquireTimeout:
1118 assert timeout is not None, "Received timeout for blocking acquire"
1119 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1121 assert op.status in (constants.OP_STATUS_WAITING,
1122 constants.OP_STATUS_CANCELING)
1124 # Was job cancelled while we were waiting for the lock?
1125 if op.status == constants.OP_STATUS_CANCELING:
1126 return (constants.OP_STATUS_CANCELING, None)
1128 # Queue is shutting down, return to queued
1129 if not self.queue.AcceptingJobsUnlocked():
1130 return (constants.OP_STATUS_QUEUED, None)
1132 # Stay in waitlock while trying to re-acquire lock
1133 return (constants.OP_STATUS_WAITING, None)
1135 logging.exception("%s: Canceling job", opctx.log_prefix)
1136 assert op.status == constants.OP_STATUS_CANCELING
1137 return (constants.OP_STATUS_CANCELING, None)
1139 except QueueShutdown:
1140 logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1142 assert op.status == constants.OP_STATUS_WAITING
1144 # Job hadn't been started yet, so it should return to the queue
1145 return (constants.OP_STATUS_QUEUED, None)
1147 except Exception, err: # pylint: disable=W0703
1148 logging.exception("%s: Caught exception in %s",
1149 opctx.log_prefix, opctx.summary)
1150 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1152 logging.debug("%s: %s successful",
1153 opctx.log_prefix, opctx.summary)
1154 return (constants.OP_STATUS_SUCCESS, result)
1156 def __call__(self, _nextop_fn=None):
1157 """Continues execution of a job.
1159 @param _nextop_fn: Callback function for tests
1160 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1161 be deferred and C{WAITDEP} if the dependency manager
1162 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1168 logging.debug("Processing job %s", job.id)
1170 queue.acquire(shared=1)
1172 opcount = len(job.ops)
1174 assert job.writable, "Expected writable job"
1176 # Don't do anything for finalized jobs
1177 if job.CalcStatus() in constants.JOBS_FINALIZED:
1178 return self.FINISHED
1180 # Is a previous opcode still pending?
1182 opctx = job.cur_opctx
1183 job.cur_opctx = None
1185 if __debug__ and _nextop_fn:
1187 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1192 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1193 constants.OP_STATUS_CANCELING)
1194 for i in job.ops[opctx.index + 1:])
1196 assert op.status in (constants.OP_STATUS_QUEUED,
1197 constants.OP_STATUS_WAITING,
1198 constants.OP_STATUS_CANCELING)
1200 assert (op.priority <= constants.OP_PRIO_LOWEST and
1201 op.priority >= constants.OP_PRIO_HIGHEST)
1205 if op.status != constants.OP_STATUS_CANCELING:
1206 assert op.status in (constants.OP_STATUS_QUEUED,
1207 constants.OP_STATUS_WAITING)
1209 # Prepare to start opcode
1210 if self._MarkWaitlock(job, op):
1212 queue.UpdateJobUnlocked(job)
1214 assert op.status == constants.OP_STATUS_WAITING
1215 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1216 assert job.start_timestamp and op.start_timestamp
1217 assert waitjob is None
1219 # Check if waiting for a job is necessary
1220 waitjob = self._CheckDependencies(queue, job, opctx)
1222 assert op.status in (constants.OP_STATUS_WAITING,
1223 constants.OP_STATUS_CANCELING,
1224 constants.OP_STATUS_ERROR)
1226 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1227 constants.OP_STATUS_ERROR)):
1228 logging.info("%s: opcode %s waiting for locks",
1229 opctx.log_prefix, opctx.summary)
1231 assert not opctx.jobdeps, "Not all dependencies were removed"
1235 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1237 queue.acquire(shared=1)
1239 op.status = op_status
1240 op.result = op_result
1244 if op.status in (constants.OP_STATUS_WAITING,
1245 constants.OP_STATUS_QUEUED):
1246 # waiting: Couldn't get locks in time
1247 # queued: Queue is shutting down
1248 assert not op.end_timestamp
1251 op.end_timestamp = TimeStampNow()
1253 if op.status == constants.OP_STATUS_CANCELING:
1254 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1255 for i in job.ops[opctx.index:])
1257 assert op.status in constants.OPS_FINALIZED
1259 if op.status == constants.OP_STATUS_QUEUED:
1260 # Queue is shutting down
1266 job.cur_opctx = None
1268 # In no case must the status be finalized here
1269 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1271 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1274 if not waitjob and opctx.CheckPriorityIncrease():
1275 # Priority was changed, need to update on-disk file
1276 queue.UpdateJobUnlocked(job)
1278 # Keep around for another round
1279 job.cur_opctx = opctx
1281 assert (op.priority <= constants.OP_PRIO_LOWEST and
1282 op.priority >= constants.OP_PRIO_HIGHEST)
1284 # In no case must the status be finalized here
1285 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1288 # Ensure all opcodes so far have been successful
1289 assert (opctx.index == 0 or
1290 compat.all(i.status == constants.OP_STATUS_SUCCESS
1291 for i in job.ops[:opctx.index]))
1294 job.cur_opctx = None
1296 if op.status == constants.OP_STATUS_SUCCESS:
1299 elif op.status == constants.OP_STATUS_ERROR:
1300 # Ensure failed opcode has an exception as its result
1301 assert errors.GetEncodedError(job.ops[opctx.index].result)
1303 to_encode = errors.OpExecError("Preceding opcode failed")
1304 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1305 _EncodeOpError(to_encode))
1309 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1310 errors.GetEncodedError(i.result)
1311 for i in job.ops[opctx.index:])
1313 elif op.status == constants.OP_STATUS_CANCELING:
1314 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1315 "Job canceled by request")
1319 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1321 if opctx.index == (opcount - 1):
1322 # Finalize on last opcode
1326 # All opcodes have been run, finalize job
1329 # Write to disk. If the job status is final, this is the final write
1330 # allowed. Once the file has been written, it can be archived anytime.
1331 queue.UpdateJobUnlocked(job)
1336 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1337 return self.FINISHED
1339 assert not waitjob or queue.depmgr.JobWaiting(job)
1346 assert job.writable, "Job became read-only while being processed"
1350 def _EvaluateJobProcessorResult(depmgr, job, result):
1351 """Looks at a result from L{_JobProcessor} for a job.
1353 To be used in a L{_JobQueueWorker}.
1356 if result == _JobProcessor.FINISHED:
1357 # Notify waiting jobs
1358 depmgr.NotifyWaiters(job.id)
1360 elif result == _JobProcessor.DEFER:
1362 raise workerpool.DeferTask(priority=job.CalcPriority())
1364 elif result == _JobProcessor.WAITDEP:
1365 # No-op, dependency manager will re-schedule
1369 raise errors.ProgrammerError("Job processor returned unknown status %s" %
1373 class _JobQueueWorker(workerpool.BaseWorker):
1374 """The actual job workers.
1377 def RunTask(self, job): # pylint: disable=W0221
1380 @type job: L{_QueuedJob}
1381 @param job: the job to be processed
1384 assert job.writable, "Expected writable job"
1386 # Ensure only one worker is active on a single job. If a job registers for
1387 # a dependency job, and the other job notifies before the first worker is
1388 # done, the job can end up in the tasklist more than once.
1389 job.processor_lock.acquire()
1391 return self._RunTaskInner(job)
1393 job.processor_lock.release()
1395 def _RunTaskInner(self, job):
1398 Must be called with per-job lock acquired.
1402 assert queue == self.pool.queue
1404 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1407 proc = mcpu.Processor(queue.context, job.id)
1409 # Create wrapper for setting thread name
1410 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1413 _EvaluateJobProcessorResult(queue.depmgr, job,
1414 _JobProcessor(queue, wrap_execop_fn, job)())
1417 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1418 """Updates the worker thread name to include a short summary of the opcode.
1420 @param setname_fn: Callable setting worker thread name
1421 @param execop_fn: Callable for executing opcode (usually
1422 L{mcpu.Processor.ExecOpCode})
1427 return execop_fn(op, *args, **kwargs)
1432 def _GetWorkerName(job, op):
1433 """Sets the worker thread name.
1435 @type job: L{_QueuedJob}
1436 @type op: L{opcodes.OpCode}
1439 parts = ["Job%s" % job.id]
1442 parts.append(op.TinySummary())
1444 return "/".join(parts)
1447 class _JobQueueWorkerPool(workerpool.WorkerPool):
1448 """Simple class implementing a job-processing workerpool.
1451 def __init__(self, queue):
1452 super(_JobQueueWorkerPool, self).__init__("Jq",
1458 class _JobDependencyManager:
1459 """Keeps track of job dependencies.
1466 WRONGSTATUS) = range(1, 6)
1468 def __init__(self, getstatus_fn, enqueue_fn):
1469 """Initializes this class.
1472 self._getstatus_fn = getstatus_fn
1473 self._enqueue_fn = enqueue_fn
1476 self._lock = locking.SharedLock("JobDepMgr")
1478 @locking.ssynchronized(_LOCK, shared=1)
1479 def GetLockInfo(self, requested): # pylint: disable=W0613
1480 """Retrieves information about waiting jobs.
1482 @type requested: set
1483 @param requested: Requested information, see C{query.LQ_*}
1486 # No need to sort here, that's being done by the lock manager and query
1487 # library. There are no priorities for notifying jobs, hence all show up as
1488 # one item under "pending".
1489 return [("job/%s" % job_id, None, None,
1490 [("job", [job.id for job in waiters])])
1491 for job_id, waiters in self._waiters.items()
1494 @locking.ssynchronized(_LOCK, shared=1)
1495 def JobWaiting(self, job):
1496 """Checks if a job is waiting.
1499 return compat.any(job in jobs
1500 for jobs in self._waiters.values())
1502 @locking.ssynchronized(_LOCK)
1503 def CheckAndRegister(self, job, dep_job_id, dep_status):
1504 """Checks if a dependency job has the requested status.
1506 If the other job is not yet in a finalized status, the calling job will be
1507 notified (re-added to the workerpool) at a later point.
1509 @type job: L{_QueuedJob}
1510 @param job: Job object
1511 @type dep_job_id: int
1512 @param dep_job_id: ID of dependency job
1513 @type dep_status: list
1514 @param dep_status: Required status
1517 assert ht.TJobId(job.id)
1518 assert ht.TJobId(dep_job_id)
1519 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1521 if job.id == dep_job_id:
1522 return (self.ERROR, "Job can't depend on itself")
1524 # Get status of dependency job
1526 status = self._getstatus_fn(dep_job_id)
1527 except errors.JobLost, err:
1528 return (self.ERROR, "Dependency error: %s" % err)
1530 assert status in constants.JOB_STATUS_ALL
1532 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1534 if status not in constants.JOBS_FINALIZED:
1535 # Register for notification and wait for job to finish
1536 job_id_waiters.add(job)
1538 "Need to wait for job %s, wanted status '%s'" %
1539 (dep_job_id, dep_status))
1541 # Remove from waiters list
1542 if job in job_id_waiters:
1543 job_id_waiters.remove(job)
1545 if (status == constants.JOB_STATUS_CANCELED and
1546 constants.JOB_STATUS_CANCELED not in dep_status):
1547 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1549 elif not dep_status or status in dep_status:
1550 return (self.CONTINUE,
1551 "Dependency job %s finished with status '%s'" %
1552 (dep_job_id, status))
1555 return (self.WRONGSTATUS,
1556 "Dependency job %s finished with status '%s',"
1557 " not one of '%s' as required" %
1558 (dep_job_id, status, utils.CommaJoin(dep_status)))
1560 def _RemoveEmptyWaitersUnlocked(self):
1561 """Remove all jobs without actual waiters.
1564 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1566 del self._waiters[job_id]
1568 def NotifyWaiters(self, job_id):
1569 """Notifies all jobs waiting for a certain job ID.
1571 @attention: Do not call until L{CheckAndRegister} returned a status other
1572 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1574 @param job_id: Job ID
1577 assert ht.TJobId(job_id)
1579 self._lock.acquire()
1581 self._RemoveEmptyWaitersUnlocked()
1583 jobs = self._waiters.pop(job_id, None)
1585 self._lock.release()
1588 # Re-add jobs to workerpool
1589 logging.debug("Re-adding %s jobs which were waiting for job %s",
1591 self._enqueue_fn(jobs)
1594 def _RequireOpenQueue(fn):
1595 """Decorator for "public" functions.
1597 This function should be used for all 'public' functions. That is,
1598 functions usually called from other classes. Note that this should
1599 be applied only to methods (not plain functions), since it expects
1600 that the decorated function is called with a first argument that has
1601 a '_queue_filelock' argument.
1603 @warning: Use this decorator only after locking.ssynchronized
1606 @locking.ssynchronized(_LOCK)
1612 def wrapper(self, *args, **kwargs):
1613 # pylint: disable=W0212
1614 assert self._queue_filelock is not None, "Queue should be open"
1615 return fn(self, *args, **kwargs)
1619 def _RequireNonDrainedQueue(fn):
1620 """Decorator checking for a non-drained queue.
1622 To be used with functions submitting new jobs.
1625 def wrapper(self, *args, **kwargs):
1626 """Wrapper function.
1628 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1631 # Ok when sharing the big job queue lock, as the drain file is created when
1632 # the lock is exclusive.
1633 # Needs access to protected member, pylint: disable=W0212
1635 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1637 if not self._accepting_jobs:
1638 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1640 return fn(self, *args, **kwargs)
1644 class JobQueue(object):
1645 """Queue used to manage the jobs.
1648 def __init__(self, context):
1649 """Constructor for JobQueue.
1651 The constructor will initialize the job queue object and then
1652 start loading the current jobs from disk, either for starting them
1653 (if they were queue) or for aborting them (if they were already
1656 @type context: GanetiContext
1657 @param context: the context object for access to the configuration
1658 data and other ganeti objects
1661 self.context = context
1662 self._memcache = weakref.WeakValueDictionary()
1663 self._my_hostname = netutils.Hostname.GetSysName()
1665 # The Big JobQueue lock. If a code block or method acquires it in shared
1666 # mode safe it must guarantee concurrency with all the code acquiring it in
1667 # shared mode, including itself. In order not to acquire it at all
1668 # concurrency must be guaranteed with all code acquiring it in shared mode
1669 # and all code acquiring it exclusively.
1670 self._lock = locking.SharedLock("JobQueue")
1672 self.acquire = self._lock.acquire
1673 self.release = self._lock.release
1675 # Accept jobs by default
1676 self._accepting_jobs = True
1678 # Initialize the queue, and acquire the filelock.
1679 # This ensures no other process is working on the job queue.
1680 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1683 self._last_serial = jstore.ReadSerial()
1684 assert self._last_serial is not None, ("Serial file was modified between"
1685 " check in jstore and here")
1687 # Get initial list of nodes
1688 self._nodes = dict((n.name, n.primary_ip)
1689 for n in self.context.cfg.GetAllNodesInfo().values()
1690 if n.master_candidate)
1692 # Remove master node
1693 self._nodes.pop(self._my_hostname, None)
1695 # TODO: Check consistency across nodes
1697 self._queue_size = None
1698 self._UpdateQueueSizeUnlocked()
1699 assert ht.TInt(self._queue_size)
1700 self._drained = jstore.CheckDrainFlag()
1703 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1705 self.context.glm.AddToLockMonitor(self.depmgr)
1708 self._wpool = _JobQueueWorkerPool(self)
1710 self._InspectQueue()
1712 self._wpool.TerminateWorkers()
1715 @locking.ssynchronized(_LOCK)
1717 def _InspectQueue(self):
1718 """Loads the whole job queue and resumes unfinished jobs.
1720 This function needs the lock here because WorkerPool.AddTask() may start a
1721 job while we're still doing our work.
1724 logging.info("Inspecting job queue")
1728 all_job_ids = self._GetJobIDsUnlocked()
1729 jobs_count = len(all_job_ids)
1730 lastinfo = time.time()
1731 for idx, job_id in enumerate(all_job_ids):
1732 # Give an update every 1000 jobs or 10 seconds
1733 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1734 idx == (jobs_count - 1)):
1735 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1736 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1737 lastinfo = time.time()
1739 job = self._LoadJobUnlocked(job_id)
1741 # a failure in loading the job can cause 'None' to be returned
1745 status = job.CalcStatus()
1747 if status == constants.JOB_STATUS_QUEUED:
1748 restartjobs.append(job)
1750 elif status in (constants.JOB_STATUS_RUNNING,
1751 constants.JOB_STATUS_WAITING,
1752 constants.JOB_STATUS_CANCELING):
1753 logging.warning("Unfinished job %s found: %s", job.id, job)
1755 if status == constants.JOB_STATUS_WAITING:
1757 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1758 restartjobs.append(job)
1760 to_encode = errors.OpExecError("Unclean master daemon shutdown")
1761 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1762 _EncodeOpError(to_encode))
1765 self.UpdateJobUnlocked(job)
1768 logging.info("Restarting %s jobs", len(restartjobs))
1769 self._EnqueueJobsUnlocked(restartjobs)
1771 logging.info("Job queue inspection finished")
1773 def _GetRpc(self, address_list):
1774 """Gets RPC runner with context.
1777 return rpc.JobQueueRunner(self.context, address_list)
1779 @locking.ssynchronized(_LOCK)
1781 def AddNode(self, node):
1782 """Register a new node with the queue.
1784 @type node: L{objects.Node}
1785 @param node: the node object to be added
1788 node_name = node.name
1789 assert node_name != self._my_hostname
1791 # Clean queue directory on added node
1792 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1793 msg = result.fail_msg
1795 logging.warning("Cannot cleanup queue directory on node %s: %s",
1798 if not node.master_candidate:
1799 # remove if existing, ignoring errors
1800 self._nodes.pop(node_name, None)
1801 # and skip the replication of the job ids
1804 # Upload the whole queue excluding archived jobs
1805 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1807 # Upload current serial file
1808 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1810 # Static address list
1811 addrs = [node.primary_ip]
1813 for file_name in files:
1815 content = utils.ReadFile(file_name)
1817 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1819 msg = result[node_name].fail_msg
1821 logging.error("Failed to upload file %s to node %s: %s",
1822 file_name, node_name, msg)
1824 # Set queue drained flag
1826 self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1828 msg = result[node_name].fail_msg
1830 logging.error("Failed to set queue drained flag on node %s: %s",
1833 self._nodes[node_name] = node.primary_ip
1835 @locking.ssynchronized(_LOCK)
1837 def RemoveNode(self, node_name):
1838 """Callback called when removing nodes from the cluster.
1840 @type node_name: str
1841 @param node_name: the name of the node to remove
1844 self._nodes.pop(node_name, None)
1847 def _CheckRpcResult(result, nodes, failmsg):
1848 """Verifies the status of an RPC call.
1850 Since we aim to keep consistency should this node (the current
1851 master) fail, we will log errors if our rpc fail, and especially
1852 log the case when more than half of the nodes fails.
1854 @param result: the data as returned from the rpc call
1856 @param nodes: the list of nodes we made the call to
1858 @param failmsg: the identifier to be used for logging
1865 msg = result[node].fail_msg
1868 logging.error("RPC call %s (%s) failed on node %s: %s",
1869 result[node].call, failmsg, node, msg)
1871 success.append(node)
1873 # +1 for the master node
1874 if (len(success) + 1) < len(failed):
1875 # TODO: Handle failing nodes
1876 logging.error("More than half of the nodes failed")
1878 def _GetNodeIp(self):
1879 """Helper for returning the node name/ip list.
1881 @rtype: (list, list)
1882 @return: a tuple of two lists, the first one with the node
1883 names and the second one with the node addresses
1886 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1887 name_list = self._nodes.keys()
1888 addr_list = [self._nodes[name] for name in name_list]
1889 return name_list, addr_list
1891 def _UpdateJobQueueFile(self, file_name, data, replicate):
1892 """Writes a file locally and then replicates it to all nodes.
1894 This function will replace the contents of a file on the local
1895 node and then replicate it to all the other nodes we have.
1897 @type file_name: str
1898 @param file_name: the path of the file to be replicated
1900 @param data: the new contents of the file
1901 @type replicate: boolean
1902 @param replicate: whether to spread the changes to the remote nodes
1905 getents = runtime.GetEnts()
1906 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1907 gid=getents.daemons_gid,
1908 mode=constants.JOB_QUEUE_FILES_PERMS)
1911 names, addrs = self._GetNodeIp()
1912 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1913 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1915 def _RenameFilesUnlocked(self, rename):
1916 """Renames a file locally and then replicate the change.
1918 This function will rename a file in the local queue directory
1919 and then replicate this rename to all the other nodes we have.
1921 @type rename: list of (old, new)
1922 @param rename: List containing tuples mapping old to new names
1925 # Rename them locally
1926 for old, new in rename:
1927 utils.RenameFile(old, new, mkdir=True)
1929 # ... and on all nodes
1930 names, addrs = self._GetNodeIp()
1931 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1932 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1934 def _NewSerialsUnlocked(self, count):
1935 """Generates a new job identifier.
1937 Job identifiers are unique during the lifetime of a cluster.
1939 @type count: integer
1940 @param count: how many serials to return
1942 @return: a list of job identifiers.
1945 assert ht.TNonNegativeInt(count)
1948 serial = self._last_serial + count
1951 self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1952 "%s\n" % serial, True)
1954 result = [jstore.FormatJobID(v)
1955 for v in range(self._last_serial + 1, serial + 1)]
1957 # Keep it only if we were able to write the file
1958 self._last_serial = serial
1960 assert len(result) == count
1965 def _GetJobPath(job_id):
1966 """Returns the job file for a given job id.
1969 @param job_id: the job identifier
1971 @return: the path to the job file
1974 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1977 def _GetArchivedJobPath(job_id):
1978 """Returns the archived job file for a give job id.
1981 @param job_id: the job identifier
1983 @return: the path to the archived job file
1986 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1987 jstore.GetArchiveDirectory(job_id),
1991 def _DetermineJobDirectories(archived):
1992 """Build list of directories containing job files.
1994 @type archived: bool
1995 @param archived: Whether to include directories for archived jobs
1999 result = [pathutils.QUEUE_DIR]
2002 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2003 result.extend(map(compat.partial(utils.PathJoin, archive_path),
2004 utils.ListVisibleFiles(archive_path)))
2009 def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2010 """Return all known job IDs.
2012 The method only looks at disk because it's a requirement that all
2013 jobs are present on disk (so in the _memcache we don't have any
2017 @param sort: perform sorting on the returned job ids
2019 @return: the list of job IDs
2024 for path in cls._DetermineJobDirectories(archived):
2025 for filename in utils.ListVisibleFiles(path):
2026 m = constants.JOB_FILE_RE.match(filename)
2028 jlist.append(int(m.group(1)))
2034 def _LoadJobUnlocked(self, job_id):
2035 """Loads a job from the disk or memory.
2037 Given a job id, this will return the cached job object if
2038 existing, or try to load the job from the disk. If loading from
2039 disk, it will also add the job to the cache.
2042 @param job_id: the job id
2043 @rtype: L{_QueuedJob} or None
2044 @return: either None or the job object
2047 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
2049 job = self._memcache.get(job_id, None)
2051 logging.debug("Found job %s in memcache", job_id)
2052 assert job.writable, "Found read-only job in memcache"
2056 job = self._LoadJobFromDisk(job_id, False)
2059 except errors.JobFileCorrupted:
2060 old_path = self._GetJobPath(job_id)
2061 new_path = self._GetArchivedJobPath(job_id)
2062 if old_path == new_path:
2063 # job already archived (future case)
2064 logging.exception("Can't parse job %s", job_id)
2067 logging.exception("Can't parse job %s, will archive.", job_id)
2068 self._RenameFilesUnlocked([(old_path, new_path)])
2071 assert job.writable, "Job just loaded is not writable"
2073 self._memcache[job_id] = job
2074 logging.debug("Added job %s to the cache", job_id)
2077 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2078 """Load the given job file from disk.
2080 Given a job file, read, load and restore it in a _QueuedJob format.
2083 @param job_id: job identifier
2084 @type try_archived: bool
2085 @param try_archived: Whether to try loading an archived job
2086 @rtype: L{_QueuedJob} or None
2087 @return: either None or the job object
2090 path_functions = [(self._GetJobPath, False)]
2093 path_functions.append((self._GetArchivedJobPath, True))
2098 for (fn, archived) in path_functions:
2099 filepath = fn(job_id)
2100 logging.debug("Loading job from %s", filepath)
2102 raw_data = utils.ReadFile(filepath)
2103 except EnvironmentError, err:
2104 if err.errno != errno.ENOENT:
2112 if writable is None:
2113 writable = not archived
2116 data = serializer.LoadJson(raw_data)
2117 job = _QueuedJob.Restore(self, data, writable, archived)
2118 except Exception, err: # pylint: disable=W0703
2119 raise errors.JobFileCorrupted(err)
2123 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2124 """Load the given job file from disk.
2126 Given a job file, read, load and restore it in a _QueuedJob format.
2127 In case of error reading the job, it gets returned as None, and the
2128 exception is logged.
2131 @param job_id: job identifier
2132 @type try_archived: bool
2133 @param try_archived: Whether to try loading an archived job
2134 @rtype: L{_QueuedJob} or None
2135 @return: either None or the job object
2139 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2140 except (errors.JobFileCorrupted, EnvironmentError):
2141 logging.exception("Can't load/parse job %s", job_id)
2144 def _UpdateQueueSizeUnlocked(self):
2145 """Update the queue size.
2148 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2150 @locking.ssynchronized(_LOCK)
2152 def SetDrainFlag(self, drain_flag):
2153 """Sets the drain flag for the queue.
2155 @type drain_flag: boolean
2156 @param drain_flag: Whether to set or unset the drain flag
2159 # Change flag locally
2160 jstore.SetDrainFlag(drain_flag)
2162 self._drained = drain_flag
2164 # ... and on all nodes
2165 (names, addrs) = self._GetNodeIp()
2167 self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2168 self._CheckRpcResult(result, self._nodes,
2169 "Setting queue drain flag to %s" % drain_flag)
2174 def _SubmitJobUnlocked(self, job_id, ops):
2175 """Create and store a new job.
2177 This enters the job into our job queue and also puts it on the new
2178 queue, in order for it to be picked up by the queue processors.
2180 @type job_id: job ID
2181 @param job_id: the job ID for the new job
2183 @param ops: The list of OpCodes that will become the new job.
2184 @rtype: L{_QueuedJob}
2185 @return: the job object to be queued
2186 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2187 @raise errors.GenericError: If an opcode is not valid
2190 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2191 raise errors.JobQueueFull()
2193 job = _QueuedJob(self, job_id, ops, True)
2195 for idx, op in enumerate(job.ops):
2197 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2198 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2199 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2200 " are %s" % (idx, op.priority, allowed))
2202 # Check job dependencies
2203 dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None)
2204 if not opcodes_base.TNoRelativeJobDependencies(dependencies):
2205 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2207 (idx, opcodes_base.TNoRelativeJobDependencies,
2211 self.UpdateJobUnlocked(job)
2213 self._queue_size += 1
2215 logging.debug("Adding new job %s to the cache", job_id)
2216 self._memcache[job_id] = job
2220 @locking.ssynchronized(_LOCK)
2222 @_RequireNonDrainedQueue
2223 def SubmitJob(self, ops):
2224 """Create and store a new job.
2226 @see: L{_SubmitJobUnlocked}
2229 (job_id, ) = self._NewSerialsUnlocked(1)
2230 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2233 @locking.ssynchronized(_LOCK)
2235 def SubmitJobToDrainedQueue(self, ops):
2236 """Forcefully create and store a new job.
2238 Do so, even if the job queue is drained.
2239 @see: L{_SubmitJobUnlocked}
2242 (job_id, ) = self._NewSerialsUnlocked(1)
2243 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2246 @locking.ssynchronized(_LOCK)
2248 @_RequireNonDrainedQueue
2249 def SubmitManyJobs(self, jobs):
2250 """Create and store multiple jobs.
2252 @see: L{_SubmitJobUnlocked}
2255 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2257 (results, added_jobs) = \
2258 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2260 self._EnqueueJobsUnlocked(added_jobs)
2265 def _FormatSubmitError(msg, ops):
2266 """Formats errors which occurred while submitting a job.
2269 return ("%s; opcodes %s" %
2270 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2273 def _ResolveJobDependencies(resolve_fn, deps):
2274 """Resolves relative job IDs in dependencies.
2276 @type resolve_fn: callable
2277 @param resolve_fn: Function to resolve a relative job ID
2279 @param deps: Dependencies
2280 @rtype: tuple; (boolean, string or list)
2281 @return: If successful (first tuple item), the returned list contains
2282 resolved job IDs along with the requested status; if not successful,
2283 the second element is an error message
2288 for (dep_job_id, dep_status) in deps:
2289 if ht.TRelativeJobId(dep_job_id):
2290 assert ht.TInt(dep_job_id) and dep_job_id < 0
2292 job_id = resolve_fn(dep_job_id)
2295 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2299 result.append((job_id, dep_status))
2301 return (True, result)
2303 def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2304 """Create and store multiple jobs.
2306 @see: L{_SubmitJobUnlocked}
2312 def resolve_fn(job_idx, reljobid):
2314 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2316 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2318 if getattr(op, opcodes_base.DEPEND_ATTR, None):
2320 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2323 # Abort resolving dependencies
2324 assert ht.TNonEmptyString(data), "No error message"
2326 # Use resolved dependencies
2330 job = self._SubmitJobUnlocked(job_id, ops)
2331 except errors.GenericError, err:
2333 data = self._FormatSubmitError(str(err), ops)
2337 added_jobs.append(job)
2339 results.append((status, data))
2341 return (results, added_jobs)
2343 @locking.ssynchronized(_LOCK)
2344 def _EnqueueJobs(self, jobs):
2345 """Helper function to add jobs to worker pool's queue.
2348 @param jobs: List of all jobs
2351 return self._EnqueueJobsUnlocked(jobs)
2353 def _EnqueueJobsUnlocked(self, jobs):
2354 """Helper function to add jobs to worker pool's queue.
2357 @param jobs: List of all jobs
2360 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2361 self._wpool.AddManyTasks([(job, ) for job in jobs],
2362 priority=[job.CalcPriority() for job in jobs],
2363 task_id=map(_GetIdAttr, jobs))
2365 def _GetJobStatusForDependencies(self, job_id):
2366 """Gets the status of a job for dependencies.
2369 @param job_id: Job ID
2370 @raise errors.JobLost: If job can't be found
2373 # Not using in-memory cache as doing so would require an exclusive lock
2375 # Try to load from disk
2376 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2378 assert not job.writable, "Got writable job" # pylint: disable=E1101
2381 return job.CalcStatus()
2383 raise errors.JobLost("Job %s not found" % job_id)
2386 def UpdateJobUnlocked(self, job, replicate=True):
2387 """Update a job's on disk storage.
2389 After a job has been modified, this function needs to be called in
2390 order to write the changes to disk and replicate them to the other
2393 @type job: L{_QueuedJob}
2394 @param job: the changed job
2395 @type replicate: boolean
2396 @param replicate: whether to replicate the change to remote nodes
2400 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2401 assert (finalized ^ (job.end_timestamp is None))
2402 assert job.writable, "Can't update read-only job"
2403 assert not job.archived, "Can't update archived job"
2405 filename = self._GetJobPath(job.id)
2406 data = serializer.DumpJson(job.Serialize())
2407 logging.debug("Writing job %s to %s", job.id, filename)
2408 self._UpdateJobQueueFile(filename, data, replicate)
2410 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2412 """Waits for changes in a job.
2415 @param job_id: Job identifier
2416 @type fields: list of strings
2417 @param fields: Which fields to check for changes
2418 @type prev_job_info: list or None
2419 @param prev_job_info: Last job information returned
2420 @type prev_log_serial: int
2421 @param prev_log_serial: Last job message serial number
2422 @type timeout: float
2423 @param timeout: maximum time to wait in seconds
2424 @rtype: tuple (job info, log entries)
2425 @return: a tuple of the job information as required via
2426 the fields parameter, and the log entries as a list
2428 if the job has not changed and the timeout has expired,
2429 we instead return a special value,
2430 L{constants.JOB_NOTCHANGED}, which should be interpreted
2431 as such by the clients
2434 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2437 helper = _WaitForJobChangesHelper()
2439 return helper(self._GetJobPath(job_id), load_fn,
2440 fields, prev_job_info, prev_log_serial, timeout)
2442 @locking.ssynchronized(_LOCK)
2444 def CancelJob(self, job_id):
2447 This will only succeed if the job has not started yet.
2450 @param job_id: job ID of job to be cancelled.
2453 logging.info("Cancelling job %s", job_id)
2455 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2457 @locking.ssynchronized(_LOCK)
2459 def ChangeJobPriority(self, job_id, priority):
2460 """Changes a job's priority.
2463 @param job_id: ID of the job whose priority should be changed
2465 @param priority: New priority
2468 logging.info("Changing priority of job %s to %s", job_id, priority)
2470 if priority not in constants.OP_PRIO_SUBMIT_VALID:
2471 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2472 raise errors.GenericError("Invalid priority %s, allowed are %s" %
2473 (priority, allowed))
2476 (success, msg) = job.ChangePriority(priority)
2480 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2481 except workerpool.NoSuchTask:
2482 logging.debug("Job %s is not in workerpool at this time", job.id)
2484 return (success, msg)
2486 return self._ModifyJobUnlocked(job_id, fn)
2488 def _ModifyJobUnlocked(self, job_id, mod_fn):
2492 @param job_id: Job ID
2493 @type mod_fn: callable
2494 @param mod_fn: Modifying function, receiving job object as parameter,
2495 returning tuple of (status boolean, message string)
2498 job = self._LoadJobUnlocked(job_id)
2500 logging.debug("Job %s not found", job_id)
2501 return (False, "Job %s not found" % job_id)
2503 assert job.writable, "Can't modify read-only job"
2504 assert not job.archived, "Can't modify archived job"
2506 (success, msg) = mod_fn(job)
2509 # If the job was finalized (e.g. cancelled), this is the final write
2510 # allowed. The job can be archived anytime.
2511 self.UpdateJobUnlocked(job)
2513 return (success, msg)
2516 def _ArchiveJobsUnlocked(self, jobs):
2519 @type jobs: list of L{_QueuedJob}
2520 @param jobs: Job objects
2522 @return: Number of archived jobs
2528 assert job.writable, "Can't archive read-only job"
2529 assert not job.archived, "Can't cancel archived job"
2531 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2532 logging.debug("Job %s is not yet done", job.id)
2535 archive_jobs.append(job)
2537 old = self._GetJobPath(job.id)
2538 new = self._GetArchivedJobPath(job.id)
2539 rename_files.append((old, new))
2541 # TODO: What if 1..n files fail to rename?
2542 self._RenameFilesUnlocked(rename_files)
2544 logging.debug("Successfully archived job(s) %s",
2545 utils.CommaJoin(job.id for job in archive_jobs))
2547 # Since we haven't quite checked, above, if we succeeded or failed renaming
2548 # the files, we update the cached queue size from the filesystem. When we
2549 # get around to fix the TODO: above, we can use the number of actually
2550 # archived jobs to fix this.
2551 self._UpdateQueueSizeUnlocked()
2552 return len(archive_jobs)
2554 @locking.ssynchronized(_LOCK)
2556 def ArchiveJob(self, job_id):
2559 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2562 @param job_id: Job ID of job to be archived.
2564 @return: Whether job was archived
2567 logging.info("Archiving job %s", job_id)
2569 job = self._LoadJobUnlocked(job_id)
2571 logging.debug("Job %s not found", job_id)
2574 return self._ArchiveJobsUnlocked([job]) == 1
2576 @locking.ssynchronized(_LOCK)
2578 def AutoArchiveJobs(self, age, timeout):
2579 """Archives all jobs based on age.
2581 The method will archive all jobs which are older than the age
2582 parameter. For jobs that don't have an end timestamp, the start
2583 timestamp will be considered. The special '-1' age will cause
2584 archival of all jobs (that are not running or queued).
2587 @param age: the minimum age in seconds
2590 logging.info("Archiving jobs with age more than %s seconds", age)
2593 end_time = now + timeout
2597 all_job_ids = self._GetJobIDsUnlocked()
2599 for idx, job_id in enumerate(all_job_ids):
2600 last_touched = idx + 1
2602 # Not optimal because jobs could be pending
2603 # TODO: Measure average duration for job archival and take number of
2604 # pending jobs into account.
2605 if time.time() > end_time:
2608 # Returns None if the job failed to load
2609 job = self._LoadJobUnlocked(job_id)
2611 if job.end_timestamp is None:
2612 if job.start_timestamp is None:
2613 job_age = job.received_timestamp
2615 job_age = job.start_timestamp
2617 job_age = job.end_timestamp
2619 if age == -1 or now - job_age[0] > age:
2622 # Archive 10 jobs at a time
2623 if len(pending) >= 10:
2624 archived_count += self._ArchiveJobsUnlocked(pending)
2628 archived_count += self._ArchiveJobsUnlocked(pending)
2630 return (archived_count, len(all_job_ids) - last_touched)
2632 def _Query(self, fields, qfilter):
2633 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2636 # Archived jobs are only looked at if the "archived" field is referenced
2637 # either as a requested field or in the filter. By default archived jobs
2639 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2641 job_ids = qobj.RequestedNames()
2643 list_all = (job_ids is None)
2646 # Since files are added to/removed from the queue atomically, there's no
2647 # risk of getting the job ids in an inconsistent state.
2648 job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2652 for job_id in job_ids:
2653 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2654 if job is not None or not list_all:
2655 jobs.append((job_id, job))
2657 return (qobj, jobs, list_all)
2659 def QueryJobs(self, fields, qfilter):
2660 """Returns a list of jobs in queue.
2662 @type fields: sequence
2663 @param fields: List of wanted fields
2664 @type qfilter: None or query2 filter (list)
2665 @param qfilter: Query filter
2668 (qobj, ctx, _) = self._Query(fields, qfilter)
2670 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2672 def OldStyleQueryJobs(self, job_ids, fields):
2673 """Returns a list of jobs in queue.
2676 @param job_ids: sequence of job identifiers or None for all
2678 @param fields: names of fields to return
2680 @return: list one element per job, each element being list with
2681 the requested fields
2685 job_ids = [int(jid) for jid in job_ids]
2686 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2688 (qobj, ctx, _) = self._Query(fields, qfilter)
2690 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2692 @locking.ssynchronized(_LOCK)
2693 def PrepareShutdown(self):
2694 """Prepare to stop the job queue.
2696 Disables execution of jobs in the workerpool and returns whether there are
2697 any jobs currently running. If the latter is the case, the job queue is not
2698 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2699 be called without interfering with any job. Queued and unfinished jobs will
2700 be resumed next time.
2702 Once this function has been called no new job submissions will be accepted
2703 (see L{_RequireNonDrainedQueue}).
2706 @return: Whether there are any running jobs
2709 if self._accepting_jobs:
2710 self._accepting_jobs = False
2712 # Tell worker pool to stop processing pending tasks
2713 self._wpool.SetActive(False)
2715 return self._wpool.HasRunningTasks()
2717 def AcceptingJobsUnlocked(self):
2718 """Returns whether jobs are accepted.
2720 Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2721 queue is shutting down.
2726 return self._accepting_jobs
2728 @locking.ssynchronized(_LOCK)
2731 """Stops the job queue.
2733 This shutdowns all the worker threads an closes the queue.
2736 self._wpool.TerminateWorkers()
2738 self._queue_filelock.Close()
2739 self._queue_filelock = None