4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
41 # pylint: disable-msg=E0611
42 from pyinotify import pyinotify
46 from ganeti import asyncnotifier
47 from ganeti import constants
48 from ganeti import serializer
49 from ganeti import workerpool
50 from ganeti import locking
51 from ganeti import opcodes
52 from ganeti import errors
53 from ganeti import mcpu
54 from ganeti import utils
55 from ganeti import jstore
56 from ganeti import rpc
57 from ganeti import runtime
58 from ganeti import netutils
59 from ganeti import compat
64 JOBS_PER_ARCHIVE_DIRECTORY = 10000
66 # member lock names to be passed to @ssynchronized decorator
71 class CancelJob(Exception):
72 """Special exception to cancel a job.
78 """Returns the current timestamp.
81 @return: the current time in the (seconds, microseconds) format
84 return utils.SplitTime(time.time())
87 class _QueuedOpCode(object):
88 """Encapsulates an opcode object.
90 @ivar log: holds the execution log and consists of tuples
91 of the form C{(log_serial, timestamp, level, message)}
92 @ivar input: the OpCode we encapsulate
93 @ivar status: the current status
94 @ivar result: the result of the LU execution
95 @ivar start_timestamp: timestamp for the start of the execution
96 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
97 @ivar stop_timestamp: timestamp for the end of the execution
100 __slots__ = ["input", "status", "result", "log", "priority",
101 "start_timestamp", "exec_timestamp", "end_timestamp",
104 def __init__(self, op):
105 """Constructor for the _QuededOpCode.
107 @type op: L{opcodes.OpCode}
108 @param op: the opcode we encapsulate
112 self.status = constants.OP_STATUS_QUEUED
115 self.start_timestamp = None
116 self.exec_timestamp = None
117 self.end_timestamp = None
119 # Get initial priority (it might change during the lifetime of this opcode)
120 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
123 def Restore(cls, state):
124 """Restore the _QueuedOpCode from the serialized form.
127 @param state: the serialized state
128 @rtype: _QueuedOpCode
129 @return: a new _QueuedOpCode instance
132 obj = _QueuedOpCode.__new__(cls)
133 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
134 obj.status = state["status"]
135 obj.result = state["result"]
136 obj.log = state["log"]
137 obj.start_timestamp = state.get("start_timestamp", None)
138 obj.exec_timestamp = state.get("exec_timestamp", None)
139 obj.end_timestamp = state.get("end_timestamp", None)
140 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
144 """Serializes this _QueuedOpCode.
147 @return: the dictionary holding the serialized state
151 "input": self.input.__getstate__(),
152 "status": self.status,
153 "result": self.result,
155 "start_timestamp": self.start_timestamp,
156 "exec_timestamp": self.exec_timestamp,
157 "end_timestamp": self.end_timestamp,
158 "priority": self.priority,
162 class _QueuedJob(object):
163 """In-memory job representation.
165 This is what we use to track the user-submitted jobs. Locking must
166 be taken care of by users of this class.
168 @type queue: L{JobQueue}
169 @ivar queue: the parent queue
172 @ivar ops: the list of _QueuedOpCode that constitute the job
173 @type log_serial: int
174 @ivar log_serial: holds the index for the next log entry
175 @ivar received_timestamp: the timestamp for when the job was received
176 @ivar start_timestmap: the timestamp for start of execution
177 @ivar end_timestamp: the timestamp for end of execution
178 @ivar writable: Whether the job is allowed to be modified
181 # pylint: disable-msg=W0212
182 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
183 "received_timestamp", "start_timestamp", "end_timestamp",
184 "__weakref__", "processor_lock", "writable"]
186 def __init__(self, queue, job_id, ops, writable):
187 """Constructor for the _QueuedJob.
189 @type queue: L{JobQueue}
190 @param queue: our parent queue
192 @param job_id: our job id
194 @param ops: the list of opcodes we hold, which will be encapsulated
197 @param writable: Whether job can be modified
201 raise errors.GenericError("A job needs at least one opcode")
205 self.ops = [_QueuedOpCode(op) for op in ops]
207 self.received_timestamp = TimeStampNow()
208 self.start_timestamp = None
209 self.end_timestamp = None
211 self._InitInMemory(self, writable)
214 def _InitInMemory(obj, writable):
215 """Initializes in-memory variables.
218 obj.writable = writable
222 # Read-only jobs are not processed and therefore don't need a lock
224 obj.processor_lock = threading.Lock()
226 obj.processor_lock = None
229 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
231 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
233 return "<%s at %#x>" % (" ".join(status), id(self))
236 def Restore(cls, queue, state, writable):
237 """Restore a _QueuedJob from serialized state:
239 @type queue: L{JobQueue}
240 @param queue: to which queue the restored job belongs
242 @param state: the serialized state
244 @param writable: Whether job can be modified
246 @return: the restored _JobQueue instance
249 obj = _QueuedJob.__new__(cls)
252 obj.received_timestamp = state.get("received_timestamp", None)
253 obj.start_timestamp = state.get("start_timestamp", None)
254 obj.end_timestamp = state.get("end_timestamp", None)
258 for op_state in state["ops"]:
259 op = _QueuedOpCode.Restore(op_state)
260 for log_entry in op.log:
261 obj.log_serial = max(obj.log_serial, log_entry[0])
264 cls._InitInMemory(obj, writable)
269 """Serialize the _JobQueue instance.
272 @return: the serialized state
277 "ops": [op.Serialize() for op in self.ops],
278 "start_timestamp": self.start_timestamp,
279 "end_timestamp": self.end_timestamp,
280 "received_timestamp": self.received_timestamp,
283 def CalcStatus(self):
284 """Compute the status of this job.
286 This function iterates over all the _QueuedOpCodes in the job and
287 based on their status, computes the job status.
290 - if we find a cancelled, or finished with error, the job
291 status will be the same
292 - otherwise, the last opcode with the status one of:
297 will determine the job status
299 - otherwise, it means either all opcodes are queued, or success,
300 and the job status will be the same
302 @return: the job status
305 status = constants.JOB_STATUS_QUEUED
309 if op.status == constants.OP_STATUS_SUCCESS:
314 if op.status == constants.OP_STATUS_QUEUED:
316 elif op.status == constants.OP_STATUS_WAITING:
317 status = constants.JOB_STATUS_WAITING
318 elif op.status == constants.OP_STATUS_RUNNING:
319 status = constants.JOB_STATUS_RUNNING
320 elif op.status == constants.OP_STATUS_CANCELING:
321 status = constants.JOB_STATUS_CANCELING
323 elif op.status == constants.OP_STATUS_ERROR:
324 status = constants.JOB_STATUS_ERROR
325 # The whole job fails if one opcode failed
327 elif op.status == constants.OP_STATUS_CANCELED:
328 status = constants.OP_STATUS_CANCELED
332 status = constants.JOB_STATUS_SUCCESS
336 def CalcPriority(self):
337 """Gets the current priority for this job.
339 Only unfinished opcodes are considered. When all are done, the default
345 priorities = [op.priority for op in self.ops
346 if op.status not in constants.OPS_FINALIZED]
349 # All opcodes are done, assume default priority
350 return constants.OP_PRIO_DEFAULT
352 return min(priorities)
354 def GetLogEntries(self, newer_than):
355 """Selectively returns the log entries.
357 @type newer_than: None or int
358 @param newer_than: if this is None, return all log entries,
359 otherwise return only the log entries with serial higher
362 @return: the list of the log entries selected
365 if newer_than is None:
372 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
376 def GetInfo(self, fields):
377 """Returns information about a job.
380 @param fields: names of fields to return
382 @return: list with one element for each field
383 @raise errors.OpExecError: when an invalid field
391 elif fname == "status":
392 row.append(self.CalcStatus())
393 elif fname == "priority":
394 row.append(self.CalcPriority())
396 row.append([op.input.__getstate__() for op in self.ops])
397 elif fname == "opresult":
398 row.append([op.result for op in self.ops])
399 elif fname == "opstatus":
400 row.append([op.status for op in self.ops])
401 elif fname == "oplog":
402 row.append([op.log for op in self.ops])
403 elif fname == "opstart":
404 row.append([op.start_timestamp for op in self.ops])
405 elif fname == "opexec":
406 row.append([op.exec_timestamp for op in self.ops])
407 elif fname == "opend":
408 row.append([op.end_timestamp for op in self.ops])
409 elif fname == "oppriority":
410 row.append([op.priority for op in self.ops])
411 elif fname == "received_ts":
412 row.append(self.received_timestamp)
413 elif fname == "start_ts":
414 row.append(self.start_timestamp)
415 elif fname == "end_ts":
416 row.append(self.end_timestamp)
417 elif fname == "summary":
418 row.append([op.input.Summary() for op in self.ops])
420 raise errors.OpExecError("Invalid self query field '%s'" % fname)
423 def MarkUnfinishedOps(self, status, result):
424 """Mark unfinished opcodes with a given status and result.
426 This is an utility function for marking all running or waiting to
427 be run opcodes with a given status. Opcodes which are already
428 finalised are not changed.
430 @param status: a given opcode status
431 @param result: the opcode result
436 if op.status in constants.OPS_FINALIZED:
437 assert not_marked, "Finalized opcodes found after non-finalized ones"
444 """Marks the job as finalized.
447 self.end_timestamp = TimeStampNow()
450 """Marks job as canceled/-ing if possible.
452 @rtype: tuple; (bool, string)
453 @return: Boolean describing whether job was successfully canceled or marked
454 as canceling and a text message
457 status = self.CalcStatus()
459 if status == constants.JOB_STATUS_QUEUED:
460 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
461 "Job canceled by request")
463 return (True, "Job %s canceled" % self.id)
465 elif status == constants.JOB_STATUS_WAITING:
466 # The worker will notice the new status and cancel the job
467 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
468 return (True, "Job %s will be canceled" % self.id)
471 logging.debug("Job %s is no longer waiting in the queue", self.id)
472 return (False, "Job %s is no longer waiting in the queue" % self.id)
475 class _OpExecCallbacks(mcpu.OpExecCbBase):
476 def __init__(self, queue, job, op):
477 """Initializes this class.
479 @type queue: L{JobQueue}
480 @param queue: Job queue
481 @type job: L{_QueuedJob}
482 @param job: Job object
483 @type op: L{_QueuedOpCode}
487 assert queue, "Queue is missing"
488 assert job, "Job is missing"
489 assert op, "Opcode is missing"
495 def _CheckCancel(self):
496 """Raises an exception to cancel the job if asked to.
499 # Cancel here if we were asked to
500 if self._op.status == constants.OP_STATUS_CANCELING:
501 logging.debug("Canceling opcode")
504 @locking.ssynchronized(_QUEUE, shared=1)
505 def NotifyStart(self):
506 """Mark the opcode as running, not lock-waiting.
508 This is called from the mcpu code as a notifier function, when the LU is
509 finally about to start the Exec() method. Of course, to have end-user
510 visible results, the opcode must be initially (before calling into
511 Processor.ExecOpCode) set to OP_STATUS_WAITING.
514 assert self._op in self._job.ops
515 assert self._op.status in (constants.OP_STATUS_WAITING,
516 constants.OP_STATUS_CANCELING)
518 # Cancel here if we were asked to
521 logging.debug("Opcode is now running")
523 self._op.status = constants.OP_STATUS_RUNNING
524 self._op.exec_timestamp = TimeStampNow()
526 # And finally replicate the job status
527 self._queue.UpdateJobUnlocked(self._job)
529 @locking.ssynchronized(_QUEUE, shared=1)
530 def _AppendFeedback(self, timestamp, log_type, log_msg):
531 """Internal feedback append function, with locks
534 self._job.log_serial += 1
535 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
536 self._queue.UpdateJobUnlocked(self._job, replicate=False)
538 def Feedback(self, *args):
539 """Append a log entry.
545 log_type = constants.ELOG_MESSAGE
548 (log_type, log_msg) = args
550 # The time is split to make serialization easier and not lose
552 timestamp = utils.SplitTime(time.time())
553 self._AppendFeedback(timestamp, log_type, log_msg)
555 def CheckCancel(self):
556 """Check whether job has been cancelled.
559 assert self._op.status in (constants.OP_STATUS_WAITING,
560 constants.OP_STATUS_CANCELING)
562 # Cancel here if we were asked to
565 def SubmitManyJobs(self, jobs):
566 """Submits jobs for processing.
568 See L{JobQueue.SubmitManyJobs}.
571 # Locking is done in job queue
572 return self._queue.SubmitManyJobs(jobs)
575 class _JobChangesChecker(object):
576 def __init__(self, fields, prev_job_info, prev_log_serial):
577 """Initializes this class.
579 @type fields: list of strings
580 @param fields: Fields requested by LUXI client
581 @type prev_job_info: string
582 @param prev_job_info: previous job info, as passed by the LUXI client
583 @type prev_log_serial: string
584 @param prev_log_serial: previous job serial, as passed by the LUXI client
587 self._fields = fields
588 self._prev_job_info = prev_job_info
589 self._prev_log_serial = prev_log_serial
591 def __call__(self, job):
592 """Checks whether job has changed.
594 @type job: L{_QueuedJob}
595 @param job: Job object
598 assert not job.writable, "Expected read-only job"
600 status = job.CalcStatus()
601 job_info = job.GetInfo(self._fields)
602 log_entries = job.GetLogEntries(self._prev_log_serial)
604 # Serializing and deserializing data can cause type changes (e.g. from
605 # tuple to list) or precision loss. We're doing it here so that we get
606 # the same modifications as the data received from the client. Without
607 # this, the comparison afterwards might fail without the data being
608 # significantly different.
609 # TODO: we just deserialized from disk, investigate how to make sure that
610 # the job info and log entries are compatible to avoid this further step.
611 # TODO: Doing something like in testutils.py:UnifyValueType might be more
612 # efficient, though floats will be tricky
613 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
614 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
616 # Don't even try to wait if the job is no longer running, there will be
618 if (status not in (constants.JOB_STATUS_QUEUED,
619 constants.JOB_STATUS_RUNNING,
620 constants.JOB_STATUS_WAITING) or
621 job_info != self._prev_job_info or
622 (log_entries and self._prev_log_serial != log_entries[0][0])):
623 logging.debug("Job %s changed", job.id)
624 return (job_info, log_entries)
629 class _JobFileChangesWaiter(object):
630 def __init__(self, filename):
631 """Initializes this class.
633 @type filename: string
634 @param filename: Path to job file
635 @raises errors.InotifyError: if the notifier cannot be setup
638 self._wm = pyinotify.WatchManager()
639 self._inotify_handler = \
640 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
642 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
644 self._inotify_handler.enable()
646 # pyinotify doesn't close file descriptors automatically
647 self._notifier.stop()
650 def _OnInotify(self, notifier_enabled):
651 """Callback for inotify.
654 if not notifier_enabled:
655 self._inotify_handler.enable()
657 def Wait(self, timeout):
658 """Waits for the job file to change.
661 @param timeout: Timeout in seconds
662 @return: Whether there have been events
666 have_events = self._notifier.check_events(timeout * 1000)
668 self._notifier.read_events()
669 self._notifier.process_events()
673 """Closes underlying notifier and its file descriptor.
676 self._notifier.stop()
679 class _JobChangesWaiter(object):
680 def __init__(self, filename):
681 """Initializes this class.
683 @type filename: string
684 @param filename: Path to job file
687 self._filewaiter = None
688 self._filename = filename
690 def Wait(self, timeout):
691 """Waits for a job to change.
694 @param timeout: Timeout in seconds
695 @return: Whether there have been events
699 return self._filewaiter.Wait(timeout)
701 # Lazy setup: Avoid inotify setup cost when job file has already changed.
702 # If this point is reached, return immediately and let caller check the job
703 # file again in case there were changes since the last check. This avoids a
705 self._filewaiter = _JobFileChangesWaiter(self._filename)
710 """Closes underlying waiter.
714 self._filewaiter.Close()
717 class _WaitForJobChangesHelper(object):
718 """Helper class using inotify to wait for changes in a job file.
720 This class takes a previous job status and serial, and alerts the client when
721 the current job status has changed.
725 def _CheckForChanges(counter, job_load_fn, check_fn):
726 if counter.next() > 0:
727 # If this isn't the first check the job is given some more time to change
728 # again. This gives better performance for jobs generating many
734 raise errors.JobLost()
736 result = check_fn(job)
738 raise utils.RetryAgain()
742 def __call__(self, filename, job_load_fn,
743 fields, prev_job_info, prev_log_serial, timeout):
744 """Waits for changes on a job.
746 @type filename: string
747 @param filename: File on which to wait for changes
748 @type job_load_fn: callable
749 @param job_load_fn: Function to load job
750 @type fields: list of strings
751 @param fields: Which fields to check for changes
752 @type prev_job_info: list or None
753 @param prev_job_info: Last job information returned
754 @type prev_log_serial: int
755 @param prev_log_serial: Last job message serial number
757 @param timeout: maximum time to wait in seconds
760 counter = itertools.count()
762 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
763 waiter = _JobChangesWaiter(filename)
765 return utils.Retry(compat.partial(self._CheckForChanges,
766 counter, job_load_fn, check_fn),
767 utils.RETRY_REMAINING_TIME, timeout,
771 except (errors.InotifyError, errors.JobLost):
773 except utils.RetryTimeout:
774 return constants.JOB_NOTCHANGED
777 def _EncodeOpError(err):
778 """Encodes an error which occurred while processing an opcode.
781 if isinstance(err, errors.GenericError):
784 to_encode = errors.OpExecError(str(err))
786 return errors.EncodeException(to_encode)
789 class _TimeoutStrategyWrapper:
790 def __init__(self, fn):
791 """Initializes this class.
798 """Gets the next timeout if necessary.
801 if self._next is None:
802 self._next = self._fn()
805 """Returns the next timeout.
812 """Returns the current timeout and advances the internal state.
821 class _OpExecContext:
822 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
823 """Initializes this class.
828 self.log_prefix = log_prefix
829 self.summary = op.input.Summary()
831 # Create local copy to modify
832 if getattr(op.input, opcodes.DEPEND_ATTR, None):
833 self.jobdeps = op.input.depends[:]
837 self._timeout_strategy_factory = timeout_strategy_factory
838 self._ResetTimeoutStrategy()
840 def _ResetTimeoutStrategy(self):
841 """Creates a new timeout strategy.
844 self._timeout_strategy = \
845 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
847 def CheckPriorityIncrease(self):
848 """Checks whether priority can and should be increased.
850 Called when locks couldn't be acquired.
855 # Exhausted all retries and next round should not use blocking acquire
857 if (self._timeout_strategy.Peek() is None and
858 op.priority > constants.OP_PRIO_HIGHEST):
859 logging.debug("Increasing priority")
861 self._ResetTimeoutStrategy()
866 def GetNextLockTimeout(self):
867 """Returns the next lock acquire timeout.
870 return self._timeout_strategy.Next()
873 class _JobProcessor(object):
876 FINISHED) = range(1, 4)
878 def __init__(self, queue, opexec_fn, job,
879 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
880 """Initializes this class.
884 self.opexec_fn = opexec_fn
886 self._timeout_strategy_factory = _timeout_strategy_factory
889 def _FindNextOpcode(job, timeout_strategy_factory):
890 """Locates the next opcode to run.
892 @type job: L{_QueuedJob}
893 @param job: Job object
894 @param timeout_strategy_factory: Callable to create new timeout strategy
897 # Create some sort of a cache to speed up locating next opcode for future
899 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
900 # pending and one for processed ops.
901 if job.ops_iter is None:
902 job.ops_iter = enumerate(job.ops)
904 # Find next opcode to run
907 (idx, op) = job.ops_iter.next()
908 except StopIteration:
909 raise errors.ProgrammerError("Called for a finished job")
911 if op.status == constants.OP_STATUS_RUNNING:
912 # Found an opcode already marked as running
913 raise errors.ProgrammerError("Called for job marked as running")
915 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
916 timeout_strategy_factory)
918 if op.status not in constants.OPS_FINALIZED:
921 # This is a job that was partially completed before master daemon
922 # shutdown, so it can be expected that some opcodes are already
923 # completed successfully (if any did error out, then the whole job
924 # should have been aborted and not resubmitted for processing).
925 logging.info("%s: opcode %s already processed, skipping",
926 opctx.log_prefix, opctx.summary)
929 def _MarkWaitlock(job, op):
930 """Marks an opcode as waiting for locks.
932 The job's start timestamp is also set if necessary.
934 @type job: L{_QueuedJob}
935 @param job: Job object
936 @type op: L{_QueuedOpCode}
937 @param op: Opcode object
941 assert op.status in (constants.OP_STATUS_QUEUED,
942 constants.OP_STATUS_WAITING)
948 if op.status == constants.OP_STATUS_QUEUED:
949 op.status = constants.OP_STATUS_WAITING
952 if op.start_timestamp is None:
953 op.start_timestamp = TimeStampNow()
956 if job.start_timestamp is None:
957 job.start_timestamp = op.start_timestamp
960 assert op.status == constants.OP_STATUS_WAITING
965 def _CheckDependencies(queue, job, opctx):
966 """Checks if an opcode has dependencies and if so, processes them.
968 @type queue: L{JobQueue}
969 @param queue: Queue object
970 @type job: L{_QueuedJob}
971 @param job: Job object
972 @type opctx: L{_OpExecContext}
973 @param opctx: Opcode execution context
975 @return: Whether opcode will be re-scheduled by dependency tracker
983 (dep_job_id, dep_status) = opctx.jobdeps[0]
985 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
987 assert ht.TNonEmptyString(depmsg), "No dependency message"
989 logging.info("%s: %s", opctx.log_prefix, depmsg)
991 if depresult == _JobDependencyManager.CONTINUE:
992 # Remove dependency and continue
995 elif depresult == _JobDependencyManager.WAIT:
996 # Need to wait for notification, dependency tracker will re-add job
1001 elif depresult == _JobDependencyManager.CANCEL:
1002 # Job was cancelled, cancel this job as well
1004 assert op.status == constants.OP_STATUS_CANCELING
1007 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1008 _JobDependencyManager.ERROR):
1009 # Job failed or there was an error, this job must fail
1010 op.status = constants.OP_STATUS_ERROR
1011 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1015 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1020 def _ExecOpCodeUnlocked(self, opctx):
1021 """Processes one opcode and returns the result.
1026 assert op.status == constants.OP_STATUS_WAITING
1028 timeout = opctx.GetNextLockTimeout()
1031 # Make sure not to hold queue lock while calling ExecOpCode
1032 result = self.opexec_fn(op.input,
1033 _OpExecCallbacks(self.queue, self.job, op),
1034 timeout=timeout, priority=op.priority)
1035 except mcpu.LockAcquireTimeout:
1036 assert timeout is not None, "Received timeout for blocking acquire"
1037 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1039 assert op.status in (constants.OP_STATUS_WAITING,
1040 constants.OP_STATUS_CANCELING)
1042 # Was job cancelled while we were waiting for the lock?
1043 if op.status == constants.OP_STATUS_CANCELING:
1044 return (constants.OP_STATUS_CANCELING, None)
1046 # Stay in waitlock while trying to re-acquire lock
1047 return (constants.OP_STATUS_WAITING, None)
1049 logging.exception("%s: Canceling job", opctx.log_prefix)
1050 assert op.status == constants.OP_STATUS_CANCELING
1051 return (constants.OP_STATUS_CANCELING, None)
1052 except Exception, err: # pylint: disable-msg=W0703
1053 logging.exception("%s: Caught exception in %s",
1054 opctx.log_prefix, opctx.summary)
1055 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1057 logging.debug("%s: %s successful",
1058 opctx.log_prefix, opctx.summary)
1059 return (constants.OP_STATUS_SUCCESS, result)
1061 def __call__(self, _nextop_fn=None):
1062 """Continues execution of a job.
1064 @param _nextop_fn: Callback function for tests
1065 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1066 be deferred and C{WAITDEP} if the dependency manager
1067 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1073 logging.debug("Processing job %s", job.id)
1075 queue.acquire(shared=1)
1077 opcount = len(job.ops)
1079 assert job.writable, "Expected writable job"
1081 # Don't do anything for finalized jobs
1082 if job.CalcStatus() in constants.JOBS_FINALIZED:
1083 return self.FINISHED
1085 # Is a previous opcode still pending?
1087 opctx = job.cur_opctx
1088 job.cur_opctx = None
1090 if __debug__ and _nextop_fn:
1092 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1097 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1098 constants.OP_STATUS_CANCELING)
1099 for i in job.ops[opctx.index + 1:])
1101 assert op.status in (constants.OP_STATUS_QUEUED,
1102 constants.OP_STATUS_WAITING,
1103 constants.OP_STATUS_CANCELING)
1105 assert (op.priority <= constants.OP_PRIO_LOWEST and
1106 op.priority >= constants.OP_PRIO_HIGHEST)
1110 if op.status != constants.OP_STATUS_CANCELING:
1111 assert op.status in (constants.OP_STATUS_QUEUED,
1112 constants.OP_STATUS_WAITING)
1114 # Prepare to start opcode
1115 if self._MarkWaitlock(job, op):
1117 queue.UpdateJobUnlocked(job)
1119 assert op.status == constants.OP_STATUS_WAITING
1120 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1121 assert job.start_timestamp and op.start_timestamp
1122 assert waitjob is None
1124 # Check if waiting for a job is necessary
1125 waitjob = self._CheckDependencies(queue, job, opctx)
1127 assert op.status in (constants.OP_STATUS_WAITING,
1128 constants.OP_STATUS_CANCELING,
1129 constants.OP_STATUS_ERROR)
1131 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1132 constants.OP_STATUS_ERROR)):
1133 logging.info("%s: opcode %s waiting for locks",
1134 opctx.log_prefix, opctx.summary)
1136 assert not opctx.jobdeps, "Not all dependencies were removed"
1140 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1142 queue.acquire(shared=1)
1144 op.status = op_status
1145 op.result = op_result
1149 if op.status == constants.OP_STATUS_WAITING:
1150 # Couldn't get locks in time
1151 assert not op.end_timestamp
1154 op.end_timestamp = TimeStampNow()
1156 if op.status == constants.OP_STATUS_CANCELING:
1157 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1158 for i in job.ops[opctx.index:])
1160 assert op.status in constants.OPS_FINALIZED
1162 if op.status == constants.OP_STATUS_WAITING or waitjob:
1165 if not waitjob and opctx.CheckPriorityIncrease():
1166 # Priority was changed, need to update on-disk file
1167 queue.UpdateJobUnlocked(job)
1169 # Keep around for another round
1170 job.cur_opctx = opctx
1172 assert (op.priority <= constants.OP_PRIO_LOWEST and
1173 op.priority >= constants.OP_PRIO_HIGHEST)
1175 # In no case must the status be finalized here
1176 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1179 # Ensure all opcodes so far have been successful
1180 assert (opctx.index == 0 or
1181 compat.all(i.status == constants.OP_STATUS_SUCCESS
1182 for i in job.ops[:opctx.index]))
1185 job.cur_opctx = None
1187 if op.status == constants.OP_STATUS_SUCCESS:
1190 elif op.status == constants.OP_STATUS_ERROR:
1191 # Ensure failed opcode has an exception as its result
1192 assert errors.GetEncodedError(job.ops[opctx.index].result)
1194 to_encode = errors.OpExecError("Preceding opcode failed")
1195 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1196 _EncodeOpError(to_encode))
1200 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1201 errors.GetEncodedError(i.result)
1202 for i in job.ops[opctx.index:])
1204 elif op.status == constants.OP_STATUS_CANCELING:
1205 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1206 "Job canceled by request")
1210 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1212 if opctx.index == (opcount - 1):
1213 # Finalize on last opcode
1217 # All opcodes have been run, finalize job
1220 # Write to disk. If the job status is final, this is the final write
1221 # allowed. Once the file has been written, it can be archived anytime.
1222 queue.UpdateJobUnlocked(job)
1227 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1228 return self.FINISHED
1230 assert not waitjob or queue.depmgr.JobWaiting(job)
1237 assert job.writable, "Job became read-only while being processed"
1241 class _JobQueueWorker(workerpool.BaseWorker):
1242 """The actual job workers.
1245 def RunTask(self, job): # pylint: disable-msg=W0221
1248 @type job: L{_QueuedJob}
1249 @param job: the job to be processed
1252 assert job.writable, "Expected writable job"
1254 # Ensure only one worker is active on a single job. If a job registers for
1255 # a dependency job, and the other job notifies before the first worker is
1256 # done, the job can end up in the tasklist more than once.
1257 job.processor_lock.acquire()
1259 return self._RunTaskInner(job)
1261 job.processor_lock.release()
1263 def _RunTaskInner(self, job):
1266 Must be called with per-job lock acquired.
1270 assert queue == self.pool.queue
1272 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1275 proc = mcpu.Processor(queue.context, job.id)
1277 # Create wrapper for setting thread name
1278 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1281 result = _JobProcessor(queue, wrap_execop_fn, job)()
1283 if result == _JobProcessor.FINISHED:
1284 # Notify waiting jobs
1285 queue.depmgr.NotifyWaiters(job.id)
1287 elif result == _JobProcessor.DEFER:
1289 raise workerpool.DeferTask(priority=job.CalcPriority())
1291 elif result == _JobProcessor.WAITDEP:
1292 # No-op, dependency manager will re-schedule
1296 raise errors.ProgrammerError("Job processor returned unknown status %s" %
1300 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1301 """Updates the worker thread name to include a short summary of the opcode.
1303 @param setname_fn: Callable setting worker thread name
1304 @param execop_fn: Callable for executing opcode (usually
1305 L{mcpu.Processor.ExecOpCode})
1310 return execop_fn(op, *args, **kwargs)
1315 def _GetWorkerName(job, op):
1316 """Sets the worker thread name.
1318 @type job: L{_QueuedJob}
1319 @type op: L{opcodes.OpCode}
1322 parts = ["Job%s" % job.id]
1325 parts.append(op.TinySummary())
1327 return "/".join(parts)
1330 class _JobQueueWorkerPool(workerpool.WorkerPool):
1331 """Simple class implementing a job-processing workerpool.
1334 def __init__(self, queue):
1335 super(_JobQueueWorkerPool, self).__init__("Jq",
1341 class _JobDependencyManager:
1342 """Keeps track of job dependencies.
1349 WRONGSTATUS) = range(1, 6)
1351 def __init__(self, getstatus_fn, enqueue_fn):
1352 """Initializes this class.
1355 self._getstatus_fn = getstatus_fn
1356 self._enqueue_fn = enqueue_fn
1359 self._lock = locking.SharedLock("JobDepMgr")
1361 @locking.ssynchronized(_LOCK, shared=1)
1362 def GetLockInfo(self, requested): # pylint: disable-msg=W0613
1363 """Retrieves information about waiting jobs.
1365 @type requested: set
1366 @param requested: Requested information, see C{query.LQ_*}
1369 # No need to sort here, that's being done by the lock manager and query
1370 # library. There are no priorities for notifying jobs, hence all show up as
1371 # one item under "pending".
1372 return [("job/%s" % job_id, None, None,
1373 [("job", [job.id for job in waiters])])
1374 for job_id, waiters in self._waiters.items()
1377 @locking.ssynchronized(_LOCK, shared=1)
1378 def JobWaiting(self, job):
1379 """Checks if a job is waiting.
1382 return compat.any(job in jobs
1383 for jobs in self._waiters.values())
1385 @locking.ssynchronized(_LOCK)
1386 def CheckAndRegister(self, job, dep_job_id, dep_status):
1387 """Checks if a dependency job has the requested status.
1389 If the other job is not yet in a finalized status, the calling job will be
1390 notified (re-added to the workerpool) at a later point.
1392 @type job: L{_QueuedJob}
1393 @param job: Job object
1394 @type dep_job_id: string
1395 @param dep_job_id: ID of dependency job
1396 @type dep_status: list
1397 @param dep_status: Required status
1400 assert ht.TString(job.id)
1401 assert ht.TString(dep_job_id)
1402 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1404 if job.id == dep_job_id:
1405 return (self.ERROR, "Job can't depend on itself")
1407 # Get status of dependency job
1409 status = self._getstatus_fn(dep_job_id)
1410 except errors.JobLost, err:
1411 return (self.ERROR, "Dependency error: %s" % err)
1413 assert status in constants.JOB_STATUS_ALL
1415 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1417 if status not in constants.JOBS_FINALIZED:
1418 # Register for notification and wait for job to finish
1419 job_id_waiters.add(job)
1421 "Need to wait for job %s, wanted status '%s'" %
1422 (dep_job_id, dep_status))
1424 # Remove from waiters list
1425 if job in job_id_waiters:
1426 job_id_waiters.remove(job)
1428 if (status == constants.JOB_STATUS_CANCELED and
1429 constants.JOB_STATUS_CANCELED not in dep_status):
1430 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1432 elif not dep_status or status in dep_status:
1433 return (self.CONTINUE,
1434 "Dependency job %s finished with status '%s'" %
1435 (dep_job_id, status))
1438 return (self.WRONGSTATUS,
1439 "Dependency job %s finished with status '%s',"
1440 " not one of '%s' as required" %
1441 (dep_job_id, status, utils.CommaJoin(dep_status)))
1443 @locking.ssynchronized(_LOCK)
1444 def NotifyWaiters(self, job_id):
1445 """Notifies all jobs waiting for a certain job ID.
1447 @type job_id: string
1448 @param job_id: Job ID
1451 assert ht.TString(job_id)
1453 jobs = self._waiters.pop(job_id, None)
1455 # Re-add jobs to workerpool
1456 logging.debug("Re-adding %s jobs which were waiting for job %s",
1458 self._enqueue_fn(jobs)
1460 # Remove all jobs without actual waiters
1461 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1463 del self._waiters[job_id]
1466 def _RequireOpenQueue(fn):
1467 """Decorator for "public" functions.
1469 This function should be used for all 'public' functions. That is,
1470 functions usually called from other classes. Note that this should
1471 be applied only to methods (not plain functions), since it expects
1472 that the decorated function is called with a first argument that has
1473 a '_queue_filelock' argument.
1475 @warning: Use this decorator only after locking.ssynchronized
1478 @locking.ssynchronized(_LOCK)
1484 def wrapper(self, *args, **kwargs):
1485 # pylint: disable-msg=W0212
1486 assert self._queue_filelock is not None, "Queue should be open"
1487 return fn(self, *args, **kwargs)
1491 class JobQueue(object):
1492 """Queue used to manage the jobs.
1494 @cvar _RE_JOB_FILE: regex matching the valid job file names
1497 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1499 def __init__(self, context):
1500 """Constructor for JobQueue.
1502 The constructor will initialize the job queue object and then
1503 start loading the current jobs from disk, either for starting them
1504 (if they were queue) or for aborting them (if they were already
1507 @type context: GanetiContext
1508 @param context: the context object for access to the configuration
1509 data and other ganeti objects
1512 self.context = context
1513 self._memcache = weakref.WeakValueDictionary()
1514 self._my_hostname = netutils.Hostname.GetSysName()
1516 # The Big JobQueue lock. If a code block or method acquires it in shared
1517 # mode safe it must guarantee concurrency with all the code acquiring it in
1518 # shared mode, including itself. In order not to acquire it at all
1519 # concurrency must be guaranteed with all code acquiring it in shared mode
1520 # and all code acquiring it exclusively.
1521 self._lock = locking.SharedLock("JobQueue")
1523 self.acquire = self._lock.acquire
1524 self.release = self._lock.release
1526 # Initialize the queue, and acquire the filelock.
1527 # This ensures no other process is working on the job queue.
1528 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1531 self._last_serial = jstore.ReadSerial()
1532 assert self._last_serial is not None, ("Serial file was modified between"
1533 " check in jstore and here")
1535 # Get initial list of nodes
1536 self._nodes = dict((n.name, n.primary_ip)
1537 for n in self.context.cfg.GetAllNodesInfo().values()
1538 if n.master_candidate)
1540 # Remove master node
1541 self._nodes.pop(self._my_hostname, None)
1543 # TODO: Check consistency across nodes
1545 self._queue_size = 0
1546 self._UpdateQueueSizeUnlocked()
1547 self._drained = jstore.CheckDrainFlag()
1550 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1552 self.context.glm.AddToLockMonitor(self.depmgr)
1555 self._wpool = _JobQueueWorkerPool(self)
1557 self._InspectQueue()
1559 self._wpool.TerminateWorkers()
1562 @locking.ssynchronized(_LOCK)
1564 def _InspectQueue(self):
1565 """Loads the whole job queue and resumes unfinished jobs.
1567 This function needs the lock here because WorkerPool.AddTask() may start a
1568 job while we're still doing our work.
1571 logging.info("Inspecting job queue")
1575 all_job_ids = self._GetJobIDsUnlocked()
1576 jobs_count = len(all_job_ids)
1577 lastinfo = time.time()
1578 for idx, job_id in enumerate(all_job_ids):
1579 # Give an update every 1000 jobs or 10 seconds
1580 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1581 idx == (jobs_count - 1)):
1582 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1583 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1584 lastinfo = time.time()
1586 job = self._LoadJobUnlocked(job_id)
1588 # a failure in loading the job can cause 'None' to be returned
1592 status = job.CalcStatus()
1594 if status == constants.JOB_STATUS_QUEUED:
1595 restartjobs.append(job)
1597 elif status in (constants.JOB_STATUS_RUNNING,
1598 constants.JOB_STATUS_WAITING,
1599 constants.JOB_STATUS_CANCELING):
1600 logging.warning("Unfinished job %s found: %s", job.id, job)
1602 if status == constants.JOB_STATUS_WAITING:
1604 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1605 restartjobs.append(job)
1607 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1608 "Unclean master daemon shutdown")
1611 self.UpdateJobUnlocked(job)
1614 logging.info("Restarting %s jobs", len(restartjobs))
1615 self._EnqueueJobsUnlocked(restartjobs)
1617 logging.info("Job queue inspection finished")
1619 @locking.ssynchronized(_LOCK)
1621 def AddNode(self, node):
1622 """Register a new node with the queue.
1624 @type node: L{objects.Node}
1625 @param node: the node object to be added
1628 node_name = node.name
1629 assert node_name != self._my_hostname
1631 # Clean queue directory on added node
1632 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1633 msg = result.fail_msg
1635 logging.warning("Cannot cleanup queue directory on node %s: %s",
1638 if not node.master_candidate:
1639 # remove if existing, ignoring errors
1640 self._nodes.pop(node_name, None)
1641 # and skip the replication of the job ids
1644 # Upload the whole queue excluding archived jobs
1645 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1647 # Upload current serial file
1648 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1650 for file_name in files:
1652 content = utils.ReadFile(file_name)
1654 result = rpc.RpcRunner.call_jobqueue_update([node_name],
1657 msg = result[node_name].fail_msg
1659 logging.error("Failed to upload file %s to node %s: %s",
1660 file_name, node_name, msg)
1662 self._nodes[node_name] = node.primary_ip
1664 @locking.ssynchronized(_LOCK)
1666 def RemoveNode(self, node_name):
1667 """Callback called when removing nodes from the cluster.
1669 @type node_name: str
1670 @param node_name: the name of the node to remove
1673 self._nodes.pop(node_name, None)
1676 def _CheckRpcResult(result, nodes, failmsg):
1677 """Verifies the status of an RPC call.
1679 Since we aim to keep consistency should this node (the current
1680 master) fail, we will log errors if our rpc fail, and especially
1681 log the case when more than half of the nodes fails.
1683 @param result: the data as returned from the rpc call
1685 @param nodes: the list of nodes we made the call to
1687 @param failmsg: the identifier to be used for logging
1694 msg = result[node].fail_msg
1697 logging.error("RPC call %s (%s) failed on node %s: %s",
1698 result[node].call, failmsg, node, msg)
1700 success.append(node)
1702 # +1 for the master node
1703 if (len(success) + 1) < len(failed):
1704 # TODO: Handle failing nodes
1705 logging.error("More than half of the nodes failed")
1707 def _GetNodeIp(self):
1708 """Helper for returning the node name/ip list.
1710 @rtype: (list, list)
1711 @return: a tuple of two lists, the first one with the node
1712 names and the second one with the node addresses
1715 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1716 name_list = self._nodes.keys()
1717 addr_list = [self._nodes[name] for name in name_list]
1718 return name_list, addr_list
1720 def _UpdateJobQueueFile(self, file_name, data, replicate):
1721 """Writes a file locally and then replicates it to all nodes.
1723 This function will replace the contents of a file on the local
1724 node and then replicate it to all the other nodes we have.
1726 @type file_name: str
1727 @param file_name: the path of the file to be replicated
1729 @param data: the new contents of the file
1730 @type replicate: boolean
1731 @param replicate: whether to spread the changes to the remote nodes
1734 getents = runtime.GetEnts()
1735 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1736 gid=getents.masterd_gid)
1739 names, addrs = self._GetNodeIp()
1740 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1741 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1743 def _RenameFilesUnlocked(self, rename):
1744 """Renames a file locally and then replicate the change.
1746 This function will rename a file in the local queue directory
1747 and then replicate this rename to all the other nodes we have.
1749 @type rename: list of (old, new)
1750 @param rename: List containing tuples mapping old to new names
1753 # Rename them locally
1754 for old, new in rename:
1755 utils.RenameFile(old, new, mkdir=True)
1757 # ... and on all nodes
1758 names, addrs = self._GetNodeIp()
1759 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1760 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1763 def _FormatJobID(job_id):
1764 """Convert a job ID to string format.
1766 Currently this just does C{str(job_id)} after performing some
1767 checks, but if we want to change the job id format this will
1768 abstract this change.
1770 @type job_id: int or long
1771 @param job_id: the numeric job id
1773 @return: the formatted job id
1776 if not isinstance(job_id, (int, long)):
1777 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1779 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1784 def _GetArchiveDirectory(cls, job_id):
1785 """Returns the archive directory for a job.
1788 @param job_id: Job identifier
1790 @return: Directory name
1793 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1795 def _NewSerialsUnlocked(self, count):
1796 """Generates a new job identifier.
1798 Job identifiers are unique during the lifetime of a cluster.
1800 @type count: integer
1801 @param count: how many serials to return
1803 @return: a string representing the job identifier.
1808 serial = self._last_serial + count
1811 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1812 "%s\n" % serial, True)
1814 result = [self._FormatJobID(v)
1815 for v in range(self._last_serial + 1, serial + 1)]
1817 # Keep it only if we were able to write the file
1818 self._last_serial = serial
1820 assert len(result) == count
1825 def _GetJobPath(job_id):
1826 """Returns the job file for a given job id.
1829 @param job_id: the job identifier
1831 @return: the path to the job file
1834 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1837 def _GetArchivedJobPath(cls, job_id):
1838 """Returns the archived job file for a give job id.
1841 @param job_id: the job identifier
1843 @return: the path to the archived job file
1846 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1847 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1849 def _GetJobIDsUnlocked(self, sort=True):
1850 """Return all known job IDs.
1852 The method only looks at disk because it's a requirement that all
1853 jobs are present on disk (so in the _memcache we don't have any
1857 @param sort: perform sorting on the returned job ids
1859 @return: the list of job IDs
1863 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1864 m = self._RE_JOB_FILE.match(filename)
1866 jlist.append(m.group(1))
1868 jlist = utils.NiceSort(jlist)
1871 def _LoadJobUnlocked(self, job_id):
1872 """Loads a job from the disk or memory.
1874 Given a job id, this will return the cached job object if
1875 existing, or try to load the job from the disk. If loading from
1876 disk, it will also add the job to the cache.
1878 @param job_id: the job id
1879 @rtype: L{_QueuedJob} or None
1880 @return: either None or the job object
1883 job = self._memcache.get(job_id, None)
1885 logging.debug("Found job %s in memcache", job_id)
1886 assert job.writable, "Found read-only job in memcache"
1890 job = self._LoadJobFromDisk(job_id, False)
1893 except errors.JobFileCorrupted:
1894 old_path = self._GetJobPath(job_id)
1895 new_path = self._GetArchivedJobPath(job_id)
1896 if old_path == new_path:
1897 # job already archived (future case)
1898 logging.exception("Can't parse job %s", job_id)
1901 logging.exception("Can't parse job %s, will archive.", job_id)
1902 self._RenameFilesUnlocked([(old_path, new_path)])
1905 assert job.writable, "Job just loaded is not writable"
1907 self._memcache[job_id] = job
1908 logging.debug("Added job %s to the cache", job_id)
1911 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1912 """Load the given job file from disk.
1914 Given a job file, read, load and restore it in a _QueuedJob format.
1916 @type job_id: string
1917 @param job_id: job identifier
1918 @type try_archived: bool
1919 @param try_archived: Whether to try loading an archived job
1920 @rtype: L{_QueuedJob} or None
1921 @return: either None or the job object
1924 path_functions = [(self._GetJobPath, True)]
1927 path_functions.append((self._GetArchivedJobPath, False))
1930 writable_default = None
1932 for (fn, writable_default) in path_functions:
1933 filepath = fn(job_id)
1934 logging.debug("Loading job from %s", filepath)
1936 raw_data = utils.ReadFile(filepath)
1937 except EnvironmentError, err:
1938 if err.errno != errno.ENOENT:
1946 if writable is None:
1947 writable = writable_default
1950 data = serializer.LoadJson(raw_data)
1951 job = _QueuedJob.Restore(self, data, writable)
1952 except Exception, err: # pylint: disable-msg=W0703
1953 raise errors.JobFileCorrupted(err)
1957 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1958 """Load the given job file from disk.
1960 Given a job file, read, load and restore it in a _QueuedJob format.
1961 In case of error reading the job, it gets returned as None, and the
1962 exception is logged.
1964 @type job_id: string
1965 @param job_id: job identifier
1966 @type try_archived: bool
1967 @param try_archived: Whether to try loading an archived job
1968 @rtype: L{_QueuedJob} or None
1969 @return: either None or the job object
1973 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1974 except (errors.JobFileCorrupted, EnvironmentError):
1975 logging.exception("Can't load/parse job %s", job_id)
1978 def _UpdateQueueSizeUnlocked(self):
1979 """Update the queue size.
1982 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1984 @locking.ssynchronized(_LOCK)
1986 def SetDrainFlag(self, drain_flag):
1987 """Sets the drain flag for the queue.
1989 @type drain_flag: boolean
1990 @param drain_flag: Whether to set or unset the drain flag
1993 jstore.SetDrainFlag(drain_flag)
1995 self._drained = drain_flag
2000 def _SubmitJobUnlocked(self, job_id, ops):
2001 """Create and store a new job.
2003 This enters the job into our job queue and also puts it on the new
2004 queue, in order for it to be picked up by the queue processors.
2006 @type job_id: job ID
2007 @param job_id: the job ID for the new job
2009 @param ops: The list of OpCodes that will become the new job.
2010 @rtype: L{_QueuedJob}
2011 @return: the job object to be queued
2012 @raise errors.JobQueueDrainError: if the job queue is marked for draining
2013 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2014 @raise errors.GenericError: If an opcode is not valid
2017 # Ok when sharing the big job queue lock, as the drain file is created when
2018 # the lock is exclusive.
2020 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
2022 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2023 raise errors.JobQueueFull()
2025 job = _QueuedJob(self, job_id, ops, True)
2028 for idx, op in enumerate(job.ops):
2029 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2030 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2031 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2032 " are %s" % (idx, op.priority, allowed))
2034 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2035 if not opcodes.TNoRelativeJobDependencies(dependencies):
2036 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2038 (idx, opcodes.TNoRelativeJobDependencies,
2042 self.UpdateJobUnlocked(job)
2044 self._queue_size += 1
2046 logging.debug("Adding new job %s to the cache", job_id)
2047 self._memcache[job_id] = job
2051 @locking.ssynchronized(_LOCK)
2053 def SubmitJob(self, ops):
2054 """Create and store a new job.
2056 @see: L{_SubmitJobUnlocked}
2059 (job_id, ) = self._NewSerialsUnlocked(1)
2060 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2063 @locking.ssynchronized(_LOCK)
2065 def SubmitManyJobs(self, jobs):
2066 """Create and store multiple jobs.
2068 @see: L{_SubmitJobUnlocked}
2071 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2073 (results, added_jobs) = \
2074 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2076 self._EnqueueJobsUnlocked(added_jobs)
2081 def _FormatSubmitError(msg, ops):
2082 """Formats errors which occurred while submitting a job.
2085 return ("%s; opcodes %s" %
2086 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2089 def _ResolveJobDependencies(resolve_fn, deps):
2090 """Resolves relative job IDs in dependencies.
2092 @type resolve_fn: callable
2093 @param resolve_fn: Function to resolve a relative job ID
2095 @param deps: Dependencies
2097 @return: Resolved dependencies
2102 for (dep_job_id, dep_status) in deps:
2103 if ht.TRelativeJobId(dep_job_id):
2104 assert ht.TInt(dep_job_id) and dep_job_id < 0
2106 job_id = resolve_fn(dep_job_id)
2109 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2113 result.append((job_id, dep_status))
2115 return (True, result)
2117 def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2118 """Create and store multiple jobs.
2120 @see: L{_SubmitJobUnlocked}
2126 def resolve_fn(job_idx, reljobid):
2128 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2130 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2132 if getattr(op, opcodes.DEPEND_ATTR, None):
2134 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2137 # Abort resolving dependencies
2138 assert ht.TNonEmptyString(data), "No error message"
2140 # Use resolved dependencies
2144 job = self._SubmitJobUnlocked(job_id, ops)
2145 except errors.GenericError, err:
2147 data = self._FormatSubmitError(str(err), ops)
2151 added_jobs.append(job)
2153 results.append((status, data))
2155 return (results, added_jobs)
2157 @locking.ssynchronized(_LOCK)
2158 def _EnqueueJobs(self, jobs):
2159 """Helper function to add jobs to worker pool's queue.
2162 @param jobs: List of all jobs
2165 return self._EnqueueJobsUnlocked(jobs)
2167 def _EnqueueJobsUnlocked(self, jobs):
2168 """Helper function to add jobs to worker pool's queue.
2171 @param jobs: List of all jobs
2174 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2175 self._wpool.AddManyTasks([(job, ) for job in jobs],
2176 priority=[job.CalcPriority() for job in jobs])
2178 def _GetJobStatusForDependencies(self, job_id):
2179 """Gets the status of a job for dependencies.
2181 @type job_id: string
2182 @param job_id: Job ID
2183 @raise errors.JobLost: If job can't be found
2186 if not isinstance(job_id, basestring):
2187 job_id = self._FormatJobID(job_id)
2189 # Not using in-memory cache as doing so would require an exclusive lock
2191 # Try to load from disk
2192 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2194 assert not job.writable, "Got writable job"
2197 return job.CalcStatus()
2199 raise errors.JobLost("Job %s not found" % job_id)
2202 def UpdateJobUnlocked(self, job, replicate=True):
2203 """Update a job's on disk storage.
2205 After a job has been modified, this function needs to be called in
2206 order to write the changes to disk and replicate them to the other
2209 @type job: L{_QueuedJob}
2210 @param job: the changed job
2211 @type replicate: boolean
2212 @param replicate: whether to replicate the change to remote nodes
2216 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2217 assert (finalized ^ (job.end_timestamp is None))
2218 assert job.writable, "Can't update read-only job"
2220 filename = self._GetJobPath(job.id)
2221 data = serializer.DumpJson(job.Serialize(), indent=False)
2222 logging.debug("Writing job %s to %s", job.id, filename)
2223 self._UpdateJobQueueFile(filename, data, replicate)
2225 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2227 """Waits for changes in a job.
2229 @type job_id: string
2230 @param job_id: Job identifier
2231 @type fields: list of strings
2232 @param fields: Which fields to check for changes
2233 @type prev_job_info: list or None
2234 @param prev_job_info: Last job information returned
2235 @type prev_log_serial: int
2236 @param prev_log_serial: Last job message serial number
2237 @type timeout: float
2238 @param timeout: maximum time to wait in seconds
2239 @rtype: tuple (job info, log entries)
2240 @return: a tuple of the job information as required via
2241 the fields parameter, and the log entries as a list
2243 if the job has not changed and the timeout has expired,
2244 we instead return a special value,
2245 L{constants.JOB_NOTCHANGED}, which should be interpreted
2246 as such by the clients
2249 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2252 helper = _WaitForJobChangesHelper()
2254 return helper(self._GetJobPath(job_id), load_fn,
2255 fields, prev_job_info, prev_log_serial, timeout)
2257 @locking.ssynchronized(_LOCK)
2259 def CancelJob(self, job_id):
2262 This will only succeed if the job has not started yet.
2264 @type job_id: string
2265 @param job_id: job ID of job to be cancelled.
2268 logging.info("Cancelling job %s", job_id)
2270 job = self._LoadJobUnlocked(job_id)
2272 logging.debug("Job %s not found", job_id)
2273 return (False, "Job %s not found" % job_id)
2275 assert job.writable, "Can't cancel read-only job"
2277 (success, msg) = job.Cancel()
2280 # If the job was finalized (e.g. cancelled), this is the final write
2281 # allowed. The job can be archived anytime.
2282 self.UpdateJobUnlocked(job)
2284 return (success, msg)
2287 def _ArchiveJobsUnlocked(self, jobs):
2290 @type jobs: list of L{_QueuedJob}
2291 @param jobs: Job objects
2293 @return: Number of archived jobs
2299 assert job.writable, "Can't archive read-only job"
2301 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2302 logging.debug("Job %s is not yet done", job.id)
2305 archive_jobs.append(job)
2307 old = self._GetJobPath(job.id)
2308 new = self._GetArchivedJobPath(job.id)
2309 rename_files.append((old, new))
2311 # TODO: What if 1..n files fail to rename?
2312 self._RenameFilesUnlocked(rename_files)
2314 logging.debug("Successfully archived job(s) %s",
2315 utils.CommaJoin(job.id for job in archive_jobs))
2317 # Since we haven't quite checked, above, if we succeeded or failed renaming
2318 # the files, we update the cached queue size from the filesystem. When we
2319 # get around to fix the TODO: above, we can use the number of actually
2320 # archived jobs to fix this.
2321 self._UpdateQueueSizeUnlocked()
2322 return len(archive_jobs)
2324 @locking.ssynchronized(_LOCK)
2326 def ArchiveJob(self, job_id):
2329 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2331 @type job_id: string
2332 @param job_id: Job ID of job to be archived.
2334 @return: Whether job was archived
2337 logging.info("Archiving job %s", job_id)
2339 job = self._LoadJobUnlocked(job_id)
2341 logging.debug("Job %s not found", job_id)
2344 return self._ArchiveJobsUnlocked([job]) == 1
2346 @locking.ssynchronized(_LOCK)
2348 def AutoArchiveJobs(self, age, timeout):
2349 """Archives all jobs based on age.
2351 The method will archive all jobs which are older than the age
2352 parameter. For jobs that don't have an end timestamp, the start
2353 timestamp will be considered. The special '-1' age will cause
2354 archival of all jobs (that are not running or queued).
2357 @param age: the minimum age in seconds
2360 logging.info("Archiving jobs with age more than %s seconds", age)
2363 end_time = now + timeout
2367 all_job_ids = self._GetJobIDsUnlocked()
2369 for idx, job_id in enumerate(all_job_ids):
2370 last_touched = idx + 1
2372 # Not optimal because jobs could be pending
2373 # TODO: Measure average duration for job archival and take number of
2374 # pending jobs into account.
2375 if time.time() > end_time:
2378 # Returns None if the job failed to load
2379 job = self._LoadJobUnlocked(job_id)
2381 if job.end_timestamp is None:
2382 if job.start_timestamp is None:
2383 job_age = job.received_timestamp
2385 job_age = job.start_timestamp
2387 job_age = job.end_timestamp
2389 if age == -1 or now - job_age[0] > age:
2392 # Archive 10 jobs at a time
2393 if len(pending) >= 10:
2394 archived_count += self._ArchiveJobsUnlocked(pending)
2398 archived_count += self._ArchiveJobsUnlocked(pending)
2400 return (archived_count, len(all_job_ids) - last_touched)
2402 def QueryJobs(self, job_ids, fields):
2403 """Returns a list of jobs in queue.
2406 @param job_ids: sequence of job identifiers or None for all
2408 @param fields: names of fields to return
2410 @return: list one element per job, each element being list with
2411 the requested fields
2417 # Since files are added to/removed from the queue atomically, there's no
2418 # risk of getting the job ids in an inconsistent state.
2419 job_ids = self._GetJobIDsUnlocked()
2422 for job_id in job_ids:
2423 job = self.SafeLoadJobFromDisk(job_id, True)
2425 jobs.append(job.GetInfo(fields))
2431 @locking.ssynchronized(_LOCK)
2434 """Stops the job queue.
2436 This shutdowns all the worker threads an closes the queue.
2439 self._wpool.TerminateWorkers()
2441 self._queue_filelock.Close()
2442 self._queue_filelock = None