4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable-msg=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
63 JOBS_PER_ARCHIVE_DIRECTORY = 10000
65 # member lock names to be passed to @ssynchronized decorator
70 class CancelJob(Exception):
71 """Special exception to cancel a job.
77 """Returns the current timestamp.
80 @return: the current time in the (seconds, microseconds) format
83 return utils.SplitTime(time.time())
86 class _QueuedOpCode(object):
87 """Encapsulates an opcode object.
89 @ivar log: holds the execution log and consists of tuples
90 of the form C{(log_serial, timestamp, level, message)}
91 @ivar input: the OpCode we encapsulate
92 @ivar status: the current status
93 @ivar result: the result of the LU execution
94 @ivar start_timestamp: timestamp for the start of the execution
95 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
96 @ivar stop_timestamp: timestamp for the end of the execution
99 __slots__ = ["input", "status", "result", "log", "priority",
100 "start_timestamp", "exec_timestamp", "end_timestamp",
103 def __init__(self, op):
104 """Constructor for the _QuededOpCode.
106 @type op: L{opcodes.OpCode}
107 @param op: the opcode we encapsulate
111 self.status = constants.OP_STATUS_QUEUED
114 self.start_timestamp = None
115 self.exec_timestamp = None
116 self.end_timestamp = None
118 # Get initial priority (it might change during the lifetime of this opcode)
119 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
122 def Restore(cls, state):
123 """Restore the _QueuedOpCode from the serialized form.
126 @param state: the serialized state
127 @rtype: _QueuedOpCode
128 @return: a new _QueuedOpCode instance
131 obj = _QueuedOpCode.__new__(cls)
132 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
133 obj.status = state["status"]
134 obj.result = state["result"]
135 obj.log = state["log"]
136 obj.start_timestamp = state.get("start_timestamp", None)
137 obj.exec_timestamp = state.get("exec_timestamp", None)
138 obj.end_timestamp = state.get("end_timestamp", None)
139 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
143 """Serializes this _QueuedOpCode.
146 @return: the dictionary holding the serialized state
150 "input": self.input.__getstate__(),
151 "status": self.status,
152 "result": self.result,
154 "start_timestamp": self.start_timestamp,
155 "exec_timestamp": self.exec_timestamp,
156 "end_timestamp": self.end_timestamp,
157 "priority": self.priority,
161 class _QueuedJob(object):
162 """In-memory job representation.
164 This is what we use to track the user-submitted jobs. Locking must
165 be taken care of by users of this class.
167 @type queue: L{JobQueue}
168 @ivar queue: the parent queue
171 @ivar ops: the list of _QueuedOpCode that constitute the job
172 @type log_serial: int
173 @ivar log_serial: holds the index for the next log entry
174 @ivar received_timestamp: the timestamp for when the job was received
175 @ivar start_timestmap: the timestamp for start of execution
176 @ivar end_timestamp: the timestamp for end of execution
177 @ivar writable: Whether the job is allowed to be modified
180 # pylint: disable-msg=W0212
181 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
182 "received_timestamp", "start_timestamp", "end_timestamp",
183 "__weakref__", "processor_lock", "writable"]
185 def __init__(self, queue, job_id, ops, writable):
186 """Constructor for the _QueuedJob.
188 @type queue: L{JobQueue}
189 @param queue: our parent queue
191 @param job_id: our job id
193 @param ops: the list of opcodes we hold, which will be encapsulated
196 @param writable: Whether job can be modified
200 raise errors.GenericError("A job needs at least one opcode")
204 self.ops = [_QueuedOpCode(op) for op in ops]
206 self.received_timestamp = TimeStampNow()
207 self.start_timestamp = None
208 self.end_timestamp = None
210 self._InitInMemory(self, writable)
213 def _InitInMemory(obj, writable):
214 """Initializes in-memory variables.
217 obj.writable = writable
220 obj.processor_lock = threading.Lock()
223 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
225 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
227 return "<%s at %#x>" % (" ".join(status), id(self))
230 def Restore(cls, queue, state, writable):
231 """Restore a _QueuedJob from serialized state:
233 @type queue: L{JobQueue}
234 @param queue: to which queue the restored job belongs
236 @param state: the serialized state
238 @param writable: Whether job can be modified
240 @return: the restored _JobQueue instance
243 obj = _QueuedJob.__new__(cls)
246 obj.received_timestamp = state.get("received_timestamp", None)
247 obj.start_timestamp = state.get("start_timestamp", None)
248 obj.end_timestamp = state.get("end_timestamp", None)
252 for op_state in state["ops"]:
253 op = _QueuedOpCode.Restore(op_state)
254 for log_entry in op.log:
255 obj.log_serial = max(obj.log_serial, log_entry[0])
258 cls._InitInMemory(obj, writable)
263 """Serialize the _JobQueue instance.
266 @return: the serialized state
271 "ops": [op.Serialize() for op in self.ops],
272 "start_timestamp": self.start_timestamp,
273 "end_timestamp": self.end_timestamp,
274 "received_timestamp": self.received_timestamp,
277 def CalcStatus(self):
278 """Compute the status of this job.
280 This function iterates over all the _QueuedOpCodes in the job and
281 based on their status, computes the job status.
284 - if we find a cancelled, or finished with error, the job
285 status will be the same
286 - otherwise, the last opcode with the status one of:
291 will determine the job status
293 - otherwise, it means either all opcodes are queued, or success,
294 and the job status will be the same
296 @return: the job status
299 status = constants.JOB_STATUS_QUEUED
303 if op.status == constants.OP_STATUS_SUCCESS:
308 if op.status == constants.OP_STATUS_QUEUED:
310 elif op.status == constants.OP_STATUS_WAITLOCK:
311 status = constants.JOB_STATUS_WAITLOCK
312 elif op.status == constants.OP_STATUS_RUNNING:
313 status = constants.JOB_STATUS_RUNNING
314 elif op.status == constants.OP_STATUS_CANCELING:
315 status = constants.JOB_STATUS_CANCELING
317 elif op.status == constants.OP_STATUS_ERROR:
318 status = constants.JOB_STATUS_ERROR
319 # The whole job fails if one opcode failed
321 elif op.status == constants.OP_STATUS_CANCELED:
322 status = constants.OP_STATUS_CANCELED
326 status = constants.JOB_STATUS_SUCCESS
330 def CalcPriority(self):
331 """Gets the current priority for this job.
333 Only unfinished opcodes are considered. When all are done, the default
339 priorities = [op.priority for op in self.ops
340 if op.status not in constants.OPS_FINALIZED]
343 # All opcodes are done, assume default priority
344 return constants.OP_PRIO_DEFAULT
346 return min(priorities)
348 def GetLogEntries(self, newer_than):
349 """Selectively returns the log entries.
351 @type newer_than: None or int
352 @param newer_than: if this is None, return all log entries,
353 otherwise return only the log entries with serial higher
356 @return: the list of the log entries selected
359 if newer_than is None:
366 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
370 def GetInfo(self, fields):
371 """Returns information about a job.
374 @param fields: names of fields to return
376 @return: list with one element for each field
377 @raise errors.OpExecError: when an invalid field
385 elif fname == "status":
386 row.append(self.CalcStatus())
387 elif fname == "priority":
388 row.append(self.CalcPriority())
390 row.append([op.input.__getstate__() for op in self.ops])
391 elif fname == "opresult":
392 row.append([op.result for op in self.ops])
393 elif fname == "opstatus":
394 row.append([op.status for op in self.ops])
395 elif fname == "oplog":
396 row.append([op.log for op in self.ops])
397 elif fname == "opstart":
398 row.append([op.start_timestamp for op in self.ops])
399 elif fname == "opexec":
400 row.append([op.exec_timestamp for op in self.ops])
401 elif fname == "opend":
402 row.append([op.end_timestamp for op in self.ops])
403 elif fname == "oppriority":
404 row.append([op.priority for op in self.ops])
405 elif fname == "received_ts":
406 row.append(self.received_timestamp)
407 elif fname == "start_ts":
408 row.append(self.start_timestamp)
409 elif fname == "end_ts":
410 row.append(self.end_timestamp)
411 elif fname == "summary":
412 row.append([op.input.Summary() for op in self.ops])
414 raise errors.OpExecError("Invalid self query field '%s'" % fname)
417 def MarkUnfinishedOps(self, status, result):
418 """Mark unfinished opcodes with a given status and result.
420 This is an utility function for marking all running or waiting to
421 be run opcodes with a given status. Opcodes which are already
422 finalised are not changed.
424 @param status: a given opcode status
425 @param result: the opcode result
430 if op.status in constants.OPS_FINALIZED:
431 assert not_marked, "Finalized opcodes found after non-finalized ones"
438 """Marks the job as finalized.
441 self.end_timestamp = TimeStampNow()
444 """Marks job as canceled/-ing if possible.
446 @rtype: tuple; (bool, string)
447 @return: Boolean describing whether job was successfully canceled or marked
448 as canceling and a text message
451 status = self.CalcStatus()
453 if status == constants.JOB_STATUS_QUEUED:
454 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
455 "Job canceled by request")
457 return (True, "Job %s canceled" % self.id)
459 elif status == constants.JOB_STATUS_WAITLOCK:
460 # The worker will notice the new status and cancel the job
461 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
462 return (True, "Job %s will be canceled" % self.id)
465 logging.debug("Job %s is no longer waiting in the queue", self.id)
466 return (False, "Job %s is no longer waiting in the queue" % self.id)
469 class _OpExecCallbacks(mcpu.OpExecCbBase):
470 def __init__(self, queue, job, op):
471 """Initializes this class.
473 @type queue: L{JobQueue}
474 @param queue: Job queue
475 @type job: L{_QueuedJob}
476 @param job: Job object
477 @type op: L{_QueuedOpCode}
481 assert queue, "Queue is missing"
482 assert job, "Job is missing"
483 assert op, "Opcode is missing"
489 def _CheckCancel(self):
490 """Raises an exception to cancel the job if asked to.
493 # Cancel here if we were asked to
494 if self._op.status == constants.OP_STATUS_CANCELING:
495 logging.debug("Canceling opcode")
498 @locking.ssynchronized(_QUEUE, shared=1)
499 def NotifyStart(self):
500 """Mark the opcode as running, not lock-waiting.
502 This is called from the mcpu code as a notifier function, when the LU is
503 finally about to start the Exec() method. Of course, to have end-user
504 visible results, the opcode must be initially (before calling into
505 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
508 assert self._op in self._job.ops
509 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
510 constants.OP_STATUS_CANCELING)
512 # Cancel here if we were asked to
515 logging.debug("Opcode is now running")
517 self._op.status = constants.OP_STATUS_RUNNING
518 self._op.exec_timestamp = TimeStampNow()
520 # And finally replicate the job status
521 self._queue.UpdateJobUnlocked(self._job)
523 @locking.ssynchronized(_QUEUE, shared=1)
524 def _AppendFeedback(self, timestamp, log_type, log_msg):
525 """Internal feedback append function, with locks
528 self._job.log_serial += 1
529 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
530 self._queue.UpdateJobUnlocked(self._job, replicate=False)
532 def Feedback(self, *args):
533 """Append a log entry.
539 log_type = constants.ELOG_MESSAGE
542 (log_type, log_msg) = args
544 # The time is split to make serialization easier and not lose
546 timestamp = utils.SplitTime(time.time())
547 self._AppendFeedback(timestamp, log_type, log_msg)
549 def CheckCancel(self):
550 """Check whether job has been cancelled.
553 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
554 constants.OP_STATUS_CANCELING)
556 # Cancel here if we were asked to
559 def SubmitManyJobs(self, jobs):
560 """Submits jobs for processing.
562 See L{JobQueue.SubmitManyJobs}.
565 # Locking is done in job queue
566 return self._queue.SubmitManyJobs(jobs)
569 class _JobChangesChecker(object):
570 def __init__(self, fields, prev_job_info, prev_log_serial):
571 """Initializes this class.
573 @type fields: list of strings
574 @param fields: Fields requested by LUXI client
575 @type prev_job_info: string
576 @param prev_job_info: previous job info, as passed by the LUXI client
577 @type prev_log_serial: string
578 @param prev_log_serial: previous job serial, as passed by the LUXI client
581 self._fields = fields
582 self._prev_job_info = prev_job_info
583 self._prev_log_serial = prev_log_serial
585 def __call__(self, job):
586 """Checks whether job has changed.
588 @type job: L{_QueuedJob}
589 @param job: Job object
592 assert not job.writable, "Expected read-only job"
594 status = job.CalcStatus()
595 job_info = job.GetInfo(self._fields)
596 log_entries = job.GetLogEntries(self._prev_log_serial)
598 # Serializing and deserializing data can cause type changes (e.g. from
599 # tuple to list) or precision loss. We're doing it here so that we get
600 # the same modifications as the data received from the client. Without
601 # this, the comparison afterwards might fail without the data being
602 # significantly different.
603 # TODO: we just deserialized from disk, investigate how to make sure that
604 # the job info and log entries are compatible to avoid this further step.
605 # TODO: Doing something like in testutils.py:UnifyValueType might be more
606 # efficient, though floats will be tricky
607 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
608 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
610 # Don't even try to wait if the job is no longer running, there will be
612 if (status not in (constants.JOB_STATUS_QUEUED,
613 constants.JOB_STATUS_RUNNING,
614 constants.JOB_STATUS_WAITLOCK) or
615 job_info != self._prev_job_info or
616 (log_entries and self._prev_log_serial != log_entries[0][0])):
617 logging.debug("Job %s changed", job.id)
618 return (job_info, log_entries)
623 class _JobFileChangesWaiter(object):
624 def __init__(self, filename):
625 """Initializes this class.
627 @type filename: string
628 @param filename: Path to job file
629 @raises errors.InotifyError: if the notifier cannot be setup
632 self._wm = pyinotify.WatchManager()
633 self._inotify_handler = \
634 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
636 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
638 self._inotify_handler.enable()
640 # pyinotify doesn't close file descriptors automatically
641 self._notifier.stop()
644 def _OnInotify(self, notifier_enabled):
645 """Callback for inotify.
648 if not notifier_enabled:
649 self._inotify_handler.enable()
651 def Wait(self, timeout):
652 """Waits for the job file to change.
655 @param timeout: Timeout in seconds
656 @return: Whether there have been events
660 have_events = self._notifier.check_events(timeout * 1000)
662 self._notifier.read_events()
663 self._notifier.process_events()
667 """Closes underlying notifier and its file descriptor.
670 self._notifier.stop()
673 class _JobChangesWaiter(object):
674 def __init__(self, filename):
675 """Initializes this class.
677 @type filename: string
678 @param filename: Path to job file
681 self._filewaiter = None
682 self._filename = filename
684 def Wait(self, timeout):
685 """Waits for a job to change.
688 @param timeout: Timeout in seconds
689 @return: Whether there have been events
693 return self._filewaiter.Wait(timeout)
695 # Lazy setup: Avoid inotify setup cost when job file has already changed.
696 # If this point is reached, return immediately and let caller check the job
697 # file again in case there were changes since the last check. This avoids a
699 self._filewaiter = _JobFileChangesWaiter(self._filename)
704 """Closes underlying waiter.
708 self._filewaiter.Close()
711 class _WaitForJobChangesHelper(object):
712 """Helper class using inotify to wait for changes in a job file.
714 This class takes a previous job status and serial, and alerts the client when
715 the current job status has changed.
719 def _CheckForChanges(job_load_fn, check_fn):
722 raise errors.JobLost()
724 result = check_fn(job)
726 raise utils.RetryAgain()
730 def __call__(self, filename, job_load_fn,
731 fields, prev_job_info, prev_log_serial, timeout):
732 """Waits for changes on a job.
734 @type filename: string
735 @param filename: File on which to wait for changes
736 @type job_load_fn: callable
737 @param job_load_fn: Function to load job
738 @type fields: list of strings
739 @param fields: Which fields to check for changes
740 @type prev_job_info: list or None
741 @param prev_job_info: Last job information returned
742 @type prev_log_serial: int
743 @param prev_log_serial: Last job message serial number
745 @param timeout: maximum time to wait in seconds
749 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
750 waiter = _JobChangesWaiter(filename)
752 return utils.Retry(compat.partial(self._CheckForChanges,
753 job_load_fn, check_fn),
754 utils.RETRY_REMAINING_TIME, timeout,
758 except (errors.InotifyError, errors.JobLost):
760 except utils.RetryTimeout:
761 return constants.JOB_NOTCHANGED
764 def _EncodeOpError(err):
765 """Encodes an error which occurred while processing an opcode.
768 if isinstance(err, errors.GenericError):
771 to_encode = errors.OpExecError(str(err))
773 return errors.EncodeException(to_encode)
776 class _TimeoutStrategyWrapper:
777 def __init__(self, fn):
778 """Initializes this class.
785 """Gets the next timeout if necessary.
788 if self._next is None:
789 self._next = self._fn()
792 """Returns the next timeout.
799 """Returns the current timeout and advances the internal state.
808 class _OpExecContext:
809 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
810 """Initializes this class.
815 self.log_prefix = log_prefix
816 self.summary = op.input.Summary()
818 # Create local copy to modify
819 if getattr(op.input, opcodes.DEPEND_ATTR, None):
820 self.jobdeps = op.input.depends[:]
824 self._timeout_strategy_factory = timeout_strategy_factory
825 self._ResetTimeoutStrategy()
827 def _ResetTimeoutStrategy(self):
828 """Creates a new timeout strategy.
831 self._timeout_strategy = \
832 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
834 def CheckPriorityIncrease(self):
835 """Checks whether priority can and should be increased.
837 Called when locks couldn't be acquired.
842 # Exhausted all retries and next round should not use blocking acquire
844 if (self._timeout_strategy.Peek() is None and
845 op.priority > constants.OP_PRIO_HIGHEST):
846 logging.debug("Increasing priority")
848 self._ResetTimeoutStrategy()
853 def GetNextLockTimeout(self):
854 """Returns the next lock acquire timeout.
857 return self._timeout_strategy.Next()
860 class _JobProcessor(object):
861 def __init__(self, queue, opexec_fn, job,
862 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
863 """Initializes this class.
867 self.opexec_fn = opexec_fn
869 self._timeout_strategy_factory = _timeout_strategy_factory
872 def _FindNextOpcode(job, timeout_strategy_factory):
873 """Locates the next opcode to run.
875 @type job: L{_QueuedJob}
876 @param job: Job object
877 @param timeout_strategy_factory: Callable to create new timeout strategy
880 # Create some sort of a cache to speed up locating next opcode for future
882 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
883 # pending and one for processed ops.
884 if job.ops_iter is None:
885 job.ops_iter = enumerate(job.ops)
887 # Find next opcode to run
890 (idx, op) = job.ops_iter.next()
891 except StopIteration:
892 raise errors.ProgrammerError("Called for a finished job")
894 if op.status == constants.OP_STATUS_RUNNING:
895 # Found an opcode already marked as running
896 raise errors.ProgrammerError("Called for job marked as running")
898 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
899 timeout_strategy_factory)
901 if op.status not in constants.OPS_FINALIZED:
904 # This is a job that was partially completed before master daemon
905 # shutdown, so it can be expected that some opcodes are already
906 # completed successfully (if any did error out, then the whole job
907 # should have been aborted and not resubmitted for processing).
908 logging.info("%s: opcode %s already processed, skipping",
909 opctx.log_prefix, opctx.summary)
912 def _MarkWaitlock(job, op):
913 """Marks an opcode as waiting for locks.
915 The job's start timestamp is also set if necessary.
917 @type job: L{_QueuedJob}
918 @param job: Job object
919 @type op: L{_QueuedOpCode}
920 @param op: Opcode object
924 assert op.status in (constants.OP_STATUS_QUEUED,
925 constants.OP_STATUS_WAITLOCK)
931 if op.status == constants.OP_STATUS_QUEUED:
932 op.status = constants.OP_STATUS_WAITLOCK
935 if op.start_timestamp is None:
936 op.start_timestamp = TimeStampNow()
939 if job.start_timestamp is None:
940 job.start_timestamp = op.start_timestamp
943 assert op.status == constants.OP_STATUS_WAITLOCK
948 def _CheckDependencies(queue, job, opctx):
949 """Checks if an opcode has dependencies and if so, processes them.
951 @type queue: L{JobQueue}
952 @param queue: Queue object
953 @type job: L{_QueuedJob}
954 @param job: Job object
955 @type opctx: L{_OpExecContext}
956 @param opctx: Opcode execution context
958 @return: Whether opcode will be re-scheduled by dependency tracker
966 (dep_job_id, dep_status) = opctx.jobdeps[0]
968 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
970 assert ht.TNonEmptyString(depmsg), "No dependency message"
972 logging.info("%s: %s", opctx.log_prefix, depmsg)
974 if depresult == _JobDependencyManager.CONTINUE:
975 # Remove dependency and continue
978 elif depresult == _JobDependencyManager.WAIT:
979 # Need to wait for notification, dependency tracker will re-add job
984 elif depresult == _JobDependencyManager.CANCEL:
985 # Job was cancelled, cancel this job as well
987 assert op.status == constants.OP_STATUS_CANCELING
990 elif depresult in (_JobDependencyManager.WRONGSTATUS,
991 _JobDependencyManager.ERROR):
992 # Job failed or there was an error, this job must fail
993 op.status = constants.OP_STATUS_ERROR
994 op.result = _EncodeOpError(errors.OpExecError(depmsg))
998 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1003 def _ExecOpCodeUnlocked(self, opctx):
1004 """Processes one opcode and returns the result.
1009 assert op.status == constants.OP_STATUS_WAITLOCK
1011 timeout = opctx.GetNextLockTimeout()
1014 # Make sure not to hold queue lock while calling ExecOpCode
1015 result = self.opexec_fn(op.input,
1016 _OpExecCallbacks(self.queue, self.job, op),
1017 timeout=timeout, priority=op.priority)
1018 except mcpu.LockAcquireTimeout:
1019 assert timeout is not None, "Received timeout for blocking acquire"
1020 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1022 assert op.status in (constants.OP_STATUS_WAITLOCK,
1023 constants.OP_STATUS_CANCELING)
1025 # Was job cancelled while we were waiting for the lock?
1026 if op.status == constants.OP_STATUS_CANCELING:
1027 return (constants.OP_STATUS_CANCELING, None)
1029 # Stay in waitlock while trying to re-acquire lock
1030 return (constants.OP_STATUS_WAITLOCK, None)
1032 logging.exception("%s: Canceling job", opctx.log_prefix)
1033 assert op.status == constants.OP_STATUS_CANCELING
1034 return (constants.OP_STATUS_CANCELING, None)
1035 except Exception, err: # pylint: disable-msg=W0703
1036 logging.exception("%s: Caught exception in %s",
1037 opctx.log_prefix, opctx.summary)
1038 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1040 logging.debug("%s: %s successful",
1041 opctx.log_prefix, opctx.summary)
1042 return (constants.OP_STATUS_SUCCESS, result)
1044 def __call__(self, _nextop_fn=None):
1045 """Continues execution of a job.
1047 @param _nextop_fn: Callback function for tests
1049 @return: True if job is finished, False if processor needs to be called
1056 logging.debug("Processing job %s", job.id)
1058 queue.acquire(shared=1)
1060 opcount = len(job.ops)
1062 assert job.writable, "Expected writable job"
1064 # Don't do anything for finalized jobs
1065 if job.CalcStatus() in constants.JOBS_FINALIZED:
1068 # Is a previous opcode still pending?
1070 opctx = job.cur_opctx
1071 job.cur_opctx = None
1073 if __debug__ and _nextop_fn:
1075 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1080 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1081 constants.OP_STATUS_CANCELING)
1082 for i in job.ops[opctx.index + 1:])
1084 assert op.status in (constants.OP_STATUS_QUEUED,
1085 constants.OP_STATUS_WAITLOCK,
1086 constants.OP_STATUS_CANCELING)
1088 assert (op.priority <= constants.OP_PRIO_LOWEST and
1089 op.priority >= constants.OP_PRIO_HIGHEST)
1093 if op.status != constants.OP_STATUS_CANCELING:
1094 assert op.status in (constants.OP_STATUS_QUEUED,
1095 constants.OP_STATUS_WAITLOCK)
1097 # Prepare to start opcode
1098 if self._MarkWaitlock(job, op):
1100 queue.UpdateJobUnlocked(job)
1102 assert op.status == constants.OP_STATUS_WAITLOCK
1103 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1104 assert job.start_timestamp and op.start_timestamp
1105 assert waitjob is None
1107 # Check if waiting for a job is necessary
1108 waitjob = self._CheckDependencies(queue, job, opctx)
1110 assert op.status in (constants.OP_STATUS_WAITLOCK,
1111 constants.OP_STATUS_CANCELING,
1112 constants.OP_STATUS_ERROR)
1114 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1115 constants.OP_STATUS_ERROR)):
1116 logging.info("%s: opcode %s waiting for locks",
1117 opctx.log_prefix, opctx.summary)
1119 assert not opctx.jobdeps, "Not all dependencies were removed"
1123 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1125 queue.acquire(shared=1)
1127 op.status = op_status
1128 op.result = op_result
1132 if op.status == constants.OP_STATUS_WAITLOCK:
1133 # Couldn't get locks in time
1134 assert not op.end_timestamp
1137 op.end_timestamp = TimeStampNow()
1139 if op.status == constants.OP_STATUS_CANCELING:
1140 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1141 for i in job.ops[opctx.index:])
1143 assert op.status in constants.OPS_FINALIZED
1145 if op.status == constants.OP_STATUS_WAITLOCK or waitjob:
1148 if not waitjob and opctx.CheckPriorityIncrease():
1149 # Priority was changed, need to update on-disk file
1150 queue.UpdateJobUnlocked(job)
1152 # Keep around for another round
1153 job.cur_opctx = opctx
1155 assert (op.priority <= constants.OP_PRIO_LOWEST and
1156 op.priority >= constants.OP_PRIO_HIGHEST)
1158 # In no case must the status be finalized here
1159 assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1162 # Ensure all opcodes so far have been successful
1163 assert (opctx.index == 0 or
1164 compat.all(i.status == constants.OP_STATUS_SUCCESS
1165 for i in job.ops[:opctx.index]))
1168 job.cur_opctx = None
1170 if op.status == constants.OP_STATUS_SUCCESS:
1173 elif op.status == constants.OP_STATUS_ERROR:
1174 # Ensure failed opcode has an exception as its result
1175 assert errors.GetEncodedError(job.ops[opctx.index].result)
1177 to_encode = errors.OpExecError("Preceding opcode failed")
1178 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1179 _EncodeOpError(to_encode))
1183 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1184 errors.GetEncodedError(i.result)
1185 for i in job.ops[opctx.index:])
1187 elif op.status == constants.OP_STATUS_CANCELING:
1188 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1189 "Job canceled by request")
1193 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1195 if opctx.index == (opcount - 1):
1196 # Finalize on last opcode
1200 # All opcodes have been run, finalize job
1203 # Write to disk. If the job status is final, this is the final write
1204 # allowed. Once the file has been written, it can be archived anytime.
1205 queue.UpdateJobUnlocked(job)
1210 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1211 # TODO: Check locking
1212 queue.depmgr.NotifyWaiters(job.id)
1215 assert not waitjob or queue.depmgr.JobWaiting(job)
1217 return bool(waitjob)
1219 assert job.writable, "Job became read-only while being processed"
1223 class _JobQueueWorker(workerpool.BaseWorker):
1224 """The actual job workers.
1227 def RunTask(self, job): # pylint: disable-msg=W0221
1230 @type job: L{_QueuedJob}
1231 @param job: the job to be processed
1234 # Ensure only one worker is active on a single job. If a job registers for
1235 # a dependency job, and the other job notifies before the first worker is
1236 # done, the job can end up in the tasklist more than once.
1237 job.processor_lock.acquire()
1239 return self._RunTaskInner(job)
1241 job.processor_lock.release()
1243 def _RunTaskInner(self, job):
1246 Must be called with per-job lock acquired.
1250 assert queue == self.pool.queue
1252 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1255 proc = mcpu.Processor(queue.context, job.id)
1257 # Create wrapper for setting thread name
1258 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1261 if not _JobProcessor(queue, wrap_execop_fn, job)():
1263 raise workerpool.DeferTask(priority=job.CalcPriority())
1266 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1267 """Updates the worker thread name to include a short summary of the opcode.
1269 @param setname_fn: Callable setting worker thread name
1270 @param execop_fn: Callable for executing opcode (usually
1271 L{mcpu.Processor.ExecOpCode})
1276 return execop_fn(op, *args, **kwargs)
1281 def _GetWorkerName(job, op):
1282 """Sets the worker thread name.
1284 @type job: L{_QueuedJob}
1285 @type op: L{opcodes.OpCode}
1288 parts = ["Job%s" % job.id]
1291 parts.append(op.TinySummary())
1293 return "/".join(parts)
1296 class _JobQueueWorkerPool(workerpool.WorkerPool):
1297 """Simple class implementing a job-processing workerpool.
1300 def __init__(self, queue):
1301 super(_JobQueueWorkerPool, self).__init__("Jq",
1307 class _JobDependencyManager:
1308 """Keeps track of job dependencies.
1315 WRONGSTATUS) = range(1, 6)
1317 # TODO: Export waiter information to lock monitor
1319 def __init__(self, getstatus_fn, enqueue_fn):
1320 """Initializes this class.
1323 self._getstatus_fn = getstatus_fn
1324 self._enqueue_fn = enqueue_fn
1327 self._lock = locking.SharedLock("JobDepMgr")
1329 @locking.ssynchronized(_LOCK, shared=1)
1330 def JobWaiting(self, job):
1331 """Checks if a job is waiting.
1334 return compat.any(job in jobs
1335 for jobs in self._waiters.values())
1337 @locking.ssynchronized(_LOCK)
1338 def CheckAndRegister(self, job, dep_job_id, dep_status):
1339 """Checks if a dependency job has the requested status.
1341 If the other job is not yet in a finalized status, the calling job will be
1342 notified (re-added to the workerpool) at a later point.
1344 @type job: L{_QueuedJob}
1345 @param job: Job object
1346 @type dep_job_id: string
1347 @param dep_job_id: ID of dependency job
1348 @type dep_status: list
1349 @param dep_status: Required status
1352 assert ht.TString(job.id)
1353 assert ht.TString(dep_job_id)
1354 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1356 if job.id == dep_job_id:
1357 return (self.ERROR, "Job can't depend on itself")
1359 # Get status of dependency job
1361 status = self._getstatus_fn(dep_job_id)
1362 except errors.JobLost, err:
1363 return (self.ERROR, "Dependency error: %s" % err)
1365 assert status in constants.JOB_STATUS_ALL
1367 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1369 if status not in constants.JOBS_FINALIZED:
1370 # Register for notification and wait for job to finish
1371 job_id_waiters.add(job)
1373 "Need to wait for job %s, wanted status '%s'" %
1374 (dep_job_id, dep_status))
1376 # Remove from waiters list
1377 if job in job_id_waiters:
1378 job_id_waiters.remove(job)
1380 if (status == constants.JOB_STATUS_CANCELED and
1381 constants.JOB_STATUS_CANCELED not in dep_status):
1382 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1384 elif not dep_status or status in dep_status:
1385 return (self.CONTINUE,
1386 "Dependency job %s finished with status '%s'" %
1387 (dep_job_id, status))
1390 return (self.WRONGSTATUS,
1391 "Dependency job %s finished with status '%s',"
1392 " not one of '%s' as required" %
1393 (dep_job_id, status, utils.CommaJoin(dep_status)))
1395 @locking.ssynchronized(_LOCK)
1396 def NotifyWaiters(self, job_id):
1397 """Notifies all jobs waiting for a certain job ID.
1399 @type job_id: string
1400 @param job_id: Job ID
1403 assert ht.TString(job_id)
1405 jobs = self._waiters.pop(job_id, None)
1407 # Re-add jobs to workerpool
1408 logging.debug("Re-adding %s jobs which were waiting for job %s",
1410 self._enqueue_fn(jobs)
1412 # Remove all jobs without actual waiters
1413 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1415 del self._waiters[job_id]
1418 def _RequireOpenQueue(fn):
1419 """Decorator for "public" functions.
1421 This function should be used for all 'public' functions. That is,
1422 functions usually called from other classes. Note that this should
1423 be applied only to methods (not plain functions), since it expects
1424 that the decorated function is called with a first argument that has
1425 a '_queue_filelock' argument.
1427 @warning: Use this decorator only after locking.ssynchronized
1430 @locking.ssynchronized(_LOCK)
1436 def wrapper(self, *args, **kwargs):
1437 # pylint: disable-msg=W0212
1438 assert self._queue_filelock is not None, "Queue should be open"
1439 return fn(self, *args, **kwargs)
1443 class JobQueue(object):
1444 """Queue used to manage the jobs.
1446 @cvar _RE_JOB_FILE: regex matching the valid job file names
1449 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1451 def __init__(self, context):
1452 """Constructor for JobQueue.
1454 The constructor will initialize the job queue object and then
1455 start loading the current jobs from disk, either for starting them
1456 (if they were queue) or for aborting them (if they were already
1459 @type context: GanetiContext
1460 @param context: the context object for access to the configuration
1461 data and other ganeti objects
1464 self.context = context
1465 self._memcache = weakref.WeakValueDictionary()
1466 self._my_hostname = netutils.Hostname.GetSysName()
1468 # The Big JobQueue lock. If a code block or method acquires it in shared
1469 # mode safe it must guarantee concurrency with all the code acquiring it in
1470 # shared mode, including itself. In order not to acquire it at all
1471 # concurrency must be guaranteed with all code acquiring it in shared mode
1472 # and all code acquiring it exclusively.
1473 self._lock = locking.SharedLock("JobQueue")
1475 self.acquire = self._lock.acquire
1476 self.release = self._lock.release
1478 # Initialize the queue, and acquire the filelock.
1479 # This ensures no other process is working on the job queue.
1480 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1483 self._last_serial = jstore.ReadSerial()
1484 assert self._last_serial is not None, ("Serial file was modified between"
1485 " check in jstore and here")
1487 # Get initial list of nodes
1488 self._nodes = dict((n.name, n.primary_ip)
1489 for n in self.context.cfg.GetAllNodesInfo().values()
1490 if n.master_candidate)
1492 # Remove master node
1493 self._nodes.pop(self._my_hostname, None)
1495 # TODO: Check consistency across nodes
1497 self._queue_size = 0
1498 self._UpdateQueueSizeUnlocked()
1499 self._drained = jstore.CheckDrainFlag()
1502 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1506 self._wpool = _JobQueueWorkerPool(self)
1508 self._InspectQueue()
1510 self._wpool.TerminateWorkers()
1513 @locking.ssynchronized(_LOCK)
1515 def _InspectQueue(self):
1516 """Loads the whole job queue and resumes unfinished jobs.
1518 This function needs the lock here because WorkerPool.AddTask() may start a
1519 job while we're still doing our work.
1522 logging.info("Inspecting job queue")
1526 all_job_ids = self._GetJobIDsUnlocked()
1527 jobs_count = len(all_job_ids)
1528 lastinfo = time.time()
1529 for idx, job_id in enumerate(all_job_ids):
1530 # Give an update every 1000 jobs or 10 seconds
1531 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1532 idx == (jobs_count - 1)):
1533 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1534 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1535 lastinfo = time.time()
1537 job = self._LoadJobUnlocked(job_id)
1539 # a failure in loading the job can cause 'None' to be returned
1543 status = job.CalcStatus()
1545 if status == constants.JOB_STATUS_QUEUED:
1546 restartjobs.append(job)
1548 elif status in (constants.JOB_STATUS_RUNNING,
1549 constants.JOB_STATUS_WAITLOCK,
1550 constants.JOB_STATUS_CANCELING):
1551 logging.warning("Unfinished job %s found: %s", job.id, job)
1553 if status == constants.JOB_STATUS_WAITLOCK:
1555 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1556 restartjobs.append(job)
1558 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1559 "Unclean master daemon shutdown")
1562 self.UpdateJobUnlocked(job)
1565 logging.info("Restarting %s jobs", len(restartjobs))
1566 self._EnqueueJobs(restartjobs)
1568 logging.info("Job queue inspection finished")
1570 @locking.ssynchronized(_LOCK)
1572 def AddNode(self, node):
1573 """Register a new node with the queue.
1575 @type node: L{objects.Node}
1576 @param node: the node object to be added
1579 node_name = node.name
1580 assert node_name != self._my_hostname
1582 # Clean queue directory on added node
1583 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1584 msg = result.fail_msg
1586 logging.warning("Cannot cleanup queue directory on node %s: %s",
1589 if not node.master_candidate:
1590 # remove if existing, ignoring errors
1591 self._nodes.pop(node_name, None)
1592 # and skip the replication of the job ids
1595 # Upload the whole queue excluding archived jobs
1596 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1598 # Upload current serial file
1599 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1601 for file_name in files:
1603 content = utils.ReadFile(file_name)
1605 result = rpc.RpcRunner.call_jobqueue_update([node_name],
1608 msg = result[node_name].fail_msg
1610 logging.error("Failed to upload file %s to node %s: %s",
1611 file_name, node_name, msg)
1613 self._nodes[node_name] = node.primary_ip
1615 @locking.ssynchronized(_LOCK)
1617 def RemoveNode(self, node_name):
1618 """Callback called when removing nodes from the cluster.
1620 @type node_name: str
1621 @param node_name: the name of the node to remove
1624 self._nodes.pop(node_name, None)
1627 def _CheckRpcResult(result, nodes, failmsg):
1628 """Verifies the status of an RPC call.
1630 Since we aim to keep consistency should this node (the current
1631 master) fail, we will log errors if our rpc fail, and especially
1632 log the case when more than half of the nodes fails.
1634 @param result: the data as returned from the rpc call
1636 @param nodes: the list of nodes we made the call to
1638 @param failmsg: the identifier to be used for logging
1645 msg = result[node].fail_msg
1648 logging.error("RPC call %s (%s) failed on node %s: %s",
1649 result[node].call, failmsg, node, msg)
1651 success.append(node)
1653 # +1 for the master node
1654 if (len(success) + 1) < len(failed):
1655 # TODO: Handle failing nodes
1656 logging.error("More than half of the nodes failed")
1658 def _GetNodeIp(self):
1659 """Helper for returning the node name/ip list.
1661 @rtype: (list, list)
1662 @return: a tuple of two lists, the first one with the node
1663 names and the second one with the node addresses
1666 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1667 name_list = self._nodes.keys()
1668 addr_list = [self._nodes[name] for name in name_list]
1669 return name_list, addr_list
1671 def _UpdateJobQueueFile(self, file_name, data, replicate):
1672 """Writes a file locally and then replicates it to all nodes.
1674 This function will replace the contents of a file on the local
1675 node and then replicate it to all the other nodes we have.
1677 @type file_name: str
1678 @param file_name: the path of the file to be replicated
1680 @param data: the new contents of the file
1681 @type replicate: boolean
1682 @param replicate: whether to spread the changes to the remote nodes
1685 getents = runtime.GetEnts()
1686 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1687 gid=getents.masterd_gid)
1690 names, addrs = self._GetNodeIp()
1691 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1692 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1694 def _RenameFilesUnlocked(self, rename):
1695 """Renames a file locally and then replicate the change.
1697 This function will rename a file in the local queue directory
1698 and then replicate this rename to all the other nodes we have.
1700 @type rename: list of (old, new)
1701 @param rename: List containing tuples mapping old to new names
1704 # Rename them locally
1705 for old, new in rename:
1706 utils.RenameFile(old, new, mkdir=True)
1708 # ... and on all nodes
1709 names, addrs = self._GetNodeIp()
1710 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1711 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1714 def _FormatJobID(job_id):
1715 """Convert a job ID to string format.
1717 Currently this just does C{str(job_id)} after performing some
1718 checks, but if we want to change the job id format this will
1719 abstract this change.
1721 @type job_id: int or long
1722 @param job_id: the numeric job id
1724 @return: the formatted job id
1727 if not isinstance(job_id, (int, long)):
1728 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1730 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1735 def _GetArchiveDirectory(cls, job_id):
1736 """Returns the archive directory for a job.
1739 @param job_id: Job identifier
1741 @return: Directory name
1744 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1746 def _NewSerialsUnlocked(self, count):
1747 """Generates a new job identifier.
1749 Job identifiers are unique during the lifetime of a cluster.
1751 @type count: integer
1752 @param count: how many serials to return
1754 @return: a string representing the job identifier.
1759 serial = self._last_serial + count
1762 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1763 "%s\n" % serial, True)
1765 result = [self._FormatJobID(v)
1766 for v in range(self._last_serial + 1, serial + 1)]
1768 # Keep it only if we were able to write the file
1769 self._last_serial = serial
1771 assert len(result) == count
1776 def _GetJobPath(job_id):
1777 """Returns the job file for a given job id.
1780 @param job_id: the job identifier
1782 @return: the path to the job file
1785 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1788 def _GetArchivedJobPath(cls, job_id):
1789 """Returns the archived job file for a give job id.
1792 @param job_id: the job identifier
1794 @return: the path to the archived job file
1797 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1798 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1800 def _GetJobIDsUnlocked(self, sort=True):
1801 """Return all known job IDs.
1803 The method only looks at disk because it's a requirement that all
1804 jobs are present on disk (so in the _memcache we don't have any
1808 @param sort: perform sorting on the returned job ids
1810 @return: the list of job IDs
1814 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1815 m = self._RE_JOB_FILE.match(filename)
1817 jlist.append(m.group(1))
1819 jlist = utils.NiceSort(jlist)
1822 def _LoadJobUnlocked(self, job_id):
1823 """Loads a job from the disk or memory.
1825 Given a job id, this will return the cached job object if
1826 existing, or try to load the job from the disk. If loading from
1827 disk, it will also add the job to the cache.
1829 @param job_id: the job id
1830 @rtype: L{_QueuedJob} or None
1831 @return: either None or the job object
1834 job = self._memcache.get(job_id, None)
1836 logging.debug("Found job %s in memcache", job_id)
1837 assert job.writable, "Found read-only job in memcache"
1841 job = self._LoadJobFromDisk(job_id, False)
1844 except errors.JobFileCorrupted:
1845 old_path = self._GetJobPath(job_id)
1846 new_path = self._GetArchivedJobPath(job_id)
1847 if old_path == new_path:
1848 # job already archived (future case)
1849 logging.exception("Can't parse job %s", job_id)
1852 logging.exception("Can't parse job %s, will archive.", job_id)
1853 self._RenameFilesUnlocked([(old_path, new_path)])
1856 assert job.writable, "Job just loaded is not writable"
1858 self._memcache[job_id] = job
1859 logging.debug("Added job %s to the cache", job_id)
1862 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1863 """Load the given job file from disk.
1865 Given a job file, read, load and restore it in a _QueuedJob format.
1867 @type job_id: string
1868 @param job_id: job identifier
1869 @type try_archived: bool
1870 @param try_archived: Whether to try loading an archived job
1871 @rtype: L{_QueuedJob} or None
1872 @return: either None or the job object
1875 path_functions = [(self._GetJobPath, True)]
1878 path_functions.append((self._GetArchivedJobPath, False))
1881 writable_default = None
1883 for (fn, writable_default) in path_functions:
1884 filepath = fn(job_id)
1885 logging.debug("Loading job from %s", filepath)
1887 raw_data = utils.ReadFile(filepath)
1888 except EnvironmentError, err:
1889 if err.errno != errno.ENOENT:
1897 if writable is None:
1898 writable = writable_default
1901 data = serializer.LoadJson(raw_data)
1902 job = _QueuedJob.Restore(self, data, writable)
1903 except Exception, err: # pylint: disable-msg=W0703
1904 raise errors.JobFileCorrupted(err)
1908 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1909 """Load the given job file from disk.
1911 Given a job file, read, load and restore it in a _QueuedJob format.
1912 In case of error reading the job, it gets returned as None, and the
1913 exception is logged.
1915 @type job_id: string
1916 @param job_id: job identifier
1917 @type try_archived: bool
1918 @param try_archived: Whether to try loading an archived job
1919 @rtype: L{_QueuedJob} or None
1920 @return: either None or the job object
1924 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1925 except (errors.JobFileCorrupted, EnvironmentError):
1926 logging.exception("Can't load/parse job %s", job_id)
1929 def _UpdateQueueSizeUnlocked(self):
1930 """Update the queue size.
1933 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1935 @locking.ssynchronized(_LOCK)
1937 def SetDrainFlag(self, drain_flag):
1938 """Sets the drain flag for the queue.
1940 @type drain_flag: boolean
1941 @param drain_flag: Whether to set or unset the drain flag
1944 jstore.SetDrainFlag(drain_flag)
1946 self._drained = drain_flag
1951 def _SubmitJobUnlocked(self, job_id, ops):
1952 """Create and store a new job.
1954 This enters the job into our job queue and also puts it on the new
1955 queue, in order for it to be picked up by the queue processors.
1957 @type job_id: job ID
1958 @param job_id: the job ID for the new job
1960 @param ops: The list of OpCodes that will become the new job.
1961 @rtype: L{_QueuedJob}
1962 @return: the job object to be queued
1963 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1964 @raise errors.JobQueueFull: if the job queue has too many jobs in it
1965 @raise errors.GenericError: If an opcode is not valid
1968 # Ok when sharing the big job queue lock, as the drain file is created when
1969 # the lock is exclusive.
1971 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1973 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1974 raise errors.JobQueueFull()
1976 job = _QueuedJob(self, job_id, ops, True)
1979 for idx, op in enumerate(job.ops):
1980 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1981 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1982 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1983 " are %s" % (idx, op.priority, allowed))
1986 self.UpdateJobUnlocked(job)
1988 self._queue_size += 1
1990 logging.debug("Adding new job %s to the cache", job_id)
1991 self._memcache[job_id] = job
1995 @locking.ssynchronized(_LOCK)
1997 def SubmitJob(self, ops):
1998 """Create and store a new job.
2000 @see: L{_SubmitJobUnlocked}
2003 job_id = self._NewSerialsUnlocked(1)[0]
2004 self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
2007 @locking.ssynchronized(_LOCK)
2009 def SubmitManyJobs(self, jobs):
2010 """Create and store multiple jobs.
2012 @see: L{_SubmitJobUnlocked}
2017 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2018 for job_id, ops in zip(all_job_ids, jobs):
2020 added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
2023 except errors.GenericError, err:
2024 data = ("%s; opcodes %s" %
2025 (err, utils.CommaJoin(op.Summary() for op in ops)))
2027 results.append((status, data))
2029 self._EnqueueJobs(added_jobs)
2033 def _EnqueueJobs(self, jobs):
2034 """Helper function to add jobs to worker pool's queue.
2037 @param jobs: List of all jobs
2040 self._wpool.AddManyTasks([(job, ) for job in jobs],
2041 priority=[job.CalcPriority() for job in jobs])
2043 def _GetJobStatusForDependencies(self, job_id):
2044 """Gets the status of a job for dependencies.
2046 @type job_id: string
2047 @param job_id: Job ID
2048 @raise errors.JobLost: If job can't be found
2051 if not isinstance(job_id, basestring):
2052 job_id = self._FormatJobID(job_id)
2054 # Not using in-memory cache as doing so would require an exclusive lock
2056 # Try to load from disk
2057 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2059 assert not job.writable, "Got writable job"
2062 return job.CalcStatus()
2064 raise errors.JobLost("Job %s not found" % job_id)
2067 def UpdateJobUnlocked(self, job, replicate=True):
2068 """Update a job's on disk storage.
2070 After a job has been modified, this function needs to be called in
2071 order to write the changes to disk and replicate them to the other
2074 @type job: L{_QueuedJob}
2075 @param job: the changed job
2076 @type replicate: boolean
2077 @param replicate: whether to replicate the change to remote nodes
2081 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2082 assert (finalized ^ (job.end_timestamp is None))
2083 assert job.writable, "Can't update read-only job"
2085 filename = self._GetJobPath(job.id)
2086 data = serializer.DumpJson(job.Serialize(), indent=False)
2087 logging.debug("Writing job %s to %s", job.id, filename)
2088 self._UpdateJobQueueFile(filename, data, replicate)
2090 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2092 """Waits for changes in a job.
2094 @type job_id: string
2095 @param job_id: Job identifier
2096 @type fields: list of strings
2097 @param fields: Which fields to check for changes
2098 @type prev_job_info: list or None
2099 @param prev_job_info: Last job information returned
2100 @type prev_log_serial: int
2101 @param prev_log_serial: Last job message serial number
2102 @type timeout: float
2103 @param timeout: maximum time to wait in seconds
2104 @rtype: tuple (job info, log entries)
2105 @return: a tuple of the job information as required via
2106 the fields parameter, and the log entries as a list
2108 if the job has not changed and the timeout has expired,
2109 we instead return a special value,
2110 L{constants.JOB_NOTCHANGED}, which should be interpreted
2111 as such by the clients
2114 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2117 helper = _WaitForJobChangesHelper()
2119 return helper(self._GetJobPath(job_id), load_fn,
2120 fields, prev_job_info, prev_log_serial, timeout)
2122 @locking.ssynchronized(_LOCK)
2124 def CancelJob(self, job_id):
2127 This will only succeed if the job has not started yet.
2129 @type job_id: string
2130 @param job_id: job ID of job to be cancelled.
2133 logging.info("Cancelling job %s", job_id)
2135 job = self._LoadJobUnlocked(job_id)
2137 logging.debug("Job %s not found", job_id)
2138 return (False, "Job %s not found" % job_id)
2140 assert job.writable, "Can't cancel read-only job"
2142 (success, msg) = job.Cancel()
2145 # If the job was finalized (e.g. cancelled), this is the final write
2146 # allowed. The job can be archived anytime.
2147 self.UpdateJobUnlocked(job)
2149 return (success, msg)
2152 def _ArchiveJobsUnlocked(self, jobs):
2155 @type jobs: list of L{_QueuedJob}
2156 @param jobs: Job objects
2158 @return: Number of archived jobs
2164 assert job.writable, "Can't archive read-only job"
2166 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2167 logging.debug("Job %s is not yet done", job.id)
2170 archive_jobs.append(job)
2172 old = self._GetJobPath(job.id)
2173 new = self._GetArchivedJobPath(job.id)
2174 rename_files.append((old, new))
2176 # TODO: What if 1..n files fail to rename?
2177 self._RenameFilesUnlocked(rename_files)
2179 logging.debug("Successfully archived job(s) %s",
2180 utils.CommaJoin(job.id for job in archive_jobs))
2182 # Since we haven't quite checked, above, if we succeeded or failed renaming
2183 # the files, we update the cached queue size from the filesystem. When we
2184 # get around to fix the TODO: above, we can use the number of actually
2185 # archived jobs to fix this.
2186 self._UpdateQueueSizeUnlocked()
2187 return len(archive_jobs)
2189 @locking.ssynchronized(_LOCK)
2191 def ArchiveJob(self, job_id):
2194 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2196 @type job_id: string
2197 @param job_id: Job ID of job to be archived.
2199 @return: Whether job was archived
2202 logging.info("Archiving job %s", job_id)
2204 job = self._LoadJobUnlocked(job_id)
2206 logging.debug("Job %s not found", job_id)
2209 return self._ArchiveJobsUnlocked([job]) == 1
2211 @locking.ssynchronized(_LOCK)
2213 def AutoArchiveJobs(self, age, timeout):
2214 """Archives all jobs based on age.
2216 The method will archive all jobs which are older than the age
2217 parameter. For jobs that don't have an end timestamp, the start
2218 timestamp will be considered. The special '-1' age will cause
2219 archival of all jobs (that are not running or queued).
2222 @param age: the minimum age in seconds
2225 logging.info("Archiving jobs with age more than %s seconds", age)
2228 end_time = now + timeout
2232 all_job_ids = self._GetJobIDsUnlocked()
2234 for idx, job_id in enumerate(all_job_ids):
2235 last_touched = idx + 1
2237 # Not optimal because jobs could be pending
2238 # TODO: Measure average duration for job archival and take number of
2239 # pending jobs into account.
2240 if time.time() > end_time:
2243 # Returns None if the job failed to load
2244 job = self._LoadJobUnlocked(job_id)
2246 if job.end_timestamp is None:
2247 if job.start_timestamp is None:
2248 job_age = job.received_timestamp
2250 job_age = job.start_timestamp
2252 job_age = job.end_timestamp
2254 if age == -1 or now - job_age[0] > age:
2257 # Archive 10 jobs at a time
2258 if len(pending) >= 10:
2259 archived_count += self._ArchiveJobsUnlocked(pending)
2263 archived_count += self._ArchiveJobsUnlocked(pending)
2265 return (archived_count, len(all_job_ids) - last_touched)
2267 def QueryJobs(self, job_ids, fields):
2268 """Returns a list of jobs in queue.
2271 @param job_ids: sequence of job identifiers or None for all
2273 @param fields: names of fields to return
2275 @return: list one element per job, each element being list with
2276 the requested fields
2282 # Since files are added to/removed from the queue atomically, there's no
2283 # risk of getting the job ids in an inconsistent state.
2284 job_ids = self._GetJobIDsUnlocked()
2287 for job_id in job_ids:
2288 job = self.SafeLoadJobFromDisk(job_id, True)
2290 jobs.append(job.GetInfo(fields))
2296 @locking.ssynchronized(_LOCK)
2299 """Stops the job queue.
2301 This shutdowns all the worker threads an closes the queue.
2304 self._wpool.TerminateWorkers()
2306 self._queue_filelock.Close()
2307 self._queue_filelock = None