4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
60 from ganeti import query
61 from ganeti import qlang
62 from ganeti import pathutils
63 from ganeti import vcluster
68 # member lock names to be passed to @ssynchronized decorator
73 class CancelJob(Exception):
74 """Special exception to cancel a job.
79 class QueueShutdown(Exception):
80 """Special exception to abort a job when the job queue is shutting down.
86 """Returns the current timestamp.
89 @return: the current time in the (seconds, microseconds) format
92 return utils.SplitTime(time.time())
95 def _CallJqUpdate(runner, names, file_name, content):
96 """Updates job queue file after virtualizing filename.
99 virt_file_name = vcluster.MakeVirtualPath(file_name)
100 return runner.call_jobqueue_update(names, virt_file_name, content)
103 class _SimpleJobQuery:
104 """Wrapper for job queries.
106 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
109 def __init__(self, fields):
110 """Initializes this class.
113 self._query = query.Query(query.JOB_FIELDS, fields)
115 def __call__(self, job):
116 """Executes a job query using cached field list.
119 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
122 class _QueuedOpCode(object):
123 """Encapsulates an opcode object.
125 @ivar log: holds the execution log and consists of tuples
126 of the form C{(log_serial, timestamp, level, message)}
127 @ivar input: the OpCode we encapsulate
128 @ivar status: the current status
129 @ivar result: the result of the LU execution
130 @ivar start_timestamp: timestamp for the start of the execution
131 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
132 @ivar stop_timestamp: timestamp for the end of the execution
135 __slots__ = ["input", "status", "result", "log", "priority",
136 "start_timestamp", "exec_timestamp", "end_timestamp",
139 def __init__(self, op):
140 """Initializes instances of this class.
142 @type op: L{opcodes.OpCode}
143 @param op: the opcode we encapsulate
147 self.status = constants.OP_STATUS_QUEUED
150 self.start_timestamp = None
151 self.exec_timestamp = None
152 self.end_timestamp = None
154 # Get initial priority (it might change during the lifetime of this opcode)
155 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
158 def Restore(cls, state):
159 """Restore the _QueuedOpCode from the serialized form.
162 @param state: the serialized state
163 @rtype: _QueuedOpCode
164 @return: a new _QueuedOpCode instance
167 obj = _QueuedOpCode.__new__(cls)
168 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
169 obj.status = state["status"]
170 obj.result = state["result"]
171 obj.log = state["log"]
172 obj.start_timestamp = state.get("start_timestamp", None)
173 obj.exec_timestamp = state.get("exec_timestamp", None)
174 obj.end_timestamp = state.get("end_timestamp", None)
175 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
179 """Serializes this _QueuedOpCode.
182 @return: the dictionary holding the serialized state
186 "input": self.input.__getstate__(),
187 "status": self.status,
188 "result": self.result,
190 "start_timestamp": self.start_timestamp,
191 "exec_timestamp": self.exec_timestamp,
192 "end_timestamp": self.end_timestamp,
193 "priority": self.priority,
197 class _QueuedJob(object):
198 """In-memory job representation.
200 This is what we use to track the user-submitted jobs. Locking must
201 be taken care of by users of this class.
203 @type queue: L{JobQueue}
204 @ivar queue: the parent queue
207 @ivar ops: the list of _QueuedOpCode that constitute the job
208 @type log_serial: int
209 @ivar log_serial: holds the index for the next log entry
210 @ivar received_timestamp: the timestamp for when the job was received
211 @ivar start_timestmap: the timestamp for start of execution
212 @ivar end_timestamp: the timestamp for end of execution
213 @ivar writable: Whether the job is allowed to be modified
216 # pylint: disable=W0212
217 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
218 "received_timestamp", "start_timestamp", "end_timestamp",
219 "__weakref__", "processor_lock", "writable", "archived"]
221 def __init__(self, queue, job_id, ops, writable):
222 """Constructor for the _QueuedJob.
224 @type queue: L{JobQueue}
225 @param queue: our parent queue
227 @param job_id: our job id
229 @param ops: the list of opcodes we hold, which will be encapsulated
232 @param writable: Whether job can be modified
236 raise errors.GenericError("A job needs at least one opcode")
239 self.id = int(job_id)
240 self.ops = [_QueuedOpCode(op) for op in ops]
242 self.received_timestamp = TimeStampNow()
243 self.start_timestamp = None
244 self.end_timestamp = None
245 self.archived = False
247 self._InitInMemory(self, writable)
249 assert not self.archived, "New jobs can not be marked as archived"
252 def _InitInMemory(obj, writable):
253 """Initializes in-memory variables.
256 obj.writable = writable
260 # Read-only jobs are not processed and therefore don't need a lock
262 obj.processor_lock = threading.Lock()
264 obj.processor_lock = None
267 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
269 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
271 return "<%s at %#x>" % (" ".join(status), id(self))
274 def Restore(cls, queue, state, writable, archived):
275 """Restore a _QueuedJob from serialized state:
277 @type queue: L{JobQueue}
278 @param queue: to which queue the restored job belongs
280 @param state: the serialized state
282 @param writable: Whether job can be modified
284 @param archived: Whether job was already archived
286 @return: the restored _JobQueue instance
289 obj = _QueuedJob.__new__(cls)
291 obj.id = int(state["id"])
292 obj.received_timestamp = state.get("received_timestamp", None)
293 obj.start_timestamp = state.get("start_timestamp", None)
294 obj.end_timestamp = state.get("end_timestamp", None)
295 obj.archived = archived
299 for op_state in state["ops"]:
300 op = _QueuedOpCode.Restore(op_state)
301 for log_entry in op.log:
302 obj.log_serial = max(obj.log_serial, log_entry[0])
305 cls._InitInMemory(obj, writable)
310 """Serialize the _JobQueue instance.
313 @return: the serialized state
318 "ops": [op.Serialize() for op in self.ops],
319 "start_timestamp": self.start_timestamp,
320 "end_timestamp": self.end_timestamp,
321 "received_timestamp": self.received_timestamp,
324 def CalcStatus(self):
325 """Compute the status of this job.
327 This function iterates over all the _QueuedOpCodes in the job and
328 based on their status, computes the job status.
331 - if we find a cancelled, or finished with error, the job
332 status will be the same
333 - otherwise, the last opcode with the status one of:
338 will determine the job status
340 - otherwise, it means either all opcodes are queued, or success,
341 and the job status will be the same
343 @return: the job status
346 status = constants.JOB_STATUS_QUEUED
350 if op.status == constants.OP_STATUS_SUCCESS:
355 if op.status == constants.OP_STATUS_QUEUED:
357 elif op.status == constants.OP_STATUS_WAITING:
358 status = constants.JOB_STATUS_WAITING
359 elif op.status == constants.OP_STATUS_RUNNING:
360 status = constants.JOB_STATUS_RUNNING
361 elif op.status == constants.OP_STATUS_CANCELING:
362 status = constants.JOB_STATUS_CANCELING
364 elif op.status == constants.OP_STATUS_ERROR:
365 status = constants.JOB_STATUS_ERROR
366 # The whole job fails if one opcode failed
368 elif op.status == constants.OP_STATUS_CANCELED:
369 status = constants.OP_STATUS_CANCELED
373 status = constants.JOB_STATUS_SUCCESS
377 def CalcPriority(self):
378 """Gets the current priority for this job.
380 Only unfinished opcodes are considered. When all are done, the default
386 priorities = [op.priority for op in self.ops
387 if op.status not in constants.OPS_FINALIZED]
390 # All opcodes are done, assume default priority
391 return constants.OP_PRIO_DEFAULT
393 return min(priorities)
395 def GetLogEntries(self, newer_than):
396 """Selectively returns the log entries.
398 @type newer_than: None or int
399 @param newer_than: if this is None, return all log entries,
400 otherwise return only the log entries with serial higher
403 @return: the list of the log entries selected
406 if newer_than is None:
413 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
417 def GetInfo(self, fields):
418 """Returns information about a job.
421 @param fields: names of fields to return
423 @return: list with one element for each field
424 @raise errors.OpExecError: when an invalid field
428 return _SimpleJobQuery(fields)(self)
430 def MarkUnfinishedOps(self, status, result):
431 """Mark unfinished opcodes with a given status and result.
433 This is an utility function for marking all running or waiting to
434 be run opcodes with a given status. Opcodes which are already
435 finalised are not changed.
437 @param status: a given opcode status
438 @param result: the opcode result
443 if op.status in constants.OPS_FINALIZED:
444 assert not_marked, "Finalized opcodes found after non-finalized ones"
451 """Marks the job as finalized.
454 self.end_timestamp = TimeStampNow()
457 """Marks job as canceled/-ing if possible.
459 @rtype: tuple; (bool, string)
460 @return: Boolean describing whether job was successfully canceled or marked
461 as canceling and a text message
464 status = self.CalcStatus()
466 if status == constants.JOB_STATUS_QUEUED:
467 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
468 "Job canceled by request")
470 return (True, "Job %s canceled" % self.id)
472 elif status == constants.JOB_STATUS_WAITING:
473 # The worker will notice the new status and cancel the job
474 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
475 return (True, "Job %s will be canceled" % self.id)
478 logging.debug("Job %s is no longer waiting in the queue", self.id)
479 return (False, "Job %s is no longer waiting in the queue" % self.id)
482 class _OpExecCallbacks(mcpu.OpExecCbBase):
483 def __init__(self, queue, job, op):
484 """Initializes this class.
486 @type queue: L{JobQueue}
487 @param queue: Job queue
488 @type job: L{_QueuedJob}
489 @param job: Job object
490 @type op: L{_QueuedOpCode}
494 assert queue, "Queue is missing"
495 assert job, "Job is missing"
496 assert op, "Opcode is missing"
502 def _CheckCancel(self):
503 """Raises an exception to cancel the job if asked to.
506 # Cancel here if we were asked to
507 if self._op.status == constants.OP_STATUS_CANCELING:
508 logging.debug("Canceling opcode")
511 # See if queue is shutting down
512 if not self._queue.AcceptingJobsUnlocked():
513 logging.debug("Queue is shutting down")
514 raise QueueShutdown()
516 @locking.ssynchronized(_QUEUE, shared=1)
517 def NotifyStart(self):
518 """Mark the opcode as running, not lock-waiting.
520 This is called from the mcpu code as a notifier function, when the LU is
521 finally about to start the Exec() method. Of course, to have end-user
522 visible results, the opcode must be initially (before calling into
523 Processor.ExecOpCode) set to OP_STATUS_WAITING.
526 assert self._op in self._job.ops
527 assert self._op.status in (constants.OP_STATUS_WAITING,
528 constants.OP_STATUS_CANCELING)
530 # Cancel here if we were asked to
533 logging.debug("Opcode is now running")
535 self._op.status = constants.OP_STATUS_RUNNING
536 self._op.exec_timestamp = TimeStampNow()
538 # And finally replicate the job status
539 self._queue.UpdateJobUnlocked(self._job)
541 @locking.ssynchronized(_QUEUE, shared=1)
542 def _AppendFeedback(self, timestamp, log_type, log_msg):
543 """Internal feedback append function, with locks
546 self._job.log_serial += 1
547 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
548 self._queue.UpdateJobUnlocked(self._job, replicate=False)
550 def Feedback(self, *args):
551 """Append a log entry.
557 log_type = constants.ELOG_MESSAGE
560 (log_type, log_msg) = args
562 # The time is split to make serialization easier and not lose
564 timestamp = utils.SplitTime(time.time())
565 self._AppendFeedback(timestamp, log_type, log_msg)
567 def CurrentPriority(self):
568 """Returns current priority for opcode.
571 assert self._op.status in (constants.OP_STATUS_WAITING,
572 constants.OP_STATUS_CANCELING)
574 # Cancel here if we were asked to
577 return self._op.priority
579 def SubmitManyJobs(self, jobs):
580 """Submits jobs for processing.
582 See L{JobQueue.SubmitManyJobs}.
585 # Locking is done in job queue
586 return self._queue.SubmitManyJobs(jobs)
589 class _JobChangesChecker(object):
590 def __init__(self, fields, prev_job_info, prev_log_serial):
591 """Initializes this class.
593 @type fields: list of strings
594 @param fields: Fields requested by LUXI client
595 @type prev_job_info: string
596 @param prev_job_info: previous job info, as passed by the LUXI client
597 @type prev_log_serial: string
598 @param prev_log_serial: previous job serial, as passed by the LUXI client
601 self._squery = _SimpleJobQuery(fields)
602 self._prev_job_info = prev_job_info
603 self._prev_log_serial = prev_log_serial
605 def __call__(self, job):
606 """Checks whether job has changed.
608 @type job: L{_QueuedJob}
609 @param job: Job object
612 assert not job.writable, "Expected read-only job"
614 status = job.CalcStatus()
615 job_info = self._squery(job)
616 log_entries = job.GetLogEntries(self._prev_log_serial)
618 # Serializing and deserializing data can cause type changes (e.g. from
619 # tuple to list) or precision loss. We're doing it here so that we get
620 # the same modifications as the data received from the client. Without
621 # this, the comparison afterwards might fail without the data being
622 # significantly different.
623 # TODO: we just deserialized from disk, investigate how to make sure that
624 # the job info and log entries are compatible to avoid this further step.
625 # TODO: Doing something like in testutils.py:UnifyValueType might be more
626 # efficient, though floats will be tricky
627 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
628 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
630 # Don't even try to wait if the job is no longer running, there will be
632 if (status not in (constants.JOB_STATUS_QUEUED,
633 constants.JOB_STATUS_RUNNING,
634 constants.JOB_STATUS_WAITING) or
635 job_info != self._prev_job_info or
636 (log_entries and self._prev_log_serial != log_entries[0][0])):
637 logging.debug("Job %s changed", job.id)
638 return (job_info, log_entries)
643 class _JobFileChangesWaiter(object):
644 def __init__(self, filename):
645 """Initializes this class.
647 @type filename: string
648 @param filename: Path to job file
649 @raises errors.InotifyError: if the notifier cannot be setup
652 self._wm = pyinotify.WatchManager()
653 self._inotify_handler = \
654 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
656 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
658 self._inotify_handler.enable()
660 # pyinotify doesn't close file descriptors automatically
661 self._notifier.stop()
664 def _OnInotify(self, notifier_enabled):
665 """Callback for inotify.
668 if not notifier_enabled:
669 self._inotify_handler.enable()
671 def Wait(self, timeout):
672 """Waits for the job file to change.
675 @param timeout: Timeout in seconds
676 @return: Whether there have been events
680 have_events = self._notifier.check_events(timeout * 1000)
682 self._notifier.read_events()
683 self._notifier.process_events()
687 """Closes underlying notifier and its file descriptor.
690 self._notifier.stop()
693 class _JobChangesWaiter(object):
694 def __init__(self, filename):
695 """Initializes this class.
697 @type filename: string
698 @param filename: Path to job file
701 self._filewaiter = None
702 self._filename = filename
704 def Wait(self, timeout):
705 """Waits for a job to change.
708 @param timeout: Timeout in seconds
709 @return: Whether there have been events
713 return self._filewaiter.Wait(timeout)
715 # Lazy setup: Avoid inotify setup cost when job file has already changed.
716 # If this point is reached, return immediately and let caller check the job
717 # file again in case there were changes since the last check. This avoids a
719 self._filewaiter = _JobFileChangesWaiter(self._filename)
724 """Closes underlying waiter.
728 self._filewaiter.Close()
731 class _WaitForJobChangesHelper(object):
732 """Helper class using inotify to wait for changes in a job file.
734 This class takes a previous job status and serial, and alerts the client when
735 the current job status has changed.
739 def _CheckForChanges(counter, job_load_fn, check_fn):
740 if counter.next() > 0:
741 # If this isn't the first check the job is given some more time to change
742 # again. This gives better performance for jobs generating many
748 raise errors.JobLost()
750 result = check_fn(job)
752 raise utils.RetryAgain()
756 def __call__(self, filename, job_load_fn,
757 fields, prev_job_info, prev_log_serial, timeout):
758 """Waits for changes on a job.
760 @type filename: string
761 @param filename: File on which to wait for changes
762 @type job_load_fn: callable
763 @param job_load_fn: Function to load job
764 @type fields: list of strings
765 @param fields: Which fields to check for changes
766 @type prev_job_info: list or None
767 @param prev_job_info: Last job information returned
768 @type prev_log_serial: int
769 @param prev_log_serial: Last job message serial number
771 @param timeout: maximum time to wait in seconds
774 counter = itertools.count()
776 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
777 waiter = _JobChangesWaiter(filename)
779 return utils.Retry(compat.partial(self._CheckForChanges,
780 counter, job_load_fn, check_fn),
781 utils.RETRY_REMAINING_TIME, timeout,
785 except (errors.InotifyError, errors.JobLost):
787 except utils.RetryTimeout:
788 return constants.JOB_NOTCHANGED
791 def _EncodeOpError(err):
792 """Encodes an error which occurred while processing an opcode.
795 if isinstance(err, errors.GenericError):
798 to_encode = errors.OpExecError(str(err))
800 return errors.EncodeException(to_encode)
803 class _TimeoutStrategyWrapper:
804 def __init__(self, fn):
805 """Initializes this class.
812 """Gets the next timeout if necessary.
815 if self._next is None:
816 self._next = self._fn()
819 """Returns the next timeout.
826 """Returns the current timeout and advances the internal state.
835 class _OpExecContext:
836 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
837 """Initializes this class.
842 self.log_prefix = log_prefix
843 self.summary = op.input.Summary()
845 # Create local copy to modify
846 if getattr(op.input, opcodes.DEPEND_ATTR, None):
847 self.jobdeps = op.input.depends[:]
851 self._timeout_strategy_factory = timeout_strategy_factory
852 self._ResetTimeoutStrategy()
854 def _ResetTimeoutStrategy(self):
855 """Creates a new timeout strategy.
858 self._timeout_strategy = \
859 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
861 def CheckPriorityIncrease(self):
862 """Checks whether priority can and should be increased.
864 Called when locks couldn't be acquired.
869 # Exhausted all retries and next round should not use blocking acquire
871 if (self._timeout_strategy.Peek() is None and
872 op.priority > constants.OP_PRIO_HIGHEST):
873 logging.debug("Increasing priority")
875 self._ResetTimeoutStrategy()
880 def GetNextLockTimeout(self):
881 """Returns the next lock acquire timeout.
884 return self._timeout_strategy.Next()
887 class _JobProcessor(object):
890 FINISHED) = range(1, 4)
892 def __init__(self, queue, opexec_fn, job,
893 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
894 """Initializes this class.
898 self.opexec_fn = opexec_fn
900 self._timeout_strategy_factory = _timeout_strategy_factory
903 def _FindNextOpcode(job, timeout_strategy_factory):
904 """Locates the next opcode to run.
906 @type job: L{_QueuedJob}
907 @param job: Job object
908 @param timeout_strategy_factory: Callable to create new timeout strategy
911 # Create some sort of a cache to speed up locating next opcode for future
913 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
914 # pending and one for processed ops.
915 if job.ops_iter is None:
916 job.ops_iter = enumerate(job.ops)
918 # Find next opcode to run
921 (idx, op) = job.ops_iter.next()
922 except StopIteration:
923 raise errors.ProgrammerError("Called for a finished job")
925 if op.status == constants.OP_STATUS_RUNNING:
926 # Found an opcode already marked as running
927 raise errors.ProgrammerError("Called for job marked as running")
929 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
930 timeout_strategy_factory)
932 if op.status not in constants.OPS_FINALIZED:
935 # This is a job that was partially completed before master daemon
936 # shutdown, so it can be expected that some opcodes are already
937 # completed successfully (if any did error out, then the whole job
938 # should have been aborted and not resubmitted for processing).
939 logging.info("%s: opcode %s already processed, skipping",
940 opctx.log_prefix, opctx.summary)
943 def _MarkWaitlock(job, op):
944 """Marks an opcode as waiting for locks.
946 The job's start timestamp is also set if necessary.
948 @type job: L{_QueuedJob}
949 @param job: Job object
950 @type op: L{_QueuedOpCode}
951 @param op: Opcode object
955 assert op.status in (constants.OP_STATUS_QUEUED,
956 constants.OP_STATUS_WAITING)
962 if op.status == constants.OP_STATUS_QUEUED:
963 op.status = constants.OP_STATUS_WAITING
966 if op.start_timestamp is None:
967 op.start_timestamp = TimeStampNow()
970 if job.start_timestamp is None:
971 job.start_timestamp = op.start_timestamp
974 assert op.status == constants.OP_STATUS_WAITING
979 def _CheckDependencies(queue, job, opctx):
980 """Checks if an opcode has dependencies and if so, processes them.
982 @type queue: L{JobQueue}
983 @param queue: Queue object
984 @type job: L{_QueuedJob}
985 @param job: Job object
986 @type opctx: L{_OpExecContext}
987 @param opctx: Opcode execution context
989 @return: Whether opcode will be re-scheduled by dependency tracker
997 (dep_job_id, dep_status) = opctx.jobdeps[0]
999 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1001 assert ht.TNonEmptyString(depmsg), "No dependency message"
1003 logging.info("%s: %s", opctx.log_prefix, depmsg)
1005 if depresult == _JobDependencyManager.CONTINUE:
1006 # Remove dependency and continue
1007 opctx.jobdeps.pop(0)
1009 elif depresult == _JobDependencyManager.WAIT:
1010 # Need to wait for notification, dependency tracker will re-add job
1015 elif depresult == _JobDependencyManager.CANCEL:
1016 # Job was cancelled, cancel this job as well
1018 assert op.status == constants.OP_STATUS_CANCELING
1021 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1022 _JobDependencyManager.ERROR):
1023 # Job failed or there was an error, this job must fail
1024 op.status = constants.OP_STATUS_ERROR
1025 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1029 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1034 def _ExecOpCodeUnlocked(self, opctx):
1035 """Processes one opcode and returns the result.
1040 assert op.status == constants.OP_STATUS_WAITING
1042 timeout = opctx.GetNextLockTimeout()
1045 # Make sure not to hold queue lock while calling ExecOpCode
1046 result = self.opexec_fn(op.input,
1047 _OpExecCallbacks(self.queue, self.job, op),
1049 except mcpu.LockAcquireTimeout:
1050 assert timeout is not None, "Received timeout for blocking acquire"
1051 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1053 assert op.status in (constants.OP_STATUS_WAITING,
1054 constants.OP_STATUS_CANCELING)
1056 # Was job cancelled while we were waiting for the lock?
1057 if op.status == constants.OP_STATUS_CANCELING:
1058 return (constants.OP_STATUS_CANCELING, None)
1060 # Queue is shutting down, return to queued
1061 if not self.queue.AcceptingJobsUnlocked():
1062 return (constants.OP_STATUS_QUEUED, None)
1064 # Stay in waitlock while trying to re-acquire lock
1065 return (constants.OP_STATUS_WAITING, None)
1067 logging.exception("%s: Canceling job", opctx.log_prefix)
1068 assert op.status == constants.OP_STATUS_CANCELING
1069 return (constants.OP_STATUS_CANCELING, None)
1071 except QueueShutdown:
1072 logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1074 assert op.status == constants.OP_STATUS_WAITING
1076 # Job hadn't been started yet, so it should return to the queue
1077 return (constants.OP_STATUS_QUEUED, None)
1079 except Exception, err: # pylint: disable=W0703
1080 logging.exception("%s: Caught exception in %s",
1081 opctx.log_prefix, opctx.summary)
1082 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1084 logging.debug("%s: %s successful",
1085 opctx.log_prefix, opctx.summary)
1086 return (constants.OP_STATUS_SUCCESS, result)
1088 def __call__(self, _nextop_fn=None):
1089 """Continues execution of a job.
1091 @param _nextop_fn: Callback function for tests
1092 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1093 be deferred and C{WAITDEP} if the dependency manager
1094 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1100 logging.debug("Processing job %s", job.id)
1102 queue.acquire(shared=1)
1104 opcount = len(job.ops)
1106 assert job.writable, "Expected writable job"
1108 # Don't do anything for finalized jobs
1109 if job.CalcStatus() in constants.JOBS_FINALIZED:
1110 return self.FINISHED
1112 # Is a previous opcode still pending?
1114 opctx = job.cur_opctx
1115 job.cur_opctx = None
1117 if __debug__ and _nextop_fn:
1119 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1124 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1125 constants.OP_STATUS_CANCELING)
1126 for i in job.ops[opctx.index + 1:])
1128 assert op.status in (constants.OP_STATUS_QUEUED,
1129 constants.OP_STATUS_WAITING,
1130 constants.OP_STATUS_CANCELING)
1132 assert (op.priority <= constants.OP_PRIO_LOWEST and
1133 op.priority >= constants.OP_PRIO_HIGHEST)
1137 if op.status != constants.OP_STATUS_CANCELING:
1138 assert op.status in (constants.OP_STATUS_QUEUED,
1139 constants.OP_STATUS_WAITING)
1141 # Prepare to start opcode
1142 if self._MarkWaitlock(job, op):
1144 queue.UpdateJobUnlocked(job)
1146 assert op.status == constants.OP_STATUS_WAITING
1147 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1148 assert job.start_timestamp and op.start_timestamp
1149 assert waitjob is None
1151 # Check if waiting for a job is necessary
1152 waitjob = self._CheckDependencies(queue, job, opctx)
1154 assert op.status in (constants.OP_STATUS_WAITING,
1155 constants.OP_STATUS_CANCELING,
1156 constants.OP_STATUS_ERROR)
1158 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1159 constants.OP_STATUS_ERROR)):
1160 logging.info("%s: opcode %s waiting for locks",
1161 opctx.log_prefix, opctx.summary)
1163 assert not opctx.jobdeps, "Not all dependencies were removed"
1167 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1169 queue.acquire(shared=1)
1171 op.status = op_status
1172 op.result = op_result
1176 if op.status in (constants.OP_STATUS_WAITING,
1177 constants.OP_STATUS_QUEUED):
1178 # waiting: Couldn't get locks in time
1179 # queued: Queue is shutting down
1180 assert not op.end_timestamp
1183 op.end_timestamp = TimeStampNow()
1185 if op.status == constants.OP_STATUS_CANCELING:
1186 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1187 for i in job.ops[opctx.index:])
1189 assert op.status in constants.OPS_FINALIZED
1191 if op.status == constants.OP_STATUS_QUEUED:
1192 # Queue is shutting down
1198 job.cur_opctx = None
1200 # In no case must the status be finalized here
1201 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1203 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1206 if not waitjob and opctx.CheckPriorityIncrease():
1207 # Priority was changed, need to update on-disk file
1208 queue.UpdateJobUnlocked(job)
1210 # Keep around for another round
1211 job.cur_opctx = opctx
1213 assert (op.priority <= constants.OP_PRIO_LOWEST and
1214 op.priority >= constants.OP_PRIO_HIGHEST)
1216 # In no case must the status be finalized here
1217 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1220 # Ensure all opcodes so far have been successful
1221 assert (opctx.index == 0 or
1222 compat.all(i.status == constants.OP_STATUS_SUCCESS
1223 for i in job.ops[:opctx.index]))
1226 job.cur_opctx = None
1228 if op.status == constants.OP_STATUS_SUCCESS:
1231 elif op.status == constants.OP_STATUS_ERROR:
1232 # Ensure failed opcode has an exception as its result
1233 assert errors.GetEncodedError(job.ops[opctx.index].result)
1235 to_encode = errors.OpExecError("Preceding opcode failed")
1236 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1237 _EncodeOpError(to_encode))
1241 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1242 errors.GetEncodedError(i.result)
1243 for i in job.ops[opctx.index:])
1245 elif op.status == constants.OP_STATUS_CANCELING:
1246 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1247 "Job canceled by request")
1251 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1253 if opctx.index == (opcount - 1):
1254 # Finalize on last opcode
1258 # All opcodes have been run, finalize job
1261 # Write to disk. If the job status is final, this is the final write
1262 # allowed. Once the file has been written, it can be archived anytime.
1263 queue.UpdateJobUnlocked(job)
1268 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1269 return self.FINISHED
1271 assert not waitjob or queue.depmgr.JobWaiting(job)
1278 assert job.writable, "Job became read-only while being processed"
1282 def _EvaluateJobProcessorResult(depmgr, job, result):
1283 """Looks at a result from L{_JobProcessor} for a job.
1285 To be used in a L{_JobQueueWorker}.
1288 if result == _JobProcessor.FINISHED:
1289 # Notify waiting jobs
1290 depmgr.NotifyWaiters(job.id)
1292 elif result == _JobProcessor.DEFER:
1294 raise workerpool.DeferTask(priority=job.CalcPriority())
1296 elif result == _JobProcessor.WAITDEP:
1297 # No-op, dependency manager will re-schedule
1301 raise errors.ProgrammerError("Job processor returned unknown status %s" %
1305 class _JobQueueWorker(workerpool.BaseWorker):
1306 """The actual job workers.
1309 def RunTask(self, job): # pylint: disable=W0221
1312 @type job: L{_QueuedJob}
1313 @param job: the job to be processed
1316 assert job.writable, "Expected writable job"
1318 # Ensure only one worker is active on a single job. If a job registers for
1319 # a dependency job, and the other job notifies before the first worker is
1320 # done, the job can end up in the tasklist more than once.
1321 job.processor_lock.acquire()
1323 return self._RunTaskInner(job)
1325 job.processor_lock.release()
1327 def _RunTaskInner(self, job):
1330 Must be called with per-job lock acquired.
1334 assert queue == self.pool.queue
1336 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1339 proc = mcpu.Processor(queue.context, job.id)
1341 # Create wrapper for setting thread name
1342 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1345 _EvaluateJobProcessorResult(queue.depmgr, job,
1346 _JobProcessor(queue, wrap_execop_fn, job)())
1349 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1350 """Updates the worker thread name to include a short summary of the opcode.
1352 @param setname_fn: Callable setting worker thread name
1353 @param execop_fn: Callable for executing opcode (usually
1354 L{mcpu.Processor.ExecOpCode})
1359 return execop_fn(op, *args, **kwargs)
1364 def _GetWorkerName(job, op):
1365 """Sets the worker thread name.
1367 @type job: L{_QueuedJob}
1368 @type op: L{opcodes.OpCode}
1371 parts = ["Job%s" % job.id]
1374 parts.append(op.TinySummary())
1376 return "/".join(parts)
1379 class _JobQueueWorkerPool(workerpool.WorkerPool):
1380 """Simple class implementing a job-processing workerpool.
1383 def __init__(self, queue):
1384 super(_JobQueueWorkerPool, self).__init__("Jq",
1390 class _JobDependencyManager:
1391 """Keeps track of job dependencies.
1398 WRONGSTATUS) = range(1, 6)
1400 def __init__(self, getstatus_fn, enqueue_fn):
1401 """Initializes this class.
1404 self._getstatus_fn = getstatus_fn
1405 self._enqueue_fn = enqueue_fn
1408 self._lock = locking.SharedLock("JobDepMgr")
1410 @locking.ssynchronized(_LOCK, shared=1)
1411 def GetLockInfo(self, requested): # pylint: disable=W0613
1412 """Retrieves information about waiting jobs.
1414 @type requested: set
1415 @param requested: Requested information, see C{query.LQ_*}
1418 # No need to sort here, that's being done by the lock manager and query
1419 # library. There are no priorities for notifying jobs, hence all show up as
1420 # one item under "pending".
1421 return [("job/%s" % job_id, None, None,
1422 [("job", [job.id for job in waiters])])
1423 for job_id, waiters in self._waiters.items()
1426 @locking.ssynchronized(_LOCK, shared=1)
1427 def JobWaiting(self, job):
1428 """Checks if a job is waiting.
1431 return compat.any(job in jobs
1432 for jobs in self._waiters.values())
1434 @locking.ssynchronized(_LOCK)
1435 def CheckAndRegister(self, job, dep_job_id, dep_status):
1436 """Checks if a dependency job has the requested status.
1438 If the other job is not yet in a finalized status, the calling job will be
1439 notified (re-added to the workerpool) at a later point.
1441 @type job: L{_QueuedJob}
1442 @param job: Job object
1443 @type dep_job_id: int
1444 @param dep_job_id: ID of dependency job
1445 @type dep_status: list
1446 @param dep_status: Required status
1449 assert ht.TJobId(job.id)
1450 assert ht.TJobId(dep_job_id)
1451 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1453 if job.id == dep_job_id:
1454 return (self.ERROR, "Job can't depend on itself")
1456 # Get status of dependency job
1458 status = self._getstatus_fn(dep_job_id)
1459 except errors.JobLost, err:
1460 return (self.ERROR, "Dependency error: %s" % err)
1462 assert status in constants.JOB_STATUS_ALL
1464 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1466 if status not in constants.JOBS_FINALIZED:
1467 # Register for notification and wait for job to finish
1468 job_id_waiters.add(job)
1470 "Need to wait for job %s, wanted status '%s'" %
1471 (dep_job_id, dep_status))
1473 # Remove from waiters list
1474 if job in job_id_waiters:
1475 job_id_waiters.remove(job)
1477 if (status == constants.JOB_STATUS_CANCELED and
1478 constants.JOB_STATUS_CANCELED not in dep_status):
1479 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1481 elif not dep_status or status in dep_status:
1482 return (self.CONTINUE,
1483 "Dependency job %s finished with status '%s'" %
1484 (dep_job_id, status))
1487 return (self.WRONGSTATUS,
1488 "Dependency job %s finished with status '%s',"
1489 " not one of '%s' as required" %
1490 (dep_job_id, status, utils.CommaJoin(dep_status)))
1492 def _RemoveEmptyWaitersUnlocked(self):
1493 """Remove all jobs without actual waiters.
1496 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1498 del self._waiters[job_id]
1500 def NotifyWaiters(self, job_id):
1501 """Notifies all jobs waiting for a certain job ID.
1503 @attention: Do not call until L{CheckAndRegister} returned a status other
1504 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1506 @param job_id: Job ID
1509 assert ht.TJobId(job_id)
1511 self._lock.acquire()
1513 self._RemoveEmptyWaitersUnlocked()
1515 jobs = self._waiters.pop(job_id, None)
1517 self._lock.release()
1520 # Re-add jobs to workerpool
1521 logging.debug("Re-adding %s jobs which were waiting for job %s",
1523 self._enqueue_fn(jobs)
1526 def _RequireOpenQueue(fn):
1527 """Decorator for "public" functions.
1529 This function should be used for all 'public' functions. That is,
1530 functions usually called from other classes. Note that this should
1531 be applied only to methods (not plain functions), since it expects
1532 that the decorated function is called with a first argument that has
1533 a '_queue_filelock' argument.
1535 @warning: Use this decorator only after locking.ssynchronized
1538 @locking.ssynchronized(_LOCK)
1544 def wrapper(self, *args, **kwargs):
1545 # pylint: disable=W0212
1546 assert self._queue_filelock is not None, "Queue should be open"
1547 return fn(self, *args, **kwargs)
1551 def _RequireNonDrainedQueue(fn):
1552 """Decorator checking for a non-drained queue.
1554 To be used with functions submitting new jobs.
1557 def wrapper(self, *args, **kwargs):
1558 """Wrapper function.
1560 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1563 # Ok when sharing the big job queue lock, as the drain file is created when
1564 # the lock is exclusive.
1565 # Needs access to protected member, pylint: disable=W0212
1567 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1569 if not self._accepting_jobs:
1570 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1572 return fn(self, *args, **kwargs)
1576 class JobQueue(object):
1577 """Queue used to manage the jobs.
1580 def __init__(self, context):
1581 """Constructor for JobQueue.
1583 The constructor will initialize the job queue object and then
1584 start loading the current jobs from disk, either for starting them
1585 (if they were queue) or for aborting them (if they were already
1588 @type context: GanetiContext
1589 @param context: the context object for access to the configuration
1590 data and other ganeti objects
1593 self.context = context
1594 self._memcache = weakref.WeakValueDictionary()
1595 self._my_hostname = netutils.Hostname.GetSysName()
1597 # The Big JobQueue lock. If a code block or method acquires it in shared
1598 # mode safe it must guarantee concurrency with all the code acquiring it in
1599 # shared mode, including itself. In order not to acquire it at all
1600 # concurrency must be guaranteed with all code acquiring it in shared mode
1601 # and all code acquiring it exclusively.
1602 self._lock = locking.SharedLock("JobQueue")
1604 self.acquire = self._lock.acquire
1605 self.release = self._lock.release
1607 # Accept jobs by default
1608 self._accepting_jobs = True
1610 # Initialize the queue, and acquire the filelock.
1611 # This ensures no other process is working on the job queue.
1612 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1615 self._last_serial = jstore.ReadSerial()
1616 assert self._last_serial is not None, ("Serial file was modified between"
1617 " check in jstore and here")
1619 # Get initial list of nodes
1620 self._nodes = dict((n.name, n.primary_ip)
1621 for n in self.context.cfg.GetAllNodesInfo().values()
1622 if n.master_candidate)
1624 # Remove master node
1625 self._nodes.pop(self._my_hostname, None)
1627 # TODO: Check consistency across nodes
1629 self._queue_size = None
1630 self._UpdateQueueSizeUnlocked()
1631 assert ht.TInt(self._queue_size)
1632 self._drained = jstore.CheckDrainFlag()
1635 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1637 self.context.glm.AddToLockMonitor(self.depmgr)
1640 self._wpool = _JobQueueWorkerPool(self)
1642 self._InspectQueue()
1644 self._wpool.TerminateWorkers()
1647 @locking.ssynchronized(_LOCK)
1649 def _InspectQueue(self):
1650 """Loads the whole job queue and resumes unfinished jobs.
1652 This function needs the lock here because WorkerPool.AddTask() may start a
1653 job while we're still doing our work.
1656 logging.info("Inspecting job queue")
1660 all_job_ids = self._GetJobIDsUnlocked()
1661 jobs_count = len(all_job_ids)
1662 lastinfo = time.time()
1663 for idx, job_id in enumerate(all_job_ids):
1664 # Give an update every 1000 jobs or 10 seconds
1665 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1666 idx == (jobs_count - 1)):
1667 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1668 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1669 lastinfo = time.time()
1671 job = self._LoadJobUnlocked(job_id)
1673 # a failure in loading the job can cause 'None' to be returned
1677 status = job.CalcStatus()
1679 if status == constants.JOB_STATUS_QUEUED:
1680 restartjobs.append(job)
1682 elif status in (constants.JOB_STATUS_RUNNING,
1683 constants.JOB_STATUS_WAITING,
1684 constants.JOB_STATUS_CANCELING):
1685 logging.warning("Unfinished job %s found: %s", job.id, job)
1687 if status == constants.JOB_STATUS_WAITING:
1689 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1690 restartjobs.append(job)
1692 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1693 "Unclean master daemon shutdown")
1696 self.UpdateJobUnlocked(job)
1699 logging.info("Restarting %s jobs", len(restartjobs))
1700 self._EnqueueJobsUnlocked(restartjobs)
1702 logging.info("Job queue inspection finished")
1704 def _GetRpc(self, address_list):
1705 """Gets RPC runner with context.
1708 return rpc.JobQueueRunner(self.context, address_list)
1710 @locking.ssynchronized(_LOCK)
1712 def AddNode(self, node):
1713 """Register a new node with the queue.
1715 @type node: L{objects.Node}
1716 @param node: the node object to be added
1719 node_name = node.name
1720 assert node_name != self._my_hostname
1722 # Clean queue directory on added node
1723 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1724 msg = result.fail_msg
1726 logging.warning("Cannot cleanup queue directory on node %s: %s",
1729 if not node.master_candidate:
1730 # remove if existing, ignoring errors
1731 self._nodes.pop(node_name, None)
1732 # and skip the replication of the job ids
1735 # Upload the whole queue excluding archived jobs
1736 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1738 # Upload current serial file
1739 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1741 # Static address list
1742 addrs = [node.primary_ip]
1744 for file_name in files:
1746 content = utils.ReadFile(file_name)
1748 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1750 msg = result[node_name].fail_msg
1752 logging.error("Failed to upload file %s to node %s: %s",
1753 file_name, node_name, msg)
1755 self._nodes[node_name] = node.primary_ip
1757 @locking.ssynchronized(_LOCK)
1759 def RemoveNode(self, node_name):
1760 """Callback called when removing nodes from the cluster.
1762 @type node_name: str
1763 @param node_name: the name of the node to remove
1766 self._nodes.pop(node_name, None)
1769 def _CheckRpcResult(result, nodes, failmsg):
1770 """Verifies the status of an RPC call.
1772 Since we aim to keep consistency should this node (the current
1773 master) fail, we will log errors if our rpc fail, and especially
1774 log the case when more than half of the nodes fails.
1776 @param result: the data as returned from the rpc call
1778 @param nodes: the list of nodes we made the call to
1780 @param failmsg: the identifier to be used for logging
1787 msg = result[node].fail_msg
1790 logging.error("RPC call %s (%s) failed on node %s: %s",
1791 result[node].call, failmsg, node, msg)
1793 success.append(node)
1795 # +1 for the master node
1796 if (len(success) + 1) < len(failed):
1797 # TODO: Handle failing nodes
1798 logging.error("More than half of the nodes failed")
1800 def _GetNodeIp(self):
1801 """Helper for returning the node name/ip list.
1803 @rtype: (list, list)
1804 @return: a tuple of two lists, the first one with the node
1805 names and the second one with the node addresses
1808 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1809 name_list = self._nodes.keys()
1810 addr_list = [self._nodes[name] for name in name_list]
1811 return name_list, addr_list
1813 def _UpdateJobQueueFile(self, file_name, data, replicate):
1814 """Writes a file locally and then replicates it to all nodes.
1816 This function will replace the contents of a file on the local
1817 node and then replicate it to all the other nodes we have.
1819 @type file_name: str
1820 @param file_name: the path of the file to be replicated
1822 @param data: the new contents of the file
1823 @type replicate: boolean
1824 @param replicate: whether to spread the changes to the remote nodes
1827 getents = runtime.GetEnts()
1828 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1829 gid=getents.masterd_gid)
1832 names, addrs = self._GetNodeIp()
1833 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1834 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1836 def _RenameFilesUnlocked(self, rename):
1837 """Renames a file locally and then replicate the change.
1839 This function will rename a file in the local queue directory
1840 and then replicate this rename to all the other nodes we have.
1842 @type rename: list of (old, new)
1843 @param rename: List containing tuples mapping old to new names
1846 # Rename them locally
1847 for old, new in rename:
1848 utils.RenameFile(old, new, mkdir=True)
1850 # ... and on all nodes
1851 names, addrs = self._GetNodeIp()
1852 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1853 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1855 def _NewSerialsUnlocked(self, count):
1856 """Generates a new job identifier.
1858 Job identifiers are unique during the lifetime of a cluster.
1860 @type count: integer
1861 @param count: how many serials to return
1863 @return: a list of job identifiers.
1866 assert ht.TPositiveInt(count)
1869 serial = self._last_serial + count
1872 self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1873 "%s\n" % serial, True)
1875 result = [jstore.FormatJobID(v)
1876 for v in range(self._last_serial + 1, serial + 1)]
1878 # Keep it only if we were able to write the file
1879 self._last_serial = serial
1881 assert len(result) == count
1886 def _GetJobPath(job_id):
1887 """Returns the job file for a given job id.
1890 @param job_id: the job identifier
1892 @return: the path to the job file
1895 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1898 def _GetArchivedJobPath(job_id):
1899 """Returns the archived job file for a give job id.
1902 @param job_id: the job identifier
1904 @return: the path to the archived job file
1907 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1908 jstore.GetArchiveDirectory(job_id),
1912 def _DetermineJobDirectories(archived):
1913 """Build list of directories containing job files.
1915 @type archived: bool
1916 @param archived: Whether to include directories for archived jobs
1920 result = [pathutils.QUEUE_DIR]
1923 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1924 result.extend(map(compat.partial(utils.PathJoin, archive_path),
1925 utils.ListVisibleFiles(archive_path)))
1930 def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1931 """Return all known job IDs.
1933 The method only looks at disk because it's a requirement that all
1934 jobs are present on disk (so in the _memcache we don't have any
1938 @param sort: perform sorting on the returned job ids
1940 @return: the list of job IDs
1945 for path in cls._DetermineJobDirectories(archived):
1946 for filename in utils.ListVisibleFiles(path):
1947 m = constants.JOB_FILE_RE.match(filename)
1949 jlist.append(int(m.group(1)))
1955 def _LoadJobUnlocked(self, job_id):
1956 """Loads a job from the disk or memory.
1958 Given a job id, this will return the cached job object if
1959 existing, or try to load the job from the disk. If loading from
1960 disk, it will also add the job to the cache.
1963 @param job_id: the job id
1964 @rtype: L{_QueuedJob} or None
1965 @return: either None or the job object
1968 job = self._memcache.get(job_id, None)
1970 logging.debug("Found job %s in memcache", job_id)
1971 assert job.writable, "Found read-only job in memcache"
1975 job = self._LoadJobFromDisk(job_id, False)
1978 except errors.JobFileCorrupted:
1979 old_path = self._GetJobPath(job_id)
1980 new_path = self._GetArchivedJobPath(job_id)
1981 if old_path == new_path:
1982 # job already archived (future case)
1983 logging.exception("Can't parse job %s", job_id)
1986 logging.exception("Can't parse job %s, will archive.", job_id)
1987 self._RenameFilesUnlocked([(old_path, new_path)])
1990 assert job.writable, "Job just loaded is not writable"
1992 self._memcache[job_id] = job
1993 logging.debug("Added job %s to the cache", job_id)
1996 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1997 """Load the given job file from disk.
1999 Given a job file, read, load and restore it in a _QueuedJob format.
2002 @param job_id: job identifier
2003 @type try_archived: bool
2004 @param try_archived: Whether to try loading an archived job
2005 @rtype: L{_QueuedJob} or None
2006 @return: either None or the job object
2009 path_functions = [(self._GetJobPath, False)]
2012 path_functions.append((self._GetArchivedJobPath, True))
2017 for (fn, archived) in path_functions:
2018 filepath = fn(job_id)
2019 logging.debug("Loading job from %s", filepath)
2021 raw_data = utils.ReadFile(filepath)
2022 except EnvironmentError, err:
2023 if err.errno != errno.ENOENT:
2031 if writable is None:
2032 writable = not archived
2035 data = serializer.LoadJson(raw_data)
2036 job = _QueuedJob.Restore(self, data, writable, archived)
2037 except Exception, err: # pylint: disable=W0703
2038 raise errors.JobFileCorrupted(err)
2042 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2043 """Load the given job file from disk.
2045 Given a job file, read, load and restore it in a _QueuedJob format.
2046 In case of error reading the job, it gets returned as None, and the
2047 exception is logged.
2050 @param job_id: job identifier
2051 @type try_archived: bool
2052 @param try_archived: Whether to try loading an archived job
2053 @rtype: L{_QueuedJob} or None
2054 @return: either None or the job object
2058 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2059 except (errors.JobFileCorrupted, EnvironmentError):
2060 logging.exception("Can't load/parse job %s", job_id)
2063 def _UpdateQueueSizeUnlocked(self):
2064 """Update the queue size.
2067 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2069 @locking.ssynchronized(_LOCK)
2071 def SetDrainFlag(self, drain_flag):
2072 """Sets the drain flag for the queue.
2074 @type drain_flag: boolean
2075 @param drain_flag: Whether to set or unset the drain flag
2078 jstore.SetDrainFlag(drain_flag)
2080 self._drained = drain_flag
2085 def _SubmitJobUnlocked(self, job_id, ops):
2086 """Create and store a new job.
2088 This enters the job into our job queue and also puts it on the new
2089 queue, in order for it to be picked up by the queue processors.
2091 @type job_id: job ID
2092 @param job_id: the job ID for the new job
2094 @param ops: The list of OpCodes that will become the new job.
2095 @rtype: L{_QueuedJob}
2096 @return: the job object to be queued
2097 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2098 @raise errors.GenericError: If an opcode is not valid
2101 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2102 raise errors.JobQueueFull()
2104 job = _QueuedJob(self, job_id, ops, True)
2106 for idx, op in enumerate(job.ops):
2108 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2109 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2110 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2111 " are %s" % (idx, op.priority, allowed))
2113 # Check job dependencies
2114 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2115 if not opcodes.TNoRelativeJobDependencies(dependencies):
2116 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2118 (idx, opcodes.TNoRelativeJobDependencies,
2122 self.UpdateJobUnlocked(job)
2124 self._queue_size += 1
2126 logging.debug("Adding new job %s to the cache", job_id)
2127 self._memcache[job_id] = job
2131 @locking.ssynchronized(_LOCK)
2133 @_RequireNonDrainedQueue
2134 def SubmitJob(self, ops):
2135 """Create and store a new job.
2137 @see: L{_SubmitJobUnlocked}
2140 (job_id, ) = self._NewSerialsUnlocked(1)
2141 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2144 @locking.ssynchronized(_LOCK)
2146 @_RequireNonDrainedQueue
2147 def SubmitManyJobs(self, jobs):
2148 """Create and store multiple jobs.
2150 @see: L{_SubmitJobUnlocked}
2153 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2155 (results, added_jobs) = \
2156 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2158 self._EnqueueJobsUnlocked(added_jobs)
2163 def _FormatSubmitError(msg, ops):
2164 """Formats errors which occurred while submitting a job.
2167 return ("%s; opcodes %s" %
2168 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2171 def _ResolveJobDependencies(resolve_fn, deps):
2172 """Resolves relative job IDs in dependencies.
2174 @type resolve_fn: callable
2175 @param resolve_fn: Function to resolve a relative job ID
2177 @param deps: Dependencies
2178 @rtype: tuple; (boolean, string or list)
2179 @return: If successful (first tuple item), the returned list contains
2180 resolved job IDs along with the requested status; if not successful,
2181 the second element is an error message
2186 for (dep_job_id, dep_status) in deps:
2187 if ht.TRelativeJobId(dep_job_id):
2188 assert ht.TInt(dep_job_id) and dep_job_id < 0
2190 job_id = resolve_fn(dep_job_id)
2193 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2197 result.append((job_id, dep_status))
2199 return (True, result)
2201 def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2202 """Create and store multiple jobs.
2204 @see: L{_SubmitJobUnlocked}
2210 def resolve_fn(job_idx, reljobid):
2212 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2214 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2216 if getattr(op, opcodes.DEPEND_ATTR, None):
2218 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2221 # Abort resolving dependencies
2222 assert ht.TNonEmptyString(data), "No error message"
2224 # Use resolved dependencies
2228 job = self._SubmitJobUnlocked(job_id, ops)
2229 except errors.GenericError, err:
2231 data = self._FormatSubmitError(str(err), ops)
2235 added_jobs.append(job)
2237 results.append((status, data))
2239 return (results, added_jobs)
2241 @locking.ssynchronized(_LOCK)
2242 def _EnqueueJobs(self, jobs):
2243 """Helper function to add jobs to worker pool's queue.
2246 @param jobs: List of all jobs
2249 return self._EnqueueJobsUnlocked(jobs)
2251 def _EnqueueJobsUnlocked(self, jobs):
2252 """Helper function to add jobs to worker pool's queue.
2255 @param jobs: List of all jobs
2258 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2259 self._wpool.AddManyTasks([(job, ) for job in jobs],
2260 priority=[job.CalcPriority() for job in jobs])
2262 def _GetJobStatusForDependencies(self, job_id):
2263 """Gets the status of a job for dependencies.
2266 @param job_id: Job ID
2267 @raise errors.JobLost: If job can't be found
2270 # Not using in-memory cache as doing so would require an exclusive lock
2272 # Try to load from disk
2273 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2275 assert not job.writable, "Got writable job" # pylint: disable=E1101
2278 return job.CalcStatus()
2280 raise errors.JobLost("Job %s not found" % job_id)
2283 def UpdateJobUnlocked(self, job, replicate=True):
2284 """Update a job's on disk storage.
2286 After a job has been modified, this function needs to be called in
2287 order to write the changes to disk and replicate them to the other
2290 @type job: L{_QueuedJob}
2291 @param job: the changed job
2292 @type replicate: boolean
2293 @param replicate: whether to replicate the change to remote nodes
2297 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2298 assert (finalized ^ (job.end_timestamp is None))
2299 assert job.writable, "Can't update read-only job"
2300 assert not job.archived, "Can't update archived job"
2302 filename = self._GetJobPath(job.id)
2303 data = serializer.DumpJson(job.Serialize())
2304 logging.debug("Writing job %s to %s", job.id, filename)
2305 self._UpdateJobQueueFile(filename, data, replicate)
2307 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2309 """Waits for changes in a job.
2312 @param job_id: Job identifier
2313 @type fields: list of strings
2314 @param fields: Which fields to check for changes
2315 @type prev_job_info: list or None
2316 @param prev_job_info: Last job information returned
2317 @type prev_log_serial: int
2318 @param prev_log_serial: Last job message serial number
2319 @type timeout: float
2320 @param timeout: maximum time to wait in seconds
2321 @rtype: tuple (job info, log entries)
2322 @return: a tuple of the job information as required via
2323 the fields parameter, and the log entries as a list
2325 if the job has not changed and the timeout has expired,
2326 we instead return a special value,
2327 L{constants.JOB_NOTCHANGED}, which should be interpreted
2328 as such by the clients
2331 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2334 helper = _WaitForJobChangesHelper()
2336 return helper(self._GetJobPath(job_id), load_fn,
2337 fields, prev_job_info, prev_log_serial, timeout)
2339 @locking.ssynchronized(_LOCK)
2341 def CancelJob(self, job_id):
2344 This will only succeed if the job has not started yet.
2347 @param job_id: job ID of job to be cancelled.
2350 logging.info("Cancelling job %s", job_id)
2352 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2354 def _ModifyJobUnlocked(self, job_id, mod_fn):
2358 @param job_id: Job ID
2359 @type mod_fn: callable
2360 @param mod_fn: Modifying function, receiving job object as parameter,
2361 returning tuple of (status boolean, message string)
2364 job = self._LoadJobUnlocked(job_id)
2366 logging.debug("Job %s not found", job_id)
2367 return (False, "Job %s not found" % job_id)
2369 assert job.writable, "Can't modify read-only job"
2370 assert not job.archived, "Can't modify archived job"
2372 (success, msg) = mod_fn(job)
2375 # If the job was finalized (e.g. cancelled), this is the final write
2376 # allowed. The job can be archived anytime.
2377 self.UpdateJobUnlocked(job)
2379 return (success, msg)
2382 def _ArchiveJobsUnlocked(self, jobs):
2385 @type jobs: list of L{_QueuedJob}
2386 @param jobs: Job objects
2388 @return: Number of archived jobs
2394 assert job.writable, "Can't archive read-only job"
2395 assert not job.archived, "Can't cancel archived job"
2397 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2398 logging.debug("Job %s is not yet done", job.id)
2401 archive_jobs.append(job)
2403 old = self._GetJobPath(job.id)
2404 new = self._GetArchivedJobPath(job.id)
2405 rename_files.append((old, new))
2407 # TODO: What if 1..n files fail to rename?
2408 self._RenameFilesUnlocked(rename_files)
2410 logging.debug("Successfully archived job(s) %s",
2411 utils.CommaJoin(job.id for job in archive_jobs))
2413 # Since we haven't quite checked, above, if we succeeded or failed renaming
2414 # the files, we update the cached queue size from the filesystem. When we
2415 # get around to fix the TODO: above, we can use the number of actually
2416 # archived jobs to fix this.
2417 self._UpdateQueueSizeUnlocked()
2418 return len(archive_jobs)
2420 @locking.ssynchronized(_LOCK)
2422 def ArchiveJob(self, job_id):
2425 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2428 @param job_id: Job ID of job to be archived.
2430 @return: Whether job was archived
2433 logging.info("Archiving job %s", job_id)
2435 job = self._LoadJobUnlocked(job_id)
2437 logging.debug("Job %s not found", job_id)
2440 return self._ArchiveJobsUnlocked([job]) == 1
2442 @locking.ssynchronized(_LOCK)
2444 def AutoArchiveJobs(self, age, timeout):
2445 """Archives all jobs based on age.
2447 The method will archive all jobs which are older than the age
2448 parameter. For jobs that don't have an end timestamp, the start
2449 timestamp will be considered. The special '-1' age will cause
2450 archival of all jobs (that are not running or queued).
2453 @param age: the minimum age in seconds
2456 logging.info("Archiving jobs with age more than %s seconds", age)
2459 end_time = now + timeout
2463 all_job_ids = self._GetJobIDsUnlocked()
2465 for idx, job_id in enumerate(all_job_ids):
2466 last_touched = idx + 1
2468 # Not optimal because jobs could be pending
2469 # TODO: Measure average duration for job archival and take number of
2470 # pending jobs into account.
2471 if time.time() > end_time:
2474 # Returns None if the job failed to load
2475 job = self._LoadJobUnlocked(job_id)
2477 if job.end_timestamp is None:
2478 if job.start_timestamp is None:
2479 job_age = job.received_timestamp
2481 job_age = job.start_timestamp
2483 job_age = job.end_timestamp
2485 if age == -1 or now - job_age[0] > age:
2488 # Archive 10 jobs at a time
2489 if len(pending) >= 10:
2490 archived_count += self._ArchiveJobsUnlocked(pending)
2494 archived_count += self._ArchiveJobsUnlocked(pending)
2496 return (archived_count, len(all_job_ids) - last_touched)
2498 def _Query(self, fields, qfilter):
2499 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2502 # Archived jobs are only looked at if the "archived" field is referenced
2503 # either as a requested field or in the filter. By default archived jobs
2505 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2507 job_ids = qobj.RequestedNames()
2509 list_all = (job_ids is None)
2512 # Since files are added to/removed from the queue atomically, there's no
2513 # risk of getting the job ids in an inconsistent state.
2514 job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2518 for job_id in job_ids:
2519 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2520 if job is not None or not list_all:
2521 jobs.append((job_id, job))
2523 return (qobj, jobs, list_all)
2525 def QueryJobs(self, fields, qfilter):
2526 """Returns a list of jobs in queue.
2528 @type fields: sequence
2529 @param fields: List of wanted fields
2530 @type qfilter: None or query2 filter (list)
2531 @param qfilter: Query filter
2534 (qobj, ctx, _) = self._Query(fields, qfilter)
2536 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2538 def OldStyleQueryJobs(self, job_ids, fields):
2539 """Returns a list of jobs in queue.
2542 @param job_ids: sequence of job identifiers or None for all
2544 @param fields: names of fields to return
2546 @return: list one element per job, each element being list with
2547 the requested fields
2551 job_ids = [int(jid) for jid in job_ids]
2552 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2554 (qobj, ctx, _) = self._Query(fields, qfilter)
2556 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2558 @locking.ssynchronized(_LOCK)
2559 def PrepareShutdown(self):
2560 """Prepare to stop the job queue.
2562 Disables execution of jobs in the workerpool and returns whether there are
2563 any jobs currently running. If the latter is the case, the job queue is not
2564 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2565 be called without interfering with any job. Queued and unfinished jobs will
2566 be resumed next time.
2568 Once this function has been called no new job submissions will be accepted
2569 (see L{_RequireNonDrainedQueue}).
2572 @return: Whether there are any running jobs
2575 if self._accepting_jobs:
2576 self._accepting_jobs = False
2578 # Tell worker pool to stop processing pending tasks
2579 self._wpool.SetActive(False)
2581 return self._wpool.HasRunningTasks()
2583 def AcceptingJobsUnlocked(self):
2584 """Returns whether jobs are accepted.
2586 Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2587 queue is shutting down.
2592 return self._accepting_jobs
2594 @locking.ssynchronized(_LOCK)
2597 """Stops the job queue.
2599 This shutdowns all the worker threads an closes the queue.
2602 self._wpool.TerminateWorkers()
2604 self._queue_filelock.Close()
2605 self._queue_filelock = None