4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
60 from ganeti import query
61 from ganeti import qlang
65 JOBS_PER_ARCHIVE_DIRECTORY = 10000
67 # member lock names to be passed to @ssynchronized decorator
72 class CancelJob(Exception):
73 """Special exception to cancel a job.
79 """Returns the current timestamp.
82 @return: the current time in the (seconds, microseconds) format
85 return utils.SplitTime(time.time())
88 class _SimpleJobQuery:
89 """Wrapper for job queries.
91 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
94 def __init__(self, fields):
95 """Initializes this class.
98 self._query = query.Query(query.JOB_FIELDS, fields)
100 def __call__(self, job):
101 """Executes a job query using cached field list.
104 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
107 class _QueuedOpCode(object):
108 """Encapsulates an opcode object.
110 @ivar log: holds the execution log and consists of tuples
111 of the form C{(log_serial, timestamp, level, message)}
112 @ivar input: the OpCode we encapsulate
113 @ivar status: the current status
114 @ivar result: the result of the LU execution
115 @ivar start_timestamp: timestamp for the start of the execution
116 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
117 @ivar stop_timestamp: timestamp for the end of the execution
120 __slots__ = ["input", "status", "result", "log", "priority",
121 "start_timestamp", "exec_timestamp", "end_timestamp",
124 def __init__(self, op):
125 """Initializes instances of this class.
127 @type op: L{opcodes.OpCode}
128 @param op: the opcode we encapsulate
132 self.status = constants.OP_STATUS_QUEUED
135 self.start_timestamp = None
136 self.exec_timestamp = None
137 self.end_timestamp = None
139 # Get initial priority (it might change during the lifetime of this opcode)
140 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
143 def Restore(cls, state):
144 """Restore the _QueuedOpCode from the serialized form.
147 @param state: the serialized state
148 @rtype: _QueuedOpCode
149 @return: a new _QueuedOpCode instance
152 obj = _QueuedOpCode.__new__(cls)
153 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
154 obj.status = state["status"]
155 obj.result = state["result"]
156 obj.log = state["log"]
157 obj.start_timestamp = state.get("start_timestamp", None)
158 obj.exec_timestamp = state.get("exec_timestamp", None)
159 obj.end_timestamp = state.get("end_timestamp", None)
160 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
164 """Serializes this _QueuedOpCode.
167 @return: the dictionary holding the serialized state
171 "input": self.input.__getstate__(),
172 "status": self.status,
173 "result": self.result,
175 "start_timestamp": self.start_timestamp,
176 "exec_timestamp": self.exec_timestamp,
177 "end_timestamp": self.end_timestamp,
178 "priority": self.priority,
182 class _QueuedJob(object):
183 """In-memory job representation.
185 This is what we use to track the user-submitted jobs. Locking must
186 be taken care of by users of this class.
188 @type queue: L{JobQueue}
189 @ivar queue: the parent queue
192 @ivar ops: the list of _QueuedOpCode that constitute the job
193 @type log_serial: int
194 @ivar log_serial: holds the index for the next log entry
195 @ivar received_timestamp: the timestamp for when the job was received
196 @ivar start_timestmap: the timestamp for start of execution
197 @ivar end_timestamp: the timestamp for end of execution
198 @ivar writable: Whether the job is allowed to be modified
201 # pylint: disable=W0212
202 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
203 "received_timestamp", "start_timestamp", "end_timestamp",
204 "__weakref__", "processor_lock", "writable"]
206 def __init__(self, queue, job_id, ops, writable):
207 """Constructor for the _QueuedJob.
209 @type queue: L{JobQueue}
210 @param queue: our parent queue
212 @param job_id: our job id
214 @param ops: the list of opcodes we hold, which will be encapsulated
217 @param writable: Whether job can be modified
221 raise errors.GenericError("A job needs at least one opcode")
225 self.ops = [_QueuedOpCode(op) for op in ops]
227 self.received_timestamp = TimeStampNow()
228 self.start_timestamp = None
229 self.end_timestamp = None
231 self._InitInMemory(self, writable)
234 def _InitInMemory(obj, writable):
235 """Initializes in-memory variables.
238 obj.writable = writable
242 # Read-only jobs are not processed and therefore don't need a lock
244 obj.processor_lock = threading.Lock()
246 obj.processor_lock = None
249 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
251 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
253 return "<%s at %#x>" % (" ".join(status), id(self))
256 def Restore(cls, queue, state, writable):
257 """Restore a _QueuedJob from serialized state:
259 @type queue: L{JobQueue}
260 @param queue: to which queue the restored job belongs
262 @param state: the serialized state
264 @param writable: Whether job can be modified
266 @return: the restored _JobQueue instance
269 obj = _QueuedJob.__new__(cls)
272 obj.received_timestamp = state.get("received_timestamp", None)
273 obj.start_timestamp = state.get("start_timestamp", None)
274 obj.end_timestamp = state.get("end_timestamp", None)
278 for op_state in state["ops"]:
279 op = _QueuedOpCode.Restore(op_state)
280 for log_entry in op.log:
281 obj.log_serial = max(obj.log_serial, log_entry[0])
284 cls._InitInMemory(obj, writable)
289 """Serialize the _JobQueue instance.
292 @return: the serialized state
297 "ops": [op.Serialize() for op in self.ops],
298 "start_timestamp": self.start_timestamp,
299 "end_timestamp": self.end_timestamp,
300 "received_timestamp": self.received_timestamp,
303 def CalcStatus(self):
304 """Compute the status of this job.
306 This function iterates over all the _QueuedOpCodes in the job and
307 based on their status, computes the job status.
310 - if we find a cancelled, or finished with error, the job
311 status will be the same
312 - otherwise, the last opcode with the status one of:
317 will determine the job status
319 - otherwise, it means either all opcodes are queued, or success,
320 and the job status will be the same
322 @return: the job status
325 status = constants.JOB_STATUS_QUEUED
329 if op.status == constants.OP_STATUS_SUCCESS:
334 if op.status == constants.OP_STATUS_QUEUED:
336 elif op.status == constants.OP_STATUS_WAITING:
337 status = constants.JOB_STATUS_WAITING
338 elif op.status == constants.OP_STATUS_RUNNING:
339 status = constants.JOB_STATUS_RUNNING
340 elif op.status == constants.OP_STATUS_CANCELING:
341 status = constants.JOB_STATUS_CANCELING
343 elif op.status == constants.OP_STATUS_ERROR:
344 status = constants.JOB_STATUS_ERROR
345 # The whole job fails if one opcode failed
347 elif op.status == constants.OP_STATUS_CANCELED:
348 status = constants.OP_STATUS_CANCELED
352 status = constants.JOB_STATUS_SUCCESS
356 def CalcPriority(self):
357 """Gets the current priority for this job.
359 Only unfinished opcodes are considered. When all are done, the default
365 priorities = [op.priority for op in self.ops
366 if op.status not in constants.OPS_FINALIZED]
369 # All opcodes are done, assume default priority
370 return constants.OP_PRIO_DEFAULT
372 return min(priorities)
374 def GetLogEntries(self, newer_than):
375 """Selectively returns the log entries.
377 @type newer_than: None or int
378 @param newer_than: if this is None, return all log entries,
379 otherwise return only the log entries with serial higher
382 @return: the list of the log entries selected
385 if newer_than is None:
392 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
396 def GetInfo(self, fields):
397 """Returns information about a job.
400 @param fields: names of fields to return
402 @return: list with one element for each field
403 @raise errors.OpExecError: when an invalid field
407 return _SimpleJobQuery(fields)(self)
409 def MarkUnfinishedOps(self, status, result):
410 """Mark unfinished opcodes with a given status and result.
412 This is an utility function for marking all running or waiting to
413 be run opcodes with a given status. Opcodes which are already
414 finalised are not changed.
416 @param status: a given opcode status
417 @param result: the opcode result
422 if op.status in constants.OPS_FINALIZED:
423 assert not_marked, "Finalized opcodes found after non-finalized ones"
430 """Marks the job as finalized.
433 self.end_timestamp = TimeStampNow()
436 """Marks job as canceled/-ing if possible.
438 @rtype: tuple; (bool, string)
439 @return: Boolean describing whether job was successfully canceled or marked
440 as canceling and a text message
443 status = self.CalcStatus()
445 if status == constants.JOB_STATUS_QUEUED:
446 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
447 "Job canceled by request")
449 return (True, "Job %s canceled" % self.id)
451 elif status == constants.JOB_STATUS_WAITING:
452 # The worker will notice the new status and cancel the job
453 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
454 return (True, "Job %s will be canceled" % self.id)
457 logging.debug("Job %s is no longer waiting in the queue", self.id)
458 return (False, "Job %s is no longer waiting in the queue" % self.id)
461 class _OpExecCallbacks(mcpu.OpExecCbBase):
462 def __init__(self, queue, job, op):
463 """Initializes this class.
465 @type queue: L{JobQueue}
466 @param queue: Job queue
467 @type job: L{_QueuedJob}
468 @param job: Job object
469 @type op: L{_QueuedOpCode}
473 assert queue, "Queue is missing"
474 assert job, "Job is missing"
475 assert op, "Opcode is missing"
481 def _CheckCancel(self):
482 """Raises an exception to cancel the job if asked to.
485 # Cancel here if we were asked to
486 if self._op.status == constants.OP_STATUS_CANCELING:
487 logging.debug("Canceling opcode")
490 @locking.ssynchronized(_QUEUE, shared=1)
491 def NotifyStart(self):
492 """Mark the opcode as running, not lock-waiting.
494 This is called from the mcpu code as a notifier function, when the LU is
495 finally about to start the Exec() method. Of course, to have end-user
496 visible results, the opcode must be initially (before calling into
497 Processor.ExecOpCode) set to OP_STATUS_WAITING.
500 assert self._op in self._job.ops
501 assert self._op.status in (constants.OP_STATUS_WAITING,
502 constants.OP_STATUS_CANCELING)
504 # Cancel here if we were asked to
507 logging.debug("Opcode is now running")
509 self._op.status = constants.OP_STATUS_RUNNING
510 self._op.exec_timestamp = TimeStampNow()
512 # And finally replicate the job status
513 self._queue.UpdateJobUnlocked(self._job)
515 @locking.ssynchronized(_QUEUE, shared=1)
516 def _AppendFeedback(self, timestamp, log_type, log_msg):
517 """Internal feedback append function, with locks
520 self._job.log_serial += 1
521 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
522 self._queue.UpdateJobUnlocked(self._job, replicate=False)
524 def Feedback(self, *args):
525 """Append a log entry.
531 log_type = constants.ELOG_MESSAGE
534 (log_type, log_msg) = args
536 # The time is split to make serialization easier and not lose
538 timestamp = utils.SplitTime(time.time())
539 self._AppendFeedback(timestamp, log_type, log_msg)
541 def CheckCancel(self):
542 """Check whether job has been cancelled.
545 assert self._op.status in (constants.OP_STATUS_WAITING,
546 constants.OP_STATUS_CANCELING)
548 # Cancel here if we were asked to
551 def SubmitManyJobs(self, jobs):
552 """Submits jobs for processing.
554 See L{JobQueue.SubmitManyJobs}.
557 # Locking is done in job queue
558 return self._queue.SubmitManyJobs(jobs)
561 class _JobChangesChecker(object):
562 def __init__(self, fields, prev_job_info, prev_log_serial):
563 """Initializes this class.
565 @type fields: list of strings
566 @param fields: Fields requested by LUXI client
567 @type prev_job_info: string
568 @param prev_job_info: previous job info, as passed by the LUXI client
569 @type prev_log_serial: string
570 @param prev_log_serial: previous job serial, as passed by the LUXI client
573 self._squery = _SimpleJobQuery(fields)
574 self._prev_job_info = prev_job_info
575 self._prev_log_serial = prev_log_serial
577 def __call__(self, job):
578 """Checks whether job has changed.
580 @type job: L{_QueuedJob}
581 @param job: Job object
584 assert not job.writable, "Expected read-only job"
586 status = job.CalcStatus()
587 job_info = self._squery(job)
588 log_entries = job.GetLogEntries(self._prev_log_serial)
590 # Serializing and deserializing data can cause type changes (e.g. from
591 # tuple to list) or precision loss. We're doing it here so that we get
592 # the same modifications as the data received from the client. Without
593 # this, the comparison afterwards might fail without the data being
594 # significantly different.
595 # TODO: we just deserialized from disk, investigate how to make sure that
596 # the job info and log entries are compatible to avoid this further step.
597 # TODO: Doing something like in testutils.py:UnifyValueType might be more
598 # efficient, though floats will be tricky
599 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
600 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
602 # Don't even try to wait if the job is no longer running, there will be
604 if (status not in (constants.JOB_STATUS_QUEUED,
605 constants.JOB_STATUS_RUNNING,
606 constants.JOB_STATUS_WAITING) or
607 job_info != self._prev_job_info or
608 (log_entries and self._prev_log_serial != log_entries[0][0])):
609 logging.debug("Job %s changed", job.id)
610 return (job_info, log_entries)
615 class _JobFileChangesWaiter(object):
616 def __init__(self, filename):
617 """Initializes this class.
619 @type filename: string
620 @param filename: Path to job file
621 @raises errors.InotifyError: if the notifier cannot be setup
624 self._wm = pyinotify.WatchManager()
625 self._inotify_handler = \
626 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
628 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
630 self._inotify_handler.enable()
632 # pyinotify doesn't close file descriptors automatically
633 self._notifier.stop()
636 def _OnInotify(self, notifier_enabled):
637 """Callback for inotify.
640 if not notifier_enabled:
641 self._inotify_handler.enable()
643 def Wait(self, timeout):
644 """Waits for the job file to change.
647 @param timeout: Timeout in seconds
648 @return: Whether there have been events
652 have_events = self._notifier.check_events(timeout * 1000)
654 self._notifier.read_events()
655 self._notifier.process_events()
659 """Closes underlying notifier and its file descriptor.
662 self._notifier.stop()
665 class _JobChangesWaiter(object):
666 def __init__(self, filename):
667 """Initializes this class.
669 @type filename: string
670 @param filename: Path to job file
673 self._filewaiter = None
674 self._filename = filename
676 def Wait(self, timeout):
677 """Waits for a job to change.
680 @param timeout: Timeout in seconds
681 @return: Whether there have been events
685 return self._filewaiter.Wait(timeout)
687 # Lazy setup: Avoid inotify setup cost when job file has already changed.
688 # If this point is reached, return immediately and let caller check the job
689 # file again in case there were changes since the last check. This avoids a
691 self._filewaiter = _JobFileChangesWaiter(self._filename)
696 """Closes underlying waiter.
700 self._filewaiter.Close()
703 class _WaitForJobChangesHelper(object):
704 """Helper class using inotify to wait for changes in a job file.
706 This class takes a previous job status and serial, and alerts the client when
707 the current job status has changed.
711 def _CheckForChanges(counter, job_load_fn, check_fn):
712 if counter.next() > 0:
713 # If this isn't the first check the job is given some more time to change
714 # again. This gives better performance for jobs generating many
720 raise errors.JobLost()
722 result = check_fn(job)
724 raise utils.RetryAgain()
728 def __call__(self, filename, job_load_fn,
729 fields, prev_job_info, prev_log_serial, timeout):
730 """Waits for changes on a job.
732 @type filename: string
733 @param filename: File on which to wait for changes
734 @type job_load_fn: callable
735 @param job_load_fn: Function to load job
736 @type fields: list of strings
737 @param fields: Which fields to check for changes
738 @type prev_job_info: list or None
739 @param prev_job_info: Last job information returned
740 @type prev_log_serial: int
741 @param prev_log_serial: Last job message serial number
743 @param timeout: maximum time to wait in seconds
746 counter = itertools.count()
748 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
749 waiter = _JobChangesWaiter(filename)
751 return utils.Retry(compat.partial(self._CheckForChanges,
752 counter, job_load_fn, check_fn),
753 utils.RETRY_REMAINING_TIME, timeout,
757 except (errors.InotifyError, errors.JobLost):
759 except utils.RetryTimeout:
760 return constants.JOB_NOTCHANGED
763 def _EncodeOpError(err):
764 """Encodes an error which occurred while processing an opcode.
767 if isinstance(err, errors.GenericError):
770 to_encode = errors.OpExecError(str(err))
772 return errors.EncodeException(to_encode)
775 class _TimeoutStrategyWrapper:
776 def __init__(self, fn):
777 """Initializes this class.
784 """Gets the next timeout if necessary.
787 if self._next is None:
788 self._next = self._fn()
791 """Returns the next timeout.
798 """Returns the current timeout and advances the internal state.
807 class _OpExecContext:
808 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
809 """Initializes this class.
814 self.log_prefix = log_prefix
815 self.summary = op.input.Summary()
817 # Create local copy to modify
818 if getattr(op.input, opcodes.DEPEND_ATTR, None):
819 self.jobdeps = op.input.depends[:]
823 self._timeout_strategy_factory = timeout_strategy_factory
824 self._ResetTimeoutStrategy()
826 def _ResetTimeoutStrategy(self):
827 """Creates a new timeout strategy.
830 self._timeout_strategy = \
831 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
833 def CheckPriorityIncrease(self):
834 """Checks whether priority can and should be increased.
836 Called when locks couldn't be acquired.
841 # Exhausted all retries and next round should not use blocking acquire
843 if (self._timeout_strategy.Peek() is None and
844 op.priority > constants.OP_PRIO_HIGHEST):
845 logging.debug("Increasing priority")
847 self._ResetTimeoutStrategy()
852 def GetNextLockTimeout(self):
853 """Returns the next lock acquire timeout.
856 return self._timeout_strategy.Next()
859 class _JobProcessor(object):
862 FINISHED) = range(1, 4)
864 def __init__(self, queue, opexec_fn, job,
865 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
866 """Initializes this class.
870 self.opexec_fn = opexec_fn
872 self._timeout_strategy_factory = _timeout_strategy_factory
875 def _FindNextOpcode(job, timeout_strategy_factory):
876 """Locates the next opcode to run.
878 @type job: L{_QueuedJob}
879 @param job: Job object
880 @param timeout_strategy_factory: Callable to create new timeout strategy
883 # Create some sort of a cache to speed up locating next opcode for future
885 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
886 # pending and one for processed ops.
887 if job.ops_iter is None:
888 job.ops_iter = enumerate(job.ops)
890 # Find next opcode to run
893 (idx, op) = job.ops_iter.next()
894 except StopIteration:
895 raise errors.ProgrammerError("Called for a finished job")
897 if op.status == constants.OP_STATUS_RUNNING:
898 # Found an opcode already marked as running
899 raise errors.ProgrammerError("Called for job marked as running")
901 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
902 timeout_strategy_factory)
904 if op.status not in constants.OPS_FINALIZED:
907 # This is a job that was partially completed before master daemon
908 # shutdown, so it can be expected that some opcodes are already
909 # completed successfully (if any did error out, then the whole job
910 # should have been aborted and not resubmitted for processing).
911 logging.info("%s: opcode %s already processed, skipping",
912 opctx.log_prefix, opctx.summary)
915 def _MarkWaitlock(job, op):
916 """Marks an opcode as waiting for locks.
918 The job's start timestamp is also set if necessary.
920 @type job: L{_QueuedJob}
921 @param job: Job object
922 @type op: L{_QueuedOpCode}
923 @param op: Opcode object
927 assert op.status in (constants.OP_STATUS_QUEUED,
928 constants.OP_STATUS_WAITING)
934 if op.status == constants.OP_STATUS_QUEUED:
935 op.status = constants.OP_STATUS_WAITING
938 if op.start_timestamp is None:
939 op.start_timestamp = TimeStampNow()
942 if job.start_timestamp is None:
943 job.start_timestamp = op.start_timestamp
946 assert op.status == constants.OP_STATUS_WAITING
951 def _CheckDependencies(queue, job, opctx):
952 """Checks if an opcode has dependencies and if so, processes them.
954 @type queue: L{JobQueue}
955 @param queue: Queue object
956 @type job: L{_QueuedJob}
957 @param job: Job object
958 @type opctx: L{_OpExecContext}
959 @param opctx: Opcode execution context
961 @return: Whether opcode will be re-scheduled by dependency tracker
969 (dep_job_id, dep_status) = opctx.jobdeps[0]
971 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
973 assert ht.TNonEmptyString(depmsg), "No dependency message"
975 logging.info("%s: %s", opctx.log_prefix, depmsg)
977 if depresult == _JobDependencyManager.CONTINUE:
978 # Remove dependency and continue
981 elif depresult == _JobDependencyManager.WAIT:
982 # Need to wait for notification, dependency tracker will re-add job
987 elif depresult == _JobDependencyManager.CANCEL:
988 # Job was cancelled, cancel this job as well
990 assert op.status == constants.OP_STATUS_CANCELING
993 elif depresult in (_JobDependencyManager.WRONGSTATUS,
994 _JobDependencyManager.ERROR):
995 # Job failed or there was an error, this job must fail
996 op.status = constants.OP_STATUS_ERROR
997 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1001 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1006 def _ExecOpCodeUnlocked(self, opctx):
1007 """Processes one opcode and returns the result.
1012 assert op.status == constants.OP_STATUS_WAITING
1014 timeout = opctx.GetNextLockTimeout()
1017 # Make sure not to hold queue lock while calling ExecOpCode
1018 result = self.opexec_fn(op.input,
1019 _OpExecCallbacks(self.queue, self.job, op),
1020 timeout=timeout, priority=op.priority)
1021 except mcpu.LockAcquireTimeout:
1022 assert timeout is not None, "Received timeout for blocking acquire"
1023 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1025 assert op.status in (constants.OP_STATUS_WAITING,
1026 constants.OP_STATUS_CANCELING)
1028 # Was job cancelled while we were waiting for the lock?
1029 if op.status == constants.OP_STATUS_CANCELING:
1030 return (constants.OP_STATUS_CANCELING, None)
1032 # Stay in waitlock while trying to re-acquire lock
1033 return (constants.OP_STATUS_WAITING, None)
1035 logging.exception("%s: Canceling job", opctx.log_prefix)
1036 assert op.status == constants.OP_STATUS_CANCELING
1037 return (constants.OP_STATUS_CANCELING, None)
1038 except Exception, err: # pylint: disable=W0703
1039 logging.exception("%s: Caught exception in %s",
1040 opctx.log_prefix, opctx.summary)
1041 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1043 logging.debug("%s: %s successful",
1044 opctx.log_prefix, opctx.summary)
1045 return (constants.OP_STATUS_SUCCESS, result)
1047 def __call__(self, _nextop_fn=None):
1048 """Continues execution of a job.
1050 @param _nextop_fn: Callback function for tests
1051 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1052 be deferred and C{WAITDEP} if the dependency manager
1053 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1059 logging.debug("Processing job %s", job.id)
1061 queue.acquire(shared=1)
1063 opcount = len(job.ops)
1065 assert job.writable, "Expected writable job"
1067 # Don't do anything for finalized jobs
1068 if job.CalcStatus() in constants.JOBS_FINALIZED:
1069 return self.FINISHED
1071 # Is a previous opcode still pending?
1073 opctx = job.cur_opctx
1074 job.cur_opctx = None
1076 if __debug__ and _nextop_fn:
1078 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1083 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1084 constants.OP_STATUS_CANCELING)
1085 for i in job.ops[opctx.index + 1:])
1087 assert op.status in (constants.OP_STATUS_QUEUED,
1088 constants.OP_STATUS_WAITING,
1089 constants.OP_STATUS_CANCELING)
1091 assert (op.priority <= constants.OP_PRIO_LOWEST and
1092 op.priority >= constants.OP_PRIO_HIGHEST)
1096 if op.status != constants.OP_STATUS_CANCELING:
1097 assert op.status in (constants.OP_STATUS_QUEUED,
1098 constants.OP_STATUS_WAITING)
1100 # Prepare to start opcode
1101 if self._MarkWaitlock(job, op):
1103 queue.UpdateJobUnlocked(job)
1105 assert op.status == constants.OP_STATUS_WAITING
1106 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1107 assert job.start_timestamp and op.start_timestamp
1108 assert waitjob is None
1110 # Check if waiting for a job is necessary
1111 waitjob = self._CheckDependencies(queue, job, opctx)
1113 assert op.status in (constants.OP_STATUS_WAITING,
1114 constants.OP_STATUS_CANCELING,
1115 constants.OP_STATUS_ERROR)
1117 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1118 constants.OP_STATUS_ERROR)):
1119 logging.info("%s: opcode %s waiting for locks",
1120 opctx.log_prefix, opctx.summary)
1122 assert not opctx.jobdeps, "Not all dependencies were removed"
1126 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1128 queue.acquire(shared=1)
1130 op.status = op_status
1131 op.result = op_result
1135 if op.status == constants.OP_STATUS_WAITING:
1136 # Couldn't get locks in time
1137 assert not op.end_timestamp
1140 op.end_timestamp = TimeStampNow()
1142 if op.status == constants.OP_STATUS_CANCELING:
1143 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1144 for i in job.ops[opctx.index:])
1146 assert op.status in constants.OPS_FINALIZED
1148 if op.status == constants.OP_STATUS_WAITING or waitjob:
1151 if not waitjob and opctx.CheckPriorityIncrease():
1152 # Priority was changed, need to update on-disk file
1153 queue.UpdateJobUnlocked(job)
1155 # Keep around for another round
1156 job.cur_opctx = opctx
1158 assert (op.priority <= constants.OP_PRIO_LOWEST and
1159 op.priority >= constants.OP_PRIO_HIGHEST)
1161 # In no case must the status be finalized here
1162 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1165 # Ensure all opcodes so far have been successful
1166 assert (opctx.index == 0 or
1167 compat.all(i.status == constants.OP_STATUS_SUCCESS
1168 for i in job.ops[:opctx.index]))
1171 job.cur_opctx = None
1173 if op.status == constants.OP_STATUS_SUCCESS:
1176 elif op.status == constants.OP_STATUS_ERROR:
1177 # Ensure failed opcode has an exception as its result
1178 assert errors.GetEncodedError(job.ops[opctx.index].result)
1180 to_encode = errors.OpExecError("Preceding opcode failed")
1181 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1182 _EncodeOpError(to_encode))
1186 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1187 errors.GetEncodedError(i.result)
1188 for i in job.ops[opctx.index:])
1190 elif op.status == constants.OP_STATUS_CANCELING:
1191 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1192 "Job canceled by request")
1196 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1198 if opctx.index == (opcount - 1):
1199 # Finalize on last opcode
1203 # All opcodes have been run, finalize job
1206 # Write to disk. If the job status is final, this is the final write
1207 # allowed. Once the file has been written, it can be archived anytime.
1208 queue.UpdateJobUnlocked(job)
1213 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1214 return self.FINISHED
1216 assert not waitjob or queue.depmgr.JobWaiting(job)
1223 assert job.writable, "Job became read-only while being processed"
1227 def _EvaluateJobProcessorResult(depmgr, job, result):
1228 """Looks at a result from L{_JobProcessor} for a job.
1230 To be used in a L{_JobQueueWorker}.
1233 if result == _JobProcessor.FINISHED:
1234 # Notify waiting jobs
1235 depmgr.NotifyWaiters(job.id)
1237 elif result == _JobProcessor.DEFER:
1239 raise workerpool.DeferTask(priority=job.CalcPriority())
1241 elif result == _JobProcessor.WAITDEP:
1242 # No-op, dependency manager will re-schedule
1246 raise errors.ProgrammerError("Job processor returned unknown status %s" %
1250 class _JobQueueWorker(workerpool.BaseWorker):
1251 """The actual job workers.
1254 def RunTask(self, job): # pylint: disable=W0221
1257 @type job: L{_QueuedJob}
1258 @param job: the job to be processed
1261 assert job.writable, "Expected writable job"
1263 # Ensure only one worker is active on a single job. If a job registers for
1264 # a dependency job, and the other job notifies before the first worker is
1265 # done, the job can end up in the tasklist more than once.
1266 job.processor_lock.acquire()
1268 return self._RunTaskInner(job)
1270 job.processor_lock.release()
1272 def _RunTaskInner(self, job):
1275 Must be called with per-job lock acquired.
1279 assert queue == self.pool.queue
1281 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1284 proc = mcpu.Processor(queue.context, job.id)
1286 # Create wrapper for setting thread name
1287 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1290 _EvaluateJobProcessorResult(queue.depmgr, job,
1291 _JobProcessor(queue, wrap_execop_fn, job)())
1294 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1295 """Updates the worker thread name to include a short summary of the opcode.
1297 @param setname_fn: Callable setting worker thread name
1298 @param execop_fn: Callable for executing opcode (usually
1299 L{mcpu.Processor.ExecOpCode})
1304 return execop_fn(op, *args, **kwargs)
1309 def _GetWorkerName(job, op):
1310 """Sets the worker thread name.
1312 @type job: L{_QueuedJob}
1313 @type op: L{opcodes.OpCode}
1316 parts = ["Job%s" % job.id]
1319 parts.append(op.TinySummary())
1321 return "/".join(parts)
1324 class _JobQueueWorkerPool(workerpool.WorkerPool):
1325 """Simple class implementing a job-processing workerpool.
1328 def __init__(self, queue):
1329 super(_JobQueueWorkerPool, self).__init__("Jq",
1335 class _JobDependencyManager:
1336 """Keeps track of job dependencies.
1343 WRONGSTATUS) = range(1, 6)
1345 def __init__(self, getstatus_fn, enqueue_fn):
1346 """Initializes this class.
1349 self._getstatus_fn = getstatus_fn
1350 self._enqueue_fn = enqueue_fn
1353 self._lock = locking.SharedLock("JobDepMgr")
1355 @locking.ssynchronized(_LOCK, shared=1)
1356 def GetLockInfo(self, requested): # pylint: disable=W0613
1357 """Retrieves information about waiting jobs.
1359 @type requested: set
1360 @param requested: Requested information, see C{query.LQ_*}
1363 # No need to sort here, that's being done by the lock manager and query
1364 # library. There are no priorities for notifying jobs, hence all show up as
1365 # one item under "pending".
1366 return [("job/%s" % job_id, None, None,
1367 [("job", [job.id for job in waiters])])
1368 for job_id, waiters in self._waiters.items()
1371 @locking.ssynchronized(_LOCK, shared=1)
1372 def JobWaiting(self, job):
1373 """Checks if a job is waiting.
1376 return compat.any(job in jobs
1377 for jobs in self._waiters.values())
1379 @locking.ssynchronized(_LOCK)
1380 def CheckAndRegister(self, job, dep_job_id, dep_status):
1381 """Checks if a dependency job has the requested status.
1383 If the other job is not yet in a finalized status, the calling job will be
1384 notified (re-added to the workerpool) at a later point.
1386 @type job: L{_QueuedJob}
1387 @param job: Job object
1388 @type dep_job_id: string
1389 @param dep_job_id: ID of dependency job
1390 @type dep_status: list
1391 @param dep_status: Required status
1394 assert ht.TString(job.id)
1395 assert ht.TString(dep_job_id)
1396 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1398 if job.id == dep_job_id:
1399 return (self.ERROR, "Job can't depend on itself")
1401 # Get status of dependency job
1403 status = self._getstatus_fn(dep_job_id)
1404 except errors.JobLost, err:
1405 return (self.ERROR, "Dependency error: %s" % err)
1407 assert status in constants.JOB_STATUS_ALL
1409 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1411 if status not in constants.JOBS_FINALIZED:
1412 # Register for notification and wait for job to finish
1413 job_id_waiters.add(job)
1415 "Need to wait for job %s, wanted status '%s'" %
1416 (dep_job_id, dep_status))
1418 # Remove from waiters list
1419 if job in job_id_waiters:
1420 job_id_waiters.remove(job)
1422 if (status == constants.JOB_STATUS_CANCELED and
1423 constants.JOB_STATUS_CANCELED not in dep_status):
1424 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1426 elif not dep_status or status in dep_status:
1427 return (self.CONTINUE,
1428 "Dependency job %s finished with status '%s'" %
1429 (dep_job_id, status))
1432 return (self.WRONGSTATUS,
1433 "Dependency job %s finished with status '%s',"
1434 " not one of '%s' as required" %
1435 (dep_job_id, status, utils.CommaJoin(dep_status)))
1437 def _RemoveEmptyWaitersUnlocked(self):
1438 """Remove all jobs without actual waiters.
1441 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1443 del self._waiters[job_id]
1445 def NotifyWaiters(self, job_id):
1446 """Notifies all jobs waiting for a certain job ID.
1448 @attention: Do not call until L{CheckAndRegister} returned a status other
1449 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1450 @type job_id: string
1451 @param job_id: Job ID
1454 assert ht.TString(job_id)
1456 self._lock.acquire()
1458 self._RemoveEmptyWaitersUnlocked()
1460 jobs = self._waiters.pop(job_id, None)
1462 self._lock.release()
1465 # Re-add jobs to workerpool
1466 logging.debug("Re-adding %s jobs which were waiting for job %s",
1468 self._enqueue_fn(jobs)
1471 def _RequireOpenQueue(fn):
1472 """Decorator for "public" functions.
1474 This function should be used for all 'public' functions. That is,
1475 functions usually called from other classes. Note that this should
1476 be applied only to methods (not plain functions), since it expects
1477 that the decorated function is called with a first argument that has
1478 a '_queue_filelock' argument.
1480 @warning: Use this decorator only after locking.ssynchronized
1483 @locking.ssynchronized(_LOCK)
1489 def wrapper(self, *args, **kwargs):
1490 # pylint: disable=W0212
1491 assert self._queue_filelock is not None, "Queue should be open"
1492 return fn(self, *args, **kwargs)
1496 def _RequireNonDrainedQueue(fn):
1497 """Decorator checking for a non-drained queue.
1499 To be used with functions submitting new jobs.
1502 def wrapper(self, *args, **kwargs):
1503 """Wrapper function.
1505 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1508 # Ok when sharing the big job queue lock, as the drain file is created when
1509 # the lock is exclusive.
1510 # Needs access to protected member, pylint: disable=W0212
1512 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1514 if not self._accepting_jobs:
1515 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1517 return fn(self, *args, **kwargs)
1521 class JobQueue(object):
1522 """Queue used to manage the jobs.
1525 def __init__(self, context):
1526 """Constructor for JobQueue.
1528 The constructor will initialize the job queue object and then
1529 start loading the current jobs from disk, either for starting them
1530 (if they were queue) or for aborting them (if they were already
1533 @type context: GanetiContext
1534 @param context: the context object for access to the configuration
1535 data and other ganeti objects
1538 self.context = context
1539 self._memcache = weakref.WeakValueDictionary()
1540 self._my_hostname = netutils.Hostname.GetSysName()
1542 # The Big JobQueue lock. If a code block or method acquires it in shared
1543 # mode safe it must guarantee concurrency with all the code acquiring it in
1544 # shared mode, including itself. In order not to acquire it at all
1545 # concurrency must be guaranteed with all code acquiring it in shared mode
1546 # and all code acquiring it exclusively.
1547 self._lock = locking.SharedLock("JobQueue")
1549 self.acquire = self._lock.acquire
1550 self.release = self._lock.release
1552 # Accept jobs by default
1553 self._accepting_jobs = True
1555 # Initialize the queue, and acquire the filelock.
1556 # This ensures no other process is working on the job queue.
1557 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1560 self._last_serial = jstore.ReadSerial()
1561 assert self._last_serial is not None, ("Serial file was modified between"
1562 " check in jstore and here")
1564 # Get initial list of nodes
1565 self._nodes = dict((n.name, n.primary_ip)
1566 for n in self.context.cfg.GetAllNodesInfo().values()
1567 if n.master_candidate)
1569 # Remove master node
1570 self._nodes.pop(self._my_hostname, None)
1572 # TODO: Check consistency across nodes
1574 self._queue_size = None
1575 self._UpdateQueueSizeUnlocked()
1576 assert ht.TInt(self._queue_size)
1577 self._drained = jstore.CheckDrainFlag()
1580 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1582 self.context.glm.AddToLockMonitor(self.depmgr)
1585 self._wpool = _JobQueueWorkerPool(self)
1587 self._InspectQueue()
1589 self._wpool.TerminateWorkers()
1592 @locking.ssynchronized(_LOCK)
1594 def _InspectQueue(self):
1595 """Loads the whole job queue and resumes unfinished jobs.
1597 This function needs the lock here because WorkerPool.AddTask() may start a
1598 job while we're still doing our work.
1601 logging.info("Inspecting job queue")
1605 all_job_ids = self._GetJobIDsUnlocked()
1606 jobs_count = len(all_job_ids)
1607 lastinfo = time.time()
1608 for idx, job_id in enumerate(all_job_ids):
1609 # Give an update every 1000 jobs or 10 seconds
1610 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1611 idx == (jobs_count - 1)):
1612 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1613 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1614 lastinfo = time.time()
1616 job = self._LoadJobUnlocked(job_id)
1618 # a failure in loading the job can cause 'None' to be returned
1622 status = job.CalcStatus()
1624 if status == constants.JOB_STATUS_QUEUED:
1625 restartjobs.append(job)
1627 elif status in (constants.JOB_STATUS_RUNNING,
1628 constants.JOB_STATUS_WAITING,
1629 constants.JOB_STATUS_CANCELING):
1630 logging.warning("Unfinished job %s found: %s", job.id, job)
1632 if status == constants.JOB_STATUS_WAITING:
1634 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1635 restartjobs.append(job)
1637 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1638 "Unclean master daemon shutdown")
1641 self.UpdateJobUnlocked(job)
1644 logging.info("Restarting %s jobs", len(restartjobs))
1645 self._EnqueueJobsUnlocked(restartjobs)
1647 logging.info("Job queue inspection finished")
1649 def _GetRpc(self, address_list):
1650 """Gets RPC runner with context.
1653 return rpc.JobQueueRunner(self.context, address_list)
1655 @locking.ssynchronized(_LOCK)
1657 def AddNode(self, node):
1658 """Register a new node with the queue.
1660 @type node: L{objects.Node}
1661 @param node: the node object to be added
1664 node_name = node.name
1665 assert node_name != self._my_hostname
1667 # Clean queue directory on added node
1668 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1669 msg = result.fail_msg
1671 logging.warning("Cannot cleanup queue directory on node %s: %s",
1674 if not node.master_candidate:
1675 # remove if existing, ignoring errors
1676 self._nodes.pop(node_name, None)
1677 # and skip the replication of the job ids
1680 # Upload the whole queue excluding archived jobs
1681 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1683 # Upload current serial file
1684 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1686 # Static address list
1687 addrs = [node.primary_ip]
1689 for file_name in files:
1691 content = utils.ReadFile(file_name)
1693 result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1695 msg = result[node_name].fail_msg
1697 logging.error("Failed to upload file %s to node %s: %s",
1698 file_name, node_name, msg)
1700 self._nodes[node_name] = node.primary_ip
1702 @locking.ssynchronized(_LOCK)
1704 def RemoveNode(self, node_name):
1705 """Callback called when removing nodes from the cluster.
1707 @type node_name: str
1708 @param node_name: the name of the node to remove
1711 self._nodes.pop(node_name, None)
1714 def _CheckRpcResult(result, nodes, failmsg):
1715 """Verifies the status of an RPC call.
1717 Since we aim to keep consistency should this node (the current
1718 master) fail, we will log errors if our rpc fail, and especially
1719 log the case when more than half of the nodes fails.
1721 @param result: the data as returned from the rpc call
1723 @param nodes: the list of nodes we made the call to
1725 @param failmsg: the identifier to be used for logging
1732 msg = result[node].fail_msg
1735 logging.error("RPC call %s (%s) failed on node %s: %s",
1736 result[node].call, failmsg, node, msg)
1738 success.append(node)
1740 # +1 for the master node
1741 if (len(success) + 1) < len(failed):
1742 # TODO: Handle failing nodes
1743 logging.error("More than half of the nodes failed")
1745 def _GetNodeIp(self):
1746 """Helper for returning the node name/ip list.
1748 @rtype: (list, list)
1749 @return: a tuple of two lists, the first one with the node
1750 names and the second one with the node addresses
1753 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1754 name_list = self._nodes.keys()
1755 addr_list = [self._nodes[name] for name in name_list]
1756 return name_list, addr_list
1758 def _UpdateJobQueueFile(self, file_name, data, replicate):
1759 """Writes a file locally and then replicates it to all nodes.
1761 This function will replace the contents of a file on the local
1762 node and then replicate it to all the other nodes we have.
1764 @type file_name: str
1765 @param file_name: the path of the file to be replicated
1767 @param data: the new contents of the file
1768 @type replicate: boolean
1769 @param replicate: whether to spread the changes to the remote nodes
1772 getents = runtime.GetEnts()
1773 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1774 gid=getents.masterd_gid)
1777 names, addrs = self._GetNodeIp()
1778 result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1779 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1781 def _RenameFilesUnlocked(self, rename):
1782 """Renames a file locally and then replicate the change.
1784 This function will rename a file in the local queue directory
1785 and then replicate this rename to all the other nodes we have.
1787 @type rename: list of (old, new)
1788 @param rename: List containing tuples mapping old to new names
1791 # Rename them locally
1792 for old, new in rename:
1793 utils.RenameFile(old, new, mkdir=True)
1795 # ... and on all nodes
1796 names, addrs = self._GetNodeIp()
1797 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1798 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1801 def _FormatJobID(job_id):
1802 """Convert a job ID to string format.
1804 Currently this just does C{str(job_id)} after performing some
1805 checks, but if we want to change the job id format this will
1806 abstract this change.
1808 @type job_id: int or long
1809 @param job_id: the numeric job id
1811 @return: the formatted job id
1814 if not isinstance(job_id, (int, long)):
1815 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1817 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1822 def _GetArchiveDirectory(cls, job_id):
1823 """Returns the archive directory for a job.
1826 @param job_id: Job identifier
1828 @return: Directory name
1831 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1833 def _NewSerialsUnlocked(self, count):
1834 """Generates a new job identifier.
1836 Job identifiers are unique during the lifetime of a cluster.
1838 @type count: integer
1839 @param count: how many serials to return
1841 @return: a string representing the job identifier.
1844 assert ht.TPositiveInt(count)
1847 serial = self._last_serial + count
1850 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1851 "%s\n" % serial, True)
1853 result = [self._FormatJobID(v)
1854 for v in range(self._last_serial + 1, serial + 1)]
1856 # Keep it only if we were able to write the file
1857 self._last_serial = serial
1859 assert len(result) == count
1864 def _GetJobPath(job_id):
1865 """Returns the job file for a given job id.
1868 @param job_id: the job identifier
1870 @return: the path to the job file
1873 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1876 def _GetArchivedJobPath(cls, job_id):
1877 """Returns the archived job file for a give job id.
1880 @param job_id: the job identifier
1882 @return: the path to the archived job file
1885 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1886 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1889 def _GetJobIDsUnlocked(sort=True):
1890 """Return all known job IDs.
1892 The method only looks at disk because it's a requirement that all
1893 jobs are present on disk (so in the _memcache we don't have any
1897 @param sort: perform sorting on the returned job ids
1899 @return: the list of job IDs
1903 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1904 m = constants.JOB_FILE_RE.match(filename)
1906 jlist.append(m.group(1))
1908 jlist = utils.NiceSort(jlist)
1911 def _LoadJobUnlocked(self, job_id):
1912 """Loads a job from the disk or memory.
1914 Given a job id, this will return the cached job object if
1915 existing, or try to load the job from the disk. If loading from
1916 disk, it will also add the job to the cache.
1918 @param job_id: the job id
1919 @rtype: L{_QueuedJob} or None
1920 @return: either None or the job object
1923 job = self._memcache.get(job_id, None)
1925 logging.debug("Found job %s in memcache", job_id)
1926 assert job.writable, "Found read-only job in memcache"
1930 job = self._LoadJobFromDisk(job_id, False)
1933 except errors.JobFileCorrupted:
1934 old_path = self._GetJobPath(job_id)
1935 new_path = self._GetArchivedJobPath(job_id)
1936 if old_path == new_path:
1937 # job already archived (future case)
1938 logging.exception("Can't parse job %s", job_id)
1941 logging.exception("Can't parse job %s, will archive.", job_id)
1942 self._RenameFilesUnlocked([(old_path, new_path)])
1945 assert job.writable, "Job just loaded is not writable"
1947 self._memcache[job_id] = job
1948 logging.debug("Added job %s to the cache", job_id)
1951 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1952 """Load the given job file from disk.
1954 Given a job file, read, load and restore it in a _QueuedJob format.
1956 @type job_id: string
1957 @param job_id: job identifier
1958 @type try_archived: bool
1959 @param try_archived: Whether to try loading an archived job
1960 @rtype: L{_QueuedJob} or None
1961 @return: either None or the job object
1964 path_functions = [(self._GetJobPath, True)]
1967 path_functions.append((self._GetArchivedJobPath, False))
1970 writable_default = None
1972 for (fn, writable_default) in path_functions:
1973 filepath = fn(job_id)
1974 logging.debug("Loading job from %s", filepath)
1976 raw_data = utils.ReadFile(filepath)
1977 except EnvironmentError, err:
1978 if err.errno != errno.ENOENT:
1986 if writable is None:
1987 writable = writable_default
1990 data = serializer.LoadJson(raw_data)
1991 job = _QueuedJob.Restore(self, data, writable)
1992 except Exception, err: # pylint: disable=W0703
1993 raise errors.JobFileCorrupted(err)
1997 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1998 """Load the given job file from disk.
2000 Given a job file, read, load and restore it in a _QueuedJob format.
2001 In case of error reading the job, it gets returned as None, and the
2002 exception is logged.
2004 @type job_id: string
2005 @param job_id: job identifier
2006 @type try_archived: bool
2007 @param try_archived: Whether to try loading an archived job
2008 @rtype: L{_QueuedJob} or None
2009 @return: either None or the job object
2013 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2014 except (errors.JobFileCorrupted, EnvironmentError):
2015 logging.exception("Can't load/parse job %s", job_id)
2018 def _UpdateQueueSizeUnlocked(self):
2019 """Update the queue size.
2022 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2024 @locking.ssynchronized(_LOCK)
2026 def SetDrainFlag(self, drain_flag):
2027 """Sets the drain flag for the queue.
2029 @type drain_flag: boolean
2030 @param drain_flag: Whether to set or unset the drain flag
2033 jstore.SetDrainFlag(drain_flag)
2035 self._drained = drain_flag
2040 def _SubmitJobUnlocked(self, job_id, ops):
2041 """Create and store a new job.
2043 This enters the job into our job queue and also puts it on the new
2044 queue, in order for it to be picked up by the queue processors.
2046 @type job_id: job ID
2047 @param job_id: the job ID for the new job
2049 @param ops: The list of OpCodes that will become the new job.
2050 @rtype: L{_QueuedJob}
2051 @return: the job object to be queued
2052 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2053 @raise errors.GenericError: If an opcode is not valid
2056 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2057 raise errors.JobQueueFull()
2059 job = _QueuedJob(self, job_id, ops, True)
2062 for idx, op in enumerate(job.ops):
2063 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2064 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2065 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2066 " are %s" % (idx, op.priority, allowed))
2068 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2069 if not opcodes.TNoRelativeJobDependencies(dependencies):
2070 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2072 (idx, opcodes.TNoRelativeJobDependencies,
2076 self.UpdateJobUnlocked(job)
2078 self._queue_size += 1
2080 logging.debug("Adding new job %s to the cache", job_id)
2081 self._memcache[job_id] = job
2085 @locking.ssynchronized(_LOCK)
2087 @_RequireNonDrainedQueue
2088 def SubmitJob(self, ops):
2089 """Create and store a new job.
2091 @see: L{_SubmitJobUnlocked}
2094 (job_id, ) = self._NewSerialsUnlocked(1)
2095 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2098 @locking.ssynchronized(_LOCK)
2100 @_RequireNonDrainedQueue
2101 def SubmitManyJobs(self, jobs):
2102 """Create and store multiple jobs.
2104 @see: L{_SubmitJobUnlocked}
2107 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2109 (results, added_jobs) = \
2110 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2112 self._EnqueueJobsUnlocked(added_jobs)
2117 def _FormatSubmitError(msg, ops):
2118 """Formats errors which occurred while submitting a job.
2121 return ("%s; opcodes %s" %
2122 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2125 def _ResolveJobDependencies(resolve_fn, deps):
2126 """Resolves relative job IDs in dependencies.
2128 @type resolve_fn: callable
2129 @param resolve_fn: Function to resolve a relative job ID
2131 @param deps: Dependencies
2133 @return: Resolved dependencies
2138 for (dep_job_id, dep_status) in deps:
2139 if ht.TRelativeJobId(dep_job_id):
2140 assert ht.TInt(dep_job_id) and dep_job_id < 0
2142 job_id = resolve_fn(dep_job_id)
2145 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2149 result.append((job_id, dep_status))
2151 return (True, result)
2153 def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2154 """Create and store multiple jobs.
2156 @see: L{_SubmitJobUnlocked}
2162 def resolve_fn(job_idx, reljobid):
2164 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2166 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2168 if getattr(op, opcodes.DEPEND_ATTR, None):
2170 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2173 # Abort resolving dependencies
2174 assert ht.TNonEmptyString(data), "No error message"
2176 # Use resolved dependencies
2180 job = self._SubmitJobUnlocked(job_id, ops)
2181 except errors.GenericError, err:
2183 data = self._FormatSubmitError(str(err), ops)
2187 added_jobs.append(job)
2189 results.append((status, data))
2191 return (results, added_jobs)
2193 @locking.ssynchronized(_LOCK)
2194 def _EnqueueJobs(self, jobs):
2195 """Helper function to add jobs to worker pool's queue.
2198 @param jobs: List of all jobs
2201 return self._EnqueueJobsUnlocked(jobs)
2203 def _EnqueueJobsUnlocked(self, jobs):
2204 """Helper function to add jobs to worker pool's queue.
2207 @param jobs: List of all jobs
2210 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2211 self._wpool.AddManyTasks([(job, ) for job in jobs],
2212 priority=[job.CalcPriority() for job in jobs])
2214 def _GetJobStatusForDependencies(self, job_id):
2215 """Gets the status of a job for dependencies.
2217 @type job_id: string
2218 @param job_id: Job ID
2219 @raise errors.JobLost: If job can't be found
2222 if not isinstance(job_id, basestring):
2223 job_id = self._FormatJobID(job_id)
2225 # Not using in-memory cache as doing so would require an exclusive lock
2227 # Try to load from disk
2228 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2230 assert not job.writable, "Got writable job" # pylint: disable=E1101
2233 return job.CalcStatus()
2235 raise errors.JobLost("Job %s not found" % job_id)
2238 def UpdateJobUnlocked(self, job, replicate=True):
2239 """Update a job's on disk storage.
2241 After a job has been modified, this function needs to be called in
2242 order to write the changes to disk and replicate them to the other
2245 @type job: L{_QueuedJob}
2246 @param job: the changed job
2247 @type replicate: boolean
2248 @param replicate: whether to replicate the change to remote nodes
2252 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2253 assert (finalized ^ (job.end_timestamp is None))
2254 assert job.writable, "Can't update read-only job"
2256 filename = self._GetJobPath(job.id)
2257 data = serializer.DumpJson(job.Serialize())
2258 logging.debug("Writing job %s to %s", job.id, filename)
2259 self._UpdateJobQueueFile(filename, data, replicate)
2261 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2263 """Waits for changes in a job.
2265 @type job_id: string
2266 @param job_id: Job identifier
2267 @type fields: list of strings
2268 @param fields: Which fields to check for changes
2269 @type prev_job_info: list or None
2270 @param prev_job_info: Last job information returned
2271 @type prev_log_serial: int
2272 @param prev_log_serial: Last job message serial number
2273 @type timeout: float
2274 @param timeout: maximum time to wait in seconds
2275 @rtype: tuple (job info, log entries)
2276 @return: a tuple of the job information as required via
2277 the fields parameter, and the log entries as a list
2279 if the job has not changed and the timeout has expired,
2280 we instead return a special value,
2281 L{constants.JOB_NOTCHANGED}, which should be interpreted
2282 as such by the clients
2285 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2288 helper = _WaitForJobChangesHelper()
2290 return helper(self._GetJobPath(job_id), load_fn,
2291 fields, prev_job_info, prev_log_serial, timeout)
2293 @locking.ssynchronized(_LOCK)
2295 def CancelJob(self, job_id):
2298 This will only succeed if the job has not started yet.
2300 @type job_id: string
2301 @param job_id: job ID of job to be cancelled.
2304 logging.info("Cancelling job %s", job_id)
2306 job = self._LoadJobUnlocked(job_id)
2308 logging.debug("Job %s not found", job_id)
2309 return (False, "Job %s not found" % job_id)
2311 assert job.writable, "Can't cancel read-only job"
2313 (success, msg) = job.Cancel()
2316 # If the job was finalized (e.g. cancelled), this is the final write
2317 # allowed. The job can be archived anytime.
2318 self.UpdateJobUnlocked(job)
2320 return (success, msg)
2323 def _ArchiveJobsUnlocked(self, jobs):
2326 @type jobs: list of L{_QueuedJob}
2327 @param jobs: Job objects
2329 @return: Number of archived jobs
2335 assert job.writable, "Can't archive read-only job"
2337 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2338 logging.debug("Job %s is not yet done", job.id)
2341 archive_jobs.append(job)
2343 old = self._GetJobPath(job.id)
2344 new = self._GetArchivedJobPath(job.id)
2345 rename_files.append((old, new))
2347 # TODO: What if 1..n files fail to rename?
2348 self._RenameFilesUnlocked(rename_files)
2350 logging.debug("Successfully archived job(s) %s",
2351 utils.CommaJoin(job.id for job in archive_jobs))
2353 # Since we haven't quite checked, above, if we succeeded or failed renaming
2354 # the files, we update the cached queue size from the filesystem. When we
2355 # get around to fix the TODO: above, we can use the number of actually
2356 # archived jobs to fix this.
2357 self._UpdateQueueSizeUnlocked()
2358 return len(archive_jobs)
2360 @locking.ssynchronized(_LOCK)
2362 def ArchiveJob(self, job_id):
2365 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2367 @type job_id: string
2368 @param job_id: Job ID of job to be archived.
2370 @return: Whether job was archived
2373 logging.info("Archiving job %s", job_id)
2375 job = self._LoadJobUnlocked(job_id)
2377 logging.debug("Job %s not found", job_id)
2380 return self._ArchiveJobsUnlocked([job]) == 1
2382 @locking.ssynchronized(_LOCK)
2384 def AutoArchiveJobs(self, age, timeout):
2385 """Archives all jobs based on age.
2387 The method will archive all jobs which are older than the age
2388 parameter. For jobs that don't have an end timestamp, the start
2389 timestamp will be considered. The special '-1' age will cause
2390 archival of all jobs (that are not running or queued).
2393 @param age: the minimum age in seconds
2396 logging.info("Archiving jobs with age more than %s seconds", age)
2399 end_time = now + timeout
2403 all_job_ids = self._GetJobIDsUnlocked()
2405 for idx, job_id in enumerate(all_job_ids):
2406 last_touched = idx + 1
2408 # Not optimal because jobs could be pending
2409 # TODO: Measure average duration for job archival and take number of
2410 # pending jobs into account.
2411 if time.time() > end_time:
2414 # Returns None if the job failed to load
2415 job = self._LoadJobUnlocked(job_id)
2417 if job.end_timestamp is None:
2418 if job.start_timestamp is None:
2419 job_age = job.received_timestamp
2421 job_age = job.start_timestamp
2423 job_age = job.end_timestamp
2425 if age == -1 or now - job_age[0] > age:
2428 # Archive 10 jobs at a time
2429 if len(pending) >= 10:
2430 archived_count += self._ArchiveJobsUnlocked(pending)
2434 archived_count += self._ArchiveJobsUnlocked(pending)
2436 return (archived_count, len(all_job_ids) - last_touched)
2438 def _Query(self, fields, qfilter):
2439 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2442 job_ids = qobj.RequestedNames()
2444 list_all = (job_ids is None)
2447 # Since files are added to/removed from the queue atomically, there's no
2448 # risk of getting the job ids in an inconsistent state.
2449 job_ids = self._GetJobIDsUnlocked()
2453 for job_id in job_ids:
2454 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2455 if job is not None or not list_all:
2456 jobs.append((job_id, job))
2458 return (qobj, jobs, list_all)
2460 def QueryJobs(self, fields, qfilter):
2461 """Returns a list of jobs in queue.
2463 @type fields: sequence
2464 @param fields: List of wanted fields
2465 @type qfilter: None or query2 filter (list)
2466 @param qfilter: Query filter
2469 (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2471 return query.GetQueryResponse(qobj, ctx, sort_by_name=sort_by_name)
2473 def OldStyleQueryJobs(self, job_ids, fields):
2474 """Returns a list of jobs in queue.
2477 @param job_ids: sequence of job identifiers or None for all
2479 @param fields: names of fields to return
2481 @return: list one element per job, each element being list with
2482 the requested fields
2485 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2487 (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2489 return qobj.OldStyleQuery(ctx, sort_by_name=sort_by_name)
2491 @locking.ssynchronized(_LOCK)
2492 def PrepareShutdown(self):
2493 """Prepare to stop the job queue.
2495 Disables execution of jobs in the workerpool and returns whether there are
2496 any jobs currently running. If the latter is the case, the job queue is not
2497 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2498 be called without interfering with any job. Queued and unfinished jobs will
2499 be resumed next time.
2501 Once this function has been called no new job submissions will be accepted
2502 (see L{_RequireNonDrainedQueue}).
2505 @return: Whether there are any running jobs
2508 if self._accepting_jobs:
2509 self._accepting_jobs = False
2511 # Tell worker pool to stop processing pending tasks
2512 self._wpool.SetActive(False)
2514 return self._wpool.HasRunningTasks()
2516 @locking.ssynchronized(_LOCK)
2519 """Stops the job queue.
2521 This shutdowns all the worker threads an closes the queue.
2524 self._wpool.TerminateWorkers()
2526 self._queue_filelock.Close()
2527 self._queue_filelock = None