4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
60 from ganeti import query
61 from ganeti import qlang
62 from ganeti import pathutils
63 from ganeti import vcluster
68 # member lock names to be passed to @ssynchronized decorator
73 class CancelJob(Exception):
74 """Special exception to cancel a job.
80 """Returns the current timestamp.
83 @return: the current time in the (seconds, microseconds) format
86 return utils.SplitTime(time.time())
89 def _CallJqUpdate(runner, names, file_name, content):
90 """Updates job queue file after virtualizing filename.
93 virt_file_name = vcluster.MakeVirtualPath(file_name)
94 return runner.call_jobqueue_update(names, virt_file_name, content)
97 class _SimpleJobQuery:
98 """Wrapper for job queries.
100 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
103 def __init__(self, fields):
104 """Initializes this class.
107 self._query = query.Query(query.JOB_FIELDS, fields)
109 def __call__(self, job):
110 """Executes a job query using cached field list.
113 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
116 class _QueuedOpCode(object):
117 """Encapsulates an opcode object.
119 @ivar log: holds the execution log and consists of tuples
120 of the form C{(log_serial, timestamp, level, message)}
121 @ivar input: the OpCode we encapsulate
122 @ivar status: the current status
123 @ivar result: the result of the LU execution
124 @ivar start_timestamp: timestamp for the start of the execution
125 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
126 @ivar stop_timestamp: timestamp for the end of the execution
129 __slots__ = ["input", "status", "result", "log", "priority",
130 "start_timestamp", "exec_timestamp", "end_timestamp",
133 def __init__(self, op):
134 """Initializes instances of this class.
136 @type op: L{opcodes.OpCode}
137 @param op: the opcode we encapsulate
141 self.status = constants.OP_STATUS_QUEUED
144 self.start_timestamp = None
145 self.exec_timestamp = None
146 self.end_timestamp = None
148 # Get initial priority (it might change during the lifetime of this opcode)
149 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
152 def Restore(cls, state):
153 """Restore the _QueuedOpCode from the serialized form.
156 @param state: the serialized state
157 @rtype: _QueuedOpCode
158 @return: a new _QueuedOpCode instance
161 obj = _QueuedOpCode.__new__(cls)
162 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
163 obj.status = state["status"]
164 obj.result = state["result"]
165 obj.log = state["log"]
166 obj.start_timestamp = state.get("start_timestamp", None)
167 obj.exec_timestamp = state.get("exec_timestamp", None)
168 obj.end_timestamp = state.get("end_timestamp", None)
169 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
173 """Serializes this _QueuedOpCode.
176 @return: the dictionary holding the serialized state
180 "input": self.input.__getstate__(),
181 "status": self.status,
182 "result": self.result,
184 "start_timestamp": self.start_timestamp,
185 "exec_timestamp": self.exec_timestamp,
186 "end_timestamp": self.end_timestamp,
187 "priority": self.priority,
191 class _QueuedJob(object):
192 """In-memory job representation.
194 This is what we use to track the user-submitted jobs. Locking must
195 be taken care of by users of this class.
197 @type queue: L{JobQueue}
198 @ivar queue: the parent queue
201 @ivar ops: the list of _QueuedOpCode that constitute the job
202 @type log_serial: int
203 @ivar log_serial: holds the index for the next log entry
204 @ivar received_timestamp: the timestamp for when the job was received
205 @ivar start_timestmap: the timestamp for start of execution
206 @ivar end_timestamp: the timestamp for end of execution
207 @ivar writable: Whether the job is allowed to be modified
210 # pylint: disable=W0212
211 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
212 "received_timestamp", "start_timestamp", "end_timestamp",
213 "__weakref__", "processor_lock", "writable", "archived"]
215 def __init__(self, queue, job_id, ops, writable):
216 """Constructor for the _QueuedJob.
218 @type queue: L{JobQueue}
219 @param queue: our parent queue
221 @param job_id: our job id
223 @param ops: the list of opcodes we hold, which will be encapsulated
226 @param writable: Whether job can be modified
230 raise errors.GenericError("A job needs at least one opcode")
233 self.id = int(job_id)
234 self.ops = [_QueuedOpCode(op) for op in ops]
236 self.received_timestamp = TimeStampNow()
237 self.start_timestamp = None
238 self.end_timestamp = None
239 self.archived = False
241 self._InitInMemory(self, writable)
243 assert not self.archived, "New jobs can not be marked as archived"
246 def _InitInMemory(obj, writable):
247 """Initializes in-memory variables.
250 obj.writable = writable
254 # Read-only jobs are not processed and therefore don't need a lock
256 obj.processor_lock = threading.Lock()
258 obj.processor_lock = None
261 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
263 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
265 return "<%s at %#x>" % (" ".join(status), id(self))
268 def Restore(cls, queue, state, writable, archived):
269 """Restore a _QueuedJob from serialized state:
271 @type queue: L{JobQueue}
272 @param queue: to which queue the restored job belongs
274 @param state: the serialized state
276 @param writable: Whether job can be modified
278 @param archived: Whether job was already archived
280 @return: the restored _JobQueue instance
283 obj = _QueuedJob.__new__(cls)
285 obj.id = int(state["id"])
286 obj.received_timestamp = state.get("received_timestamp", None)
287 obj.start_timestamp = state.get("start_timestamp", None)
288 obj.end_timestamp = state.get("end_timestamp", None)
289 obj.archived = archived
293 for op_state in state["ops"]:
294 op = _QueuedOpCode.Restore(op_state)
295 for log_entry in op.log:
296 obj.log_serial = max(obj.log_serial, log_entry[0])
299 cls._InitInMemory(obj, writable)
304 """Serialize the _JobQueue instance.
307 @return: the serialized state
312 "ops": [op.Serialize() for op in self.ops],
313 "start_timestamp": self.start_timestamp,
314 "end_timestamp": self.end_timestamp,
315 "received_timestamp": self.received_timestamp,
318 def CalcStatus(self):
319 """Compute the status of this job.
321 This function iterates over all the _QueuedOpCodes in the job and
322 based on their status, computes the job status.
325 - if we find a cancelled, or finished with error, the job
326 status will be the same
327 - otherwise, the last opcode with the status one of:
332 will determine the job status
334 - otherwise, it means either all opcodes are queued, or success,
335 and the job status will be the same
337 @return: the job status
340 status = constants.JOB_STATUS_QUEUED
344 if op.status == constants.OP_STATUS_SUCCESS:
349 if op.status == constants.OP_STATUS_QUEUED:
351 elif op.status == constants.OP_STATUS_WAITING:
352 status = constants.JOB_STATUS_WAITING
353 elif op.status == constants.OP_STATUS_RUNNING:
354 status = constants.JOB_STATUS_RUNNING
355 elif op.status == constants.OP_STATUS_CANCELING:
356 status = constants.JOB_STATUS_CANCELING
358 elif op.status == constants.OP_STATUS_ERROR:
359 status = constants.JOB_STATUS_ERROR
360 # The whole job fails if one opcode failed
362 elif op.status == constants.OP_STATUS_CANCELED:
363 status = constants.OP_STATUS_CANCELED
367 status = constants.JOB_STATUS_SUCCESS
371 def CalcPriority(self):
372 """Gets the current priority for this job.
374 Only unfinished opcodes are considered. When all are done, the default
380 priorities = [op.priority for op in self.ops
381 if op.status not in constants.OPS_FINALIZED]
384 # All opcodes are done, assume default priority
385 return constants.OP_PRIO_DEFAULT
387 return min(priorities)
389 def GetLogEntries(self, newer_than):
390 """Selectively returns the log entries.
392 @type newer_than: None or int
393 @param newer_than: if this is None, return all log entries,
394 otherwise return only the log entries with serial higher
397 @return: the list of the log entries selected
400 if newer_than is None:
407 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
411 def GetInfo(self, fields):
412 """Returns information about a job.
415 @param fields: names of fields to return
417 @return: list with one element for each field
418 @raise errors.OpExecError: when an invalid field
422 return _SimpleJobQuery(fields)(self)
424 def MarkUnfinishedOps(self, status, result):
425 """Mark unfinished opcodes with a given status and result.
427 This is an utility function for marking all running or waiting to
428 be run opcodes with a given status. Opcodes which are already
429 finalised are not changed.
431 @param status: a given opcode status
432 @param result: the opcode result
437 if op.status in constants.OPS_FINALIZED:
438 assert not_marked, "Finalized opcodes found after non-finalized ones"
445 """Marks the job as finalized.
448 self.end_timestamp = TimeStampNow()
451 """Marks job as canceled/-ing if possible.
453 @rtype: tuple; (bool, string)
454 @return: Boolean describing whether job was successfully canceled or marked
455 as canceling and a text message
458 status = self.CalcStatus()
460 if status == constants.JOB_STATUS_QUEUED:
461 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
462 "Job canceled by request")
464 return (True, "Job %s canceled" % self.id)
466 elif status == constants.JOB_STATUS_WAITING:
467 # The worker will notice the new status and cancel the job
468 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
469 return (True, "Job %s will be canceled" % self.id)
472 logging.debug("Job %s is no longer waiting in the queue", self.id)
473 return (False, "Job %s is no longer waiting in the queue" % self.id)
476 class _OpExecCallbacks(mcpu.OpExecCbBase):
477 def __init__(self, queue, job, op):
478 """Initializes this class.
480 @type queue: L{JobQueue}
481 @param queue: Job queue
482 @type job: L{_QueuedJob}
483 @param job: Job object
484 @type op: L{_QueuedOpCode}
488 assert queue, "Queue is missing"
489 assert job, "Job is missing"
490 assert op, "Opcode is missing"
496 def _CheckCancel(self):
497 """Raises an exception to cancel the job if asked to.
500 # Cancel here if we were asked to
501 if self._op.status == constants.OP_STATUS_CANCELING:
502 logging.debug("Canceling opcode")
505 @locking.ssynchronized(_QUEUE, shared=1)
506 def NotifyStart(self):
507 """Mark the opcode as running, not lock-waiting.
509 This is called from the mcpu code as a notifier function, when the LU is
510 finally about to start the Exec() method. Of course, to have end-user
511 visible results, the opcode must be initially (before calling into
512 Processor.ExecOpCode) set to OP_STATUS_WAITING.
515 assert self._op in self._job.ops
516 assert self._op.status in (constants.OP_STATUS_WAITING,
517 constants.OP_STATUS_CANCELING)
519 # Cancel here if we were asked to
522 logging.debug("Opcode is now running")
524 self._op.status = constants.OP_STATUS_RUNNING
525 self._op.exec_timestamp = TimeStampNow()
527 # And finally replicate the job status
528 self._queue.UpdateJobUnlocked(self._job)
530 @locking.ssynchronized(_QUEUE, shared=1)
531 def _AppendFeedback(self, timestamp, log_type, log_msg):
532 """Internal feedback append function, with locks
535 self._job.log_serial += 1
536 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
537 self._queue.UpdateJobUnlocked(self._job, replicate=False)
539 def Feedback(self, *args):
540 """Append a log entry.
546 log_type = constants.ELOG_MESSAGE
549 (log_type, log_msg) = args
551 # The time is split to make serialization easier and not lose
553 timestamp = utils.SplitTime(time.time())
554 self._AppendFeedback(timestamp, log_type, log_msg)
556 def CheckCancel(self):
557 """Check whether job has been cancelled.
560 assert self._op.status in (constants.OP_STATUS_WAITING,
561 constants.OP_STATUS_CANCELING)
563 # Cancel here if we were asked to
566 def SubmitManyJobs(self, jobs):
567 """Submits jobs for processing.
569 See L{JobQueue.SubmitManyJobs}.
572 # Locking is done in job queue
573 return self._queue.SubmitManyJobs(jobs)
576 class _JobChangesChecker(object):
577 def __init__(self, fields, prev_job_info, prev_log_serial):
578 """Initializes this class.
580 @type fields: list of strings
581 @param fields: Fields requested by LUXI client
582 @type prev_job_info: string
583 @param prev_job_info: previous job info, as passed by the LUXI client
584 @type prev_log_serial: string
585 @param prev_log_serial: previous job serial, as passed by the LUXI client
588 self._squery = _SimpleJobQuery(fields)
589 self._prev_job_info = prev_job_info
590 self._prev_log_serial = prev_log_serial
592 def __call__(self, job):
593 """Checks whether job has changed.
595 @type job: L{_QueuedJob}
596 @param job: Job object
599 assert not job.writable, "Expected read-only job"
601 status = job.CalcStatus()
602 job_info = self._squery(job)
603 log_entries = job.GetLogEntries(self._prev_log_serial)
605 # Serializing and deserializing data can cause type changes (e.g. from
606 # tuple to list) or precision loss. We're doing it here so that we get
607 # the same modifications as the data received from the client. Without
608 # this, the comparison afterwards might fail without the data being
609 # significantly different.
610 # TODO: we just deserialized from disk, investigate how to make sure that
611 # the job info and log entries are compatible to avoid this further step.
612 # TODO: Doing something like in testutils.py:UnifyValueType might be more
613 # efficient, though floats will be tricky
614 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
615 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
617 # Don't even try to wait if the job is no longer running, there will be
619 if (status not in (constants.JOB_STATUS_QUEUED,
620 constants.JOB_STATUS_RUNNING,
621 constants.JOB_STATUS_WAITING) or
622 job_info != self._prev_job_info or
623 (log_entries and self._prev_log_serial != log_entries[0][0])):
624 logging.debug("Job %s changed", job.id)
625 return (job_info, log_entries)
630 class _JobFileChangesWaiter(object):
631 def __init__(self, filename):
632 """Initializes this class.
634 @type filename: string
635 @param filename: Path to job file
636 @raises errors.InotifyError: if the notifier cannot be setup
639 self._wm = pyinotify.WatchManager()
640 self._inotify_handler = \
641 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
643 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
645 self._inotify_handler.enable()
647 # pyinotify doesn't close file descriptors automatically
648 self._notifier.stop()
651 def _OnInotify(self, notifier_enabled):
652 """Callback for inotify.
655 if not notifier_enabled:
656 self._inotify_handler.enable()
658 def Wait(self, timeout):
659 """Waits for the job file to change.
662 @param timeout: Timeout in seconds
663 @return: Whether there have been events
667 have_events = self._notifier.check_events(timeout * 1000)
669 self._notifier.read_events()
670 self._notifier.process_events()
674 """Closes underlying notifier and its file descriptor.
677 self._notifier.stop()
680 class _JobChangesWaiter(object):
681 def __init__(self, filename):
682 """Initializes this class.
684 @type filename: string
685 @param filename: Path to job file
688 self._filewaiter = None
689 self._filename = filename
691 def Wait(self, timeout):
692 """Waits for a job to change.
695 @param timeout: Timeout in seconds
696 @return: Whether there have been events
700 return self._filewaiter.Wait(timeout)
702 # Lazy setup: Avoid inotify setup cost when job file has already changed.
703 # If this point is reached, return immediately and let caller check the job
704 # file again in case there were changes since the last check. This avoids a
706 self._filewaiter = _JobFileChangesWaiter(self._filename)
711 """Closes underlying waiter.
715 self._filewaiter.Close()
718 class _WaitForJobChangesHelper(object):
719 """Helper class using inotify to wait for changes in a job file.
721 This class takes a previous job status and serial, and alerts the client when
722 the current job status has changed.
726 def _CheckForChanges(counter, job_load_fn, check_fn):
727 if counter.next() > 0:
728 # If this isn't the first check the job is given some more time to change
729 # again. This gives better performance for jobs generating many
735 raise errors.JobLost()
737 result = check_fn(job)
739 raise utils.RetryAgain()
743 def __call__(self, filename, job_load_fn,
744 fields, prev_job_info, prev_log_serial, timeout):
745 """Waits for changes on a job.
747 @type filename: string
748 @param filename: File on which to wait for changes
749 @type job_load_fn: callable
750 @param job_load_fn: Function to load job
751 @type fields: list of strings
752 @param fields: Which fields to check for changes
753 @type prev_job_info: list or None
754 @param prev_job_info: Last job information returned
755 @type prev_log_serial: int
756 @param prev_log_serial: Last job message serial number
758 @param timeout: maximum time to wait in seconds
761 counter = itertools.count()
763 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
764 waiter = _JobChangesWaiter(filename)
766 return utils.Retry(compat.partial(self._CheckForChanges,
767 counter, job_load_fn, check_fn),
768 utils.RETRY_REMAINING_TIME, timeout,
772 except (errors.InotifyError, errors.JobLost):
774 except utils.RetryTimeout:
775 return constants.JOB_NOTCHANGED
778 def _EncodeOpError(err):
779 """Encodes an error which occurred while processing an opcode.
782 if isinstance(err, errors.GenericError):
785 to_encode = errors.OpExecError(str(err))
787 return errors.EncodeException(to_encode)
790 class _TimeoutStrategyWrapper:
791 def __init__(self, fn):
792 """Initializes this class.
799 """Gets the next timeout if necessary.
802 if self._next is None:
803 self._next = self._fn()
806 """Returns the next timeout.
813 """Returns the current timeout and advances the internal state.
822 class _OpExecContext:
823 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
824 """Initializes this class.
829 self.log_prefix = log_prefix
830 self.summary = op.input.Summary()
832 # Create local copy to modify
833 if getattr(op.input, opcodes.DEPEND_ATTR, None):
834 self.jobdeps = op.input.depends[:]
838 self._timeout_strategy_factory = timeout_strategy_factory
839 self._ResetTimeoutStrategy()
841 def _ResetTimeoutStrategy(self):
842 """Creates a new timeout strategy.
845 self._timeout_strategy = \
846 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
848 def CheckPriorityIncrease(self):
849 """Checks whether priority can and should be increased.
851 Called when locks couldn't be acquired.
856 # Exhausted all retries and next round should not use blocking acquire
858 if (self._timeout_strategy.Peek() is None and
859 op.priority > constants.OP_PRIO_HIGHEST):
860 logging.debug("Increasing priority")
862 self._ResetTimeoutStrategy()
867 def GetNextLockTimeout(self):
868 """Returns the next lock acquire timeout.
871 return self._timeout_strategy.Next()
874 class _JobProcessor(object):
877 FINISHED) = range(1, 4)
879 def __init__(self, queue, opexec_fn, job,
880 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
881 """Initializes this class.
885 self.opexec_fn = opexec_fn
887 self._timeout_strategy_factory = _timeout_strategy_factory
890 def _FindNextOpcode(job, timeout_strategy_factory):
891 """Locates the next opcode to run.
893 @type job: L{_QueuedJob}
894 @param job: Job object
895 @param timeout_strategy_factory: Callable to create new timeout strategy
898 # Create some sort of a cache to speed up locating next opcode for future
900 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
901 # pending and one for processed ops.
902 if job.ops_iter is None:
903 job.ops_iter = enumerate(job.ops)
905 # Find next opcode to run
908 (idx, op) = job.ops_iter.next()
909 except StopIteration:
910 raise errors.ProgrammerError("Called for a finished job")
912 if op.status == constants.OP_STATUS_RUNNING:
913 # Found an opcode already marked as running
914 raise errors.ProgrammerError("Called for job marked as running")
916 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
917 timeout_strategy_factory)
919 if op.status not in constants.OPS_FINALIZED:
922 # This is a job that was partially completed before master daemon
923 # shutdown, so it can be expected that some opcodes are already
924 # completed successfully (if any did error out, then the whole job
925 # should have been aborted and not resubmitted for processing).
926 logging.info("%s: opcode %s already processed, skipping",
927 opctx.log_prefix, opctx.summary)
930 def _MarkWaitlock(job, op):
931 """Marks an opcode as waiting for locks.
933 The job's start timestamp is also set if necessary.
935 @type job: L{_QueuedJob}
936 @param job: Job object
937 @type op: L{_QueuedOpCode}
938 @param op: Opcode object
942 assert op.status in (constants.OP_STATUS_QUEUED,
943 constants.OP_STATUS_WAITING)
949 if op.status == constants.OP_STATUS_QUEUED:
950 op.status = constants.OP_STATUS_WAITING
953 if op.start_timestamp is None:
954 op.start_timestamp = TimeStampNow()
957 if job.start_timestamp is None:
958 job.start_timestamp = op.start_timestamp
961 assert op.status == constants.OP_STATUS_WAITING
966 def _CheckDependencies(queue, job, opctx):
967 """Checks if an opcode has dependencies and if so, processes them.
969 @type queue: L{JobQueue}
970 @param queue: Queue object
971 @type job: L{_QueuedJob}
972 @param job: Job object
973 @type opctx: L{_OpExecContext}
974 @param opctx: Opcode execution context
976 @return: Whether opcode will be re-scheduled by dependency tracker
984 (dep_job_id, dep_status) = opctx.jobdeps[0]
986 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
988 assert ht.TNonEmptyString(depmsg), "No dependency message"
990 logging.info("%s: %s", opctx.log_prefix, depmsg)
992 if depresult == _JobDependencyManager.CONTINUE:
993 # Remove dependency and continue
996 elif depresult == _JobDependencyManager.WAIT:
997 # Need to wait for notification, dependency tracker will re-add job
1002 elif depresult == _JobDependencyManager.CANCEL:
1003 # Job was cancelled, cancel this job as well
1005 assert op.status == constants.OP_STATUS_CANCELING
1008 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1009 _JobDependencyManager.ERROR):
1010 # Job failed or there was an error, this job must fail
1011 op.status = constants.OP_STATUS_ERROR
1012 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1016 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1021 def _ExecOpCodeUnlocked(self, opctx):
1022 """Processes one opcode and returns the result.
1027 assert op.status == constants.OP_STATUS_WAITING
1029 timeout = opctx.GetNextLockTimeout()
1032 # Make sure not to hold queue lock while calling ExecOpCode
1033 result = self.opexec_fn(op.input,
1034 _OpExecCallbacks(self.queue, self.job, op),
1035 timeout=timeout, priority=op.priority)
1036 except mcpu.LockAcquireTimeout:
1037 assert timeout is not None, "Received timeout for blocking acquire"
1038 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1040 assert op.status in (constants.OP_STATUS_WAITING,
1041 constants.OP_STATUS_CANCELING)
1043 # Was job cancelled while we were waiting for the lock?
1044 if op.status == constants.OP_STATUS_CANCELING:
1045 return (constants.OP_STATUS_CANCELING, None)
1047 # Stay in waitlock while trying to re-acquire lock
1048 return (constants.OP_STATUS_WAITING, None)
1050 logging.exception("%s: Canceling job", opctx.log_prefix)
1051 assert op.status == constants.OP_STATUS_CANCELING
1052 return (constants.OP_STATUS_CANCELING, None)
1053 except Exception, err: # pylint: disable=W0703
1054 logging.exception("%s: Caught exception in %s",
1055 opctx.log_prefix, opctx.summary)
1056 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1058 logging.debug("%s: %s successful",
1059 opctx.log_prefix, opctx.summary)
1060 return (constants.OP_STATUS_SUCCESS, result)
1062 def __call__(self, _nextop_fn=None):
1063 """Continues execution of a job.
1065 @param _nextop_fn: Callback function for tests
1066 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1067 be deferred and C{WAITDEP} if the dependency manager
1068 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1074 logging.debug("Processing job %s", job.id)
1076 queue.acquire(shared=1)
1078 opcount = len(job.ops)
1080 assert job.writable, "Expected writable job"
1082 # Don't do anything for finalized jobs
1083 if job.CalcStatus() in constants.JOBS_FINALIZED:
1084 return self.FINISHED
1086 # Is a previous opcode still pending?
1088 opctx = job.cur_opctx
1089 job.cur_opctx = None
1091 if __debug__ and _nextop_fn:
1093 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1098 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1099 constants.OP_STATUS_CANCELING)
1100 for i in job.ops[opctx.index + 1:])
1102 assert op.status in (constants.OP_STATUS_QUEUED,
1103 constants.OP_STATUS_WAITING,
1104 constants.OP_STATUS_CANCELING)
1106 assert (op.priority <= constants.OP_PRIO_LOWEST and
1107 op.priority >= constants.OP_PRIO_HIGHEST)
1111 if op.status != constants.OP_STATUS_CANCELING:
1112 assert op.status in (constants.OP_STATUS_QUEUED,
1113 constants.OP_STATUS_WAITING)
1115 # Prepare to start opcode
1116 if self._MarkWaitlock(job, op):
1118 queue.UpdateJobUnlocked(job)
1120 assert op.status == constants.OP_STATUS_WAITING
1121 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1122 assert job.start_timestamp and op.start_timestamp
1123 assert waitjob is None
1125 # Check if waiting for a job is necessary
1126 waitjob = self._CheckDependencies(queue, job, opctx)
1128 assert op.status in (constants.OP_STATUS_WAITING,
1129 constants.OP_STATUS_CANCELING,
1130 constants.OP_STATUS_ERROR)
1132 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1133 constants.OP_STATUS_ERROR)):
1134 logging.info("%s: opcode %s waiting for locks",
1135 opctx.log_prefix, opctx.summary)
1137 assert not opctx.jobdeps, "Not all dependencies were removed"
1141 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1143 queue.acquire(shared=1)
1145 op.status = op_status
1146 op.result = op_result
1150 if op.status == constants.OP_STATUS_WAITING:
1151 # Couldn't get locks in time
1152 assert not op.end_timestamp
1155 op.end_timestamp = TimeStampNow()
1157 if op.status == constants.OP_STATUS_CANCELING:
1158 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1159 for i in job.ops[opctx.index:])
1161 assert op.status in constants.OPS_FINALIZED
1163 if op.status == constants.OP_STATUS_WAITING or waitjob:
1166 if not waitjob and opctx.CheckPriorityIncrease():
1167 # Priority was changed, need to update on-disk file
1168 queue.UpdateJobUnlocked(job)
1170 # Keep around for another round
1171 job.cur_opctx = opctx
1173 assert (op.priority <= constants.OP_PRIO_LOWEST and
1174 op.priority >= constants.OP_PRIO_HIGHEST)
1176 # In no case must the status be finalized here
1177 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1180 # Ensure all opcodes so far have been successful
1181 assert (opctx.index == 0 or
1182 compat.all(i.status == constants.OP_STATUS_SUCCESS
1183 for i in job.ops[:opctx.index]))
1186 job.cur_opctx = None
1188 if op.status == constants.OP_STATUS_SUCCESS:
1191 elif op.status == constants.OP_STATUS_ERROR:
1192 # Ensure failed opcode has an exception as its result
1193 assert errors.GetEncodedError(job.ops[opctx.index].result)
1195 to_encode = errors.OpExecError("Preceding opcode failed")
1196 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1197 _EncodeOpError(to_encode))
1201 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1202 errors.GetEncodedError(i.result)
1203 for i in job.ops[opctx.index:])
1205 elif op.status == constants.OP_STATUS_CANCELING:
1206 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1207 "Job canceled by request")
1211 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1213 if opctx.index == (opcount - 1):
1214 # Finalize on last opcode
1218 # All opcodes have been run, finalize job
1221 # Write to disk. If the job status is final, this is the final write
1222 # allowed. Once the file has been written, it can be archived anytime.
1223 queue.UpdateJobUnlocked(job)
1228 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1229 return self.FINISHED
1231 assert not waitjob or queue.depmgr.JobWaiting(job)
1238 assert job.writable, "Job became read-only while being processed"
1242 def _EvaluateJobProcessorResult(depmgr, job, result):
1243 """Looks at a result from L{_JobProcessor} for a job.
1245 To be used in a L{_JobQueueWorker}.
1248 if result == _JobProcessor.FINISHED:
1249 # Notify waiting jobs
1250 depmgr.NotifyWaiters(job.id)
1252 elif result == _JobProcessor.DEFER:
1254 raise workerpool.DeferTask(priority=job.CalcPriority())
1256 elif result == _JobProcessor.WAITDEP:
1257 # No-op, dependency manager will re-schedule
1261 raise errors.ProgrammerError("Job processor returned unknown status %s" %
1265 class _JobQueueWorker(workerpool.BaseWorker):
1266 """The actual job workers.
1269 def RunTask(self, job): # pylint: disable=W0221
1272 @type job: L{_QueuedJob}
1273 @param job: the job to be processed
1276 assert job.writable, "Expected writable job"
1278 # Ensure only one worker is active on a single job. If a job registers for
1279 # a dependency job, and the other job notifies before the first worker is
1280 # done, the job can end up in the tasklist more than once.
1281 job.processor_lock.acquire()
1283 return self._RunTaskInner(job)
1285 job.processor_lock.release()
1287 def _RunTaskInner(self, job):
1290 Must be called with per-job lock acquired.
1294 assert queue == self.pool.queue
1296 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1299 proc = mcpu.Processor(queue.context, job.id)
1301 # Create wrapper for setting thread name
1302 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1305 _EvaluateJobProcessorResult(queue.depmgr, job,
1306 _JobProcessor(queue, wrap_execop_fn, job)())
1309 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1310 """Updates the worker thread name to include a short summary of the opcode.
1312 @param setname_fn: Callable setting worker thread name
1313 @param execop_fn: Callable for executing opcode (usually
1314 L{mcpu.Processor.ExecOpCode})
1319 return execop_fn(op, *args, **kwargs)
1324 def _GetWorkerName(job, op):
1325 """Sets the worker thread name.
1327 @type job: L{_QueuedJob}
1328 @type op: L{opcodes.OpCode}
1331 parts = ["Job%s" % job.id]
1334 parts.append(op.TinySummary())
1336 return "/".join(parts)
1339 class _JobQueueWorkerPool(workerpool.WorkerPool):
1340 """Simple class implementing a job-processing workerpool.
1343 def __init__(self, queue):
1344 super(_JobQueueWorkerPool, self).__init__("Jq",
1350 class _JobDependencyManager:
1351 """Keeps track of job dependencies.
1358 WRONGSTATUS) = range(1, 6)
1360 def __init__(self, getstatus_fn, enqueue_fn):
1361 """Initializes this class.
1364 self._getstatus_fn = getstatus_fn
1365 self._enqueue_fn = enqueue_fn
1368 self._lock = locking.SharedLock("JobDepMgr")
1370 @locking.ssynchronized(_LOCK, shared=1)
1371 def GetLockInfo(self, requested): # pylint: disable=W0613
1372 """Retrieves information about waiting jobs.
1374 @type requested: set
1375 @param requested: Requested information, see C{query.LQ_*}
1378 # No need to sort here, that's being done by the lock manager and query
1379 # library. There are no priorities for notifying jobs, hence all show up as
1380 # one item under "pending".
1381 return [("job/%s" % job_id, None, None,
1382 [("job", [job.id for job in waiters])])
1383 for job_id, waiters in self._waiters.items()
1386 @locking.ssynchronized(_LOCK, shared=1)
1387 def JobWaiting(self, job):
1388 """Checks if a job is waiting.
1391 return compat.any(job in jobs
1392 for jobs in self._waiters.values())
1394 @locking.ssynchronized(_LOCK)
1395 def CheckAndRegister(self, job, dep_job_id, dep_status):
1396 """Checks if a dependency job has the requested status.
1398 If the other job is not yet in a finalized status, the calling job will be
1399 notified (re-added to the workerpool) at a later point.
1401 @type job: L{_QueuedJob}
1402 @param job: Job object
1403 @type dep_job_id: int
1404 @param dep_job_id: ID of dependency job
1405 @type dep_status: list
1406 @param dep_status: Required status
1409 assert ht.TJobId(job.id)
1410 assert ht.TJobId(dep_job_id)
1411 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1413 if job.id == dep_job_id:
1414 return (self.ERROR, "Job can't depend on itself")
1416 # Get status of dependency job
1418 status = self._getstatus_fn(dep_job_id)
1419 except errors.JobLost, err:
1420 return (self.ERROR, "Dependency error: %s" % err)
1422 assert status in constants.JOB_STATUS_ALL
1424 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1426 if status not in constants.JOBS_FINALIZED:
1427 # Register for notification and wait for job to finish
1428 job_id_waiters.add(job)
1430 "Need to wait for job %s, wanted status '%s'" %
1431 (dep_job_id, dep_status))
1433 # Remove from waiters list
1434 if job in job_id_waiters:
1435 job_id_waiters.remove(job)
1437 if (status == constants.JOB_STATUS_CANCELED and
1438 constants.JOB_STATUS_CANCELED not in dep_status):
1439 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1441 elif not dep_status or status in dep_status:
1442 return (self.CONTINUE,
1443 "Dependency job %s finished with status '%s'" %
1444 (dep_job_id, status))
1447 return (self.WRONGSTATUS,
1448 "Dependency job %s finished with status '%s',"
1449 " not one of '%s' as required" %
1450 (dep_job_id, status, utils.CommaJoin(dep_status)))
1452 def _RemoveEmptyWaitersUnlocked(self):
1453 """Remove all jobs without actual waiters.
1456 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1458 del self._waiters[job_id]
1460 def NotifyWaiters(self, job_id):
1461 """Notifies all jobs waiting for a certain job ID.
1463 @attention: Do not call until L{CheckAndRegister} returned a status other
1464 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1466 @param job_id: Job ID
1469 assert ht.TJobId(job_id)
1471 self._lock.acquire()
1473 self._RemoveEmptyWaitersUnlocked()
1475 jobs = self._waiters.pop(job_id, None)
1477 self._lock.release()
1480 # Re-add jobs to workerpool
1481 logging.debug("Re-adding %s jobs which were waiting for job %s",
1483 self._enqueue_fn(jobs)
1486 def _RequireOpenQueue(fn):
1487 """Decorator for "public" functions.
1489 This function should be used for all 'public' functions. That is,
1490 functions usually called from other classes. Note that this should
1491 be applied only to methods (not plain functions), since it expects
1492 that the decorated function is called with a first argument that has
1493 a '_queue_filelock' argument.
1495 @warning: Use this decorator only after locking.ssynchronized
1498 @locking.ssynchronized(_LOCK)
1504 def wrapper(self, *args, **kwargs):
1505 # pylint: disable=W0212
1506 assert self._queue_filelock is not None, "Queue should be open"
1507 return fn(self, *args, **kwargs)
1511 def _RequireNonDrainedQueue(fn):
1512 """Decorator checking for a non-drained queue.
1514 To be used with functions submitting new jobs.
1517 def wrapper(self, *args, **kwargs):
1518 """Wrapper function.
1520 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1523 # Ok when sharing the big job queue lock, as the drain file is created when
1524 # the lock is exclusive.
1525 # Needs access to protected member, pylint: disable=W0212
1527 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1529 if not self._accepting_jobs:
1530 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1532 return fn(self, *args, **kwargs)
1536 class JobQueue(object):
1537 """Queue used to manage the jobs.
1540 def __init__(self, context):
1541 """Constructor for JobQueue.
1543 The constructor will initialize the job queue object and then
1544 start loading the current jobs from disk, either for starting them
1545 (if they were queue) or for aborting them (if they were already
1548 @type context: GanetiContext
1549 @param context: the context object for access to the configuration
1550 data and other ganeti objects
1553 self.context = context
1554 self._memcache = weakref.WeakValueDictionary()
1555 self._my_hostname = netutils.Hostname.GetSysName()
1557 # The Big JobQueue lock. If a code block or method acquires it in shared
1558 # mode safe it must guarantee concurrency with all the code acquiring it in
1559 # shared mode, including itself. In order not to acquire it at all
1560 # concurrency must be guaranteed with all code acquiring it in shared mode
1561 # and all code acquiring it exclusively.
1562 self._lock = locking.SharedLock("JobQueue")
1564 self.acquire = self._lock.acquire
1565 self.release = self._lock.release
1567 # Accept jobs by default
1568 self._accepting_jobs = True
1570 # Initialize the queue, and acquire the filelock.
1571 # This ensures no other process is working on the job queue.
1572 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1575 self._last_serial = jstore.ReadSerial()
1576 assert self._last_serial is not None, ("Serial file was modified between"
1577 " check in jstore and here")
1579 # Get initial list of nodes
1580 self._nodes = dict((n.name, n.primary_ip)
1581 for n in self.context.cfg.GetAllNodesInfo().values()
1582 if n.master_candidate)
1584 # Remove master node
1585 self._nodes.pop(self._my_hostname, None)
1587 # TODO: Check consistency across nodes
1589 self._queue_size = None
1590 self._UpdateQueueSizeUnlocked()
1591 assert ht.TInt(self._queue_size)
1592 self._drained = jstore.CheckDrainFlag()
1595 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1597 self.context.glm.AddToLockMonitor(self.depmgr)
1600 self._wpool = _JobQueueWorkerPool(self)
1602 self._InspectQueue()
1604 self._wpool.TerminateWorkers()
1607 @locking.ssynchronized(_LOCK)
1609 def _InspectQueue(self):
1610 """Loads the whole job queue and resumes unfinished jobs.
1612 This function needs the lock here because WorkerPool.AddTask() may start a
1613 job while we're still doing our work.
1616 logging.info("Inspecting job queue")
1620 all_job_ids = self._GetJobIDsUnlocked()
1621 jobs_count = len(all_job_ids)
1622 lastinfo = time.time()
1623 for idx, job_id in enumerate(all_job_ids):
1624 # Give an update every 1000 jobs or 10 seconds
1625 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1626 idx == (jobs_count - 1)):
1627 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1628 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1629 lastinfo = time.time()
1631 job = self._LoadJobUnlocked(job_id)
1633 # a failure in loading the job can cause 'None' to be returned
1637 status = job.CalcStatus()
1639 if status == constants.JOB_STATUS_QUEUED:
1640 restartjobs.append(job)
1642 elif status in (constants.JOB_STATUS_RUNNING,
1643 constants.JOB_STATUS_WAITING,
1644 constants.JOB_STATUS_CANCELING):
1645 logging.warning("Unfinished job %s found: %s", job.id, job)
1647 if status == constants.JOB_STATUS_WAITING:
1649 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1650 restartjobs.append(job)
1652 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1653 "Unclean master daemon shutdown")
1656 self.UpdateJobUnlocked(job)
1659 logging.info("Restarting %s jobs", len(restartjobs))
1660 self._EnqueueJobsUnlocked(restartjobs)
1662 logging.info("Job queue inspection finished")
1664 def _GetRpc(self, address_list):
1665 """Gets RPC runner with context.
1668 return rpc.JobQueueRunner(self.context, address_list)
1670 @locking.ssynchronized(_LOCK)
1672 def AddNode(self, node):
1673 """Register a new node with the queue.
1675 @type node: L{objects.Node}
1676 @param node: the node object to be added
1679 node_name = node.name
1680 assert node_name != self._my_hostname
1682 # Clean queue directory on added node
1683 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1684 msg = result.fail_msg
1686 logging.warning("Cannot cleanup queue directory on node %s: %s",
1689 if not node.master_candidate:
1690 # remove if existing, ignoring errors
1691 self._nodes.pop(node_name, None)
1692 # and skip the replication of the job ids
1695 # Upload the whole queue excluding archived jobs
1696 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1698 # Upload current serial file
1699 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1701 # Static address list
1702 addrs = [node.primary_ip]
1704 for file_name in files:
1706 content = utils.ReadFile(file_name)
1708 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1710 msg = result[node_name].fail_msg
1712 logging.error("Failed to upload file %s to node %s: %s",
1713 file_name, node_name, msg)
1715 self._nodes[node_name] = node.primary_ip
1717 @locking.ssynchronized(_LOCK)
1719 def RemoveNode(self, node_name):
1720 """Callback called when removing nodes from the cluster.
1722 @type node_name: str
1723 @param node_name: the name of the node to remove
1726 self._nodes.pop(node_name, None)
1729 def _CheckRpcResult(result, nodes, failmsg):
1730 """Verifies the status of an RPC call.
1732 Since we aim to keep consistency should this node (the current
1733 master) fail, we will log errors if our rpc fail, and especially
1734 log the case when more than half of the nodes fails.
1736 @param result: the data as returned from the rpc call
1738 @param nodes: the list of nodes we made the call to
1740 @param failmsg: the identifier to be used for logging
1747 msg = result[node].fail_msg
1750 logging.error("RPC call %s (%s) failed on node %s: %s",
1751 result[node].call, failmsg, node, msg)
1753 success.append(node)
1755 # +1 for the master node
1756 if (len(success) + 1) < len(failed):
1757 # TODO: Handle failing nodes
1758 logging.error("More than half of the nodes failed")
1760 def _GetNodeIp(self):
1761 """Helper for returning the node name/ip list.
1763 @rtype: (list, list)
1764 @return: a tuple of two lists, the first one with the node
1765 names and the second one with the node addresses
1768 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1769 name_list = self._nodes.keys()
1770 addr_list = [self._nodes[name] for name in name_list]
1771 return name_list, addr_list
1773 def _UpdateJobQueueFile(self, file_name, data, replicate):
1774 """Writes a file locally and then replicates it to all nodes.
1776 This function will replace the contents of a file on the local
1777 node and then replicate it to all the other nodes we have.
1779 @type file_name: str
1780 @param file_name: the path of the file to be replicated
1782 @param data: the new contents of the file
1783 @type replicate: boolean
1784 @param replicate: whether to spread the changes to the remote nodes
1787 getents = runtime.GetEnts()
1788 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1789 gid=getents.masterd_gid)
1792 names, addrs = self._GetNodeIp()
1793 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1794 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1796 def _RenameFilesUnlocked(self, rename):
1797 """Renames a file locally and then replicate the change.
1799 This function will rename a file in the local queue directory
1800 and then replicate this rename to all the other nodes we have.
1802 @type rename: list of (old, new)
1803 @param rename: List containing tuples mapping old to new names
1806 # Rename them locally
1807 for old, new in rename:
1808 utils.RenameFile(old, new, mkdir=True)
1810 # ... and on all nodes
1811 names, addrs = self._GetNodeIp()
1812 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1813 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1815 def _NewSerialsUnlocked(self, count):
1816 """Generates a new job identifier.
1818 Job identifiers are unique during the lifetime of a cluster.
1820 @type count: integer
1821 @param count: how many serials to return
1823 @return: a list of job identifiers.
1826 assert ht.TPositiveInt(count)
1829 serial = self._last_serial + count
1832 self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1833 "%s\n" % serial, True)
1835 result = [jstore.FormatJobID(v)
1836 for v in range(self._last_serial + 1, serial + 1)]
1838 # Keep it only if we were able to write the file
1839 self._last_serial = serial
1841 assert len(result) == count
1846 def _GetJobPath(job_id):
1847 """Returns the job file for a given job id.
1850 @param job_id: the job identifier
1852 @return: the path to the job file
1855 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1858 def _GetArchivedJobPath(job_id):
1859 """Returns the archived job file for a give job id.
1862 @param job_id: the job identifier
1864 @return: the path to the archived job file
1867 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1868 jstore.GetArchiveDirectory(job_id),
1872 def _DetermineJobDirectories(archived):
1873 """Build list of directories containing job files.
1875 @type archived: bool
1876 @param archived: Whether to include directories for archived jobs
1880 result = [pathutils.QUEUE_DIR]
1883 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1884 result.extend(map(compat.partial(utils.PathJoin, archive_path),
1885 utils.ListVisibleFiles(archive_path)))
1890 def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1891 """Return all known job IDs.
1893 The method only looks at disk because it's a requirement that all
1894 jobs are present on disk (so in the _memcache we don't have any
1898 @param sort: perform sorting on the returned job ids
1900 @return: the list of job IDs
1905 for path in cls._DetermineJobDirectories(archived):
1906 for filename in utils.ListVisibleFiles(path):
1907 m = constants.JOB_FILE_RE.match(filename)
1909 jlist.append(int(m.group(1)))
1915 def _LoadJobUnlocked(self, job_id):
1916 """Loads a job from the disk or memory.
1918 Given a job id, this will return the cached job object if
1919 existing, or try to load the job from the disk. If loading from
1920 disk, it will also add the job to the cache.
1923 @param job_id: the job id
1924 @rtype: L{_QueuedJob} or None
1925 @return: either None or the job object
1928 job = self._memcache.get(job_id, None)
1930 logging.debug("Found job %s in memcache", job_id)
1931 assert job.writable, "Found read-only job in memcache"
1935 job = self._LoadJobFromDisk(job_id, False)
1938 except errors.JobFileCorrupted:
1939 old_path = self._GetJobPath(job_id)
1940 new_path = self._GetArchivedJobPath(job_id)
1941 if old_path == new_path:
1942 # job already archived (future case)
1943 logging.exception("Can't parse job %s", job_id)
1946 logging.exception("Can't parse job %s, will archive.", job_id)
1947 self._RenameFilesUnlocked([(old_path, new_path)])
1950 assert job.writable, "Job just loaded is not writable"
1952 self._memcache[job_id] = job
1953 logging.debug("Added job %s to the cache", job_id)
1956 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1957 """Load the given job file from disk.
1959 Given a job file, read, load and restore it in a _QueuedJob format.
1962 @param job_id: job identifier
1963 @type try_archived: bool
1964 @param try_archived: Whether to try loading an archived job
1965 @rtype: L{_QueuedJob} or None
1966 @return: either None or the job object
1969 path_functions = [(self._GetJobPath, False)]
1972 path_functions.append((self._GetArchivedJobPath, True))
1977 for (fn, archived) in path_functions:
1978 filepath = fn(job_id)
1979 logging.debug("Loading job from %s", filepath)
1981 raw_data = utils.ReadFile(filepath)
1982 except EnvironmentError, err:
1983 if err.errno != errno.ENOENT:
1991 if writable is None:
1992 writable = not archived
1995 data = serializer.LoadJson(raw_data)
1996 job = _QueuedJob.Restore(self, data, writable, archived)
1997 except Exception, err: # pylint: disable=W0703
1998 raise errors.JobFileCorrupted(err)
2002 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2003 """Load the given job file from disk.
2005 Given a job file, read, load and restore it in a _QueuedJob format.
2006 In case of error reading the job, it gets returned as None, and the
2007 exception is logged.
2010 @param job_id: job identifier
2011 @type try_archived: bool
2012 @param try_archived: Whether to try loading an archived job
2013 @rtype: L{_QueuedJob} or None
2014 @return: either None or the job object
2018 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2019 except (errors.JobFileCorrupted, EnvironmentError):
2020 logging.exception("Can't load/parse job %s", job_id)
2023 def _UpdateQueueSizeUnlocked(self):
2024 """Update the queue size.
2027 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2029 @locking.ssynchronized(_LOCK)
2031 def SetDrainFlag(self, drain_flag):
2032 """Sets the drain flag for the queue.
2034 @type drain_flag: boolean
2035 @param drain_flag: Whether to set or unset the drain flag
2038 jstore.SetDrainFlag(drain_flag)
2040 self._drained = drain_flag
2045 def _SubmitJobUnlocked(self, job_id, ops):
2046 """Create and store a new job.
2048 This enters the job into our job queue and also puts it on the new
2049 queue, in order for it to be picked up by the queue processors.
2051 @type job_id: job ID
2052 @param job_id: the job ID for the new job
2054 @param ops: The list of OpCodes that will become the new job.
2055 @rtype: L{_QueuedJob}
2056 @return: the job object to be queued
2057 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2058 @raise errors.GenericError: If an opcode is not valid
2061 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2062 raise errors.JobQueueFull()
2064 job = _QueuedJob(self, job_id, ops, True)
2066 for idx, op in enumerate(job.ops):
2068 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2069 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2070 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2071 " are %s" % (idx, op.priority, allowed))
2073 # Check job dependencies
2074 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2075 if not opcodes.TNoRelativeJobDependencies(dependencies):
2076 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2078 (idx, opcodes.TNoRelativeJobDependencies,
2082 self.UpdateJobUnlocked(job)
2084 self._queue_size += 1
2086 logging.debug("Adding new job %s to the cache", job_id)
2087 self._memcache[job_id] = job
2091 @locking.ssynchronized(_LOCK)
2093 @_RequireNonDrainedQueue
2094 def SubmitJob(self, ops):
2095 """Create and store a new job.
2097 @see: L{_SubmitJobUnlocked}
2100 (job_id, ) = self._NewSerialsUnlocked(1)
2101 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2104 @locking.ssynchronized(_LOCK)
2106 @_RequireNonDrainedQueue
2107 def SubmitManyJobs(self, jobs):
2108 """Create and store multiple jobs.
2110 @see: L{_SubmitJobUnlocked}
2113 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2115 (results, added_jobs) = \
2116 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2118 self._EnqueueJobsUnlocked(added_jobs)
2123 def _FormatSubmitError(msg, ops):
2124 """Formats errors which occurred while submitting a job.
2127 return ("%s; opcodes %s" %
2128 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2131 def _ResolveJobDependencies(resolve_fn, deps):
2132 """Resolves relative job IDs in dependencies.
2134 @type resolve_fn: callable
2135 @param resolve_fn: Function to resolve a relative job ID
2137 @param deps: Dependencies
2138 @rtype: tuple; (boolean, string or list)
2139 @return: If successful (first tuple item), the returned list contains
2140 resolved job IDs along with the requested status; if not successful,
2141 the second element is an error message
2146 for (dep_job_id, dep_status) in deps:
2147 if ht.TRelativeJobId(dep_job_id):
2148 assert ht.TInt(dep_job_id) and dep_job_id < 0
2150 job_id = resolve_fn(dep_job_id)
2153 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2157 result.append((job_id, dep_status))
2159 return (True, result)
2161 def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2162 """Create and store multiple jobs.
2164 @see: L{_SubmitJobUnlocked}
2170 def resolve_fn(job_idx, reljobid):
2172 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2174 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2176 if getattr(op, opcodes.DEPEND_ATTR, None):
2178 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2181 # Abort resolving dependencies
2182 assert ht.TNonEmptyString(data), "No error message"
2184 # Use resolved dependencies
2188 job = self._SubmitJobUnlocked(job_id, ops)
2189 except errors.GenericError, err:
2191 data = self._FormatSubmitError(str(err), ops)
2195 added_jobs.append(job)
2197 results.append((status, data))
2199 return (results, added_jobs)
2201 @locking.ssynchronized(_LOCK)
2202 def _EnqueueJobs(self, jobs):
2203 """Helper function to add jobs to worker pool's queue.
2206 @param jobs: List of all jobs
2209 return self._EnqueueJobsUnlocked(jobs)
2211 def _EnqueueJobsUnlocked(self, jobs):
2212 """Helper function to add jobs to worker pool's queue.
2215 @param jobs: List of all jobs
2218 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2219 self._wpool.AddManyTasks([(job, ) for job in jobs],
2220 priority=[job.CalcPriority() for job in jobs])
2222 def _GetJobStatusForDependencies(self, job_id):
2223 """Gets the status of a job for dependencies.
2226 @param job_id: Job ID
2227 @raise errors.JobLost: If job can't be found
2230 # Not using in-memory cache as doing so would require an exclusive lock
2232 # Try to load from disk
2233 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2235 assert not job.writable, "Got writable job" # pylint: disable=E1101
2238 return job.CalcStatus()
2240 raise errors.JobLost("Job %s not found" % job_id)
2243 def UpdateJobUnlocked(self, job, replicate=True):
2244 """Update a job's on disk storage.
2246 After a job has been modified, this function needs to be called in
2247 order to write the changes to disk and replicate them to the other
2250 @type job: L{_QueuedJob}
2251 @param job: the changed job
2252 @type replicate: boolean
2253 @param replicate: whether to replicate the change to remote nodes
2257 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2258 assert (finalized ^ (job.end_timestamp is None))
2259 assert job.writable, "Can't update read-only job"
2260 assert not job.archived, "Can't update archived job"
2262 filename = self._GetJobPath(job.id)
2263 data = serializer.DumpJson(job.Serialize())
2264 logging.debug("Writing job %s to %s", job.id, filename)
2265 self._UpdateJobQueueFile(filename, data, replicate)
2267 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2269 """Waits for changes in a job.
2272 @param job_id: Job identifier
2273 @type fields: list of strings
2274 @param fields: Which fields to check for changes
2275 @type prev_job_info: list or None
2276 @param prev_job_info: Last job information returned
2277 @type prev_log_serial: int
2278 @param prev_log_serial: Last job message serial number
2279 @type timeout: float
2280 @param timeout: maximum time to wait in seconds
2281 @rtype: tuple (job info, log entries)
2282 @return: a tuple of the job information as required via
2283 the fields parameter, and the log entries as a list
2285 if the job has not changed and the timeout has expired,
2286 we instead return a special value,
2287 L{constants.JOB_NOTCHANGED}, which should be interpreted
2288 as such by the clients
2291 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2294 helper = _WaitForJobChangesHelper()
2296 return helper(self._GetJobPath(job_id), load_fn,
2297 fields, prev_job_info, prev_log_serial, timeout)
2299 @locking.ssynchronized(_LOCK)
2301 def CancelJob(self, job_id):
2304 This will only succeed if the job has not started yet.
2307 @param job_id: job ID of job to be cancelled.
2310 logging.info("Cancelling job %s", job_id)
2312 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2314 def _ModifyJobUnlocked(self, job_id, mod_fn):
2318 @param job_id: Job ID
2319 @type mod_fn: callable
2320 @param mod_fn: Modifying function, receiving job object as parameter,
2321 returning tuple of (status boolean, message string)
2324 job = self._LoadJobUnlocked(job_id)
2326 logging.debug("Job %s not found", job_id)
2327 return (False, "Job %s not found" % job_id)
2329 assert job.writable, "Can't modify read-only job"
2330 assert not job.archived, "Can't modify archived job"
2332 (success, msg) = mod_fn(job)
2335 # If the job was finalized (e.g. cancelled), this is the final write
2336 # allowed. The job can be archived anytime.
2337 self.UpdateJobUnlocked(job)
2339 return (success, msg)
2342 def _ArchiveJobsUnlocked(self, jobs):
2345 @type jobs: list of L{_QueuedJob}
2346 @param jobs: Job objects
2348 @return: Number of archived jobs
2354 assert job.writable, "Can't archive read-only job"
2355 assert not job.archived, "Can't cancel archived job"
2357 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2358 logging.debug("Job %s is not yet done", job.id)
2361 archive_jobs.append(job)
2363 old = self._GetJobPath(job.id)
2364 new = self._GetArchivedJobPath(job.id)
2365 rename_files.append((old, new))
2367 # TODO: What if 1..n files fail to rename?
2368 self._RenameFilesUnlocked(rename_files)
2370 logging.debug("Successfully archived job(s) %s",
2371 utils.CommaJoin(job.id for job in archive_jobs))
2373 # Since we haven't quite checked, above, if we succeeded or failed renaming
2374 # the files, we update the cached queue size from the filesystem. When we
2375 # get around to fix the TODO: above, we can use the number of actually
2376 # archived jobs to fix this.
2377 self._UpdateQueueSizeUnlocked()
2378 return len(archive_jobs)
2380 @locking.ssynchronized(_LOCK)
2382 def ArchiveJob(self, job_id):
2385 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2388 @param job_id: Job ID of job to be archived.
2390 @return: Whether job was archived
2393 logging.info("Archiving job %s", job_id)
2395 job = self._LoadJobUnlocked(job_id)
2397 logging.debug("Job %s not found", job_id)
2400 return self._ArchiveJobsUnlocked([job]) == 1
2402 @locking.ssynchronized(_LOCK)
2404 def AutoArchiveJobs(self, age, timeout):
2405 """Archives all jobs based on age.
2407 The method will archive all jobs which are older than the age
2408 parameter. For jobs that don't have an end timestamp, the start
2409 timestamp will be considered. The special '-1' age will cause
2410 archival of all jobs (that are not running or queued).
2413 @param age: the minimum age in seconds
2416 logging.info("Archiving jobs with age more than %s seconds", age)
2419 end_time = now + timeout
2423 all_job_ids = self._GetJobIDsUnlocked()
2425 for idx, job_id in enumerate(all_job_ids):
2426 last_touched = idx + 1
2428 # Not optimal because jobs could be pending
2429 # TODO: Measure average duration for job archival and take number of
2430 # pending jobs into account.
2431 if time.time() > end_time:
2434 # Returns None if the job failed to load
2435 job = self._LoadJobUnlocked(job_id)
2437 if job.end_timestamp is None:
2438 if job.start_timestamp is None:
2439 job_age = job.received_timestamp
2441 job_age = job.start_timestamp
2443 job_age = job.end_timestamp
2445 if age == -1 or now - job_age[0] > age:
2448 # Archive 10 jobs at a time
2449 if len(pending) >= 10:
2450 archived_count += self._ArchiveJobsUnlocked(pending)
2454 archived_count += self._ArchiveJobsUnlocked(pending)
2456 return (archived_count, len(all_job_ids) - last_touched)
2458 def _Query(self, fields, qfilter):
2459 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2462 # Archived jobs are only looked at if the "archived" field is referenced
2463 # either as a requested field or in the filter. By default archived jobs
2465 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2467 job_ids = qobj.RequestedNames()
2469 list_all = (job_ids is None)
2472 # Since files are added to/removed from the queue atomically, there's no
2473 # risk of getting the job ids in an inconsistent state.
2474 job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2478 for job_id in job_ids:
2479 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2480 if job is not None or not list_all:
2481 jobs.append((job_id, job))
2483 return (qobj, jobs, list_all)
2485 def QueryJobs(self, fields, qfilter):
2486 """Returns a list of jobs in queue.
2488 @type fields: sequence
2489 @param fields: List of wanted fields
2490 @type qfilter: None or query2 filter (list)
2491 @param qfilter: Query filter
2494 (qobj, ctx, _) = self._Query(fields, qfilter)
2496 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2498 def OldStyleQueryJobs(self, job_ids, fields):
2499 """Returns a list of jobs in queue.
2502 @param job_ids: sequence of job identifiers or None for all
2504 @param fields: names of fields to return
2506 @return: list one element per job, each element being list with
2507 the requested fields
2511 job_ids = [int(jid) for jid in job_ids]
2512 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2514 (qobj, ctx, _) = self._Query(fields, qfilter)
2516 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2518 @locking.ssynchronized(_LOCK)
2519 def PrepareShutdown(self):
2520 """Prepare to stop the job queue.
2522 Disables execution of jobs in the workerpool and returns whether there are
2523 any jobs currently running. If the latter is the case, the job queue is not
2524 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2525 be called without interfering with any job. Queued and unfinished jobs will
2526 be resumed next time.
2528 Once this function has been called no new job submissions will be accepted
2529 (see L{_RequireNonDrainedQueue}).
2532 @return: Whether there are any running jobs
2535 if self._accepting_jobs:
2536 self._accepting_jobs = False
2538 # Tell worker pool to stop processing pending tasks
2539 self._wpool.SetActive(False)
2541 return self._wpool.HasRunningTasks()
2543 @locking.ssynchronized(_LOCK)
2546 """Stops the job queue.
2548 This shutdowns all the worker threads an closes the queue.
2551 self._wpool.TerminateWorkers()
2553 self._queue_filelock.Close()
2554 self._queue_filelock = None