4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
60 from ganeti import query
61 from ganeti import qlang
62 from ganeti import pathutils
63 from ganeti import vcluster
68 # member lock names to be passed to @ssynchronized decorator
73 class CancelJob(Exception):
74 """Special exception to cancel a job.
80 """Returns the current timestamp.
83 @return: the current time in the (seconds, microseconds) format
86 return utils.SplitTime(time.time())
89 def _CallJqUpdate(runner, names, file_name, content):
90 """Updates job queue file after virtualizing filename.
93 virt_file_name = vcluster.MakeVirtualPath(file_name)
94 return runner.call_jobqueue_update(names, virt_file_name, content)
97 class _SimpleJobQuery:
98 """Wrapper for job queries.
100 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
103 def __init__(self, fields):
104 """Initializes this class.
107 self._query = query.Query(query.JOB_FIELDS, fields)
109 def __call__(self, job):
110 """Executes a job query using cached field list.
113 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
116 class _QueuedOpCode(object):
117 """Encapsulates an opcode object.
119 @ivar log: holds the execution log and consists of tuples
120 of the form C{(log_serial, timestamp, level, message)}
121 @ivar input: the OpCode we encapsulate
122 @ivar status: the current status
123 @ivar result: the result of the LU execution
124 @ivar start_timestamp: timestamp for the start of the execution
125 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
126 @ivar stop_timestamp: timestamp for the end of the execution
129 __slots__ = ["input", "status", "result", "log", "priority",
130 "start_timestamp", "exec_timestamp", "end_timestamp",
133 def __init__(self, op):
134 """Initializes instances of this class.
136 @type op: L{opcodes.OpCode}
137 @param op: the opcode we encapsulate
141 self.status = constants.OP_STATUS_QUEUED
144 self.start_timestamp = None
145 self.exec_timestamp = None
146 self.end_timestamp = None
148 # Get initial priority (it might change during the lifetime of this opcode)
149 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
152 def Restore(cls, state):
153 """Restore the _QueuedOpCode from the serialized form.
156 @param state: the serialized state
157 @rtype: _QueuedOpCode
158 @return: a new _QueuedOpCode instance
161 obj = _QueuedOpCode.__new__(cls)
162 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
163 obj.status = state["status"]
164 obj.result = state["result"]
165 obj.log = state["log"]
166 obj.start_timestamp = state.get("start_timestamp", None)
167 obj.exec_timestamp = state.get("exec_timestamp", None)
168 obj.end_timestamp = state.get("end_timestamp", None)
169 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
173 """Serializes this _QueuedOpCode.
176 @return: the dictionary holding the serialized state
180 "input": self.input.__getstate__(),
181 "status": self.status,
182 "result": self.result,
184 "start_timestamp": self.start_timestamp,
185 "exec_timestamp": self.exec_timestamp,
186 "end_timestamp": self.end_timestamp,
187 "priority": self.priority,
191 class _QueuedJob(object):
192 """In-memory job representation.
194 This is what we use to track the user-submitted jobs. Locking must
195 be taken care of by users of this class.
197 @type queue: L{JobQueue}
198 @ivar queue: the parent queue
201 @ivar ops: the list of _QueuedOpCode that constitute the job
202 @type log_serial: int
203 @ivar log_serial: holds the index for the next log entry
204 @ivar received_timestamp: the timestamp for when the job was received
205 @ivar start_timestmap: the timestamp for start of execution
206 @ivar end_timestamp: the timestamp for end of execution
207 @ivar writable: Whether the job is allowed to be modified
210 # pylint: disable=W0212
211 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
212 "received_timestamp", "start_timestamp", "end_timestamp",
213 "__weakref__", "processor_lock", "writable"]
215 def __init__(self, queue, job_id, ops, writable):
216 """Constructor for the _QueuedJob.
218 @type queue: L{JobQueue}
219 @param queue: our parent queue
221 @param job_id: our job id
223 @param ops: the list of opcodes we hold, which will be encapsulated
226 @param writable: Whether job can be modified
230 raise errors.GenericError("A job needs at least one opcode")
233 self.id = int(job_id)
234 self.ops = [_QueuedOpCode(op) for op in ops]
236 self.received_timestamp = TimeStampNow()
237 self.start_timestamp = None
238 self.end_timestamp = None
240 self._InitInMemory(self, writable)
243 def _InitInMemory(obj, writable):
244 """Initializes in-memory variables.
247 obj.writable = writable
251 # Read-only jobs are not processed and therefore don't need a lock
253 obj.processor_lock = threading.Lock()
255 obj.processor_lock = None
258 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
260 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
262 return "<%s at %#x>" % (" ".join(status), id(self))
265 def Restore(cls, queue, state, writable):
266 """Restore a _QueuedJob from serialized state:
268 @type queue: L{JobQueue}
269 @param queue: to which queue the restored job belongs
271 @param state: the serialized state
273 @param writable: Whether job can be modified
275 @return: the restored _JobQueue instance
278 obj = _QueuedJob.__new__(cls)
280 obj.id = int(state["id"])
281 obj.received_timestamp = state.get("received_timestamp", None)
282 obj.start_timestamp = state.get("start_timestamp", None)
283 obj.end_timestamp = state.get("end_timestamp", None)
287 for op_state in state["ops"]:
288 op = _QueuedOpCode.Restore(op_state)
289 for log_entry in op.log:
290 obj.log_serial = max(obj.log_serial, log_entry[0])
293 cls._InitInMemory(obj, writable)
298 """Serialize the _JobQueue instance.
301 @return: the serialized state
306 "ops": [op.Serialize() for op in self.ops],
307 "start_timestamp": self.start_timestamp,
308 "end_timestamp": self.end_timestamp,
309 "received_timestamp": self.received_timestamp,
312 def CalcStatus(self):
313 """Compute the status of this job.
315 This function iterates over all the _QueuedOpCodes in the job and
316 based on their status, computes the job status.
319 - if we find a cancelled, or finished with error, the job
320 status will be the same
321 - otherwise, the last opcode with the status one of:
326 will determine the job status
328 - otherwise, it means either all opcodes are queued, or success,
329 and the job status will be the same
331 @return: the job status
334 status = constants.JOB_STATUS_QUEUED
338 if op.status == constants.OP_STATUS_SUCCESS:
343 if op.status == constants.OP_STATUS_QUEUED:
345 elif op.status == constants.OP_STATUS_WAITING:
346 status = constants.JOB_STATUS_WAITING
347 elif op.status == constants.OP_STATUS_RUNNING:
348 status = constants.JOB_STATUS_RUNNING
349 elif op.status == constants.OP_STATUS_CANCELING:
350 status = constants.JOB_STATUS_CANCELING
352 elif op.status == constants.OP_STATUS_ERROR:
353 status = constants.JOB_STATUS_ERROR
354 # The whole job fails if one opcode failed
356 elif op.status == constants.OP_STATUS_CANCELED:
357 status = constants.OP_STATUS_CANCELED
361 status = constants.JOB_STATUS_SUCCESS
365 def CalcPriority(self):
366 """Gets the current priority for this job.
368 Only unfinished opcodes are considered. When all are done, the default
374 priorities = [op.priority for op in self.ops
375 if op.status not in constants.OPS_FINALIZED]
378 # All opcodes are done, assume default priority
379 return constants.OP_PRIO_DEFAULT
381 return min(priorities)
383 def GetLogEntries(self, newer_than):
384 """Selectively returns the log entries.
386 @type newer_than: None or int
387 @param newer_than: if this is None, return all log entries,
388 otherwise return only the log entries with serial higher
391 @return: the list of the log entries selected
394 if newer_than is None:
401 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
405 def GetInfo(self, fields):
406 """Returns information about a job.
409 @param fields: names of fields to return
411 @return: list with one element for each field
412 @raise errors.OpExecError: when an invalid field
416 return _SimpleJobQuery(fields)(self)
418 def MarkUnfinishedOps(self, status, result):
419 """Mark unfinished opcodes with a given status and result.
421 This is an utility function for marking all running or waiting to
422 be run opcodes with a given status. Opcodes which are already
423 finalised are not changed.
425 @param status: a given opcode status
426 @param result: the opcode result
431 if op.status in constants.OPS_FINALIZED:
432 assert not_marked, "Finalized opcodes found after non-finalized ones"
439 """Marks the job as finalized.
442 self.end_timestamp = TimeStampNow()
445 """Marks job as canceled/-ing if possible.
447 @rtype: tuple; (bool, string)
448 @return: Boolean describing whether job was successfully canceled or marked
449 as canceling and a text message
452 status = self.CalcStatus()
454 if status == constants.JOB_STATUS_QUEUED:
455 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
456 "Job canceled by request")
458 return (True, "Job %s canceled" % self.id)
460 elif status == constants.JOB_STATUS_WAITING:
461 # The worker will notice the new status and cancel the job
462 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
463 return (True, "Job %s will be canceled" % self.id)
466 logging.debug("Job %s is no longer waiting in the queue", self.id)
467 return (False, "Job %s is no longer waiting in the queue" % self.id)
470 class _OpExecCallbacks(mcpu.OpExecCbBase):
471 def __init__(self, queue, job, op):
472 """Initializes this class.
474 @type queue: L{JobQueue}
475 @param queue: Job queue
476 @type job: L{_QueuedJob}
477 @param job: Job object
478 @type op: L{_QueuedOpCode}
482 assert queue, "Queue is missing"
483 assert job, "Job is missing"
484 assert op, "Opcode is missing"
490 def _CheckCancel(self):
491 """Raises an exception to cancel the job if asked to.
494 # Cancel here if we were asked to
495 if self._op.status == constants.OP_STATUS_CANCELING:
496 logging.debug("Canceling opcode")
499 @locking.ssynchronized(_QUEUE, shared=1)
500 def NotifyStart(self):
501 """Mark the opcode as running, not lock-waiting.
503 This is called from the mcpu code as a notifier function, when the LU is
504 finally about to start the Exec() method. Of course, to have end-user
505 visible results, the opcode must be initially (before calling into
506 Processor.ExecOpCode) set to OP_STATUS_WAITING.
509 assert self._op in self._job.ops
510 assert self._op.status in (constants.OP_STATUS_WAITING,
511 constants.OP_STATUS_CANCELING)
513 # Cancel here if we were asked to
516 logging.debug("Opcode is now running")
518 self._op.status = constants.OP_STATUS_RUNNING
519 self._op.exec_timestamp = TimeStampNow()
521 # And finally replicate the job status
522 self._queue.UpdateJobUnlocked(self._job)
524 @locking.ssynchronized(_QUEUE, shared=1)
525 def _AppendFeedback(self, timestamp, log_type, log_msg):
526 """Internal feedback append function, with locks
529 self._job.log_serial += 1
530 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
531 self._queue.UpdateJobUnlocked(self._job, replicate=False)
533 def Feedback(self, *args):
534 """Append a log entry.
540 log_type = constants.ELOG_MESSAGE
543 (log_type, log_msg) = args
545 # The time is split to make serialization easier and not lose
547 timestamp = utils.SplitTime(time.time())
548 self._AppendFeedback(timestamp, log_type, log_msg)
550 def CheckCancel(self):
551 """Check whether job has been cancelled.
554 assert self._op.status in (constants.OP_STATUS_WAITING,
555 constants.OP_STATUS_CANCELING)
557 # Cancel here if we were asked to
560 def SubmitManyJobs(self, jobs):
561 """Submits jobs for processing.
563 See L{JobQueue.SubmitManyJobs}.
566 # Locking is done in job queue
567 return self._queue.SubmitManyJobs(jobs)
570 class _JobChangesChecker(object):
571 def __init__(self, fields, prev_job_info, prev_log_serial):
572 """Initializes this class.
574 @type fields: list of strings
575 @param fields: Fields requested by LUXI client
576 @type prev_job_info: string
577 @param prev_job_info: previous job info, as passed by the LUXI client
578 @type prev_log_serial: string
579 @param prev_log_serial: previous job serial, as passed by the LUXI client
582 self._squery = _SimpleJobQuery(fields)
583 self._prev_job_info = prev_job_info
584 self._prev_log_serial = prev_log_serial
586 def __call__(self, job):
587 """Checks whether job has changed.
589 @type job: L{_QueuedJob}
590 @param job: Job object
593 assert not job.writable, "Expected read-only job"
595 status = job.CalcStatus()
596 job_info = self._squery(job)
597 log_entries = job.GetLogEntries(self._prev_log_serial)
599 # Serializing and deserializing data can cause type changes (e.g. from
600 # tuple to list) or precision loss. We're doing it here so that we get
601 # the same modifications as the data received from the client. Without
602 # this, the comparison afterwards might fail without the data being
603 # significantly different.
604 # TODO: we just deserialized from disk, investigate how to make sure that
605 # the job info and log entries are compatible to avoid this further step.
606 # TODO: Doing something like in testutils.py:UnifyValueType might be more
607 # efficient, though floats will be tricky
608 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
609 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
611 # Don't even try to wait if the job is no longer running, there will be
613 if (status not in (constants.JOB_STATUS_QUEUED,
614 constants.JOB_STATUS_RUNNING,
615 constants.JOB_STATUS_WAITING) or
616 job_info != self._prev_job_info or
617 (log_entries and self._prev_log_serial != log_entries[0][0])):
618 logging.debug("Job %s changed", job.id)
619 return (job_info, log_entries)
624 class _JobFileChangesWaiter(object):
625 def __init__(self, filename):
626 """Initializes this class.
628 @type filename: string
629 @param filename: Path to job file
630 @raises errors.InotifyError: if the notifier cannot be setup
633 self._wm = pyinotify.WatchManager()
634 self._inotify_handler = \
635 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
637 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
639 self._inotify_handler.enable()
641 # pyinotify doesn't close file descriptors automatically
642 self._notifier.stop()
645 def _OnInotify(self, notifier_enabled):
646 """Callback for inotify.
649 if not notifier_enabled:
650 self._inotify_handler.enable()
652 def Wait(self, timeout):
653 """Waits for the job file to change.
656 @param timeout: Timeout in seconds
657 @return: Whether there have been events
661 have_events = self._notifier.check_events(timeout * 1000)
663 self._notifier.read_events()
664 self._notifier.process_events()
668 """Closes underlying notifier and its file descriptor.
671 self._notifier.stop()
674 class _JobChangesWaiter(object):
675 def __init__(self, filename):
676 """Initializes this class.
678 @type filename: string
679 @param filename: Path to job file
682 self._filewaiter = None
683 self._filename = filename
685 def Wait(self, timeout):
686 """Waits for a job to change.
689 @param timeout: Timeout in seconds
690 @return: Whether there have been events
694 return self._filewaiter.Wait(timeout)
696 # Lazy setup: Avoid inotify setup cost when job file has already changed.
697 # If this point is reached, return immediately and let caller check the job
698 # file again in case there were changes since the last check. This avoids a
700 self._filewaiter = _JobFileChangesWaiter(self._filename)
705 """Closes underlying waiter.
709 self._filewaiter.Close()
712 class _WaitForJobChangesHelper(object):
713 """Helper class using inotify to wait for changes in a job file.
715 This class takes a previous job status and serial, and alerts the client when
716 the current job status has changed.
720 def _CheckForChanges(counter, job_load_fn, check_fn):
721 if counter.next() > 0:
722 # If this isn't the first check the job is given some more time to change
723 # again. This gives better performance for jobs generating many
729 raise errors.JobLost()
731 result = check_fn(job)
733 raise utils.RetryAgain()
737 def __call__(self, filename, job_load_fn,
738 fields, prev_job_info, prev_log_serial, timeout):
739 """Waits for changes on a job.
741 @type filename: string
742 @param filename: File on which to wait for changes
743 @type job_load_fn: callable
744 @param job_load_fn: Function to load job
745 @type fields: list of strings
746 @param fields: Which fields to check for changes
747 @type prev_job_info: list or None
748 @param prev_job_info: Last job information returned
749 @type prev_log_serial: int
750 @param prev_log_serial: Last job message serial number
752 @param timeout: maximum time to wait in seconds
755 counter = itertools.count()
757 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
758 waiter = _JobChangesWaiter(filename)
760 return utils.Retry(compat.partial(self._CheckForChanges,
761 counter, job_load_fn, check_fn),
762 utils.RETRY_REMAINING_TIME, timeout,
766 except (errors.InotifyError, errors.JobLost):
768 except utils.RetryTimeout:
769 return constants.JOB_NOTCHANGED
772 def _EncodeOpError(err):
773 """Encodes an error which occurred while processing an opcode.
776 if isinstance(err, errors.GenericError):
779 to_encode = errors.OpExecError(str(err))
781 return errors.EncodeException(to_encode)
784 class _TimeoutStrategyWrapper:
785 def __init__(self, fn):
786 """Initializes this class.
793 """Gets the next timeout if necessary.
796 if self._next is None:
797 self._next = self._fn()
800 """Returns the next timeout.
807 """Returns the current timeout and advances the internal state.
816 class _OpExecContext:
817 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
818 """Initializes this class.
823 self.log_prefix = log_prefix
824 self.summary = op.input.Summary()
826 # Create local copy to modify
827 if getattr(op.input, opcodes.DEPEND_ATTR, None):
828 self.jobdeps = op.input.depends[:]
832 self._timeout_strategy_factory = timeout_strategy_factory
833 self._ResetTimeoutStrategy()
835 def _ResetTimeoutStrategy(self):
836 """Creates a new timeout strategy.
839 self._timeout_strategy = \
840 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
842 def CheckPriorityIncrease(self):
843 """Checks whether priority can and should be increased.
845 Called when locks couldn't be acquired.
850 # Exhausted all retries and next round should not use blocking acquire
852 if (self._timeout_strategy.Peek() is None and
853 op.priority > constants.OP_PRIO_HIGHEST):
854 logging.debug("Increasing priority")
856 self._ResetTimeoutStrategy()
861 def GetNextLockTimeout(self):
862 """Returns the next lock acquire timeout.
865 return self._timeout_strategy.Next()
868 class _JobProcessor(object):
871 FINISHED) = range(1, 4)
873 def __init__(self, queue, opexec_fn, job,
874 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
875 """Initializes this class.
879 self.opexec_fn = opexec_fn
881 self._timeout_strategy_factory = _timeout_strategy_factory
884 def _FindNextOpcode(job, timeout_strategy_factory):
885 """Locates the next opcode to run.
887 @type job: L{_QueuedJob}
888 @param job: Job object
889 @param timeout_strategy_factory: Callable to create new timeout strategy
892 # Create some sort of a cache to speed up locating next opcode for future
894 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
895 # pending and one for processed ops.
896 if job.ops_iter is None:
897 job.ops_iter = enumerate(job.ops)
899 # Find next opcode to run
902 (idx, op) = job.ops_iter.next()
903 except StopIteration:
904 raise errors.ProgrammerError("Called for a finished job")
906 if op.status == constants.OP_STATUS_RUNNING:
907 # Found an opcode already marked as running
908 raise errors.ProgrammerError("Called for job marked as running")
910 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
911 timeout_strategy_factory)
913 if op.status not in constants.OPS_FINALIZED:
916 # This is a job that was partially completed before master daemon
917 # shutdown, so it can be expected that some opcodes are already
918 # completed successfully (if any did error out, then the whole job
919 # should have been aborted and not resubmitted for processing).
920 logging.info("%s: opcode %s already processed, skipping",
921 opctx.log_prefix, opctx.summary)
924 def _MarkWaitlock(job, op):
925 """Marks an opcode as waiting for locks.
927 The job's start timestamp is also set if necessary.
929 @type job: L{_QueuedJob}
930 @param job: Job object
931 @type op: L{_QueuedOpCode}
932 @param op: Opcode object
936 assert op.status in (constants.OP_STATUS_QUEUED,
937 constants.OP_STATUS_WAITING)
943 if op.status == constants.OP_STATUS_QUEUED:
944 op.status = constants.OP_STATUS_WAITING
947 if op.start_timestamp is None:
948 op.start_timestamp = TimeStampNow()
951 if job.start_timestamp is None:
952 job.start_timestamp = op.start_timestamp
955 assert op.status == constants.OP_STATUS_WAITING
960 def _CheckDependencies(queue, job, opctx):
961 """Checks if an opcode has dependencies and if so, processes them.
963 @type queue: L{JobQueue}
964 @param queue: Queue object
965 @type job: L{_QueuedJob}
966 @param job: Job object
967 @type opctx: L{_OpExecContext}
968 @param opctx: Opcode execution context
970 @return: Whether opcode will be re-scheduled by dependency tracker
978 (dep_job_id, dep_status) = opctx.jobdeps[0]
980 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
982 assert ht.TNonEmptyString(depmsg), "No dependency message"
984 logging.info("%s: %s", opctx.log_prefix, depmsg)
986 if depresult == _JobDependencyManager.CONTINUE:
987 # Remove dependency and continue
990 elif depresult == _JobDependencyManager.WAIT:
991 # Need to wait for notification, dependency tracker will re-add job
996 elif depresult == _JobDependencyManager.CANCEL:
997 # Job was cancelled, cancel this job as well
999 assert op.status == constants.OP_STATUS_CANCELING
1002 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1003 _JobDependencyManager.ERROR):
1004 # Job failed or there was an error, this job must fail
1005 op.status = constants.OP_STATUS_ERROR
1006 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1010 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1015 def _ExecOpCodeUnlocked(self, opctx):
1016 """Processes one opcode and returns the result.
1021 assert op.status == constants.OP_STATUS_WAITING
1023 timeout = opctx.GetNextLockTimeout()
1026 # Make sure not to hold queue lock while calling ExecOpCode
1027 result = self.opexec_fn(op.input,
1028 _OpExecCallbacks(self.queue, self.job, op),
1029 timeout=timeout, priority=op.priority)
1030 except mcpu.LockAcquireTimeout:
1031 assert timeout is not None, "Received timeout for blocking acquire"
1032 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1034 assert op.status in (constants.OP_STATUS_WAITING,
1035 constants.OP_STATUS_CANCELING)
1037 # Was job cancelled while we were waiting for the lock?
1038 if op.status == constants.OP_STATUS_CANCELING:
1039 return (constants.OP_STATUS_CANCELING, None)
1041 # Stay in waitlock while trying to re-acquire lock
1042 return (constants.OP_STATUS_WAITING, None)
1044 logging.exception("%s: Canceling job", opctx.log_prefix)
1045 assert op.status == constants.OP_STATUS_CANCELING
1046 return (constants.OP_STATUS_CANCELING, None)
1047 except Exception, err: # pylint: disable=W0703
1048 logging.exception("%s: Caught exception in %s",
1049 opctx.log_prefix, opctx.summary)
1050 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1052 logging.debug("%s: %s successful",
1053 opctx.log_prefix, opctx.summary)
1054 return (constants.OP_STATUS_SUCCESS, result)
1056 def __call__(self, _nextop_fn=None):
1057 """Continues execution of a job.
1059 @param _nextop_fn: Callback function for tests
1060 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1061 be deferred and C{WAITDEP} if the dependency manager
1062 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1068 logging.debug("Processing job %s", job.id)
1070 queue.acquire(shared=1)
1072 opcount = len(job.ops)
1074 assert job.writable, "Expected writable job"
1076 # Don't do anything for finalized jobs
1077 if job.CalcStatus() in constants.JOBS_FINALIZED:
1078 return self.FINISHED
1080 # Is a previous opcode still pending?
1082 opctx = job.cur_opctx
1083 job.cur_opctx = None
1085 if __debug__ and _nextop_fn:
1087 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1092 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1093 constants.OP_STATUS_CANCELING)
1094 for i in job.ops[opctx.index + 1:])
1096 assert op.status in (constants.OP_STATUS_QUEUED,
1097 constants.OP_STATUS_WAITING,
1098 constants.OP_STATUS_CANCELING)
1100 assert (op.priority <= constants.OP_PRIO_LOWEST and
1101 op.priority >= constants.OP_PRIO_HIGHEST)
1105 if op.status != constants.OP_STATUS_CANCELING:
1106 assert op.status in (constants.OP_STATUS_QUEUED,
1107 constants.OP_STATUS_WAITING)
1109 # Prepare to start opcode
1110 if self._MarkWaitlock(job, op):
1112 queue.UpdateJobUnlocked(job)
1114 assert op.status == constants.OP_STATUS_WAITING
1115 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1116 assert job.start_timestamp and op.start_timestamp
1117 assert waitjob is None
1119 # Check if waiting for a job is necessary
1120 waitjob = self._CheckDependencies(queue, job, opctx)
1122 assert op.status in (constants.OP_STATUS_WAITING,
1123 constants.OP_STATUS_CANCELING,
1124 constants.OP_STATUS_ERROR)
1126 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1127 constants.OP_STATUS_ERROR)):
1128 logging.info("%s: opcode %s waiting for locks",
1129 opctx.log_prefix, opctx.summary)
1131 assert not opctx.jobdeps, "Not all dependencies were removed"
1135 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1137 queue.acquire(shared=1)
1139 op.status = op_status
1140 op.result = op_result
1144 if op.status == constants.OP_STATUS_WAITING:
1145 # Couldn't get locks in time
1146 assert not op.end_timestamp
1149 op.end_timestamp = TimeStampNow()
1151 if op.status == constants.OP_STATUS_CANCELING:
1152 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1153 for i in job.ops[opctx.index:])
1155 assert op.status in constants.OPS_FINALIZED
1157 if op.status == constants.OP_STATUS_WAITING or waitjob:
1160 if not waitjob and opctx.CheckPriorityIncrease():
1161 # Priority was changed, need to update on-disk file
1162 queue.UpdateJobUnlocked(job)
1164 # Keep around for another round
1165 job.cur_opctx = opctx
1167 assert (op.priority <= constants.OP_PRIO_LOWEST and
1168 op.priority >= constants.OP_PRIO_HIGHEST)
1170 # In no case must the status be finalized here
1171 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1174 # Ensure all opcodes so far have been successful
1175 assert (opctx.index == 0 or
1176 compat.all(i.status == constants.OP_STATUS_SUCCESS
1177 for i in job.ops[:opctx.index]))
1180 job.cur_opctx = None
1182 if op.status == constants.OP_STATUS_SUCCESS:
1185 elif op.status == constants.OP_STATUS_ERROR:
1186 # Ensure failed opcode has an exception as its result
1187 assert errors.GetEncodedError(job.ops[opctx.index].result)
1189 to_encode = errors.OpExecError("Preceding opcode failed")
1190 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1191 _EncodeOpError(to_encode))
1195 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1196 errors.GetEncodedError(i.result)
1197 for i in job.ops[opctx.index:])
1199 elif op.status == constants.OP_STATUS_CANCELING:
1200 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1201 "Job canceled by request")
1205 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1207 if opctx.index == (opcount - 1):
1208 # Finalize on last opcode
1212 # All opcodes have been run, finalize job
1215 # Write to disk. If the job status is final, this is the final write
1216 # allowed. Once the file has been written, it can be archived anytime.
1217 queue.UpdateJobUnlocked(job)
1222 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1223 return self.FINISHED
1225 assert not waitjob or queue.depmgr.JobWaiting(job)
1232 assert job.writable, "Job became read-only while being processed"
1236 def _EvaluateJobProcessorResult(depmgr, job, result):
1237 """Looks at a result from L{_JobProcessor} for a job.
1239 To be used in a L{_JobQueueWorker}.
1242 if result == _JobProcessor.FINISHED:
1243 # Notify waiting jobs
1244 depmgr.NotifyWaiters(job.id)
1246 elif result == _JobProcessor.DEFER:
1248 raise workerpool.DeferTask(priority=job.CalcPriority())
1250 elif result == _JobProcessor.WAITDEP:
1251 # No-op, dependency manager will re-schedule
1255 raise errors.ProgrammerError("Job processor returned unknown status %s" %
1259 class _JobQueueWorker(workerpool.BaseWorker):
1260 """The actual job workers.
1263 def RunTask(self, job): # pylint: disable=W0221
1266 @type job: L{_QueuedJob}
1267 @param job: the job to be processed
1270 assert job.writable, "Expected writable job"
1272 # Ensure only one worker is active on a single job. If a job registers for
1273 # a dependency job, and the other job notifies before the first worker is
1274 # done, the job can end up in the tasklist more than once.
1275 job.processor_lock.acquire()
1277 return self._RunTaskInner(job)
1279 job.processor_lock.release()
1281 def _RunTaskInner(self, job):
1284 Must be called with per-job lock acquired.
1288 assert queue == self.pool.queue
1290 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1293 proc = mcpu.Processor(queue.context, job.id)
1295 # Create wrapper for setting thread name
1296 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1299 _EvaluateJobProcessorResult(queue.depmgr, job,
1300 _JobProcessor(queue, wrap_execop_fn, job)())
1303 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1304 """Updates the worker thread name to include a short summary of the opcode.
1306 @param setname_fn: Callable setting worker thread name
1307 @param execop_fn: Callable for executing opcode (usually
1308 L{mcpu.Processor.ExecOpCode})
1313 return execop_fn(op, *args, **kwargs)
1318 def _GetWorkerName(job, op):
1319 """Sets the worker thread name.
1321 @type job: L{_QueuedJob}
1322 @type op: L{opcodes.OpCode}
1325 parts = ["Job%s" % job.id]
1328 parts.append(op.TinySummary())
1330 return "/".join(parts)
1333 class _JobQueueWorkerPool(workerpool.WorkerPool):
1334 """Simple class implementing a job-processing workerpool.
1337 def __init__(self, queue):
1338 super(_JobQueueWorkerPool, self).__init__("Jq",
1344 class _JobDependencyManager:
1345 """Keeps track of job dependencies.
1352 WRONGSTATUS) = range(1, 6)
1354 def __init__(self, getstatus_fn, enqueue_fn):
1355 """Initializes this class.
1358 self._getstatus_fn = getstatus_fn
1359 self._enqueue_fn = enqueue_fn
1362 self._lock = locking.SharedLock("JobDepMgr")
1364 @locking.ssynchronized(_LOCK, shared=1)
1365 def GetLockInfo(self, requested): # pylint: disable=W0613
1366 """Retrieves information about waiting jobs.
1368 @type requested: set
1369 @param requested: Requested information, see C{query.LQ_*}
1372 # No need to sort here, that's being done by the lock manager and query
1373 # library. There are no priorities for notifying jobs, hence all show up as
1374 # one item under "pending".
1375 return [("job/%s" % job_id, None, None,
1376 [("job", [job.id for job in waiters])])
1377 for job_id, waiters in self._waiters.items()
1380 @locking.ssynchronized(_LOCK, shared=1)
1381 def JobWaiting(self, job):
1382 """Checks if a job is waiting.
1385 return compat.any(job in jobs
1386 for jobs in self._waiters.values())
1388 @locking.ssynchronized(_LOCK)
1389 def CheckAndRegister(self, job, dep_job_id, dep_status):
1390 """Checks if a dependency job has the requested status.
1392 If the other job is not yet in a finalized status, the calling job will be
1393 notified (re-added to the workerpool) at a later point.
1395 @type job: L{_QueuedJob}
1396 @param job: Job object
1397 @type dep_job_id: int
1398 @param dep_job_id: ID of dependency job
1399 @type dep_status: list
1400 @param dep_status: Required status
1403 assert ht.TJobId(job.id)
1404 assert ht.TJobId(dep_job_id)
1405 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1407 if job.id == dep_job_id:
1408 return (self.ERROR, "Job can't depend on itself")
1410 # Get status of dependency job
1412 status = self._getstatus_fn(dep_job_id)
1413 except errors.JobLost, err:
1414 return (self.ERROR, "Dependency error: %s" % err)
1416 assert status in constants.JOB_STATUS_ALL
1418 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1420 if status not in constants.JOBS_FINALIZED:
1421 # Register for notification and wait for job to finish
1422 job_id_waiters.add(job)
1424 "Need to wait for job %s, wanted status '%s'" %
1425 (dep_job_id, dep_status))
1427 # Remove from waiters list
1428 if job in job_id_waiters:
1429 job_id_waiters.remove(job)
1431 if (status == constants.JOB_STATUS_CANCELED and
1432 constants.JOB_STATUS_CANCELED not in dep_status):
1433 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1435 elif not dep_status or status in dep_status:
1436 return (self.CONTINUE,
1437 "Dependency job %s finished with status '%s'" %
1438 (dep_job_id, status))
1441 return (self.WRONGSTATUS,
1442 "Dependency job %s finished with status '%s',"
1443 " not one of '%s' as required" %
1444 (dep_job_id, status, utils.CommaJoin(dep_status)))
1446 def _RemoveEmptyWaitersUnlocked(self):
1447 """Remove all jobs without actual waiters.
1450 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1452 del self._waiters[job_id]
1454 def NotifyWaiters(self, job_id):
1455 """Notifies all jobs waiting for a certain job ID.
1457 @attention: Do not call until L{CheckAndRegister} returned a status other
1458 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1460 @param job_id: Job ID
1463 assert ht.TJobId(job_id)
1465 self._lock.acquire()
1467 self._RemoveEmptyWaitersUnlocked()
1469 jobs = self._waiters.pop(job_id, None)
1471 self._lock.release()
1474 # Re-add jobs to workerpool
1475 logging.debug("Re-adding %s jobs which were waiting for job %s",
1477 self._enqueue_fn(jobs)
1480 def _RequireOpenQueue(fn):
1481 """Decorator for "public" functions.
1483 This function should be used for all 'public' functions. That is,
1484 functions usually called from other classes. Note that this should
1485 be applied only to methods (not plain functions), since it expects
1486 that the decorated function is called with a first argument that has
1487 a '_queue_filelock' argument.
1489 @warning: Use this decorator only after locking.ssynchronized
1492 @locking.ssynchronized(_LOCK)
1498 def wrapper(self, *args, **kwargs):
1499 # pylint: disable=W0212
1500 assert self._queue_filelock is not None, "Queue should be open"
1501 return fn(self, *args, **kwargs)
1505 def _RequireNonDrainedQueue(fn):
1506 """Decorator checking for a non-drained queue.
1508 To be used with functions submitting new jobs.
1511 def wrapper(self, *args, **kwargs):
1512 """Wrapper function.
1514 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1517 # Ok when sharing the big job queue lock, as the drain file is created when
1518 # the lock is exclusive.
1519 # Needs access to protected member, pylint: disable=W0212
1521 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1523 if not self._accepting_jobs:
1524 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1526 return fn(self, *args, **kwargs)
1530 class JobQueue(object):
1531 """Queue used to manage the jobs.
1534 def __init__(self, context):
1535 """Constructor for JobQueue.
1537 The constructor will initialize the job queue object and then
1538 start loading the current jobs from disk, either for starting them
1539 (if they were queue) or for aborting them (if they were already
1542 @type context: GanetiContext
1543 @param context: the context object for access to the configuration
1544 data and other ganeti objects
1547 self.context = context
1548 self._memcache = weakref.WeakValueDictionary()
1549 self._my_hostname = netutils.Hostname.GetSysName()
1551 # The Big JobQueue lock. If a code block or method acquires it in shared
1552 # mode safe it must guarantee concurrency with all the code acquiring it in
1553 # shared mode, including itself. In order not to acquire it at all
1554 # concurrency must be guaranteed with all code acquiring it in shared mode
1555 # and all code acquiring it exclusively.
1556 self._lock = locking.SharedLock("JobQueue")
1558 self.acquire = self._lock.acquire
1559 self.release = self._lock.release
1561 # Accept jobs by default
1562 self._accepting_jobs = True
1564 # Initialize the queue, and acquire the filelock.
1565 # This ensures no other process is working on the job queue.
1566 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1569 self._last_serial = jstore.ReadSerial()
1570 assert self._last_serial is not None, ("Serial file was modified between"
1571 " check in jstore and here")
1573 # Get initial list of nodes
1574 self._nodes = dict((n.name, n.primary_ip)
1575 for n in self.context.cfg.GetAllNodesInfo().values()
1576 if n.master_candidate)
1578 # Remove master node
1579 self._nodes.pop(self._my_hostname, None)
1581 # TODO: Check consistency across nodes
1583 self._queue_size = None
1584 self._UpdateQueueSizeUnlocked()
1585 assert ht.TInt(self._queue_size)
1586 self._drained = jstore.CheckDrainFlag()
1589 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1591 self.context.glm.AddToLockMonitor(self.depmgr)
1594 self._wpool = _JobQueueWorkerPool(self)
1596 self._InspectQueue()
1598 self._wpool.TerminateWorkers()
1601 @locking.ssynchronized(_LOCK)
1603 def _InspectQueue(self):
1604 """Loads the whole job queue and resumes unfinished jobs.
1606 This function needs the lock here because WorkerPool.AddTask() may start a
1607 job while we're still doing our work.
1610 logging.info("Inspecting job queue")
1614 all_job_ids = self._GetJobIDsUnlocked()
1615 jobs_count = len(all_job_ids)
1616 lastinfo = time.time()
1617 for idx, job_id in enumerate(all_job_ids):
1618 # Give an update every 1000 jobs or 10 seconds
1619 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1620 idx == (jobs_count - 1)):
1621 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1622 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1623 lastinfo = time.time()
1625 job = self._LoadJobUnlocked(job_id)
1627 # a failure in loading the job can cause 'None' to be returned
1631 status = job.CalcStatus()
1633 if status == constants.JOB_STATUS_QUEUED:
1634 restartjobs.append(job)
1636 elif status in (constants.JOB_STATUS_RUNNING,
1637 constants.JOB_STATUS_WAITING,
1638 constants.JOB_STATUS_CANCELING):
1639 logging.warning("Unfinished job %s found: %s", job.id, job)
1641 if status == constants.JOB_STATUS_WAITING:
1643 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1644 restartjobs.append(job)
1646 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1647 "Unclean master daemon shutdown")
1650 self.UpdateJobUnlocked(job)
1653 logging.info("Restarting %s jobs", len(restartjobs))
1654 self._EnqueueJobsUnlocked(restartjobs)
1656 logging.info("Job queue inspection finished")
1658 def _GetRpc(self, address_list):
1659 """Gets RPC runner with context.
1662 return rpc.JobQueueRunner(self.context, address_list)
1664 @locking.ssynchronized(_LOCK)
1666 def AddNode(self, node):
1667 """Register a new node with the queue.
1669 @type node: L{objects.Node}
1670 @param node: the node object to be added
1673 node_name = node.name
1674 assert node_name != self._my_hostname
1676 # Clean queue directory on added node
1677 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1678 msg = result.fail_msg
1680 logging.warning("Cannot cleanup queue directory on node %s: %s",
1683 if not node.master_candidate:
1684 # remove if existing, ignoring errors
1685 self._nodes.pop(node_name, None)
1686 # and skip the replication of the job ids
1689 # Upload the whole queue excluding archived jobs
1690 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1692 # Upload current serial file
1693 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1695 # Static address list
1696 addrs = [node.primary_ip]
1698 for file_name in files:
1700 content = utils.ReadFile(file_name)
1702 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1704 msg = result[node_name].fail_msg
1706 logging.error("Failed to upload file %s to node %s: %s",
1707 file_name, node_name, msg)
1709 self._nodes[node_name] = node.primary_ip
1711 @locking.ssynchronized(_LOCK)
1713 def RemoveNode(self, node_name):
1714 """Callback called when removing nodes from the cluster.
1716 @type node_name: str
1717 @param node_name: the name of the node to remove
1720 self._nodes.pop(node_name, None)
1723 def _CheckRpcResult(result, nodes, failmsg):
1724 """Verifies the status of an RPC call.
1726 Since we aim to keep consistency should this node (the current
1727 master) fail, we will log errors if our rpc fail, and especially
1728 log the case when more than half of the nodes fails.
1730 @param result: the data as returned from the rpc call
1732 @param nodes: the list of nodes we made the call to
1734 @param failmsg: the identifier to be used for logging
1741 msg = result[node].fail_msg
1744 logging.error("RPC call %s (%s) failed on node %s: %s",
1745 result[node].call, failmsg, node, msg)
1747 success.append(node)
1749 # +1 for the master node
1750 if (len(success) + 1) < len(failed):
1751 # TODO: Handle failing nodes
1752 logging.error("More than half of the nodes failed")
1754 def _GetNodeIp(self):
1755 """Helper for returning the node name/ip list.
1757 @rtype: (list, list)
1758 @return: a tuple of two lists, the first one with the node
1759 names and the second one with the node addresses
1762 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1763 name_list = self._nodes.keys()
1764 addr_list = [self._nodes[name] for name in name_list]
1765 return name_list, addr_list
1767 def _UpdateJobQueueFile(self, file_name, data, replicate):
1768 """Writes a file locally and then replicates it to all nodes.
1770 This function will replace the contents of a file on the local
1771 node and then replicate it to all the other nodes we have.
1773 @type file_name: str
1774 @param file_name: the path of the file to be replicated
1776 @param data: the new contents of the file
1777 @type replicate: boolean
1778 @param replicate: whether to spread the changes to the remote nodes
1781 getents = runtime.GetEnts()
1782 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1783 gid=getents.masterd_gid)
1786 names, addrs = self._GetNodeIp()
1787 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1788 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1790 def _RenameFilesUnlocked(self, rename):
1791 """Renames a file locally and then replicate the change.
1793 This function will rename a file in the local queue directory
1794 and then replicate this rename to all the other nodes we have.
1796 @type rename: list of (old, new)
1797 @param rename: List containing tuples mapping old to new names
1800 # Rename them locally
1801 for old, new in rename:
1802 utils.RenameFile(old, new, mkdir=True)
1804 # ... and on all nodes
1805 names, addrs = self._GetNodeIp()
1806 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1807 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1809 def _NewSerialsUnlocked(self, count):
1810 """Generates a new job identifier.
1812 Job identifiers are unique during the lifetime of a cluster.
1814 @type count: integer
1815 @param count: how many serials to return
1817 @return: a list of job identifiers.
1820 assert ht.TPositiveInt(count)
1823 serial = self._last_serial + count
1826 self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1827 "%s\n" % serial, True)
1829 result = [jstore.FormatJobID(v)
1830 for v in range(self._last_serial + 1, serial + 1)]
1832 # Keep it only if we were able to write the file
1833 self._last_serial = serial
1835 assert len(result) == count
1840 def _GetJobPath(job_id):
1841 """Returns the job file for a given job id.
1844 @param job_id: the job identifier
1846 @return: the path to the job file
1849 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1852 def _GetArchivedJobPath(job_id):
1853 """Returns the archived job file for a give job id.
1856 @param job_id: the job identifier
1858 @return: the path to the archived job file
1861 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1862 jstore.GetArchiveDirectory(job_id),
1866 def _GetJobIDsUnlocked(sort=True):
1867 """Return all known job IDs.
1869 The method only looks at disk because it's a requirement that all
1870 jobs are present on disk (so in the _memcache we don't have any
1874 @param sort: perform sorting on the returned job ids
1876 @return: the list of job IDs
1880 for filename in utils.ListVisibleFiles(pathutils.QUEUE_DIR):
1881 m = constants.JOB_FILE_RE.match(filename)
1883 jlist.append(int(m.group(1)))
1888 def _LoadJobUnlocked(self, job_id):
1889 """Loads a job from the disk or memory.
1891 Given a job id, this will return the cached job object if
1892 existing, or try to load the job from the disk. If loading from
1893 disk, it will also add the job to the cache.
1896 @param job_id: the job id
1897 @rtype: L{_QueuedJob} or None
1898 @return: either None or the job object
1901 job = self._memcache.get(job_id, None)
1903 logging.debug("Found job %s in memcache", job_id)
1904 assert job.writable, "Found read-only job in memcache"
1908 job = self._LoadJobFromDisk(job_id, False)
1911 except errors.JobFileCorrupted:
1912 old_path = self._GetJobPath(job_id)
1913 new_path = self._GetArchivedJobPath(job_id)
1914 if old_path == new_path:
1915 # job already archived (future case)
1916 logging.exception("Can't parse job %s", job_id)
1919 logging.exception("Can't parse job %s, will archive.", job_id)
1920 self._RenameFilesUnlocked([(old_path, new_path)])
1923 assert job.writable, "Job just loaded is not writable"
1925 self._memcache[job_id] = job
1926 logging.debug("Added job %s to the cache", job_id)
1929 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1930 """Load the given job file from disk.
1932 Given a job file, read, load and restore it in a _QueuedJob format.
1935 @param job_id: job identifier
1936 @type try_archived: bool
1937 @param try_archived: Whether to try loading an archived job
1938 @rtype: L{_QueuedJob} or None
1939 @return: either None or the job object
1942 path_functions = [(self._GetJobPath, True)]
1945 path_functions.append((self._GetArchivedJobPath, False))
1948 writable_default = None
1950 for (fn, writable_default) in path_functions:
1951 filepath = fn(job_id)
1952 logging.debug("Loading job from %s", filepath)
1954 raw_data = utils.ReadFile(filepath)
1955 except EnvironmentError, err:
1956 if err.errno != errno.ENOENT:
1964 if writable is None:
1965 writable = writable_default
1968 data = serializer.LoadJson(raw_data)
1969 job = _QueuedJob.Restore(self, data, writable)
1970 except Exception, err: # pylint: disable=W0703
1971 raise errors.JobFileCorrupted(err)
1975 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1976 """Load the given job file from disk.
1978 Given a job file, read, load and restore it in a _QueuedJob format.
1979 In case of error reading the job, it gets returned as None, and the
1980 exception is logged.
1983 @param job_id: job identifier
1984 @type try_archived: bool
1985 @param try_archived: Whether to try loading an archived job
1986 @rtype: L{_QueuedJob} or None
1987 @return: either None or the job object
1991 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1992 except (errors.JobFileCorrupted, EnvironmentError):
1993 logging.exception("Can't load/parse job %s", job_id)
1996 def _UpdateQueueSizeUnlocked(self):
1997 """Update the queue size.
2000 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2002 @locking.ssynchronized(_LOCK)
2004 def SetDrainFlag(self, drain_flag):
2005 """Sets the drain flag for the queue.
2007 @type drain_flag: boolean
2008 @param drain_flag: Whether to set or unset the drain flag
2011 jstore.SetDrainFlag(drain_flag)
2013 self._drained = drain_flag
2018 def _SubmitJobUnlocked(self, job_id, ops):
2019 """Create and store a new job.
2021 This enters the job into our job queue and also puts it on the new
2022 queue, in order for it to be picked up by the queue processors.
2024 @type job_id: job ID
2025 @param job_id: the job ID for the new job
2027 @param ops: The list of OpCodes that will become the new job.
2028 @rtype: L{_QueuedJob}
2029 @return: the job object to be queued
2030 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2031 @raise errors.GenericError: If an opcode is not valid
2034 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2035 raise errors.JobQueueFull()
2037 job = _QueuedJob(self, job_id, ops, True)
2040 for idx, op in enumerate(job.ops):
2041 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2042 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2043 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2044 " are %s" % (idx, op.priority, allowed))
2046 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2047 if not opcodes.TNoRelativeJobDependencies(dependencies):
2048 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2050 (idx, opcodes.TNoRelativeJobDependencies,
2054 self.UpdateJobUnlocked(job)
2056 self._queue_size += 1
2058 logging.debug("Adding new job %s to the cache", job_id)
2059 self._memcache[job_id] = job
2063 @locking.ssynchronized(_LOCK)
2065 @_RequireNonDrainedQueue
2066 def SubmitJob(self, ops):
2067 """Create and store a new job.
2069 @see: L{_SubmitJobUnlocked}
2072 (job_id, ) = self._NewSerialsUnlocked(1)
2073 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2076 @locking.ssynchronized(_LOCK)
2078 @_RequireNonDrainedQueue
2079 def SubmitManyJobs(self, jobs):
2080 """Create and store multiple jobs.
2082 @see: L{_SubmitJobUnlocked}
2085 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2087 (results, added_jobs) = \
2088 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2090 self._EnqueueJobsUnlocked(added_jobs)
2095 def _FormatSubmitError(msg, ops):
2096 """Formats errors which occurred while submitting a job.
2099 return ("%s; opcodes %s" %
2100 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2103 def _ResolveJobDependencies(resolve_fn, deps):
2104 """Resolves relative job IDs in dependencies.
2106 @type resolve_fn: callable
2107 @param resolve_fn: Function to resolve a relative job ID
2109 @param deps: Dependencies
2111 @return: Resolved dependencies
2116 for (dep_job_id, dep_status) in deps:
2117 if ht.TRelativeJobId(dep_job_id):
2118 assert ht.TInt(dep_job_id) and dep_job_id < 0
2120 job_id = resolve_fn(dep_job_id)
2123 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2127 result.append((job_id, dep_status))
2129 return (True, result)
2131 def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2132 """Create and store multiple jobs.
2134 @see: L{_SubmitJobUnlocked}
2140 def resolve_fn(job_idx, reljobid):
2142 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2144 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2146 if getattr(op, opcodes.DEPEND_ATTR, None):
2148 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2151 # Abort resolving dependencies
2152 assert ht.TNonEmptyString(data), "No error message"
2154 # Use resolved dependencies
2158 job = self._SubmitJobUnlocked(job_id, ops)
2159 except errors.GenericError, err:
2161 data = self._FormatSubmitError(str(err), ops)
2165 added_jobs.append(job)
2167 results.append((status, data))
2169 return (results, added_jobs)
2171 @locking.ssynchronized(_LOCK)
2172 def _EnqueueJobs(self, jobs):
2173 """Helper function to add jobs to worker pool's queue.
2176 @param jobs: List of all jobs
2179 return self._EnqueueJobsUnlocked(jobs)
2181 def _EnqueueJobsUnlocked(self, jobs):
2182 """Helper function to add jobs to worker pool's queue.
2185 @param jobs: List of all jobs
2188 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2189 self._wpool.AddManyTasks([(job, ) for job in jobs],
2190 priority=[job.CalcPriority() for job in jobs])
2192 def _GetJobStatusForDependencies(self, job_id):
2193 """Gets the status of a job for dependencies.
2196 @param job_id: Job ID
2197 @raise errors.JobLost: If job can't be found
2200 # Not using in-memory cache as doing so would require an exclusive lock
2202 # Try to load from disk
2203 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2205 assert not job.writable, "Got writable job" # pylint: disable=E1101
2208 return job.CalcStatus()
2210 raise errors.JobLost("Job %s not found" % job_id)
2213 def UpdateJobUnlocked(self, job, replicate=True):
2214 """Update a job's on disk storage.
2216 After a job has been modified, this function needs to be called in
2217 order to write the changes to disk and replicate them to the other
2220 @type job: L{_QueuedJob}
2221 @param job: the changed job
2222 @type replicate: boolean
2223 @param replicate: whether to replicate the change to remote nodes
2227 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2228 assert (finalized ^ (job.end_timestamp is None))
2229 assert job.writable, "Can't update read-only job"
2231 filename = self._GetJobPath(job.id)
2232 data = serializer.DumpJson(job.Serialize())
2233 logging.debug("Writing job %s to %s", job.id, filename)
2234 self._UpdateJobQueueFile(filename, data, replicate)
2236 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2238 """Waits for changes in a job.
2241 @param job_id: Job identifier
2242 @type fields: list of strings
2243 @param fields: Which fields to check for changes
2244 @type prev_job_info: list or None
2245 @param prev_job_info: Last job information returned
2246 @type prev_log_serial: int
2247 @param prev_log_serial: Last job message serial number
2248 @type timeout: float
2249 @param timeout: maximum time to wait in seconds
2250 @rtype: tuple (job info, log entries)
2251 @return: a tuple of the job information as required via
2252 the fields parameter, and the log entries as a list
2254 if the job has not changed and the timeout has expired,
2255 we instead return a special value,
2256 L{constants.JOB_NOTCHANGED}, which should be interpreted
2257 as such by the clients
2260 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2263 helper = _WaitForJobChangesHelper()
2265 return helper(self._GetJobPath(job_id), load_fn,
2266 fields, prev_job_info, prev_log_serial, timeout)
2268 @locking.ssynchronized(_LOCK)
2270 def CancelJob(self, job_id):
2273 This will only succeed if the job has not started yet.
2276 @param job_id: job ID of job to be cancelled.
2279 logging.info("Cancelling job %s", job_id)
2281 job = self._LoadJobUnlocked(job_id)
2283 logging.debug("Job %s not found", job_id)
2284 return (False, "Job %s not found" % job_id)
2286 assert job.writable, "Can't cancel read-only job"
2288 (success, msg) = job.Cancel()
2291 # If the job was finalized (e.g. cancelled), this is the final write
2292 # allowed. The job can be archived anytime.
2293 self.UpdateJobUnlocked(job)
2295 return (success, msg)
2298 def _ArchiveJobsUnlocked(self, jobs):
2301 @type jobs: list of L{_QueuedJob}
2302 @param jobs: Job objects
2304 @return: Number of archived jobs
2310 assert job.writable, "Can't archive read-only job"
2312 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2313 logging.debug("Job %s is not yet done", job.id)
2316 archive_jobs.append(job)
2318 old = self._GetJobPath(job.id)
2319 new = self._GetArchivedJobPath(job.id)
2320 rename_files.append((old, new))
2322 # TODO: What if 1..n files fail to rename?
2323 self._RenameFilesUnlocked(rename_files)
2325 logging.debug("Successfully archived job(s) %s",
2326 utils.CommaJoin(job.id for job in archive_jobs))
2328 # Since we haven't quite checked, above, if we succeeded or failed renaming
2329 # the files, we update the cached queue size from the filesystem. When we
2330 # get around to fix the TODO: above, we can use the number of actually
2331 # archived jobs to fix this.
2332 self._UpdateQueueSizeUnlocked()
2333 return len(archive_jobs)
2335 @locking.ssynchronized(_LOCK)
2337 def ArchiveJob(self, job_id):
2340 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2343 @param job_id: Job ID of job to be archived.
2345 @return: Whether job was archived
2348 logging.info("Archiving job %s", job_id)
2350 job = self._LoadJobUnlocked(job_id)
2352 logging.debug("Job %s not found", job_id)
2355 return self._ArchiveJobsUnlocked([job]) == 1
2357 @locking.ssynchronized(_LOCK)
2359 def AutoArchiveJobs(self, age, timeout):
2360 """Archives all jobs based on age.
2362 The method will archive all jobs which are older than the age
2363 parameter. For jobs that don't have an end timestamp, the start
2364 timestamp will be considered. The special '-1' age will cause
2365 archival of all jobs (that are not running or queued).
2368 @param age: the minimum age in seconds
2371 logging.info("Archiving jobs with age more than %s seconds", age)
2374 end_time = now + timeout
2378 all_job_ids = self._GetJobIDsUnlocked()
2380 for idx, job_id in enumerate(all_job_ids):
2381 last_touched = idx + 1
2383 # Not optimal because jobs could be pending
2384 # TODO: Measure average duration for job archival and take number of
2385 # pending jobs into account.
2386 if time.time() > end_time:
2389 # Returns None if the job failed to load
2390 job = self._LoadJobUnlocked(job_id)
2392 if job.end_timestamp is None:
2393 if job.start_timestamp is None:
2394 job_age = job.received_timestamp
2396 job_age = job.start_timestamp
2398 job_age = job.end_timestamp
2400 if age == -1 or now - job_age[0] > age:
2403 # Archive 10 jobs at a time
2404 if len(pending) >= 10:
2405 archived_count += self._ArchiveJobsUnlocked(pending)
2409 archived_count += self._ArchiveJobsUnlocked(pending)
2411 return (archived_count, len(all_job_ids) - last_touched)
2413 def _Query(self, fields, qfilter):
2414 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2417 job_ids = qobj.RequestedNames()
2419 list_all = (job_ids is None)
2422 # Since files are added to/removed from the queue atomically, there's no
2423 # risk of getting the job ids in an inconsistent state.
2424 job_ids = self._GetJobIDsUnlocked()
2428 for job_id in job_ids:
2429 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2430 if job is not None or not list_all:
2431 jobs.append((job_id, job))
2433 return (qobj, jobs, list_all)
2435 def QueryJobs(self, fields, qfilter):
2436 """Returns a list of jobs in queue.
2438 @type fields: sequence
2439 @param fields: List of wanted fields
2440 @type qfilter: None or query2 filter (list)
2441 @param qfilter: Query filter
2444 (qobj, ctx, _) = self._Query(fields, qfilter)
2446 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2448 def OldStyleQueryJobs(self, job_ids, fields):
2449 """Returns a list of jobs in queue.
2452 @param job_ids: sequence of job identifiers or None for all
2454 @param fields: names of fields to return
2456 @return: list one element per job, each element being list with
2457 the requested fields
2461 job_ids = [int(jid) for jid in job_ids]
2462 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2464 (qobj, ctx, _) = self._Query(fields, qfilter)
2466 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2468 @locking.ssynchronized(_LOCK)
2469 def PrepareShutdown(self):
2470 """Prepare to stop the job queue.
2472 Disables execution of jobs in the workerpool and returns whether there are
2473 any jobs currently running. If the latter is the case, the job queue is not
2474 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2475 be called without interfering with any job. Queued and unfinished jobs will
2476 be resumed next time.
2478 Once this function has been called no new job submissions will be accepted
2479 (see L{_RequireNonDrainedQueue}).
2482 @return: Whether there are any running jobs
2485 if self._accepting_jobs:
2486 self._accepting_jobs = False
2488 # Tell worker pool to stop processing pending tasks
2489 self._wpool.SetActive(False)
2491 return self._wpool.HasRunningTasks()
2493 @locking.ssynchronized(_LOCK)
2496 """Stops the job queue.
2498 This shutdowns all the worker threads an closes the queue.
2501 self._wpool.TerminateWorkers()
2503 self._queue_filelock.Close()
2504 self._queue_filelock = None