4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
60 from ganeti import query
61 from ganeti import qlang
65 JOBS_PER_ARCHIVE_DIRECTORY = 10000
67 # member lock names to be passed to @ssynchronized decorator
72 class CancelJob(Exception):
73 """Special exception to cancel a job.
78 class QueueShutdown(Exception):
79 """Special exception to abort a job when the job queue is shutting down.
85 """Returns the current timestamp.
88 @return: the current time in the (seconds, microseconds) format
91 return utils.SplitTime(time.time())
94 class _SimpleJobQuery:
95 """Wrapper for job queries.
97 Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
100 def __init__(self, fields):
101 """Initializes this class.
104 self._query = query.Query(query.JOB_FIELDS, fields)
106 def __call__(self, job):
107 """Executes a job query using cached field list.
110 return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
113 class _QueuedOpCode(object):
114 """Encapsulates an opcode object.
116 @ivar log: holds the execution log and consists of tuples
117 of the form C{(log_serial, timestamp, level, message)}
118 @ivar input: the OpCode we encapsulate
119 @ivar status: the current status
120 @ivar result: the result of the LU execution
121 @ivar start_timestamp: timestamp for the start of the execution
122 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
123 @ivar stop_timestamp: timestamp for the end of the execution
126 __slots__ = ["input", "status", "result", "log", "priority",
127 "start_timestamp", "exec_timestamp", "end_timestamp",
130 def __init__(self, op):
131 """Initializes instances of this class.
133 @type op: L{opcodes.OpCode}
134 @param op: the opcode we encapsulate
138 self.status = constants.OP_STATUS_QUEUED
141 self.start_timestamp = None
142 self.exec_timestamp = None
143 self.end_timestamp = None
145 # Get initial priority (it might change during the lifetime of this opcode)
146 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
149 def Restore(cls, state):
150 """Restore the _QueuedOpCode from the serialized form.
153 @param state: the serialized state
154 @rtype: _QueuedOpCode
155 @return: a new _QueuedOpCode instance
158 obj = _QueuedOpCode.__new__(cls)
159 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
160 obj.status = state["status"]
161 obj.result = state["result"]
162 obj.log = state["log"]
163 obj.start_timestamp = state.get("start_timestamp", None)
164 obj.exec_timestamp = state.get("exec_timestamp", None)
165 obj.end_timestamp = state.get("end_timestamp", None)
166 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
170 """Serializes this _QueuedOpCode.
173 @return: the dictionary holding the serialized state
177 "input": self.input.__getstate__(),
178 "status": self.status,
179 "result": self.result,
181 "start_timestamp": self.start_timestamp,
182 "exec_timestamp": self.exec_timestamp,
183 "end_timestamp": self.end_timestamp,
184 "priority": self.priority,
188 class _QueuedJob(object):
189 """In-memory job representation.
191 This is what we use to track the user-submitted jobs. Locking must
192 be taken care of by users of this class.
194 @type queue: L{JobQueue}
195 @ivar queue: the parent queue
198 @ivar ops: the list of _QueuedOpCode that constitute the job
199 @type log_serial: int
200 @ivar log_serial: holds the index for the next log entry
201 @ivar received_timestamp: the timestamp for when the job was received
202 @ivar start_timestmap: the timestamp for start of execution
203 @ivar end_timestamp: the timestamp for end of execution
204 @ivar writable: Whether the job is allowed to be modified
207 # pylint: disable=W0212
208 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
209 "received_timestamp", "start_timestamp", "end_timestamp",
210 "__weakref__", "processor_lock", "writable"]
212 def __init__(self, queue, job_id, ops, writable):
213 """Constructor for the _QueuedJob.
215 @type queue: L{JobQueue}
216 @param queue: our parent queue
218 @param job_id: our job id
220 @param ops: the list of opcodes we hold, which will be encapsulated
223 @param writable: Whether job can be modified
227 raise errors.GenericError("A job needs at least one opcode")
231 self.ops = [_QueuedOpCode(op) for op in ops]
233 self.received_timestamp = TimeStampNow()
234 self.start_timestamp = None
235 self.end_timestamp = None
237 self._InitInMemory(self, writable)
240 def _InitInMemory(obj, writable):
241 """Initializes in-memory variables.
244 obj.writable = writable
248 # Read-only jobs are not processed and therefore don't need a lock
250 obj.processor_lock = threading.Lock()
252 obj.processor_lock = None
255 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
257 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
259 return "<%s at %#x>" % (" ".join(status), id(self))
262 def Restore(cls, queue, state, writable):
263 """Restore a _QueuedJob from serialized state:
265 @type queue: L{JobQueue}
266 @param queue: to which queue the restored job belongs
268 @param state: the serialized state
270 @param writable: Whether job can be modified
272 @return: the restored _JobQueue instance
275 obj = _QueuedJob.__new__(cls)
278 obj.received_timestamp = state.get("received_timestamp", None)
279 obj.start_timestamp = state.get("start_timestamp", None)
280 obj.end_timestamp = state.get("end_timestamp", None)
284 for op_state in state["ops"]:
285 op = _QueuedOpCode.Restore(op_state)
286 for log_entry in op.log:
287 obj.log_serial = max(obj.log_serial, log_entry[0])
290 cls._InitInMemory(obj, writable)
295 """Serialize the _JobQueue instance.
298 @return: the serialized state
303 "ops": [op.Serialize() for op in self.ops],
304 "start_timestamp": self.start_timestamp,
305 "end_timestamp": self.end_timestamp,
306 "received_timestamp": self.received_timestamp,
309 def CalcStatus(self):
310 """Compute the status of this job.
312 This function iterates over all the _QueuedOpCodes in the job and
313 based on their status, computes the job status.
316 - if we find a cancelled, or finished with error, the job
317 status will be the same
318 - otherwise, the last opcode with the status one of:
323 will determine the job status
325 - otherwise, it means either all opcodes are queued, or success,
326 and the job status will be the same
328 @return: the job status
331 status = constants.JOB_STATUS_QUEUED
335 if op.status == constants.OP_STATUS_SUCCESS:
340 if op.status == constants.OP_STATUS_QUEUED:
342 elif op.status == constants.OP_STATUS_WAITING:
343 status = constants.JOB_STATUS_WAITING
344 elif op.status == constants.OP_STATUS_RUNNING:
345 status = constants.JOB_STATUS_RUNNING
346 elif op.status == constants.OP_STATUS_CANCELING:
347 status = constants.JOB_STATUS_CANCELING
349 elif op.status == constants.OP_STATUS_ERROR:
350 status = constants.JOB_STATUS_ERROR
351 # The whole job fails if one opcode failed
353 elif op.status == constants.OP_STATUS_CANCELED:
354 status = constants.OP_STATUS_CANCELED
358 status = constants.JOB_STATUS_SUCCESS
362 def CalcPriority(self):
363 """Gets the current priority for this job.
365 Only unfinished opcodes are considered. When all are done, the default
371 priorities = [op.priority for op in self.ops
372 if op.status not in constants.OPS_FINALIZED]
375 # All opcodes are done, assume default priority
376 return constants.OP_PRIO_DEFAULT
378 return min(priorities)
380 def GetLogEntries(self, newer_than):
381 """Selectively returns the log entries.
383 @type newer_than: None or int
384 @param newer_than: if this is None, return all log entries,
385 otherwise return only the log entries with serial higher
388 @return: the list of the log entries selected
391 if newer_than is None:
398 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
402 def GetInfo(self, fields):
403 """Returns information about a job.
406 @param fields: names of fields to return
408 @return: list with one element for each field
409 @raise errors.OpExecError: when an invalid field
413 return _SimpleJobQuery(fields)(self)
415 def MarkUnfinishedOps(self, status, result):
416 """Mark unfinished opcodes with a given status and result.
418 This is an utility function for marking all running or waiting to
419 be run opcodes with a given status. Opcodes which are already
420 finalised are not changed.
422 @param status: a given opcode status
423 @param result: the opcode result
428 if op.status in constants.OPS_FINALIZED:
429 assert not_marked, "Finalized opcodes found after non-finalized ones"
436 """Marks the job as finalized.
439 self.end_timestamp = TimeStampNow()
442 """Marks job as canceled/-ing if possible.
444 @rtype: tuple; (bool, string)
445 @return: Boolean describing whether job was successfully canceled or marked
446 as canceling and a text message
449 status = self.CalcStatus()
451 if status == constants.JOB_STATUS_QUEUED:
452 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
453 "Job canceled by request")
455 return (True, "Job %s canceled" % self.id)
457 elif status == constants.JOB_STATUS_WAITING:
458 # The worker will notice the new status and cancel the job
459 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
460 return (True, "Job %s will be canceled" % self.id)
463 logging.debug("Job %s is no longer waiting in the queue", self.id)
464 return (False, "Job %s is no longer waiting in the queue" % self.id)
467 class _OpExecCallbacks(mcpu.OpExecCbBase):
468 def __init__(self, queue, job, op):
469 """Initializes this class.
471 @type queue: L{JobQueue}
472 @param queue: Job queue
473 @type job: L{_QueuedJob}
474 @param job: Job object
475 @type op: L{_QueuedOpCode}
479 assert queue, "Queue is missing"
480 assert job, "Job is missing"
481 assert op, "Opcode is missing"
487 def _CheckCancel(self):
488 """Raises an exception to cancel the job if asked to.
491 # Cancel here if we were asked to
492 if self._op.status == constants.OP_STATUS_CANCELING:
493 logging.debug("Canceling opcode")
496 # See if queue is shutting down
497 if not self._queue.AcceptingJobsUnlocked():
498 logging.debug("Queue is shutting down")
499 raise QueueShutdown()
501 @locking.ssynchronized(_QUEUE, shared=1)
502 def NotifyStart(self):
503 """Mark the opcode as running, not lock-waiting.
505 This is called from the mcpu code as a notifier function, when the LU is
506 finally about to start the Exec() method. Of course, to have end-user
507 visible results, the opcode must be initially (before calling into
508 Processor.ExecOpCode) set to OP_STATUS_WAITING.
511 assert self._op in self._job.ops
512 assert self._op.status in (constants.OP_STATUS_WAITING,
513 constants.OP_STATUS_CANCELING)
515 # Cancel here if we were asked to
518 logging.debug("Opcode is now running")
520 self._op.status = constants.OP_STATUS_RUNNING
521 self._op.exec_timestamp = TimeStampNow()
523 # And finally replicate the job status
524 self._queue.UpdateJobUnlocked(self._job)
526 @locking.ssynchronized(_QUEUE, shared=1)
527 def _AppendFeedback(self, timestamp, log_type, log_msg):
528 """Internal feedback append function, with locks
531 self._job.log_serial += 1
532 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
533 self._queue.UpdateJobUnlocked(self._job, replicate=False)
535 def Feedback(self, *args):
536 """Append a log entry.
542 log_type = constants.ELOG_MESSAGE
545 (log_type, log_msg) = args
547 # The time is split to make serialization easier and not lose
549 timestamp = utils.SplitTime(time.time())
550 self._AppendFeedback(timestamp, log_type, log_msg)
552 def CheckCancel(self):
553 """Check whether job has been cancelled.
556 assert self._op.status in (constants.OP_STATUS_WAITING,
557 constants.OP_STATUS_CANCELING)
559 # Cancel here if we were asked to
562 def SubmitManyJobs(self, jobs):
563 """Submits jobs for processing.
565 See L{JobQueue.SubmitManyJobs}.
568 # Locking is done in job queue
569 return self._queue.SubmitManyJobs(jobs)
572 class _JobChangesChecker(object):
573 def __init__(self, fields, prev_job_info, prev_log_serial):
574 """Initializes this class.
576 @type fields: list of strings
577 @param fields: Fields requested by LUXI client
578 @type prev_job_info: string
579 @param prev_job_info: previous job info, as passed by the LUXI client
580 @type prev_log_serial: string
581 @param prev_log_serial: previous job serial, as passed by the LUXI client
584 self._squery = _SimpleJobQuery(fields)
585 self._prev_job_info = prev_job_info
586 self._prev_log_serial = prev_log_serial
588 def __call__(self, job):
589 """Checks whether job has changed.
591 @type job: L{_QueuedJob}
592 @param job: Job object
595 assert not job.writable, "Expected read-only job"
597 status = job.CalcStatus()
598 job_info = self._squery(job)
599 log_entries = job.GetLogEntries(self._prev_log_serial)
601 # Serializing and deserializing data can cause type changes (e.g. from
602 # tuple to list) or precision loss. We're doing it here so that we get
603 # the same modifications as the data received from the client. Without
604 # this, the comparison afterwards might fail without the data being
605 # significantly different.
606 # TODO: we just deserialized from disk, investigate how to make sure that
607 # the job info and log entries are compatible to avoid this further step.
608 # TODO: Doing something like in testutils.py:UnifyValueType might be more
609 # efficient, though floats will be tricky
610 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
611 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
613 # Don't even try to wait if the job is no longer running, there will be
615 if (status not in (constants.JOB_STATUS_QUEUED,
616 constants.JOB_STATUS_RUNNING,
617 constants.JOB_STATUS_WAITING) or
618 job_info != self._prev_job_info or
619 (log_entries and self._prev_log_serial != log_entries[0][0])):
620 logging.debug("Job %s changed", job.id)
621 return (job_info, log_entries)
626 class _JobFileChangesWaiter(object):
627 def __init__(self, filename):
628 """Initializes this class.
630 @type filename: string
631 @param filename: Path to job file
632 @raises errors.InotifyError: if the notifier cannot be setup
635 self._wm = pyinotify.WatchManager()
636 self._inotify_handler = \
637 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
639 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
641 self._inotify_handler.enable()
643 # pyinotify doesn't close file descriptors automatically
644 self._notifier.stop()
647 def _OnInotify(self, notifier_enabled):
648 """Callback for inotify.
651 if not notifier_enabled:
652 self._inotify_handler.enable()
654 def Wait(self, timeout):
655 """Waits for the job file to change.
658 @param timeout: Timeout in seconds
659 @return: Whether there have been events
663 have_events = self._notifier.check_events(timeout * 1000)
665 self._notifier.read_events()
666 self._notifier.process_events()
670 """Closes underlying notifier and its file descriptor.
673 self._notifier.stop()
676 class _JobChangesWaiter(object):
677 def __init__(self, filename):
678 """Initializes this class.
680 @type filename: string
681 @param filename: Path to job file
684 self._filewaiter = None
685 self._filename = filename
687 def Wait(self, timeout):
688 """Waits for a job to change.
691 @param timeout: Timeout in seconds
692 @return: Whether there have been events
696 return self._filewaiter.Wait(timeout)
698 # Lazy setup: Avoid inotify setup cost when job file has already changed.
699 # If this point is reached, return immediately and let caller check the job
700 # file again in case there were changes since the last check. This avoids a
702 self._filewaiter = _JobFileChangesWaiter(self._filename)
707 """Closes underlying waiter.
711 self._filewaiter.Close()
714 class _WaitForJobChangesHelper(object):
715 """Helper class using inotify to wait for changes in a job file.
717 This class takes a previous job status and serial, and alerts the client when
718 the current job status has changed.
722 def _CheckForChanges(counter, job_load_fn, check_fn):
723 if counter.next() > 0:
724 # If this isn't the first check the job is given some more time to change
725 # again. This gives better performance for jobs generating many
731 raise errors.JobLost()
733 result = check_fn(job)
735 raise utils.RetryAgain()
739 def __call__(self, filename, job_load_fn,
740 fields, prev_job_info, prev_log_serial, timeout):
741 """Waits for changes on a job.
743 @type filename: string
744 @param filename: File on which to wait for changes
745 @type job_load_fn: callable
746 @param job_load_fn: Function to load job
747 @type fields: list of strings
748 @param fields: Which fields to check for changes
749 @type prev_job_info: list or None
750 @param prev_job_info: Last job information returned
751 @type prev_log_serial: int
752 @param prev_log_serial: Last job message serial number
754 @param timeout: maximum time to wait in seconds
757 counter = itertools.count()
759 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
760 waiter = _JobChangesWaiter(filename)
762 return utils.Retry(compat.partial(self._CheckForChanges,
763 counter, job_load_fn, check_fn),
764 utils.RETRY_REMAINING_TIME, timeout,
768 except (errors.InotifyError, errors.JobLost):
770 except utils.RetryTimeout:
771 return constants.JOB_NOTCHANGED
774 def _EncodeOpError(err):
775 """Encodes an error which occurred while processing an opcode.
778 if isinstance(err, errors.GenericError):
781 to_encode = errors.OpExecError(str(err))
783 return errors.EncodeException(to_encode)
786 class _TimeoutStrategyWrapper:
787 def __init__(self, fn):
788 """Initializes this class.
795 """Gets the next timeout if necessary.
798 if self._next is None:
799 self._next = self._fn()
802 """Returns the next timeout.
809 """Returns the current timeout and advances the internal state.
818 class _OpExecContext:
819 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
820 """Initializes this class.
825 self.log_prefix = log_prefix
826 self.summary = op.input.Summary()
828 # Create local copy to modify
829 if getattr(op.input, opcodes.DEPEND_ATTR, None):
830 self.jobdeps = op.input.depends[:]
834 self._timeout_strategy_factory = timeout_strategy_factory
835 self._ResetTimeoutStrategy()
837 def _ResetTimeoutStrategy(self):
838 """Creates a new timeout strategy.
841 self._timeout_strategy = \
842 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
844 def CheckPriorityIncrease(self):
845 """Checks whether priority can and should be increased.
847 Called when locks couldn't be acquired.
852 # Exhausted all retries and next round should not use blocking acquire
854 if (self._timeout_strategy.Peek() is None and
855 op.priority > constants.OP_PRIO_HIGHEST):
856 logging.debug("Increasing priority")
858 self._ResetTimeoutStrategy()
863 def GetNextLockTimeout(self):
864 """Returns the next lock acquire timeout.
867 return self._timeout_strategy.Next()
870 class _JobProcessor(object):
873 FINISHED) = range(1, 4)
875 def __init__(self, queue, opexec_fn, job,
876 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
877 """Initializes this class.
881 self.opexec_fn = opexec_fn
883 self._timeout_strategy_factory = _timeout_strategy_factory
886 def _FindNextOpcode(job, timeout_strategy_factory):
887 """Locates the next opcode to run.
889 @type job: L{_QueuedJob}
890 @param job: Job object
891 @param timeout_strategy_factory: Callable to create new timeout strategy
894 # Create some sort of a cache to speed up locating next opcode for future
896 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
897 # pending and one for processed ops.
898 if job.ops_iter is None:
899 job.ops_iter = enumerate(job.ops)
901 # Find next opcode to run
904 (idx, op) = job.ops_iter.next()
905 except StopIteration:
906 raise errors.ProgrammerError("Called for a finished job")
908 if op.status == constants.OP_STATUS_RUNNING:
909 # Found an opcode already marked as running
910 raise errors.ProgrammerError("Called for job marked as running")
912 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
913 timeout_strategy_factory)
915 if op.status not in constants.OPS_FINALIZED:
918 # This is a job that was partially completed before master daemon
919 # shutdown, so it can be expected that some opcodes are already
920 # completed successfully (if any did error out, then the whole job
921 # should have been aborted and not resubmitted for processing).
922 logging.info("%s: opcode %s already processed, skipping",
923 opctx.log_prefix, opctx.summary)
926 def _MarkWaitlock(job, op):
927 """Marks an opcode as waiting for locks.
929 The job's start timestamp is also set if necessary.
931 @type job: L{_QueuedJob}
932 @param job: Job object
933 @type op: L{_QueuedOpCode}
934 @param op: Opcode object
938 assert op.status in (constants.OP_STATUS_QUEUED,
939 constants.OP_STATUS_WAITING)
945 if op.status == constants.OP_STATUS_QUEUED:
946 op.status = constants.OP_STATUS_WAITING
949 if op.start_timestamp is None:
950 op.start_timestamp = TimeStampNow()
953 if job.start_timestamp is None:
954 job.start_timestamp = op.start_timestamp
957 assert op.status == constants.OP_STATUS_WAITING
962 def _CheckDependencies(queue, job, opctx):
963 """Checks if an opcode has dependencies and if so, processes them.
965 @type queue: L{JobQueue}
966 @param queue: Queue object
967 @type job: L{_QueuedJob}
968 @param job: Job object
969 @type opctx: L{_OpExecContext}
970 @param opctx: Opcode execution context
972 @return: Whether opcode will be re-scheduled by dependency tracker
980 (dep_job_id, dep_status) = opctx.jobdeps[0]
982 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
984 assert ht.TNonEmptyString(depmsg), "No dependency message"
986 logging.info("%s: %s", opctx.log_prefix, depmsg)
988 if depresult == _JobDependencyManager.CONTINUE:
989 # Remove dependency and continue
992 elif depresult == _JobDependencyManager.WAIT:
993 # Need to wait for notification, dependency tracker will re-add job
998 elif depresult == _JobDependencyManager.CANCEL:
999 # Job was cancelled, cancel this job as well
1001 assert op.status == constants.OP_STATUS_CANCELING
1004 elif depresult in (_JobDependencyManager.WRONGSTATUS,
1005 _JobDependencyManager.ERROR):
1006 # Job failed or there was an error, this job must fail
1007 op.status = constants.OP_STATUS_ERROR
1008 op.result = _EncodeOpError(errors.OpExecError(depmsg))
1012 raise errors.ProgrammerError("Unknown dependency result '%s'" %
1017 def _ExecOpCodeUnlocked(self, opctx):
1018 """Processes one opcode and returns the result.
1023 assert op.status == constants.OP_STATUS_WAITING
1025 timeout = opctx.GetNextLockTimeout()
1028 # Make sure not to hold queue lock while calling ExecOpCode
1029 result = self.opexec_fn(op.input,
1030 _OpExecCallbacks(self.queue, self.job, op),
1031 timeout=timeout, priority=op.priority)
1032 except mcpu.LockAcquireTimeout:
1033 assert timeout is not None, "Received timeout for blocking acquire"
1034 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1036 assert op.status in (constants.OP_STATUS_WAITING,
1037 constants.OP_STATUS_CANCELING)
1039 # Was job cancelled while we were waiting for the lock?
1040 if op.status == constants.OP_STATUS_CANCELING:
1041 return (constants.OP_STATUS_CANCELING, None)
1043 # Queue is shutting down, return to queued
1044 if not self.queue.AcceptingJobsUnlocked():
1045 return (constants.OP_STATUS_QUEUED, None)
1047 # Stay in waitlock while trying to re-acquire lock
1048 return (constants.OP_STATUS_WAITING, None)
1050 logging.exception("%s: Canceling job", opctx.log_prefix)
1051 assert op.status == constants.OP_STATUS_CANCELING
1052 return (constants.OP_STATUS_CANCELING, None)
1054 except QueueShutdown:
1055 logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1057 assert op.status == constants.OP_STATUS_WAITING
1059 # Job hadn't been started yet, so it should return to the queue
1060 return (constants.OP_STATUS_QUEUED, None)
1062 except Exception, err: # pylint: disable=W0703
1063 logging.exception("%s: Caught exception in %s",
1064 opctx.log_prefix, opctx.summary)
1065 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1067 logging.debug("%s: %s successful",
1068 opctx.log_prefix, opctx.summary)
1069 return (constants.OP_STATUS_SUCCESS, result)
1071 def __call__(self, _nextop_fn=None):
1072 """Continues execution of a job.
1074 @param _nextop_fn: Callback function for tests
1075 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1076 be deferred and C{WAITDEP} if the dependency manager
1077 (L{_JobDependencyManager}) will re-schedule the job when appropriate
1083 logging.debug("Processing job %s", job.id)
1085 queue.acquire(shared=1)
1087 opcount = len(job.ops)
1089 assert job.writable, "Expected writable job"
1091 # Don't do anything for finalized jobs
1092 if job.CalcStatus() in constants.JOBS_FINALIZED:
1093 return self.FINISHED
1095 # Is a previous opcode still pending?
1097 opctx = job.cur_opctx
1098 job.cur_opctx = None
1100 if __debug__ and _nextop_fn:
1102 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1107 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1108 constants.OP_STATUS_CANCELING)
1109 for i in job.ops[opctx.index + 1:])
1111 assert op.status in (constants.OP_STATUS_QUEUED,
1112 constants.OP_STATUS_WAITING,
1113 constants.OP_STATUS_CANCELING)
1115 assert (op.priority <= constants.OP_PRIO_LOWEST and
1116 op.priority >= constants.OP_PRIO_HIGHEST)
1120 if op.status != constants.OP_STATUS_CANCELING:
1121 assert op.status in (constants.OP_STATUS_QUEUED,
1122 constants.OP_STATUS_WAITING)
1124 # Prepare to start opcode
1125 if self._MarkWaitlock(job, op):
1127 queue.UpdateJobUnlocked(job)
1129 assert op.status == constants.OP_STATUS_WAITING
1130 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1131 assert job.start_timestamp and op.start_timestamp
1132 assert waitjob is None
1134 # Check if waiting for a job is necessary
1135 waitjob = self._CheckDependencies(queue, job, opctx)
1137 assert op.status in (constants.OP_STATUS_WAITING,
1138 constants.OP_STATUS_CANCELING,
1139 constants.OP_STATUS_ERROR)
1141 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1142 constants.OP_STATUS_ERROR)):
1143 logging.info("%s: opcode %s waiting for locks",
1144 opctx.log_prefix, opctx.summary)
1146 assert not opctx.jobdeps, "Not all dependencies were removed"
1150 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1152 queue.acquire(shared=1)
1154 op.status = op_status
1155 op.result = op_result
1159 if op.status in (constants.OP_STATUS_WAITING,
1160 constants.OP_STATUS_QUEUED):
1161 # waiting: Couldn't get locks in time
1162 # queued: Queue is shutting down
1163 assert not op.end_timestamp
1166 op.end_timestamp = TimeStampNow()
1168 if op.status == constants.OP_STATUS_CANCELING:
1169 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1170 for i in job.ops[opctx.index:])
1172 assert op.status in constants.OPS_FINALIZED
1174 if op.status == constants.OP_STATUS_QUEUED:
1175 # Queue is shutting down
1181 job.cur_opctx = None
1183 # In no case must the status be finalized here
1184 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1186 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1189 if not waitjob and opctx.CheckPriorityIncrease():
1190 # Priority was changed, need to update on-disk file
1191 queue.UpdateJobUnlocked(job)
1193 # Keep around for another round
1194 job.cur_opctx = opctx
1196 assert (op.priority <= constants.OP_PRIO_LOWEST and
1197 op.priority >= constants.OP_PRIO_HIGHEST)
1199 # In no case must the status be finalized here
1200 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1203 # Ensure all opcodes so far have been successful
1204 assert (opctx.index == 0 or
1205 compat.all(i.status == constants.OP_STATUS_SUCCESS
1206 for i in job.ops[:opctx.index]))
1209 job.cur_opctx = None
1211 if op.status == constants.OP_STATUS_SUCCESS:
1214 elif op.status == constants.OP_STATUS_ERROR:
1215 # Ensure failed opcode has an exception as its result
1216 assert errors.GetEncodedError(job.ops[opctx.index].result)
1218 to_encode = errors.OpExecError("Preceding opcode failed")
1219 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1220 _EncodeOpError(to_encode))
1224 assert compat.all(i.status == constants.OP_STATUS_ERROR and
1225 errors.GetEncodedError(i.result)
1226 for i in job.ops[opctx.index:])
1228 elif op.status == constants.OP_STATUS_CANCELING:
1229 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1230 "Job canceled by request")
1234 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1236 if opctx.index == (opcount - 1):
1237 # Finalize on last opcode
1241 # All opcodes have been run, finalize job
1244 # Write to disk. If the job status is final, this is the final write
1245 # allowed. Once the file has been written, it can be archived anytime.
1246 queue.UpdateJobUnlocked(job)
1251 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1252 return self.FINISHED
1254 assert not waitjob or queue.depmgr.JobWaiting(job)
1261 assert job.writable, "Job became read-only while being processed"
1265 def _EvaluateJobProcessorResult(depmgr, job, result):
1266 """Looks at a result from L{_JobProcessor} for a job.
1268 To be used in a L{_JobQueueWorker}.
1271 if result == _JobProcessor.FINISHED:
1272 # Notify waiting jobs
1273 depmgr.NotifyWaiters(job.id)
1275 elif result == _JobProcessor.DEFER:
1277 raise workerpool.DeferTask(priority=job.CalcPriority())
1279 elif result == _JobProcessor.WAITDEP:
1280 # No-op, dependency manager will re-schedule
1284 raise errors.ProgrammerError("Job processor returned unknown status %s" %
1288 class _JobQueueWorker(workerpool.BaseWorker):
1289 """The actual job workers.
1292 def RunTask(self, job): # pylint: disable=W0221
1295 @type job: L{_QueuedJob}
1296 @param job: the job to be processed
1299 assert job.writable, "Expected writable job"
1301 # Ensure only one worker is active on a single job. If a job registers for
1302 # a dependency job, and the other job notifies before the first worker is
1303 # done, the job can end up in the tasklist more than once.
1304 job.processor_lock.acquire()
1306 return self._RunTaskInner(job)
1308 job.processor_lock.release()
1310 def _RunTaskInner(self, job):
1313 Must be called with per-job lock acquired.
1317 assert queue == self.pool.queue
1319 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1322 proc = mcpu.Processor(queue.context, job.id)
1324 # Create wrapper for setting thread name
1325 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1328 _EvaluateJobProcessorResult(queue.depmgr, job,
1329 _JobProcessor(queue, wrap_execop_fn, job)())
1332 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1333 """Updates the worker thread name to include a short summary of the opcode.
1335 @param setname_fn: Callable setting worker thread name
1336 @param execop_fn: Callable for executing opcode (usually
1337 L{mcpu.Processor.ExecOpCode})
1342 return execop_fn(op, *args, **kwargs)
1347 def _GetWorkerName(job, op):
1348 """Sets the worker thread name.
1350 @type job: L{_QueuedJob}
1351 @type op: L{opcodes.OpCode}
1354 parts = ["Job%s" % job.id]
1357 parts.append(op.TinySummary())
1359 return "/".join(parts)
1362 class _JobQueueWorkerPool(workerpool.WorkerPool):
1363 """Simple class implementing a job-processing workerpool.
1366 def __init__(self, queue):
1367 super(_JobQueueWorkerPool, self).__init__("Jq",
1373 class _JobDependencyManager:
1374 """Keeps track of job dependencies.
1381 WRONGSTATUS) = range(1, 6)
1383 def __init__(self, getstatus_fn, enqueue_fn):
1384 """Initializes this class.
1387 self._getstatus_fn = getstatus_fn
1388 self._enqueue_fn = enqueue_fn
1391 self._lock = locking.SharedLock("JobDepMgr")
1393 @locking.ssynchronized(_LOCK, shared=1)
1394 def GetLockInfo(self, requested): # pylint: disable=W0613
1395 """Retrieves information about waiting jobs.
1397 @type requested: set
1398 @param requested: Requested information, see C{query.LQ_*}
1401 # No need to sort here, that's being done by the lock manager and query
1402 # library. There are no priorities for notifying jobs, hence all show up as
1403 # one item under "pending".
1404 return [("job/%s" % job_id, None, None,
1405 [("job", [job.id for job in waiters])])
1406 for job_id, waiters in self._waiters.items()
1409 @locking.ssynchronized(_LOCK, shared=1)
1410 def JobWaiting(self, job):
1411 """Checks if a job is waiting.
1414 return compat.any(job in jobs
1415 for jobs in self._waiters.values())
1417 @locking.ssynchronized(_LOCK)
1418 def CheckAndRegister(self, job, dep_job_id, dep_status):
1419 """Checks if a dependency job has the requested status.
1421 If the other job is not yet in a finalized status, the calling job will be
1422 notified (re-added to the workerpool) at a later point.
1424 @type job: L{_QueuedJob}
1425 @param job: Job object
1426 @type dep_job_id: string
1427 @param dep_job_id: ID of dependency job
1428 @type dep_status: list
1429 @param dep_status: Required status
1432 assert ht.TString(job.id)
1433 assert ht.TString(dep_job_id)
1434 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1436 if job.id == dep_job_id:
1437 return (self.ERROR, "Job can't depend on itself")
1439 # Get status of dependency job
1441 status = self._getstatus_fn(dep_job_id)
1442 except errors.JobLost, err:
1443 return (self.ERROR, "Dependency error: %s" % err)
1445 assert status in constants.JOB_STATUS_ALL
1447 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1449 if status not in constants.JOBS_FINALIZED:
1450 # Register for notification and wait for job to finish
1451 job_id_waiters.add(job)
1453 "Need to wait for job %s, wanted status '%s'" %
1454 (dep_job_id, dep_status))
1456 # Remove from waiters list
1457 if job in job_id_waiters:
1458 job_id_waiters.remove(job)
1460 if (status == constants.JOB_STATUS_CANCELED and
1461 constants.JOB_STATUS_CANCELED not in dep_status):
1462 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1464 elif not dep_status or status in dep_status:
1465 return (self.CONTINUE,
1466 "Dependency job %s finished with status '%s'" %
1467 (dep_job_id, status))
1470 return (self.WRONGSTATUS,
1471 "Dependency job %s finished with status '%s',"
1472 " not one of '%s' as required" %
1473 (dep_job_id, status, utils.CommaJoin(dep_status)))
1475 def _RemoveEmptyWaitersUnlocked(self):
1476 """Remove all jobs without actual waiters.
1479 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1481 del self._waiters[job_id]
1483 def NotifyWaiters(self, job_id):
1484 """Notifies all jobs waiting for a certain job ID.
1486 @attention: Do not call until L{CheckAndRegister} returned a status other
1487 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1488 @type job_id: string
1489 @param job_id: Job ID
1492 assert ht.TString(job_id)
1494 self._lock.acquire()
1496 self._RemoveEmptyWaitersUnlocked()
1498 jobs = self._waiters.pop(job_id, None)
1500 self._lock.release()
1503 # Re-add jobs to workerpool
1504 logging.debug("Re-adding %s jobs which were waiting for job %s",
1506 self._enqueue_fn(jobs)
1509 def _RequireOpenQueue(fn):
1510 """Decorator for "public" functions.
1512 This function should be used for all 'public' functions. That is,
1513 functions usually called from other classes. Note that this should
1514 be applied only to methods (not plain functions), since it expects
1515 that the decorated function is called with a first argument that has
1516 a '_queue_filelock' argument.
1518 @warning: Use this decorator only after locking.ssynchronized
1521 @locking.ssynchronized(_LOCK)
1527 def wrapper(self, *args, **kwargs):
1528 # pylint: disable=W0212
1529 assert self._queue_filelock is not None, "Queue should be open"
1530 return fn(self, *args, **kwargs)
1534 def _RequireNonDrainedQueue(fn):
1535 """Decorator checking for a non-drained queue.
1537 To be used with functions submitting new jobs.
1540 def wrapper(self, *args, **kwargs):
1541 """Wrapper function.
1543 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1546 # Ok when sharing the big job queue lock, as the drain file is created when
1547 # the lock is exclusive.
1548 # Needs access to protected member, pylint: disable=W0212
1550 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1552 if not self._accepting_jobs:
1553 raise errors.JobQueueError("Job queue is shutting down, refusing job")
1555 return fn(self, *args, **kwargs)
1559 class JobQueue(object):
1560 """Queue used to manage the jobs.
1563 def __init__(self, context):
1564 """Constructor for JobQueue.
1566 The constructor will initialize the job queue object and then
1567 start loading the current jobs from disk, either for starting them
1568 (if they were queue) or for aborting them (if they were already
1571 @type context: GanetiContext
1572 @param context: the context object for access to the configuration
1573 data and other ganeti objects
1576 self.context = context
1577 self._memcache = weakref.WeakValueDictionary()
1578 self._my_hostname = netutils.Hostname.GetSysName()
1580 # The Big JobQueue lock. If a code block or method acquires it in shared
1581 # mode safe it must guarantee concurrency with all the code acquiring it in
1582 # shared mode, including itself. In order not to acquire it at all
1583 # concurrency must be guaranteed with all code acquiring it in shared mode
1584 # and all code acquiring it exclusively.
1585 self._lock = locking.SharedLock("JobQueue")
1587 self.acquire = self._lock.acquire
1588 self.release = self._lock.release
1590 # Accept jobs by default
1591 self._accepting_jobs = True
1593 # Initialize the queue, and acquire the filelock.
1594 # This ensures no other process is working on the job queue.
1595 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1598 self._last_serial = jstore.ReadSerial()
1599 assert self._last_serial is not None, ("Serial file was modified between"
1600 " check in jstore and here")
1602 # Get initial list of nodes
1603 self._nodes = dict((n.name, n.primary_ip)
1604 for n in self.context.cfg.GetAllNodesInfo().values()
1605 if n.master_candidate)
1607 # Remove master node
1608 self._nodes.pop(self._my_hostname, None)
1610 # TODO: Check consistency across nodes
1612 self._queue_size = None
1613 self._UpdateQueueSizeUnlocked()
1614 assert ht.TInt(self._queue_size)
1615 self._drained = jstore.CheckDrainFlag()
1618 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1620 self.context.glm.AddToLockMonitor(self.depmgr)
1623 self._wpool = _JobQueueWorkerPool(self)
1625 self._InspectQueue()
1627 self._wpool.TerminateWorkers()
1630 @locking.ssynchronized(_LOCK)
1632 def _InspectQueue(self):
1633 """Loads the whole job queue and resumes unfinished jobs.
1635 This function needs the lock here because WorkerPool.AddTask() may start a
1636 job while we're still doing our work.
1639 logging.info("Inspecting job queue")
1643 all_job_ids = self._GetJobIDsUnlocked()
1644 jobs_count = len(all_job_ids)
1645 lastinfo = time.time()
1646 for idx, job_id in enumerate(all_job_ids):
1647 # Give an update every 1000 jobs or 10 seconds
1648 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1649 idx == (jobs_count - 1)):
1650 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1651 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1652 lastinfo = time.time()
1654 job = self._LoadJobUnlocked(job_id)
1656 # a failure in loading the job can cause 'None' to be returned
1660 status = job.CalcStatus()
1662 if status == constants.JOB_STATUS_QUEUED:
1663 restartjobs.append(job)
1665 elif status in (constants.JOB_STATUS_RUNNING,
1666 constants.JOB_STATUS_WAITING,
1667 constants.JOB_STATUS_CANCELING):
1668 logging.warning("Unfinished job %s found: %s", job.id, job)
1670 if status == constants.JOB_STATUS_WAITING:
1672 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1673 restartjobs.append(job)
1675 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1676 "Unclean master daemon shutdown")
1679 self.UpdateJobUnlocked(job)
1682 logging.info("Restarting %s jobs", len(restartjobs))
1683 self._EnqueueJobsUnlocked(restartjobs)
1685 logging.info("Job queue inspection finished")
1687 def _GetRpc(self, address_list):
1688 """Gets RPC runner with context.
1691 return rpc.JobQueueRunner(self.context, address_list)
1693 @locking.ssynchronized(_LOCK)
1695 def AddNode(self, node):
1696 """Register a new node with the queue.
1698 @type node: L{objects.Node}
1699 @param node: the node object to be added
1702 node_name = node.name
1703 assert node_name != self._my_hostname
1705 # Clean queue directory on added node
1706 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1707 msg = result.fail_msg
1709 logging.warning("Cannot cleanup queue directory on node %s: %s",
1712 if not node.master_candidate:
1713 # remove if existing, ignoring errors
1714 self._nodes.pop(node_name, None)
1715 # and skip the replication of the job ids
1718 # Upload the whole queue excluding archived jobs
1719 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1721 # Upload current serial file
1722 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1724 # Static address list
1725 addrs = [node.primary_ip]
1727 for file_name in files:
1729 content = utils.ReadFile(file_name)
1731 result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1733 msg = result[node_name].fail_msg
1735 logging.error("Failed to upload file %s to node %s: %s",
1736 file_name, node_name, msg)
1738 self._nodes[node_name] = node.primary_ip
1740 @locking.ssynchronized(_LOCK)
1742 def RemoveNode(self, node_name):
1743 """Callback called when removing nodes from the cluster.
1745 @type node_name: str
1746 @param node_name: the name of the node to remove
1749 self._nodes.pop(node_name, None)
1752 def _CheckRpcResult(result, nodes, failmsg):
1753 """Verifies the status of an RPC call.
1755 Since we aim to keep consistency should this node (the current
1756 master) fail, we will log errors if our rpc fail, and especially
1757 log the case when more than half of the nodes fails.
1759 @param result: the data as returned from the rpc call
1761 @param nodes: the list of nodes we made the call to
1763 @param failmsg: the identifier to be used for logging
1770 msg = result[node].fail_msg
1773 logging.error("RPC call %s (%s) failed on node %s: %s",
1774 result[node].call, failmsg, node, msg)
1776 success.append(node)
1778 # +1 for the master node
1779 if (len(success) + 1) < len(failed):
1780 # TODO: Handle failing nodes
1781 logging.error("More than half of the nodes failed")
1783 def _GetNodeIp(self):
1784 """Helper for returning the node name/ip list.
1786 @rtype: (list, list)
1787 @return: a tuple of two lists, the first one with the node
1788 names and the second one with the node addresses
1791 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1792 name_list = self._nodes.keys()
1793 addr_list = [self._nodes[name] for name in name_list]
1794 return name_list, addr_list
1796 def _UpdateJobQueueFile(self, file_name, data, replicate):
1797 """Writes a file locally and then replicates it to all nodes.
1799 This function will replace the contents of a file on the local
1800 node and then replicate it to all the other nodes we have.
1802 @type file_name: str
1803 @param file_name: the path of the file to be replicated
1805 @param data: the new contents of the file
1806 @type replicate: boolean
1807 @param replicate: whether to spread the changes to the remote nodes
1810 getents = runtime.GetEnts()
1811 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1812 gid=getents.masterd_gid)
1815 names, addrs = self._GetNodeIp()
1816 result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1817 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1819 def _RenameFilesUnlocked(self, rename):
1820 """Renames a file locally and then replicate the change.
1822 This function will rename a file in the local queue directory
1823 and then replicate this rename to all the other nodes we have.
1825 @type rename: list of (old, new)
1826 @param rename: List containing tuples mapping old to new names
1829 # Rename them locally
1830 for old, new in rename:
1831 utils.RenameFile(old, new, mkdir=True)
1833 # ... and on all nodes
1834 names, addrs = self._GetNodeIp()
1835 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1836 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1839 def _FormatJobID(job_id):
1840 """Convert a job ID to string format.
1842 Currently this just does C{str(job_id)} after performing some
1843 checks, but if we want to change the job id format this will
1844 abstract this change.
1846 @type job_id: int or long
1847 @param job_id: the numeric job id
1849 @return: the formatted job id
1852 if not isinstance(job_id, (int, long)):
1853 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1855 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1860 def _GetArchiveDirectory(cls, job_id):
1861 """Returns the archive directory for a job.
1864 @param job_id: Job identifier
1866 @return: Directory name
1869 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1871 def _NewSerialsUnlocked(self, count):
1872 """Generates a new job identifier.
1874 Job identifiers are unique during the lifetime of a cluster.
1876 @type count: integer
1877 @param count: how many serials to return
1879 @return: a string representing the job identifier.
1882 assert ht.TPositiveInt(count)
1885 serial = self._last_serial + count
1888 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1889 "%s\n" % serial, True)
1891 result = [self._FormatJobID(v)
1892 for v in range(self._last_serial + 1, serial + 1)]
1894 # Keep it only if we were able to write the file
1895 self._last_serial = serial
1897 assert len(result) == count
1902 def _GetJobPath(job_id):
1903 """Returns the job file for a given job id.
1906 @param job_id: the job identifier
1908 @return: the path to the job file
1911 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1914 def _GetArchivedJobPath(cls, job_id):
1915 """Returns the archived job file for a give job id.
1918 @param job_id: the job identifier
1920 @return: the path to the archived job file
1923 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1924 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1927 def _GetJobIDsUnlocked(sort=True):
1928 """Return all known job IDs.
1930 The method only looks at disk because it's a requirement that all
1931 jobs are present on disk (so in the _memcache we don't have any
1935 @param sort: perform sorting on the returned job ids
1937 @return: the list of job IDs
1941 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1942 m = constants.JOB_FILE_RE.match(filename)
1944 jlist.append(m.group(1))
1946 jlist = utils.NiceSort(jlist)
1949 def _LoadJobUnlocked(self, job_id):
1950 """Loads a job from the disk or memory.
1952 Given a job id, this will return the cached job object if
1953 existing, or try to load the job from the disk. If loading from
1954 disk, it will also add the job to the cache.
1956 @param job_id: the job id
1957 @rtype: L{_QueuedJob} or None
1958 @return: either None or the job object
1961 job = self._memcache.get(job_id, None)
1963 logging.debug("Found job %s in memcache", job_id)
1964 assert job.writable, "Found read-only job in memcache"
1968 job = self._LoadJobFromDisk(job_id, False)
1971 except errors.JobFileCorrupted:
1972 old_path = self._GetJobPath(job_id)
1973 new_path = self._GetArchivedJobPath(job_id)
1974 if old_path == new_path:
1975 # job already archived (future case)
1976 logging.exception("Can't parse job %s", job_id)
1979 logging.exception("Can't parse job %s, will archive.", job_id)
1980 self._RenameFilesUnlocked([(old_path, new_path)])
1983 assert job.writable, "Job just loaded is not writable"
1985 self._memcache[job_id] = job
1986 logging.debug("Added job %s to the cache", job_id)
1989 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1990 """Load the given job file from disk.
1992 Given a job file, read, load and restore it in a _QueuedJob format.
1994 @type job_id: string
1995 @param job_id: job identifier
1996 @type try_archived: bool
1997 @param try_archived: Whether to try loading an archived job
1998 @rtype: L{_QueuedJob} or None
1999 @return: either None or the job object
2002 path_functions = [(self._GetJobPath, True)]
2005 path_functions.append((self._GetArchivedJobPath, False))
2008 writable_default = None
2010 for (fn, writable_default) in path_functions:
2011 filepath = fn(job_id)
2012 logging.debug("Loading job from %s", filepath)
2014 raw_data = utils.ReadFile(filepath)
2015 except EnvironmentError, err:
2016 if err.errno != errno.ENOENT:
2024 if writable is None:
2025 writable = writable_default
2028 data = serializer.LoadJson(raw_data)
2029 job = _QueuedJob.Restore(self, data, writable)
2030 except Exception, err: # pylint: disable=W0703
2031 raise errors.JobFileCorrupted(err)
2035 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2036 """Load the given job file from disk.
2038 Given a job file, read, load and restore it in a _QueuedJob format.
2039 In case of error reading the job, it gets returned as None, and the
2040 exception is logged.
2042 @type job_id: string
2043 @param job_id: job identifier
2044 @type try_archived: bool
2045 @param try_archived: Whether to try loading an archived job
2046 @rtype: L{_QueuedJob} or None
2047 @return: either None or the job object
2051 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2052 except (errors.JobFileCorrupted, EnvironmentError):
2053 logging.exception("Can't load/parse job %s", job_id)
2056 def _UpdateQueueSizeUnlocked(self):
2057 """Update the queue size.
2060 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2062 @locking.ssynchronized(_LOCK)
2064 def SetDrainFlag(self, drain_flag):
2065 """Sets the drain flag for the queue.
2067 @type drain_flag: boolean
2068 @param drain_flag: Whether to set or unset the drain flag
2071 jstore.SetDrainFlag(drain_flag)
2073 self._drained = drain_flag
2078 def _SubmitJobUnlocked(self, job_id, ops):
2079 """Create and store a new job.
2081 This enters the job into our job queue and also puts it on the new
2082 queue, in order for it to be picked up by the queue processors.
2084 @type job_id: job ID
2085 @param job_id: the job ID for the new job
2087 @param ops: The list of OpCodes that will become the new job.
2088 @rtype: L{_QueuedJob}
2089 @return: the job object to be queued
2090 @raise errors.JobQueueFull: if the job queue has too many jobs in it
2091 @raise errors.GenericError: If an opcode is not valid
2094 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2095 raise errors.JobQueueFull()
2097 job = _QueuedJob(self, job_id, ops, True)
2100 for idx, op in enumerate(job.ops):
2101 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2102 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2103 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2104 " are %s" % (idx, op.priority, allowed))
2106 dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2107 if not opcodes.TNoRelativeJobDependencies(dependencies):
2108 raise errors.GenericError("Opcode %s has invalid dependencies, must"
2110 (idx, opcodes.TNoRelativeJobDependencies,
2114 self.UpdateJobUnlocked(job)
2116 self._queue_size += 1
2118 logging.debug("Adding new job %s to the cache", job_id)
2119 self._memcache[job_id] = job
2123 @locking.ssynchronized(_LOCK)
2125 @_RequireNonDrainedQueue
2126 def SubmitJob(self, ops):
2127 """Create and store a new job.
2129 @see: L{_SubmitJobUnlocked}
2132 (job_id, ) = self._NewSerialsUnlocked(1)
2133 self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2136 @locking.ssynchronized(_LOCK)
2138 @_RequireNonDrainedQueue
2139 def SubmitManyJobs(self, jobs):
2140 """Create and store multiple jobs.
2142 @see: L{_SubmitJobUnlocked}
2145 all_job_ids = self._NewSerialsUnlocked(len(jobs))
2147 (results, added_jobs) = \
2148 self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2150 self._EnqueueJobsUnlocked(added_jobs)
2155 def _FormatSubmitError(msg, ops):
2156 """Formats errors which occurred while submitting a job.
2159 return ("%s; opcodes %s" %
2160 (msg, utils.CommaJoin(op.Summary() for op in ops)))
2163 def _ResolveJobDependencies(resolve_fn, deps):
2164 """Resolves relative job IDs in dependencies.
2166 @type resolve_fn: callable
2167 @param resolve_fn: Function to resolve a relative job ID
2169 @param deps: Dependencies
2171 @return: Resolved dependencies
2176 for (dep_job_id, dep_status) in deps:
2177 if ht.TRelativeJobId(dep_job_id):
2178 assert ht.TInt(dep_job_id) and dep_job_id < 0
2180 job_id = resolve_fn(dep_job_id)
2183 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2187 result.append((job_id, dep_status))
2189 return (True, result)
2191 def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2192 """Create and store multiple jobs.
2194 @see: L{_SubmitJobUnlocked}
2200 def resolve_fn(job_idx, reljobid):
2202 return (previous_job_ids + job_ids[:job_idx])[reljobid]
2204 for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2206 if getattr(op, opcodes.DEPEND_ATTR, None):
2208 self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2211 # Abort resolving dependencies
2212 assert ht.TNonEmptyString(data), "No error message"
2214 # Use resolved dependencies
2218 job = self._SubmitJobUnlocked(job_id, ops)
2219 except errors.GenericError, err:
2221 data = self._FormatSubmitError(str(err), ops)
2225 added_jobs.append(job)
2227 results.append((status, data))
2229 return (results, added_jobs)
2231 @locking.ssynchronized(_LOCK)
2232 def _EnqueueJobs(self, jobs):
2233 """Helper function to add jobs to worker pool's queue.
2236 @param jobs: List of all jobs
2239 return self._EnqueueJobsUnlocked(jobs)
2241 def _EnqueueJobsUnlocked(self, jobs):
2242 """Helper function to add jobs to worker pool's queue.
2245 @param jobs: List of all jobs
2248 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2249 self._wpool.AddManyTasks([(job, ) for job in jobs],
2250 priority=[job.CalcPriority() for job in jobs])
2252 def _GetJobStatusForDependencies(self, job_id):
2253 """Gets the status of a job for dependencies.
2255 @type job_id: string
2256 @param job_id: Job ID
2257 @raise errors.JobLost: If job can't be found
2260 if not isinstance(job_id, basestring):
2261 job_id = self._FormatJobID(job_id)
2263 # Not using in-memory cache as doing so would require an exclusive lock
2265 # Try to load from disk
2266 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2268 assert not job.writable, "Got writable job" # pylint: disable=E1101
2271 return job.CalcStatus()
2273 raise errors.JobLost("Job %s not found" % job_id)
2276 def UpdateJobUnlocked(self, job, replicate=True):
2277 """Update a job's on disk storage.
2279 After a job has been modified, this function needs to be called in
2280 order to write the changes to disk and replicate them to the other
2283 @type job: L{_QueuedJob}
2284 @param job: the changed job
2285 @type replicate: boolean
2286 @param replicate: whether to replicate the change to remote nodes
2290 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2291 assert (finalized ^ (job.end_timestamp is None))
2292 assert job.writable, "Can't update read-only job"
2294 filename = self._GetJobPath(job.id)
2295 data = serializer.DumpJson(job.Serialize())
2296 logging.debug("Writing job %s to %s", job.id, filename)
2297 self._UpdateJobQueueFile(filename, data, replicate)
2299 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2301 """Waits for changes in a job.
2303 @type job_id: string
2304 @param job_id: Job identifier
2305 @type fields: list of strings
2306 @param fields: Which fields to check for changes
2307 @type prev_job_info: list or None
2308 @param prev_job_info: Last job information returned
2309 @type prev_log_serial: int
2310 @param prev_log_serial: Last job message serial number
2311 @type timeout: float
2312 @param timeout: maximum time to wait in seconds
2313 @rtype: tuple (job info, log entries)
2314 @return: a tuple of the job information as required via
2315 the fields parameter, and the log entries as a list
2317 if the job has not changed and the timeout has expired,
2318 we instead return a special value,
2319 L{constants.JOB_NOTCHANGED}, which should be interpreted
2320 as such by the clients
2323 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2326 helper = _WaitForJobChangesHelper()
2328 return helper(self._GetJobPath(job_id), load_fn,
2329 fields, prev_job_info, prev_log_serial, timeout)
2331 @locking.ssynchronized(_LOCK)
2333 def CancelJob(self, job_id):
2336 This will only succeed if the job has not started yet.
2338 @type job_id: string
2339 @param job_id: job ID of job to be cancelled.
2342 logging.info("Cancelling job %s", job_id)
2344 job = self._LoadJobUnlocked(job_id)
2346 logging.debug("Job %s not found", job_id)
2347 return (False, "Job %s not found" % job_id)
2349 assert job.writable, "Can't cancel read-only job"
2351 (success, msg) = job.Cancel()
2354 # If the job was finalized (e.g. cancelled), this is the final write
2355 # allowed. The job can be archived anytime.
2356 self.UpdateJobUnlocked(job)
2358 return (success, msg)
2361 def _ArchiveJobsUnlocked(self, jobs):
2364 @type jobs: list of L{_QueuedJob}
2365 @param jobs: Job objects
2367 @return: Number of archived jobs
2373 assert job.writable, "Can't archive read-only job"
2375 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2376 logging.debug("Job %s is not yet done", job.id)
2379 archive_jobs.append(job)
2381 old = self._GetJobPath(job.id)
2382 new = self._GetArchivedJobPath(job.id)
2383 rename_files.append((old, new))
2385 # TODO: What if 1..n files fail to rename?
2386 self._RenameFilesUnlocked(rename_files)
2388 logging.debug("Successfully archived job(s) %s",
2389 utils.CommaJoin(job.id for job in archive_jobs))
2391 # Since we haven't quite checked, above, if we succeeded or failed renaming
2392 # the files, we update the cached queue size from the filesystem. When we
2393 # get around to fix the TODO: above, we can use the number of actually
2394 # archived jobs to fix this.
2395 self._UpdateQueueSizeUnlocked()
2396 return len(archive_jobs)
2398 @locking.ssynchronized(_LOCK)
2400 def ArchiveJob(self, job_id):
2403 This is just a wrapper over L{_ArchiveJobsUnlocked}.
2405 @type job_id: string
2406 @param job_id: Job ID of job to be archived.
2408 @return: Whether job was archived
2411 logging.info("Archiving job %s", job_id)
2413 job = self._LoadJobUnlocked(job_id)
2415 logging.debug("Job %s not found", job_id)
2418 return self._ArchiveJobsUnlocked([job]) == 1
2420 @locking.ssynchronized(_LOCK)
2422 def AutoArchiveJobs(self, age, timeout):
2423 """Archives all jobs based on age.
2425 The method will archive all jobs which are older than the age
2426 parameter. For jobs that don't have an end timestamp, the start
2427 timestamp will be considered. The special '-1' age will cause
2428 archival of all jobs (that are not running or queued).
2431 @param age: the minimum age in seconds
2434 logging.info("Archiving jobs with age more than %s seconds", age)
2437 end_time = now + timeout
2441 all_job_ids = self._GetJobIDsUnlocked()
2443 for idx, job_id in enumerate(all_job_ids):
2444 last_touched = idx + 1
2446 # Not optimal because jobs could be pending
2447 # TODO: Measure average duration for job archival and take number of
2448 # pending jobs into account.
2449 if time.time() > end_time:
2452 # Returns None if the job failed to load
2453 job = self._LoadJobUnlocked(job_id)
2455 if job.end_timestamp is None:
2456 if job.start_timestamp is None:
2457 job_age = job.received_timestamp
2459 job_age = job.start_timestamp
2461 job_age = job.end_timestamp
2463 if age == -1 or now - job_age[0] > age:
2466 # Archive 10 jobs at a time
2467 if len(pending) >= 10:
2468 archived_count += self._ArchiveJobsUnlocked(pending)
2472 archived_count += self._ArchiveJobsUnlocked(pending)
2474 return (archived_count, len(all_job_ids) - last_touched)
2476 def _Query(self, fields, qfilter):
2477 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2480 job_ids = qobj.RequestedNames()
2482 list_all = (job_ids is None)
2485 # Since files are added to/removed from the queue atomically, there's no
2486 # risk of getting the job ids in an inconsistent state.
2487 job_ids = self._GetJobIDsUnlocked()
2491 for job_id in job_ids:
2492 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2493 if job is not None or not list_all:
2494 jobs.append((job_id, job))
2496 return (qobj, jobs, list_all)
2498 def QueryJobs(self, fields, qfilter):
2499 """Returns a list of jobs in queue.
2501 @type fields: sequence
2502 @param fields: List of wanted fields
2503 @type qfilter: None or query2 filter (list)
2504 @param qfilter: Query filter
2507 (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2509 return query.GetQueryResponse(qobj, ctx, sort_by_name=sort_by_name)
2511 def OldStyleQueryJobs(self, job_ids, fields):
2512 """Returns a list of jobs in queue.
2515 @param job_ids: sequence of job identifiers or None for all
2517 @param fields: names of fields to return
2519 @return: list one element per job, each element being list with
2520 the requested fields
2523 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2525 (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2527 return qobj.OldStyleQuery(ctx, sort_by_name=sort_by_name)
2529 @locking.ssynchronized(_LOCK)
2530 def PrepareShutdown(self):
2531 """Prepare to stop the job queue.
2533 Disables execution of jobs in the workerpool and returns whether there are
2534 any jobs currently running. If the latter is the case, the job queue is not
2535 yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2536 be called without interfering with any job. Queued and unfinished jobs will
2537 be resumed next time.
2539 Once this function has been called no new job submissions will be accepted
2540 (see L{_RequireNonDrainedQueue}).
2543 @return: Whether there are any running jobs
2546 if self._accepting_jobs:
2547 self._accepting_jobs = False
2549 # Tell worker pool to stop processing pending tasks
2550 self._wpool.SetActive(False)
2552 return self._wpool.HasRunningTasks()
2554 def AcceptingJobsUnlocked(self):
2555 """Returns whether jobs are accepted.
2557 Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2558 queue is shutting down.
2563 return self._accepting_jobs
2565 @locking.ssynchronized(_LOCK)
2568 """Stops the job queue.
2570 This shutdowns all the worker threads an closes the queue.
2573 self._wpool.TerminateWorkers()
2575 self._queue_filelock.Close()
2576 self._queue_filelock = None