4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
63 JOBS_PER_ARCHIVE_DIRECTORY = 10000
65 # member lock names to be passed to @ssynchronized decorator
70 class CancelJob(Exception):
71 """Special exception to cancel a job.
77 """Returns the current timestamp.
80 @return: the current time in the (seconds, microseconds) format
83 return utils.SplitTime(time.time())
86 class _QueuedOpCode(object):
87 """Encapsulates an opcode object.
89 @ivar log: holds the execution log and consists of tuples
90 of the form C{(log_serial, timestamp, level, message)}
91 @ivar input: the OpCode we encapsulate
92 @ivar status: the current status
93 @ivar result: the result of the LU execution
94 @ivar start_timestamp: timestamp for the start of the execution
95 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
96 @ivar stop_timestamp: timestamp for the end of the execution
99 __slots__ = ["input", "status", "result", "log", "priority",
100 "start_timestamp", "exec_timestamp", "end_timestamp",
103 def __init__(self, op):
104 """Constructor for the _QuededOpCode.
106 @type op: L{opcodes.OpCode}
107 @param op: the opcode we encapsulate
111 self.status = constants.OP_STATUS_QUEUED
114 self.start_timestamp = None
115 self.exec_timestamp = None
116 self.end_timestamp = None
118 # Get initial priority (it might change during the lifetime of this opcode)
119 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
122 def Restore(cls, state):
123 """Restore the _QueuedOpCode from the serialized form.
126 @param state: the serialized state
127 @rtype: _QueuedOpCode
128 @return: a new _QueuedOpCode instance
131 obj = _QueuedOpCode.__new__(cls)
132 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
133 obj.status = state["status"]
134 obj.result = state["result"]
135 obj.log = state["log"]
136 obj.start_timestamp = state.get("start_timestamp", None)
137 obj.exec_timestamp = state.get("exec_timestamp", None)
138 obj.end_timestamp = state.get("end_timestamp", None)
139 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
143 """Serializes this _QueuedOpCode.
146 @return: the dictionary holding the serialized state
150 "input": self.input.__getstate__(),
151 "status": self.status,
152 "result": self.result,
154 "start_timestamp": self.start_timestamp,
155 "exec_timestamp": self.exec_timestamp,
156 "end_timestamp": self.end_timestamp,
157 "priority": self.priority,
161 class _QueuedJob(object):
162 """In-memory job representation.
164 This is what we use to track the user-submitted jobs. Locking must
165 be taken care of by users of this class.
167 @type queue: L{JobQueue}
168 @ivar queue: the parent queue
171 @ivar ops: the list of _QueuedOpCode that constitute the job
172 @type log_serial: int
173 @ivar log_serial: holds the index for the next log entry
174 @ivar received_timestamp: the timestamp for when the job was received
175 @ivar start_timestmap: the timestamp for start of execution
176 @ivar end_timestamp: the timestamp for end of execution
177 @ivar writable: Whether the job is allowed to be modified
180 # pylint: disable=W0212
181 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
182 "received_timestamp", "start_timestamp", "end_timestamp",
183 "__weakref__", "processor_lock", "writable"]
185 def __init__(self, queue, job_id, ops, writable):
186 """Constructor for the _QueuedJob.
188 @type queue: L{JobQueue}
189 @param queue: our parent queue
191 @param job_id: our job id
193 @param ops: the list of opcodes we hold, which will be encapsulated
196 @param writable: Whether job can be modified
200 raise errors.GenericError("A job needs at least one opcode")
204 self.ops = [_QueuedOpCode(op) for op in ops]
206 self.received_timestamp = TimeStampNow()
207 self.start_timestamp = None
208 self.end_timestamp = None
210 self._InitInMemory(self, writable)
213 def _InitInMemory(obj, writable):
214 """Initializes in-memory variables.
217 obj.writable = writable
221 # Read-only jobs are not processed and therefore don't need a lock
223 obj.processor_lock = threading.Lock()
225 obj.processor_lock = None
228 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
230 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
232 return "<%s at %#x>" % (" ".join(status), id(self))
235 def Restore(cls, queue, state, writable):
236 """Restore a _QueuedJob from serialized state:
238 @type queue: L{JobQueue}
239 @param queue: to which queue the restored job belongs
241 @param state: the serialized state
243 @param writable: Whether job can be modified
245 @return: the restored _JobQueue instance
248 obj = _QueuedJob.__new__(cls)
251 obj.received_timestamp = state.get("received_timestamp", None)
252 obj.start_timestamp = state.get("start_timestamp", None)
253 obj.end_timestamp = state.get("end_timestamp", None)
257 for op_state in state["ops"]:
258 op = _QueuedOpCode.Restore(op_state)
259 for log_entry in op.log:
260 obj.log_serial = max(obj.log_serial, log_entry[0])
263 cls._InitInMemory(obj, writable)
268 """Serialize the _JobQueue instance.
271 @return: the serialized state
276 "ops": [op.Serialize() for op in self.ops],
277 "start_timestamp": self.start_timestamp,
278 "end_timestamp": self.end_timestamp,
279 "received_timestamp": self.received_timestamp,
282 def CalcStatus(self):
283 """Compute the status of this job.
285 This function iterates over all the _QueuedOpCodes in the job and
286 based on their status, computes the job status.
289 - if we find a cancelled, or finished with error, the job
290 status will be the same
291 - otherwise, the last opcode with the status one of:
296 will determine the job status
298 - otherwise, it means either all opcodes are queued, or success,
299 and the job status will be the same
301 @return: the job status
304 status = constants.JOB_STATUS_QUEUED
308 if op.status == constants.OP_STATUS_SUCCESS:
313 if op.status == constants.OP_STATUS_QUEUED:
315 elif op.status == constants.OP_STATUS_WAITING:
316 status = constants.JOB_STATUS_WAITING
317 elif op.status == constants.OP_STATUS_RUNNING:
318 status = constants.JOB_STATUS_RUNNING
319 elif op.status == constants.OP_STATUS_CANCELING:
320 status = constants.JOB_STATUS_CANCELING
322 elif op.status == constants.OP_STATUS_ERROR:
323 status = constants.JOB_STATUS_ERROR
324 # The whole job fails if one opcode failed
326 elif op.status == constants.OP_STATUS_CANCELED:
327 status = constants.OP_STATUS_CANCELED
331 status = constants.JOB_STATUS_SUCCESS
335 def CalcPriority(self):
336 """Gets the current priority for this job.
338 Only unfinished opcodes are considered. When all are done, the default
344 priorities = [op.priority for op in self.ops
345 if op.status not in constants.OPS_FINALIZED]
348 # All opcodes are done, assume default priority
349 return constants.OP_PRIO_DEFAULT
351 return min(priorities)
353 def GetLogEntries(self, newer_than):
354 """Selectively returns the log entries.
356 @type newer_than: None or int
357 @param newer_than: if this is None, return all log entries,
358 otherwise return only the log entries with serial higher
361 @return: the list of the log entries selected
364 if newer_than is None:
371 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
375 def GetInfo(self, fields):
376 """Returns information about a job.
379 @param fields: names of fields to return
381 @return: list with one element for each field
382 @raise errors.OpExecError: when an invalid field
390 elif fname == "status":
391 row.append(self.CalcStatus())
392 elif fname == "priority":
393 row.append(self.CalcPriority())
395 row.append([op.input.__getstate__() for op in self.ops])
396 elif fname == "opresult":
397 row.append([op.result for op in self.ops])
398 elif fname == "opstatus":
399 row.append([op.status for op in self.ops])
400 elif fname == "oplog":
401 row.append([op.log for op in self.ops])
402 elif fname == "opstart":
403 row.append([op.start_timestamp for op in self.ops])
404 elif fname == "opexec":
405 row.append([op.exec_timestamp for op in self.ops])
406 elif fname == "opend":
407 row.append([op.end_timestamp for op in self.ops])
408 elif fname == "oppriority":
409 row.append([op.priority for op in self.ops])
410 elif fname == "received_ts":
411 row.append(self.received_timestamp)
412 elif fname == "start_ts":
413 row.append(self.start_timestamp)
414 elif fname == "end_ts":
415 row.append(self.end_timestamp)
416 elif fname == "summary":
417 row.append([op.input.Summary() for op in self.ops])
419 raise errors.OpExecError("Invalid self query field '%s'" % fname)
422 def MarkUnfinishedOps(self, status, result):
423 """Mark unfinished opcodes with a given status and result.
425 This is an utility function for marking all running or waiting to
426 be run opcodes with a given status. Opcodes which are already
427 finalised are not changed.
429 @param status: a given opcode status
430 @param result: the opcode result
435 if op.status in constants.OPS_FINALIZED:
436 assert not_marked, "Finalized opcodes found after non-finalized ones"
443 """Marks the job as finalized.
446 self.end_timestamp = TimeStampNow()
449 """Marks job as canceled/-ing if possible.
451 @rtype: tuple; (bool, string)
452 @return: Boolean describing whether job was successfully canceled or marked
453 as canceling and a text message
456 status = self.CalcStatus()
458 if status == constants.JOB_STATUS_QUEUED:
459 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
460 "Job canceled by request")
462 return (True, "Job %s canceled" % self.id)
464 elif status == constants.JOB_STATUS_WAITING:
465 # The worker will notice the new status and cancel the job
466 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
467 return (True, "Job %s will be canceled" % self.id)
470 logging.debug("Job %s is no longer waiting in the queue", self.id)
471 return (False, "Job %s is no longer waiting in the queue" % self.id)
474 class _OpExecCallbacks(mcpu.OpExecCbBase):
475 def __init__(self, queue, job, op):
476 """Initializes this class.
478 @type queue: L{JobQueue}
479 @param queue: Job queue
480 @type job: L{_QueuedJob}
481 @param job: Job object
482 @type op: L{_QueuedOpCode}
486 assert queue, "Queue is missing"
487 assert job, "Job is missing"
488 assert op, "Opcode is missing"
494 def _CheckCancel(self):
495 """Raises an exception to cancel the job if asked to.
498 # Cancel here if we were asked to
499 if self._op.status == constants.OP_STATUS_CANCELING:
500 logging.debug("Canceling opcode")
503 @locking.ssynchronized(_QUEUE, shared=1)
504 def NotifyStart(self):
505 """Mark the opcode as running, not lock-waiting.
507 This is called from the mcpu code as a notifier function, when the LU is
508 finally about to start the Exec() method. Of course, to have end-user
509 visible results, the opcode must be initially (before calling into
510 Processor.ExecOpCode) set to OP_STATUS_WAITING.
513 assert self._op in self._job.ops
514 assert self._op.status in (constants.OP_STATUS_WAITING,
515 constants.OP_STATUS_CANCELING)
517 # Cancel here if we were asked to
520 logging.debug("Opcode is now running")
522 self._op.status = constants.OP_STATUS_RUNNING
523 self._op.exec_timestamp = TimeStampNow()
525 # And finally replicate the job status
526 self._queue.UpdateJobUnlocked(self._job)
528 @locking.ssynchronized(_QUEUE, shared=1)
529 def _AppendFeedback(self, timestamp, log_type, log_msg):
530 """Internal feedback append function, with locks
533 self._job.log_serial += 1
534 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
535 self._queue.UpdateJobUnlocked(self._job, replicate=False)
537 def Feedback(self, *args):
538 """Append a log entry.
544 log_type = constants.ELOG_MESSAGE
547 (log_type, log_msg) = args
549 # The time is split to make serialization easier and not lose
551 timestamp = utils.SplitTime(time.time())
552 self._AppendFeedback(timestamp, log_type, log_msg)
554 def CheckCancel(self):
555 """Check whether job has been cancelled.
558 assert self._op.status in (constants.OP_STATUS_WAITING,
559 constants.OP_STATUS_CANCELING)
561 # Cancel here if we were asked to
564 def SubmitManyJobs(self, jobs):
565 """Submits jobs for processing.
567 See L{JobQueue.SubmitManyJobs}.
570 # Locking is done in job queue
571 return self._queue.SubmitManyJobs(jobs)
574 class _JobChangesChecker(object):
575 def __init__(self, fields, prev_job_info, prev_log_serial):
576 """Initializes this class.
578 @type fields: list of strings
579 @param fields: Fields requested by LUXI client
580 @type prev_job_info: string
581 @param prev_job_info: previous job info, as passed by the LUXI client
582 @type prev_log_serial: string
583 @param prev_log_serial: previous job serial, as passed by the LUXI client
586 self._fields = fields
587 self._prev_job_info = prev_job_info
588 self._prev_log_serial = prev_log_serial
590 def __call__(self, job):
591 """Checks whether job has changed.
593 @type job: L{_QueuedJob}
594 @param job: Job object
597 assert not job.writable, "Expected read-only job"
599 status = job.CalcStatus()
600 job_info = job.GetInfo(self._fields)
601 log_entries = job.GetLogEntries(self._prev_log_serial)
603 # Serializing and deserializing data can cause type changes (e.g. from
604 # tuple to list) or precision loss. We're doing it here so that we get
605 # the same modifications as the data received from the client. Without
606 # this, the comparison afterwards might fail without the data being
607 # significantly different.
608 # TODO: we just deserialized from disk, investigate how to make sure that
609 # the job info and log entries are compatible to avoid this further step.
610 # TODO: Doing something like in testutils.py:UnifyValueType might be more
611 # efficient, though floats will be tricky
612 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
613 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
615 # Don't even try to wait if the job is no longer running, there will be
617 if (status not in (constants.JOB_STATUS_QUEUED,
618 constants.JOB_STATUS_RUNNING,
619 constants.JOB_STATUS_WAITING) or
620 job_info != self._prev_job_info or
621 (log_entries and self._prev_log_serial != log_entries[0][0])):
622 logging.debug("Job %s changed", job.id)
623 return (job_info, log_entries)
628 class _JobFileChangesWaiter(object):
629 def __init__(self, filename):
630 """Initializes this class.
632 @type filename: string
633 @param filename: Path to job file
634 @raises errors.InotifyError: if the notifier cannot be setup
637 self._wm = pyinotify.WatchManager()
638 self._inotify_handler = \
639 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
641 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
643 self._inotify_handler.enable()
645 # pyinotify doesn't close file descriptors automatically
646 self._notifier.stop()
649 def _OnInotify(self, notifier_enabled):
650 """Callback for inotify.
653 if not notifier_enabled:
654 self._inotify_handler.enable()
656 def Wait(self, timeout):
657 """Waits for the job file to change.
660 @param timeout: Timeout in seconds
661 @return: Whether there have been events
665 have_events = self._notifier.check_events(timeout * 1000)
667 self._notifier.read_events()
668 self._notifier.process_events()
672 """Closes underlying notifier and its file descriptor.
675 self._notifier.stop()
678 class _JobChangesWaiter(object):
679 def __init__(self, filename):
680 """Initializes this class.
682 @type filename: string
683 @param filename: Path to job file
686 self._filewaiter = None
687 self._filename = filename
689 def Wait(self, timeout):
690 """Waits for a job to change.
693 @param timeout: Timeout in seconds
694 @return: Whether there have been events
698 return self._filewaiter.Wait(timeout)
700 # Lazy setup: Avoid inotify setup cost when job file has already changed.
701 # If this point is reached, return immediately and let caller check the job
702 # file again in case there were changes since the last check. This avoids a
704 self._filewaiter = _JobFileChangesWaiter(self._filename)
709 """Closes underlying waiter.
713 self._filewaiter.Close()
716 class _WaitForJobChangesHelper(object):
717 """Helper class using inotify to wait for changes in a job file.
719 This class takes a previous job status and serial, and alerts the client when
720 the current job status has changed.
724 def _CheckForChanges(counter, job_load_fn, check_fn):
725 if counter.next() > 0:
726 # If this isn't the first check the job is given some more time to change
727 # again. This gives better performance for jobs generating many
733 raise errors.JobLost()
735 result = check_fn(job)
737 raise utils.RetryAgain()
741 def __call__(self, filename, job_load_fn,
742 fields, prev_job_info, prev_log_serial, timeout):
743 """Waits for changes on a job.
745 @type filename: string
746 @param filename: File on which to wait for changes
747 @type job_load_fn: callable
748 @param job_load_fn: Function to load job
749 @type fields: list of strings
750 @param fields: Which fields to check for changes
751 @type prev_job_info: list or None
752 @param prev_job_info: Last job information returned
753 @type prev_log_serial: int
754 @param prev_log_serial: Last job message serial number
756 @param timeout: maximum time to wait in seconds
759 counter = itertools.count()
761 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
762 waiter = _JobChangesWaiter(filename)
764 return utils.Retry(compat.partial(self._CheckForChanges,
765 counter, job_load_fn, check_fn),
766 utils.RETRY_REMAINING_TIME, timeout,
770 except (errors.InotifyError, errors.JobLost):
772 except utils.RetryTimeout:
773 return constants.JOB_NOTCHANGED
776 def _EncodeOpError(err):
777 """Encodes an error which occurred while processing an opcode.
780 if isinstance(err, errors.GenericError):
783 to_encode = errors.OpExecError(str(err))
785 return errors.EncodeException(to_encode)
788 class _TimeoutStrategyWrapper:
789 def __init__(self, fn):
790 """Initializes this class.
797 """Gets the next timeout if necessary.
800 if self._next is None:
801 self._next = self._fn()
804 """Returns the next timeout.
811 """Returns the current timeout and advances the internal state.
820 class _OpExecContext:
821 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
822 """Initializes this class.
827 self.log_prefix = log_prefix
828 self.summary = op.input.Summary()
830 # Create local copy to modify
831 if getattr(op.input, opcodes.DEPEND_ATTR, None):
832 self.jobdeps = op.input.depends[:]
836 self._timeout_strategy_factory = timeout_strategy_factory
837 self._ResetTimeoutStrategy()
839 def _ResetTimeoutStrategy(self):
840 """Creates a new timeout strategy.
843 self._timeout_strategy = \
844 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
846 def CheckPriorityIncrease(self):
847 """Checks whether priority can and should be increased.
849 Called when locks couldn't be acquired.
854 # Exhausted all retries and next round should not use blocking acquire
856 if (self._timeout_strategy.Peek() is None and
857 op.priority > constants.OP_PRIO_HIGHEST):
858 logging.debug("Increasing priority")
860 self._ResetTimeoutStrategy()
865 def GetNextLockTimeout(self):
866 """Returns the next lock acquire timeout.
869 return self._timeout_strategy.Next()
872 class _JobProcessor(object):
875 FINISHED) = range(1, 4)
877 def __init__(self, queue, opexec_fn, job,
878 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
879 """Initializes this class.
883 self.opexec_fn = opexec_fn
885 self._timeout_strategy_factory = _timeout_strategy_factory
888 def _FindNextOpcode(job, timeout_strategy_factory):
889 """Locates the next opcode to run.
891 @type job: L{_QueuedJob}
892 @param job: Job object
893 @param timeout_strategy_factory: Callable to create new timeout strategy
896 # Create some sort of a cache to speed up locating next opcode for future
898 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
899 # pending and one for processed ops.
900 if job.ops_iter is None:
901 job.ops_iter = enumerate(job.ops)
903 # Find next opcode to run
906 (idx, op) = job.ops_iter.next()
907 except StopIteration:
908 raise errors.ProgrammerError("Called for a finished job")
910 if op.status == constants.OP_STATUS_RUNNING:
911 # Found an opcode already marked as running
912 raise errors.ProgrammerError("Called for job marked as running")
914 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
915 timeout_strategy_factory)
917 if op.status not in constants.OPS_FINALIZED:
920 # This is a job that was partially completed before master daemon
921 # shutdown, so it can be expected that some opcodes are already
922 # completed successfully (if any did error out, then the whole job
923 # should have been aborted and not resubmitted for processing).
924 logging.info("%s: opcode %s already processed, skipping",
925 opctx.log_prefix, opctx.summary)
928 def _MarkWaitlock(job, op):
929 """Marks an opcode as waiting for locks.
931 The job's start timestamp is also set if necessary.
933 @type job: L{_QueuedJob}
934 @param job: Job object
935 @type op: L{_QueuedOpCode}
936 @param op: Opcode object
940 assert op.status in (constants.OP_STATUS_QUEUED,
941 constants.OP_STATUS_WAITING)
947 if op.status == constants.OP_STATUS_QUEUED:
948 op.status = constants.OP_STATUS_WAITING
951 if op.start_timestamp is None:
952 op.start_timestamp = TimeStampNow()
955 if job.start_timestamp is None:
956 job.start_timestamp = op.start_timestamp
959 assert op.status == constants.OP_STATUS_WAITING
964 def _CheckDependencies(queue, job, opctx):
965 """Checks if an opcode has dependencies and if so, processes them.
967 @type queue: L{JobQueue}
968 @param queue: Queue object
969 @type job: L{_QueuedJob}
970 @param job: Job object
971 @type opctx: L{_OpExecContext}
972 @param opctx: Opcode execution context
974 @return: Whether opcode will be re-scheduled by dependency tracker
982 (dep_job_id, dep_status) = opctx.jobdeps[0]
984 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
986 assert ht.TNonEmptyString(depmsg), "No dependency message"
988 logging.info("%s: %s", opctx.log_prefix, depmsg)
990 if depresult == _JobDependencyManager.CONTINUE:
991 # Remove dependency and continue
994 elif depresult == _JobDependencyManager.WAIT:
995 # Need to wait for notification, dependency tracker will re-add job
1000 elif depresult == _JobDependencyManager.CANCEL:
1001 # Job was cancelled, cancel this job as well
1003 assert op.status == constants.OP_STATUS_CANCELING
1006 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1007 _JobDependencyManager.ERROR):
1008 # Job failed or there was an error, this job must fail
1009 op.status = constants.OP_STATUS_ERROR
1010 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1014 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1019 def _ExecOpCodeUnlocked(self, opctx):
1020 """Processes one opcode and returns the result.
1025 assert op.status == constants.OP_STATUS_WAITING
1027 timeout = opctx.GetNextLockTimeout()
1030 # Make sure not to hold queue lock while calling ExecOpCode
1031 result = self.opexec_fn(op.input,
1032 _OpExecCallbacks(self.queue, self.job, op),
1033 timeout=timeout, priority=op.priority)
1034 except mcpu.LockAcquireTimeout:
1035 assert timeout is not None, "Received timeout for blocking acquire"
1036 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1038 assert op.status in (constants.OP_STATUS_WAITING,
1039 constants.OP_STATUS_CANCELING)
1041 # Was job cancelled while we were waiting for the lock?
1042 if op.status == constants.OP_STATUS_CANCELING:
1043 return (constants.OP_STATUS_CANCELING, None)
1045 # Stay in waitlock while trying to re-acquire lock
1046 return (constants.OP_STATUS_WAITING, None)
1048 logging.exception("%s: Canceling job", opctx.log_prefix)
1049 assert op.status == constants.OP_STATUS_CANCELING
1050 return (constants.OP_STATUS_CANCELING, None)
1051 except Exception, err: # pylint: disable=W0703
1052 logging.exception("%s: Caught exception in %s",
1053 opctx.log_prefix, opctx.summary)
1054 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1056 logging.debug("%s: %s successful",
1057 opctx.log_prefix, opctx.summary)
1058 return (constants.OP_STATUS_SUCCESS, result)
1060 def __call__(self, _nextop_fn=None):
1061 """Continues execution of a job.
1063 @param _nextop_fn: Callback function for tests
1064 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1065 be deferred and C{WAITDEP} if the dependency manager
1066 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1072 logging.debug("Processing job %s", job.id)
1074 queue.acquire(shared=1)
1076 opcount = len(job.ops)
1078 assert job.writable, "Expected writable job"
1080 # Don't do anything for finalized jobs
1081 if job.CalcStatus() in constants.JOBS_FINALIZED:
1082 return self.FINISHED
1084 # Is a previous opcode still pending?
1086 opctx = job.cur_opctx
1087 job.cur_opctx = None
1089 if __debug__ and _nextop_fn:
1091 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1096 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1097 constants.OP_STATUS_CANCELING)
1098 for i in job.ops[opctx.index + 1:])
1100 assert op.status in (constants.OP_STATUS_QUEUED,
1101 constants.OP_STATUS_WAITING,
1102 constants.OP_STATUS_CANCELING)
1104 assert (op.priority <= constants.OP_PRIO_LOWEST and
1105 op.priority >= constants.OP_PRIO_HIGHEST)
1109 if op.status != constants.OP_STATUS_CANCELING:
1110 assert op.status in (constants.OP_STATUS_QUEUED,
1111 constants.OP_STATUS_WAITING)
1113 # Prepare to start opcode
1114 if self._MarkWaitlock(job, op):
1116 queue.UpdateJobUnlocked(job)
1118 assert op.status == constants.OP_STATUS_WAITING
1119 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1120 assert job.start_timestamp and op.start_timestamp
1121 assert waitjob is None
1123 # Check if waiting for a job is necessary
1124 waitjob = self._CheckDependencies(queue, job, opctx)
1126 assert op.status in (constants.OP_STATUS_WAITING,
1127 constants.OP_STATUS_CANCELING,
1128 constants.OP_STATUS_ERROR)
1130 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1131 constants.OP_STATUS_ERROR)):
1132 logging.info("%s: opcode %s waiting for locks",
1133 opctx.log_prefix, opctx.summary)
1135 assert not opctx.jobdeps, "Not all dependencies were removed"
1139 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1141 queue.acquire(shared=1)
1143 op.status = op_status
1144 op.result = op_result
1148 if op.status == constants.OP_STATUS_WAITING:
1149 # Couldn't get locks in time
1150 assert not op.end_timestamp
1153 op.end_timestamp = TimeStampNow()
1155 if op.status == constants.OP_STATUS_CANCELING:
1156 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1157 for i in job.ops[opctx.index:])
1159 assert op.status in constants.OPS_FINALIZED
1161 if op.status == constants.OP_STATUS_WAITING or waitjob:
1164 if not waitjob and opctx.CheckPriorityIncrease():
1165 # Priority was changed, need to update on-disk file
1166 queue.UpdateJobUnlocked(job)
1168 # Keep around for another round
1169 job.cur_opctx = opctx
1171 assert (op.priority <= constants.OP_PRIO_LOWEST and
1172 op.priority >= constants.OP_PRIO_HIGHEST)
1174 # In no case must the status be finalized here
1175 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1178 # Ensure all opcodes so far have been successful
1179 assert (opctx.index == 0 or
1180 compat.all(i.status == constants.OP_STATUS_SUCCESS
1181 for i in job.ops[:opctx.index]))
1184 job.cur_opctx = None
1186 if op.status == constants.OP_STATUS_SUCCESS:
1189 elif op.status == constants.OP_STATUS_ERROR:
1190 # Ensure failed opcode has an exception as its result
1191 assert errors.GetEncodedError(job.ops[opctx.index].result)
1193 to_encode = errors.OpExecError("Preceding opcode failed")
1194 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1195 _EncodeOpError(to_encode))
1199 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1200 errors.GetEncodedError(i.result)
1201 for i in job.ops[opctx.index:])
1203 elif op.status == constants.OP_STATUS_CANCELING:
1204 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1205 "Job canceled by request")
1209 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1211 if opctx.index == (opcount - 1):
1212 # Finalize on last opcode
1216 # All opcodes have been run, finalize job
1219 # Write to disk. If the job status is final, this is the final write
1220 # allowed. Once the file has been written, it can be archived anytime.
1221 queue.UpdateJobUnlocked(job)
1226 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1227 return self.FINISHED
1229 assert not waitjob or queue.depmgr.JobWaiting(job)
1236 assert job.writable, "Job became read-only while being processed"
1240 def _EvaluateJobProcessorResult(depmgr, job, result):
1241 """Looks at a result from L{_JobProcessor} for a job.
1243 To be used in a L{_JobQueueWorker}.
1246 if result == _JobProcessor.FINISHED:
1247 # Notify waiting jobs
1248 depmgr.NotifyWaiters(job.id)
1250 elif result == _JobProcessor.DEFER:
1252 raise workerpool.DeferTask(priority=job.CalcPriority())
1254 elif result == _JobProcessor.WAITDEP:
1255 # No-op, dependency manager will re-schedule
1259 raise errors.ProgrammerError("Job processor returned unknown status %s" %
1263 class _JobQueueWorker(workerpool.BaseWorker):
1264 """The actual job workers.
1267 def RunTask(self, job): # pylint: disable=W0221
1270 @type job: L{_QueuedJob}
1271 @param job: the job to be processed
1274 assert job.writable, "Expected writable job"
1276 # Ensure only one worker is active on a single job. If a job registers for
1277 # a dependency job, and the other job notifies before the first worker is
1278 # done, the job can end up in the tasklist more than once.
1279 job.processor_lock.acquire()
1281 return self._RunTaskInner(job)
1283 job.processor_lock.release()
1285 def _RunTaskInner(self, job):
1288 Must be called with per-job lock acquired.
1292 assert queue == self.pool.queue
1294 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1297 proc = mcpu.Processor(queue.context, job.id)
1299 # Create wrapper for setting thread name
1300 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1303 _EvaluateJobProcessorResult(queue.depmgr, job,
1304 _JobProcessor(queue, wrap_execop_fn, job)())
1307 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1308 """Updates the worker thread name to include a short summary of the opcode.
1310 @param setname_fn: Callable setting worker thread name
1311 @param execop_fn: Callable for executing opcode (usually
1312 L{mcpu.Processor.ExecOpCode})
1317 return execop_fn(op, *args, **kwargs)
1322 def _GetWorkerName(job, op):
1323 """Sets the worker thread name.
1325 @type job: L{_QueuedJob}
1326 @type op: L{opcodes.OpCode}
1329 parts = ["Job%s" % job.id]
1332 parts.append(op.TinySummary())
1334 return "/".join(parts)
1337 class _JobQueueWorkerPool(workerpool.WorkerPool):
1338 """Simple class implementing a job-processing workerpool.
1341 def __init__(self, queue):
1342 super(_JobQueueWorkerPool, self).__init__("Jq",
1348 class _JobDependencyManager:
1349 """Keeps track of job dependencies.
1356 WRONGSTATUS) = range(1, 6)
1358 def __init__(self, getstatus_fn, enqueue_fn):
1359 """Initializes this class.
1362 self._getstatus_fn = getstatus_fn
1363 self._enqueue_fn = enqueue_fn
1366 self._lock = locking.SharedLock("JobDepMgr")
1368 @locking.ssynchronized(_LOCK, shared=1)
1369 def GetLockInfo(self, requested): # pylint: disable=W0613
1370 """Retrieves information about waiting jobs.
1372 @type requested: set
1373 @param requested: Requested information, see C{query.LQ_*}
1376 # No need to sort here, that's being done by the lock manager and query
1377 # library. There are no priorities for notifying jobs, hence all show up as
1378 # one item under "pending".
1379 return [("job/%s" % job_id, None, None,
1380 [("job", [job.id for job in waiters])])
1381 for job_id, waiters in self._waiters.items()
1384 @locking.ssynchronized(_LOCK, shared=1)
1385 def JobWaiting(self, job):
1386 """Checks if a job is waiting.
1389 return compat.any(job in jobs
1390 for jobs in self._waiters.values())
1392 @locking.ssynchronized(_LOCK)
1393 def CheckAndRegister(self, job, dep_job_id, dep_status):
1394 """Checks if a dependency job has the requested status.
1396 If the other job is not yet in a finalized status, the calling job will be
1397 notified (re-added to the workerpool) at a later point.
1399 @type job: L{_QueuedJob}
1400 @param job: Job object
1401 @type dep_job_id: string
1402 @param dep_job_id: ID of dependency job
1403 @type dep_status: list
1404 @param dep_status: Required status
1407 assert ht.TString(job.id)
1408 assert ht.TString(dep_job_id)
1409 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1411 if job.id == dep_job_id:
1412 return (self.ERROR, "Job can't depend on itself")
1414 # Get status of dependency job
1416 status = self._getstatus_fn(dep_job_id)
1417 except errors.JobLost, err:
1418 return (self.ERROR, "Dependency error: %s" % err)
1420 assert status in constants.JOB_STATUS_ALL
1422 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1424 if status not in constants.JOBS_FINALIZED:
1425 # Register for notification and wait for job to finish
1426 job_id_waiters.add(job)
1428 "Need to wait for job %s, wanted status '%s'" %
1429 (dep_job_id, dep_status))
1431 # Remove from waiters list
1432 if job in job_id_waiters:
1433 job_id_waiters.remove(job)
1435 if (status == constants.JOB_STATUS_CANCELED and
1436 constants.JOB_STATUS_CANCELED not in dep_status):
1437 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1439 elif not dep_status or status in dep_status:
1440 return (self.CONTINUE,
1441 "Dependency job %s finished with status '%s'" %
1442 (dep_job_id, status))
1445 return (self.WRONGSTATUS,
1446 "Dependency job %s finished with status '%s',"
1447 " not one of '%s' as required" %
1448 (dep_job_id, status, utils.CommaJoin(dep_status)))
1450 def _RemoveEmptyWaitersUnlocked(self):
1451 """Remove all jobs without actual waiters.
1454 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1456 del self._waiters[job_id]
1458 def NotifyWaiters(self, job_id):
1459 """Notifies all jobs waiting for a certain job ID.
1461 @attention: Do not call until L{CheckAndRegister} returned a status other
1462 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1463 @type job_id: string
1464 @param job_id: Job ID
1467 assert ht.TString(job_id)
1469 self._lock.acquire()
1471 self._RemoveEmptyWaitersUnlocked()
1473 jobs = self._waiters.pop(job_id, None)
1475 self._lock.release()
1478 # Re-add jobs to workerpool
1479 logging.debug("Re-adding %s jobs which were waiting for job %s",
1481 self._enqueue_fn(jobs)
1484 def _RequireOpenQueue(fn):
1485 """Decorator for "public" functions.
1487 This function should be used for all 'public' functions. That is,
1488 functions usually called from other classes. Note that this should
1489 be applied only to methods (not plain functions), since it expects
1490 that the decorated function is called with a first argument that has
1491 a '_queue_filelock' argument.
1493 @warning: Use this decorator only after locking.ssynchronized
1496 @locking.ssynchronized(_LOCK)
1502 def wrapper(self, *args, **kwargs):
1503 # pylint: disable=W0212
1504 assert self._queue_filelock is not None, "Queue should be open"
1505 return fn(self, *args, **kwargs)
1509 def _RequireNonDrainedQueue(fn):
1510 """Decorator checking for a non-drained queue.
1512 To be used with functions submitting new jobs.
1515 def wrapper(self, *args, **kwargs):
1516 """Wrapper function.
1518 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1521 # Ok when sharing the big job queue lock, as the drain file is created when
1522 # the lock is exclusive.
1523 # Needs access to protected member, pylint: disable=W0212
1525 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1527 if not self._accepting_jobs:
1528 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1530 return fn(self, *args, **kwargs)
1534 class JobQueue(object):
1535 """Queue used to manage the jobs.
1538 def __init__(self, context):
1539 """Constructor for JobQueue.
1541 The constructor will initialize the job queue object and then
1542 start loading the current jobs from disk, either for starting them
1543 (if they were queue) or for aborting them (if they were already
1546 @type context: GanetiContext
1547 @param context: the context object for access to the configuration
1548 data and other ganeti objects
1551 self.context = context
1552 self._memcache = weakref.WeakValueDictionary()
1553 self._my_hostname = netutils.Hostname.GetSysName()
1555 # The Big JobQueue lock. If a code block or method acquires it in shared
1556 # mode safe it must guarantee concurrency with all the code acquiring it in
1557 # shared mode, including itself. In order not to acquire it at all
1558 # concurrency must be guaranteed with all code acquiring it in shared mode
1559 # and all code acquiring it exclusively.
1560 self._lock = locking.SharedLock("JobQueue")
1562 self.acquire = self._lock.acquire
1563 self.release = self._lock.release
1565 # Accept jobs by default
1566 self._accepting_jobs = True
1568 # Initialize the queue, and acquire the filelock.
1569 # This ensures no other process is working on the job queue.
1570 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1573 self._last_serial = jstore.ReadSerial()
1574 assert self._last_serial is not None, ("Serial file was modified between"
1575 " check in jstore and here")
1577 # Get initial list of nodes
1578 self._nodes = dict((n.name, n.primary_ip)
1579 for n in self.context.cfg.GetAllNodesInfo().values()
1580 if n.master_candidate)
1582 # Remove master node
1583 self._nodes.pop(self._my_hostname, None)
1585 # TODO: Check consistency across nodes
1587 self._queue_size = None
1588 self._UpdateQueueSizeUnlocked()
1589 assert ht.TInt(self._queue_size)
1590 self._drained = jstore.CheckDrainFlag()
1593 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1595 self.context.glm.AddToLockMonitor(self.depmgr)
1598 self._wpool = _JobQueueWorkerPool(self)
1600 self._InspectQueue()
1602 self._wpool.TerminateWorkers()
1605 @locking.ssynchronized(_LOCK)
1607 def _InspectQueue(self):
1608 """Loads the whole job queue and resumes unfinished jobs.
1610 This function needs the lock here because WorkerPool.AddTask() may start a
1611 job while we're still doing our work.
1614 logging.info("Inspecting job queue")
1618 all_job_ids = self._GetJobIDsUnlocked()
1619 jobs_count = len(all_job_ids)
1620 lastinfo = time.time()
1621 for idx, job_id in enumerate(all_job_ids):
1622 # Give an update every 1000 jobs or 10 seconds
1623 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1624 idx == (jobs_count - 1)):
1625 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1626 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1627 lastinfo = time.time()
1629 job = self._LoadJobUnlocked(job_id)
1631 # a failure in loading the job can cause 'None' to be returned
1635 status = job.CalcStatus()
1637 if status == constants.JOB_STATUS_QUEUED:
1638 restartjobs.append(job)
1640 elif status in (constants.JOB_STATUS_RUNNING,
1641 constants.JOB_STATUS_WAITING,
1642 constants.JOB_STATUS_CANCELING):
1643 logging.warning("Unfinished job %s found: %s", job.id, job)
1645 if status == constants.JOB_STATUS_WAITING:
1647 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1648 restartjobs.append(job)
1650 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1651 "Unclean master daemon shutdown")
1654 self.UpdateJobUnlocked(job)
1657 logging.info("Restarting %s jobs", len(restartjobs))
1658 self._EnqueueJobsUnlocked(restartjobs)
1660 logging.info("Job queue inspection finished")
1662 def _GetRpc(self, address_list):
1663 """Gets RPC runner with context.
1666 return rpc.JobQueueRunner(self.context, address_list)
1668 @locking.ssynchronized(_LOCK)
1670 def AddNode(self, node):
1671 """Register a new node with the queue.
1673 @type node: L{objects.Node}
1674 @param node: the node object to be added
1677 node_name = node.name
1678 assert node_name != self._my_hostname
1680 # Clean queue directory on added node
1681 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1682 msg = result.fail_msg
1684 logging.warning("Cannot cleanup queue directory on node %s: %s",
1687 if not node.master_candidate:
1688 # remove if existing, ignoring errors
1689 self._nodes.pop(node_name, None)
1690 # and skip the replication of the job ids
1693 # Upload the whole queue excluding archived jobs
1694 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1696 # Upload current serial file
1697 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1699 # Static address list
1700 addrs = [node.primary_ip]
1702 for file_name in files:
1704 content = utils.ReadFile(file_name)
1706 result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1708 msg = result[node_name].fail_msg
1710 logging.error("Failed to upload file %s to node %s: %s",
1711 file_name, node_name, msg)
1713 self._nodes[node_name] = node.primary_ip
1715 @locking.ssynchronized(_LOCK)
1717 def RemoveNode(self, node_name):
1718 """Callback called when removing nodes from the cluster.
1720 @type node_name: str
1721 @param node_name: the name of the node to remove
1724 self._nodes.pop(node_name, None)
1727 def _CheckRpcResult(result, nodes, failmsg):
1728 """Verifies the status of an RPC call.
1730 Since we aim to keep consistency should this node (the current
1731 master) fail, we will log errors if our rpc fail, and especially
1732 log the case when more than half of the nodes fails.
1734 @param result: the data as returned from the rpc call
1736 @param nodes: the list of nodes we made the call to
1738 @param failmsg: the identifier to be used for logging
1745 msg = result[node].fail_msg
1748 logging.error("RPC call %s (%s) failed on node %s: %s",
1749 result[node].call, failmsg, node, msg)
1751 success.append(node)
1753 # +1 for the master node
1754 if (len(success) + 1) < len(failed):
1755 # TODO: Handle failing nodes
1756 logging.error("More than half of the nodes failed")
1758 def _GetNodeIp(self):
1759 """Helper for returning the node name/ip list.
1761 @rtype: (list, list)
1762 @return: a tuple of two lists, the first one with the node
1763 names and the second one with the node addresses
1766 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1767 name_list = self._nodes.keys()
1768 addr_list = [self._nodes[name] for name in name_list]
1769 return name_list, addr_list
1771 def _UpdateJobQueueFile(self, file_name, data, replicate):
1772 """Writes a file locally and then replicates it to all nodes.
1774 This function will replace the contents of a file on the local
1775 node and then replicate it to all the other nodes we have.
1777 @type file_name: str
1778 @param file_name: the path of the file to be replicated
1780 @param data: the new contents of the file
1781 @type replicate: boolean
1782 @param replicate: whether to spread the changes to the remote nodes
1785 getents = runtime.GetEnts()
1786 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1787 gid=getents.masterd_gid)
1790 names, addrs = self._GetNodeIp()
1791 result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1792 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1794 def _RenameFilesUnlocked(self, rename):
1795 """Renames a file locally and then replicate the change.
1797 This function will rename a file in the local queue directory
1798 and then replicate this rename to all the other nodes we have.
1800 @type rename: list of (old, new)
1801 @param rename: List containing tuples mapping old to new names
1804 # Rename them locally
1805 for old, new in rename:
1806 utils.RenameFile(old, new, mkdir=True)
1808 # ... and on all nodes
1809 names, addrs = self._GetNodeIp()
1810 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1811 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1814 def _FormatJobID(job_id):
1815 """Convert a job ID to string format.
1817 Currently this just does C{str(job_id)} after performing some
1818 checks, but if we want to change the job id format this will
1819 abstract this change.
1821 @type job_id: int or long
1822 @param job_id: the numeric job id
1824 @return: the formatted job id
1827 if not isinstance(job_id, (int, long)):
1828 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1830 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1835 def _GetArchiveDirectory(cls, job_id):
1836 """Returns the archive directory for a job.
1839 @param job_id: Job identifier
1841 @return: Directory name
1844 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1846 def _NewSerialsUnlocked(self, count):
1847 """Generates a new job identifier.
1849 Job identifiers are unique during the lifetime of a cluster.
1851 @type count: integer
1852 @param count: how many serials to return
1854 @return: a string representing the job identifier.
1857 assert ht.TPositiveInt(count)
1860 serial = self._last_serial + count
1863 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1864 "%s\n" % serial, True)
1866 result = [self._FormatJobID(v)
1867 for v in range(self._last_serial + 1, serial + 1)]
1869 # Keep it only if we were able to write the file
1870 self._last_serial = serial
1872 assert len(result) == count
1877 def _GetJobPath(job_id):
1878 """Returns the job file for a given job id.
1881 @param job_id: the job identifier
1883 @return: the path to the job file
1886 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1889 def _GetArchivedJobPath(cls, job_id):
1890 """Returns the archived job file for a give job id.
1893 @param job_id: the job identifier
1895 @return: the path to the archived job file
1898 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1899 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1902 def _GetJobIDsUnlocked(sort=True):
1903 """Return all known job IDs.
1905 The method only looks at disk because it's a requirement that all
1906 jobs are present on disk (so in the _memcache we don't have any
1910 @param sort: perform sorting on the returned job ids
1912 @return: the list of job IDs
1916 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1917 m = constants.JOB_FILE_RE.match(filename)
1919 jlist.append(m.group(1))
1921 jlist = utils.NiceSort(jlist)
1924 def _LoadJobUnlocked(self, job_id):
1925 """Loads a job from the disk or memory.
1927 Given a job id, this will return the cached job object if
1928 existing, or try to load the job from the disk. If loading from
1929 disk, it will also add the job to the cache.
1931 @param job_id: the job id
1932 @rtype: L{_QueuedJob} or None
1933 @return: either None or the job object
1936 job = self._memcache.get(job_id, None)
1938 logging.debug("Found job %s in memcache", job_id)
1939 assert job.writable, "Found read-only job in memcache"
1943 job = self._LoadJobFromDisk(job_id, False)
1946 except errors.JobFileCorrupted:
1947 old_path = self._GetJobPath(job_id)
1948 new_path = self._GetArchivedJobPath(job_id)
1949 if old_path == new_path:
1950 # job already archived (future case)
1951 logging.exception("Can't parse job %s", job_id)
1954 logging.exception("Can't parse job %s, will archive.", job_id)
1955 self._RenameFilesUnlocked([(old_path, new_path)])
1958 assert job.writable, "Job just loaded is not writable"
1960 self._memcache[job_id] = job
1961 logging.debug("Added job %s to the cache", job_id)
1964 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1965 """Load the given job file from disk.
1967 Given a job file, read, load and restore it in a _QueuedJob format.
1969 @type job_id: string
1970 @param job_id: job identifier
1971 @type try_archived: bool
1972 @param try_archived: Whether to try loading an archived job
1973 @rtype: L{_QueuedJob} or None
1974 @return: either None or the job object
1977 path_functions = [(self._GetJobPath, True)]
1980 path_functions.append((self._GetArchivedJobPath, False))
1983 writable_default = None
1985 for (fn, writable_default) in path_functions:
1986 filepath = fn(job_id)
1987 logging.debug("Loading job from %s", filepath)
1989 raw_data = utils.ReadFile(filepath)
1990 except EnvironmentError, err:
1991 if err.errno != errno.ENOENT:
1999 if writable is None:
2000 writable = writable_default
2003 data = serializer.LoadJson(raw_data)
2004 job = _QueuedJob.Restore(self, data, writable)
2005 except Exception, err: # pylint: disable=W0703
2006 raise errors.JobFileCorrupted(err)
2010 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2011 """Load the given job file from disk.
2013 Given a job file, read, load and restore it in a _QueuedJob format.
2014 In case of error reading the job, it gets returned as None, and the
2015 exception is logged.
2017 @type job_id: string
2018 @param job_id: job identifier
2019 @type try_archived: bool
2020 @param try_archived: Whether to try loading an archived job
2021 @rtype: L{_QueuedJob} or None
2022 @return: either None or the job object
2026 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2027 except (errors.JobFileCorrupted, EnvironmentError):
2028 logging.exception("Can't load/parse job %s", job_id)
2031 def _UpdateQueueSizeUnlocked(self):
2032 """Update the queue size.
2035 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2037 @locking.ssynchronized(_LOCK)
2039 def SetDrainFlag(self, drain_flag):
2040 """Sets the drain flag for the queue.
2042 @type drain_flag: boolean
2043 @param drain_flag: Whether to set or unset the drain flag
2046 jstore.SetDrainFlag(drain_flag)
2048 self._drained = drain_flag
2053 def _SubmitJobUnlocked(self, job_id, ops):
2054 """Create and store a new job.
2056 This enters the job into our job queue and also puts it on the new
2057 queue, in order for it to be picked up by the queue processors.
2059 @type job_id: job ID
2060 @param job_id: the job ID for the new job
2062 @param ops: The list of OpCodes that will become the new job.
2063 @rtype: L{_QueuedJob}
2064 @return: the job object to be queued
2065 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2066 @raise errors.GenericError: If an opcode is not valid
2069 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2070 raise errors.JobQueueFull()
2072 job = _QueuedJob(self, job_id, ops, True)
2075 for idx, op in enumerate(job.ops):
2076 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2077 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2078 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2079 " are %s" % (idx, op.priority, allowed))
2081 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2082 if not opcodes.TNoRelativeJobDependencies(dependencies):
2083 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2085 (idx, opcodes.TNoRelativeJobDependencies,
2089 self.UpdateJobUnlocked(job)
2091 self._queue_size += 1
2093 logging.debug("Adding new job %s to the cache", job_id)
2094 self._memcache[job_id] = job
2098 @locking.ssynchronized(_LOCK)
2100 @_RequireNonDrainedQueue
2101 def SubmitJob(self, ops):
2102 """Create and store a new job.
2104 @see: L{_SubmitJobUnlocked}
2107 (job_id, ) = self._NewSerialsUnlocked(1)
2108 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2111 @locking.ssynchronized(_LOCK)
2113 @_RequireNonDrainedQueue
2114 def SubmitManyJobs(self, jobs):
2115 """Create and store multiple jobs.
2117 @see: L{_SubmitJobUnlocked}
2120 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2122 (results, added_jobs) = \
2123 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2125 self._EnqueueJobsUnlocked(added_jobs)
2130 def _FormatSubmitError(msg, ops):
2131 """Formats errors which occurred while submitting a job.
2134 return ("%s; opcodes %s" %
2135 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2138 def _ResolveJobDependencies(resolve_fn, deps):
2139 """Resolves relative job IDs in dependencies.
2141 @type resolve_fn: callable
2142 @param resolve_fn: Function to resolve a relative job ID
2144 @param deps: Dependencies
2146 @return: Resolved dependencies
2151 for (dep_job_id, dep_status) in deps:
2152 if ht.TRelativeJobId(dep_job_id):
2153 assert ht.TInt(dep_job_id) and dep_job_id < 0
2155 job_id = resolve_fn(dep_job_id)
2158 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2162 result.append((job_id, dep_status))
2164 return (True, result)
2166 def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2167 """Create and store multiple jobs.
2169 @see: L{_SubmitJobUnlocked}
2175 def resolve_fn(job_idx, reljobid):
2177 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2179 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2181 if getattr(op, opcodes.DEPEND_ATTR, None):
2183 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2186 # Abort resolving dependencies
2187 assert ht.TNonEmptyString(data), "No error message"
2189 # Use resolved dependencies
2193 job = self._SubmitJobUnlocked(job_id, ops)
2194 except errors.GenericError, err:
2196 data = self._FormatSubmitError(str(err), ops)
2200 added_jobs.append(job)
2202 results.append((status, data))
2204 return (results, added_jobs)
2206 @locking.ssynchronized(_LOCK)
2207 def _EnqueueJobs(self, jobs):
2208 """Helper function to add jobs to worker pool's queue.
2211 @param jobs: List of all jobs
2214 return self._EnqueueJobsUnlocked(jobs)
2216 def _EnqueueJobsUnlocked(self, jobs):
2217 """Helper function to add jobs to worker pool's queue.
2220 @param jobs: List of all jobs
2223 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2224 self._wpool.AddManyTasks([(job, ) for job in jobs],
2225 priority=[job.CalcPriority() for job in jobs])
2227 def _GetJobStatusForDependencies(self, job_id):
2228 """Gets the status of a job for dependencies.
2230 @type job_id: string
2231 @param job_id: Job ID
2232 @raise errors.JobLost: If job can't be found
2235 if not isinstance(job_id, basestring):
2236 job_id = self._FormatJobID(job_id)
2238 # Not using in-memory cache as doing so would require an exclusive lock
2240 # Try to load from disk
2241 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2243 assert not job.writable, "Got writable job" # pylint: disable=E1101
2246 return job.CalcStatus()
2248 raise errors.JobLost("Job %s not found" % job_id)
2251 def UpdateJobUnlocked(self, job, replicate=True):
2252 """Update a job's on disk storage.
2254 After a job has been modified, this function needs to be called in
2255 order to write the changes to disk and replicate them to the other
2258 @type job: L{_QueuedJob}
2259 @param job: the changed job
2260 @type replicate: boolean
2261 @param replicate: whether to replicate the change to remote nodes
2265 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2266 assert (finalized ^ (job.end_timestamp is None))
2267 assert job.writable, "Can't update read-only job"
2269 filename = self._GetJobPath(job.id)
2270 data = serializer.DumpJson(job.Serialize())
2271 logging.debug("Writing job %s to %s", job.id, filename)
2272 self._UpdateJobQueueFile(filename, data, replicate)
2274 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2276 """Waits for changes in a job.
2278 @type job_id: string
2279 @param job_id: Job identifier
2280 @type fields: list of strings
2281 @param fields: Which fields to check for changes
2282 @type prev_job_info: list or None
2283 @param prev_job_info: Last job information returned
2284 @type prev_log_serial: int
2285 @param prev_log_serial: Last job message serial number
2286 @type timeout: float
2287 @param timeout: maximum time to wait in seconds
2288 @rtype: tuple (job info, log entries)
2289 @return: a tuple of the job information as required via
2290 the fields parameter, and the log entries as a list
2292 if the job has not changed and the timeout has expired,
2293 we instead return a special value,
2294 L{constants.JOB_NOTCHANGED}, which should be interpreted
2295 as such by the clients
2298 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2301 helper = _WaitForJobChangesHelper()
2303 return helper(self._GetJobPath(job_id), load_fn,
2304 fields, prev_job_info, prev_log_serial, timeout)
2306 @locking.ssynchronized(_LOCK)
2308 def CancelJob(self, job_id):
2311 This will only succeed if the job has not started yet.
2313 @type job_id: string
2314 @param job_id: job ID of job to be cancelled.
2317 logging.info("Cancelling job %s", job_id)
2319 job = self._LoadJobUnlocked(job_id)
2321 logging.debug("Job %s not found", job_id)
2322 return (False, "Job %s not found" % job_id)
2324 assert job.writable, "Can't cancel read-only job"
2326 (success, msg) = job.Cancel()
2329 # If the job was finalized (e.g. cancelled), this is the final write
2330 # allowed. The job can be archived anytime.
2331 self.UpdateJobUnlocked(job)
2333 return (success, msg)
2336 def _ArchiveJobsUnlocked(self, jobs):
2339 @type jobs: list of L{_QueuedJob}
2340 @param jobs: Job objects
2342 @return: Number of archived jobs
2348 assert job.writable, "Can't archive read-only job"
2350 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2351 logging.debug("Job %s is not yet done", job.id)
2354 archive_jobs.append(job)
2356 old = self._GetJobPath(job.id)
2357 new = self._GetArchivedJobPath(job.id)
2358 rename_files.append((old, new))
2360 # TODO: What if 1..n files fail to rename?
2361 self._RenameFilesUnlocked(rename_files)
2363 logging.debug("Successfully archived job(s) %s",
2364 utils.CommaJoin(job.id for job in archive_jobs))
2366 # Since we haven't quite checked, above, if we succeeded or failed renaming
2367 # the files, we update the cached queue size from the filesystem. When we
2368 # get around to fix the TODO: above, we can use the number of actually
2369 # archived jobs to fix this.
2370 self._UpdateQueueSizeUnlocked()
2371 return len(archive_jobs)
2373 @locking.ssynchronized(_LOCK)
2375 def ArchiveJob(self, job_id):
2378 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2380 @type job_id: string
2381 @param job_id: Job ID of job to be archived.
2383 @return: Whether job was archived
2386 logging.info("Archiving job %s", job_id)
2388 job = self._LoadJobUnlocked(job_id)
2390 logging.debug("Job %s not found", job_id)
2393 return self._ArchiveJobsUnlocked([job]) == 1
2395 @locking.ssynchronized(_LOCK)
2397 def AutoArchiveJobs(self, age, timeout):
2398 """Archives all jobs based on age.
2400 The method will archive all jobs which are older than the age
2401 parameter. For jobs that don't have an end timestamp, the start
2402 timestamp will be considered. The special '-1' age will cause
2403 archival of all jobs (that are not running or queued).
2406 @param age: the minimum age in seconds
2409 logging.info("Archiving jobs with age more than %s seconds", age)
2412 end_time = now + timeout
2416 all_job_ids = self._GetJobIDsUnlocked()
2418 for idx, job_id in enumerate(all_job_ids):
2419 last_touched = idx + 1
2421 # Not optimal because jobs could be pending
2422 # TODO: Measure average duration for job archival and take number of
2423 # pending jobs into account.
2424 if time.time() > end_time:
2427 # Returns None if the job failed to load
2428 job = self._LoadJobUnlocked(job_id)
2430 if job.end_timestamp is None:
2431 if job.start_timestamp is None:
2432 job_age = job.received_timestamp
2434 job_age = job.start_timestamp
2436 job_age = job.end_timestamp
2438 if age == -1 or now - job_age[0] > age:
2441 # Archive 10 jobs at a time
2442 if len(pending) >= 10:
2443 archived_count += self._ArchiveJobsUnlocked(pending)
2447 archived_count += self._ArchiveJobsUnlocked(pending)
2449 return (archived_count, len(all_job_ids) - last_touched)
2451 def QueryJobs(self, job_ids, fields):
2452 """Returns a list of jobs in queue.
2455 @param job_ids: sequence of job identifiers or None for all
2457 @param fields: names of fields to return
2459 @return: list one element per job, each element being list with
2460 the requested fields
2466 # Since files are added to/removed from the queue atomically, there's no
2467 # risk of getting the job ids in an inconsistent state.
2468 job_ids = self._GetJobIDsUnlocked()
2471 for job_id in job_ids:
2472 job = self.SafeLoadJobFromDisk(job_id, True)
2474 jobs.append(job.GetInfo(fields))
2480 @locking.ssynchronized(_LOCK)
2481 def PrepareShutdown(self):
2482 """Prepare to stop the job queue.
2484 Disables execution of jobs in the workerpool and returns whether there are
2485 any jobs currently running. If the latter is the case, the job queue is not
2486 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2487 be called without interfering with any job. Queued and unfinished jobs will
2488 be resumed next time.
2490 Once this function has been called no new job submissions will be accepted
2491 (see L{_RequireNonDrainedQueue}).
2494 @return: Whether there are any running jobs
2497 if self._accepting_jobs:
2498 self._accepting_jobs = False
2500 # Tell worker pool to stop processing pending tasks
2501 self._wpool.SetActive(False)
2503 return self._wpool.HasRunningTasks()
2505 @locking.ssynchronized(_LOCK)
2508 """Stops the job queue.
2510 This shutdowns all the worker threads an closes the queue.
2513 self._wpool.TerminateWorkers()
2515 self._queue_filelock.Close()
2516 self._queue_filelock = None