X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/7b5c4a693b48f52db43f835f6740201c5e23a251..090377807b5214b3ae4a6bfe294d94df3eb5d6df:/lib/jqueue.py?ds=sidebyside diff --git a/lib/jqueue.py b/lib/jqueue.py index c2d1a2b..34ac6a7 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -29,15 +29,15 @@ used by all other classes in this module. """ -import os import logging import errno -import re import time import weakref +import threading +import itertools try: - # pylint: disable-msg=E0611 + # pylint: disable=E0611 from pyinotify import pyinotify except ImportError: import pyinotify @@ -56,6 +56,7 @@ from ganeti import rpc from ganeti import runtime from ganeti import netutils from ganeti import compat +from ganeti import ht JOBQUEUE_THREADS = 25 @@ -173,14 +174,15 @@ class _QueuedJob(object): @ivar received_timestamp: the timestamp for when the job was received @ivar start_timestmap: the timestamp for start of execution @ivar end_timestamp: the timestamp for end of execution + @ivar writable: Whether the job is allowed to be modified """ - # pylint: disable-msg=W0212 - __slots__ = ["queue", "id", "ops", "log_serial", + # pylint: disable=W0212 + __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", "received_timestamp", "start_timestamp", "end_timestamp", - "__weakref__"] + "__weakref__", "processor_lock", "writable"] - def __init__(self, queue, job_id, ops): + def __init__(self, queue, job_id, ops, writable): """Constructor for the _QueuedJob. @type queue: L{JobQueue} @@ -190,6 +192,8 @@ class _QueuedJob(object): @type ops: list @param ops: the list of opcodes we hold, which will be encapsulated in _QueuedOpCodes + @type writable: bool + @param writable: Whether job can be modified """ if not ops: @@ -203,6 +207,23 @@ class _QueuedJob(object): self.start_timestamp = None self.end_timestamp = None + self._InitInMemory(self, writable) + + @staticmethod + def _InitInMemory(obj, writable): + """Initializes in-memory variables. + + """ + obj.writable = writable + obj.ops_iter = None + obj.cur_opctx = None + + # Read-only jobs are not processed and therefore don't need a lock + if writable: + obj.processor_lock = threading.Lock() + else: + obj.processor_lock = None + def __repr__(self): status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), "id=%s" % self.id, @@ -211,13 +232,15 @@ class _QueuedJob(object): return "<%s at %#x>" % (" ".join(status), id(self)) @classmethod - def Restore(cls, queue, state): + def Restore(cls, queue, state, writable): """Restore a _QueuedJob from serialized state: @type queue: L{JobQueue} @param queue: to which queue the restored job belongs @type state: dict @param state: the serialized state + @type writable: bool + @param writable: Whether job can be modified @rtype: _JobQueue @return: the restored _JobQueue instance @@ -237,6 +260,8 @@ class _QueuedJob(object): obj.log_serial = max(obj.log_serial, log_entry[0]) obj.ops.append(op) + cls._InitInMemory(obj, writable) + return obj def Serialize(self): @@ -287,8 +312,8 @@ class _QueuedJob(object): if op.status == constants.OP_STATUS_QUEUED: pass - elif op.status == constants.OP_STATUS_WAITLOCK: - status = constants.JOB_STATUS_WAITLOCK + elif op.status == constants.OP_STATUS_WAITING: + status = constants.JOB_STATUS_WAITING elif op.status == constants.OP_STATUS_RUNNING: status = constants.JOB_STATUS_RUNNING elif op.status == constants.OP_STATUS_CANCELING: @@ -364,6 +389,8 @@ class _QueuedJob(object): row.append(self.id) elif fname == "status": row.append(self.CalcStatus()) + elif fname == "priority": + row.append(self.CalcPriority()) elif fname == "ops": row.append([op.input.__getstate__() for op in self.ops]) elif fname == "opresult": @@ -378,6 +405,8 @@ class _QueuedJob(object): row.append([op.exec_timestamp for op in self.ops]) elif fname == "opend": row.append([op.end_timestamp for op in self.ops]) + elif fname == "oppriority": + row.append([op.priority for op in self.ops]) elif fname == "received_ts": row.append(self.received_timestamp) elif fname == "start_ts": @@ -410,6 +439,12 @@ class _QueuedJob(object): op.result = result not_marked = False + def Finalize(self): + """Marks the job as finalized. + + """ + self.end_timestamp = TimeStampNow() + def Cancel(self): """Marks job as canceled/-ing if possible. @@ -420,22 +455,20 @@ class _QueuedJob(object): """ status = self.CalcStatus() - if status not in (constants.JOB_STATUS_QUEUED, - constants.JOB_STATUS_WAITLOCK): - logging.debug("Job %s is no longer waiting in the queue", self.id) - return (False, "Job %s is no longer waiting in the queue" % self.id) - if status == constants.JOB_STATUS_QUEUED: self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, "Job canceled by request") - msg = "Job %s canceled" % self.id + self.Finalize() + return (True, "Job %s canceled" % self.id) - elif status == constants.JOB_STATUS_WAITLOCK: + elif status == constants.JOB_STATUS_WAITING: # The worker will notice the new status and cancel the job self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) - msg = "Job %s will be canceled" % self.id + return (True, "Job %s will be canceled" % self.id) - return (True, msg) + else: + logging.debug("Job %s is no longer waiting in the queue", self.id) + return (False, "Job %s is no longer waiting in the queue" % self.id) class _OpExecCallbacks(mcpu.OpExecCbBase): @@ -474,11 +507,11 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): This is called from the mcpu code as a notifier function, when the LU is finally about to start the Exec() method. Of course, to have end-user visible results, the opcode must be initially (before calling into - Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. + Processor.ExecOpCode) set to OP_STATUS_WAITING. """ assert self._op in self._job.ops - assert self._op.status in (constants.OP_STATUS_WAITLOCK, + assert self._op.status in (constants.OP_STATUS_WAITING, constants.OP_STATUS_CANCELING) # Cancel here if we were asked to @@ -522,12 +555,21 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): """Check whether job has been cancelled. """ - assert self._op.status in (constants.OP_STATUS_WAITLOCK, + assert self._op.status in (constants.OP_STATUS_WAITING, constants.OP_STATUS_CANCELING) # Cancel here if we were asked to self._CheckCancel() + def SubmitManyJobs(self, jobs): + """Submits jobs for processing. + + See L{JobQueue.SubmitManyJobs}. + + """ + # Locking is done in job queue + return self._queue.SubmitManyJobs(jobs) + class _JobChangesChecker(object): def __init__(self, fields, prev_job_info, prev_log_serial): @@ -552,6 +594,8 @@ class _JobChangesChecker(object): @param job: Job object """ + assert not job.writable, "Expected read-only job" + status = job.CalcStatus() job_info = job.GetInfo(self._fields) log_entries = job.GetLogEntries(self._prev_log_serial) @@ -572,7 +616,7 @@ class _JobChangesChecker(object): # no changes. if (status not in (constants.JOB_STATUS_QUEUED, constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITLOCK) or + constants.JOB_STATUS_WAITING) or job_info != self._prev_job_info or (log_entries and self._prev_log_serial != log_entries[0][0])): logging.debug("Job %s changed", job.id) @@ -677,7 +721,13 @@ class _WaitForJobChangesHelper(object): """ @staticmethod - def _CheckForChanges(job_load_fn, check_fn): + def _CheckForChanges(counter, job_load_fn, check_fn): + if counter.next() > 0: + # If this isn't the first check the job is given some more time to change + # again. This gives better performance for jobs generating many + # changes/messages. + time.sleep(0.1) + job = job_load_fn() if not job: raise errors.JobLost() @@ -706,12 +756,13 @@ class _WaitForJobChangesHelper(object): @param timeout: maximum time to wait in seconds """ + counter = itertools.count() try: check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) waiter = _JobChangesWaiter(filename) try: return utils.Retry(compat.partial(self._CheckForChanges, - job_load_fn, check_fn), + counter, job_load_fn, check_fn), utils.RETRY_REMAINING_TIME, timeout, wait_fn=waiter.Wait) finally: @@ -734,132 +785,553 @@ def _EncodeOpError(err): return errors.EncodeException(to_encode) +class _TimeoutStrategyWrapper: + def __init__(self, fn): + """Initializes this class. + + """ + self._fn = fn + self._next = None + + def _Advance(self): + """Gets the next timeout if necessary. + + """ + if self._next is None: + self._next = self._fn() + + def Peek(self): + """Returns the next timeout. + + """ + self._Advance() + return self._next + + def Next(self): + """Returns the current timeout and advances the internal state. + + """ + self._Advance() + result = self._next + self._next = None + return result + + +class _OpExecContext: + def __init__(self, op, index, log_prefix, timeout_strategy_factory): + """Initializes this class. + + """ + self.op = op + self.index = index + self.log_prefix = log_prefix + self.summary = op.input.Summary() + + # Create local copy to modify + if getattr(op.input, opcodes.DEPEND_ATTR, None): + self.jobdeps = op.input.depends[:] + else: + self.jobdeps = None + + self._timeout_strategy_factory = timeout_strategy_factory + self._ResetTimeoutStrategy() + + def _ResetTimeoutStrategy(self): + """Creates a new timeout strategy. + + """ + self._timeout_strategy = \ + _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt) + + def CheckPriorityIncrease(self): + """Checks whether priority can and should be increased. + + Called when locks couldn't be acquired. + + """ + op = self.op + + # Exhausted all retries and next round should not use blocking acquire + # for locks? + if (self._timeout_strategy.Peek() is None and + op.priority > constants.OP_PRIO_HIGHEST): + logging.debug("Increasing priority") + op.priority -= 1 + self._ResetTimeoutStrategy() + return True + + return False + + def GetNextLockTimeout(self): + """Returns the next lock acquire timeout. + + """ + return self._timeout_strategy.Next() + + +class _JobProcessor(object): + (DEFER, + WAITDEP, + FINISHED) = range(1, 4) + + def __init__(self, queue, opexec_fn, job, + _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy): + """Initializes this class. + + """ + self.queue = queue + self.opexec_fn = opexec_fn + self.job = job + self._timeout_strategy_factory = _timeout_strategy_factory + + @staticmethod + def _FindNextOpcode(job, timeout_strategy_factory): + """Locates the next opcode to run. + + @type job: L{_QueuedJob} + @param job: Job object + @param timeout_strategy_factory: Callable to create new timeout strategy + + """ + # Create some sort of a cache to speed up locating next opcode for future + # lookups + # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for + # pending and one for processed ops. + if job.ops_iter is None: + job.ops_iter = enumerate(job.ops) + + # Find next opcode to run + while True: + try: + (idx, op) = job.ops_iter.next() + except StopIteration: + raise errors.ProgrammerError("Called for a finished job") + + if op.status == constants.OP_STATUS_RUNNING: + # Found an opcode already marked as running + raise errors.ProgrammerError("Called for job marked as running") + + opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), + timeout_strategy_factory) + + if op.status not in constants.OPS_FINALIZED: + return opctx + + # This is a job that was partially completed before master daemon + # shutdown, so it can be expected that some opcodes are already + # completed successfully (if any did error out, then the whole job + # should have been aborted and not resubmitted for processing). + logging.info("%s: opcode %s already processed, skipping", + opctx.log_prefix, opctx.summary) + + @staticmethod + def _MarkWaitlock(job, op): + """Marks an opcode as waiting for locks. + + The job's start timestamp is also set if necessary. + + @type job: L{_QueuedJob} + @param job: Job object + @type op: L{_QueuedOpCode} + @param op: Opcode object + + """ + assert op in job.ops + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITING) + + update = False + + op.result = None + + if op.status == constants.OP_STATUS_QUEUED: + op.status = constants.OP_STATUS_WAITING + update = True + + if op.start_timestamp is None: + op.start_timestamp = TimeStampNow() + update = True + + if job.start_timestamp is None: + job.start_timestamp = op.start_timestamp + update = True + + assert op.status == constants.OP_STATUS_WAITING + + return update + + @staticmethod + def _CheckDependencies(queue, job, opctx): + """Checks if an opcode has dependencies and if so, processes them. + + @type queue: L{JobQueue} + @param queue: Queue object + @type job: L{_QueuedJob} + @param job: Job object + @type opctx: L{_OpExecContext} + @param opctx: Opcode execution context + @rtype: bool + @return: Whether opcode will be re-scheduled by dependency tracker + + """ + op = opctx.op + + result = False + + while opctx.jobdeps: + (dep_job_id, dep_status) = opctx.jobdeps[0] + + (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id, + dep_status) + assert ht.TNonEmptyString(depmsg), "No dependency message" + + logging.info("%s: %s", opctx.log_prefix, depmsg) + + if depresult == _JobDependencyManager.CONTINUE: + # Remove dependency and continue + opctx.jobdeps.pop(0) + + elif depresult == _JobDependencyManager.WAIT: + # Need to wait for notification, dependency tracker will re-add job + # to workerpool + result = True + break + + elif depresult == _JobDependencyManager.CANCEL: + # Job was cancelled, cancel this job as well + job.Cancel() + assert op.status == constants.OP_STATUS_CANCELING + break + + elif depresult in (_JobDependencyManager.WRONGSTATUS, + _JobDependencyManager.ERROR): + # Job failed or there was an error, this job must fail + op.status = constants.OP_STATUS_ERROR + op.result = _EncodeOpError(errors.OpExecError(depmsg)) + break + + else: + raise errors.ProgrammerError("Unknown dependency result '%s'" % + depresult) + + return result + + def _ExecOpCodeUnlocked(self, opctx): + """Processes one opcode and returns the result. + + """ + op = opctx.op + + assert op.status == constants.OP_STATUS_WAITING + + timeout = opctx.GetNextLockTimeout() + + try: + # Make sure not to hold queue lock while calling ExecOpCode + result = self.opexec_fn(op.input, + _OpExecCallbacks(self.queue, self.job, op), + timeout=timeout, priority=op.priority) + except mcpu.LockAcquireTimeout: + assert timeout is not None, "Received timeout for blocking acquire" + logging.debug("Couldn't acquire locks in %0.6fs", timeout) + + assert op.status in (constants.OP_STATUS_WAITING, + constants.OP_STATUS_CANCELING) + + # Was job cancelled while we were waiting for the lock? + if op.status == constants.OP_STATUS_CANCELING: + return (constants.OP_STATUS_CANCELING, None) + + # Stay in waitlock while trying to re-acquire lock + return (constants.OP_STATUS_WAITING, None) + except CancelJob: + logging.exception("%s: Canceling job", opctx.log_prefix) + assert op.status == constants.OP_STATUS_CANCELING + return (constants.OP_STATUS_CANCELING, None) + except Exception, err: # pylint: disable=W0703 + logging.exception("%s: Caught exception in %s", + opctx.log_prefix, opctx.summary) + return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) + else: + logging.debug("%s: %s successful", + opctx.log_prefix, opctx.summary) + return (constants.OP_STATUS_SUCCESS, result) + + def __call__(self, _nextop_fn=None): + """Continues execution of a job. + + @param _nextop_fn: Callback function for tests + @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should + be deferred and C{WAITDEP} if the dependency manager + (L{_JobDependencyManager}) will re-schedule the job when appropriate + + """ + queue = self.queue + job = self.job + + logging.debug("Processing job %s", job.id) + + queue.acquire(shared=1) + try: + opcount = len(job.ops) + + assert job.writable, "Expected writable job" + + # Don't do anything for finalized jobs + if job.CalcStatus() in constants.JOBS_FINALIZED: + return self.FINISHED + + # Is a previous opcode still pending? + if job.cur_opctx: + opctx = job.cur_opctx + job.cur_opctx = None + else: + if __debug__ and _nextop_fn: + _nextop_fn() + opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) + + op = opctx.op + + # Consistency check + assert compat.all(i.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_CANCELING) + for i in job.ops[opctx.index + 1:]) + + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITING, + constants.OP_STATUS_CANCELING) + + assert (op.priority <= constants.OP_PRIO_LOWEST and + op.priority >= constants.OP_PRIO_HIGHEST) + + waitjob = None + + if op.status != constants.OP_STATUS_CANCELING: + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITING) + + # Prepare to start opcode + if self._MarkWaitlock(job, op): + # Write to disk + queue.UpdateJobUnlocked(job) + + assert op.status == constants.OP_STATUS_WAITING + assert job.CalcStatus() == constants.JOB_STATUS_WAITING + assert job.start_timestamp and op.start_timestamp + assert waitjob is None + + # Check if waiting for a job is necessary + waitjob = self._CheckDependencies(queue, job, opctx) + + assert op.status in (constants.OP_STATUS_WAITING, + constants.OP_STATUS_CANCELING, + constants.OP_STATUS_ERROR) + + if not (waitjob or op.status in (constants.OP_STATUS_CANCELING, + constants.OP_STATUS_ERROR)): + logging.info("%s: opcode %s waiting for locks", + opctx.log_prefix, opctx.summary) + + assert not opctx.jobdeps, "Not all dependencies were removed" + + queue.release() + try: + (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) + finally: + queue.acquire(shared=1) + + op.status = op_status + op.result = op_result + + assert not waitjob + + if op.status == constants.OP_STATUS_WAITING: + # Couldn't get locks in time + assert not op.end_timestamp + else: + # Finalize opcode + op.end_timestamp = TimeStampNow() + + if op.status == constants.OP_STATUS_CANCELING: + assert not compat.any(i.status != constants.OP_STATUS_CANCELING + for i in job.ops[opctx.index:]) + else: + assert op.status in constants.OPS_FINALIZED + + if op.status == constants.OP_STATUS_WAITING or waitjob: + finalize = False + + if not waitjob and opctx.CheckPriorityIncrease(): + # Priority was changed, need to update on-disk file + queue.UpdateJobUnlocked(job) + + # Keep around for another round + job.cur_opctx = opctx + + assert (op.priority <= constants.OP_PRIO_LOWEST and + op.priority >= constants.OP_PRIO_HIGHEST) + + # In no case must the status be finalized here + assert job.CalcStatus() == constants.JOB_STATUS_WAITING + + else: + # Ensure all opcodes so far have been successful + assert (opctx.index == 0 or + compat.all(i.status == constants.OP_STATUS_SUCCESS + for i in job.ops[:opctx.index])) + + # Reset context + job.cur_opctx = None + + if op.status == constants.OP_STATUS_SUCCESS: + finalize = False + + elif op.status == constants.OP_STATUS_ERROR: + # Ensure failed opcode has an exception as its result + assert errors.GetEncodedError(job.ops[opctx.index].result) + + to_encode = errors.OpExecError("Preceding opcode failed") + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + _EncodeOpError(to_encode)) + finalize = True + + # Consistency check + assert compat.all(i.status == constants.OP_STATUS_ERROR and + errors.GetEncodedError(i.result) + for i in job.ops[opctx.index:]) + + elif op.status == constants.OP_STATUS_CANCELING: + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") + finalize = True + + else: + raise errors.ProgrammerError("Unknown status '%s'" % op.status) + + if opctx.index == (opcount - 1): + # Finalize on last opcode + finalize = True + + if finalize: + # All opcodes have been run, finalize job + job.Finalize() + + # Write to disk. If the job status is final, this is the final write + # allowed. Once the file has been written, it can be archived anytime. + queue.UpdateJobUnlocked(job) + + assert not waitjob + + if finalize: + logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) + return self.FINISHED + + assert not waitjob or queue.depmgr.JobWaiting(job) + + if waitjob: + return self.WAITDEP + else: + return self.DEFER + finally: + assert job.writable, "Job became read-only while being processed" + queue.release() + + +def _EvaluateJobProcessorResult(depmgr, job, result): + """Looks at a result from L{_JobProcessor} for a job. + + To be used in a L{_JobQueueWorker}. + + """ + if result == _JobProcessor.FINISHED: + # Notify waiting jobs + depmgr.NotifyWaiters(job.id) + + elif result == _JobProcessor.DEFER: + # Schedule again + raise workerpool.DeferTask(priority=job.CalcPriority()) + + elif result == _JobProcessor.WAITDEP: + # No-op, dependency manager will re-schedule + pass + + else: + raise errors.ProgrammerError("Job processor returned unknown status %s" % + (result, )) + + class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. """ - def RunTask(self, job): # pylint: disable-msg=W0221 + def RunTask(self, job): # pylint: disable=W0221 """Job executor. - This functions processes a job. It is closely tied to the _QueuedJob and - _QueuedOpCode classes. - @type job: L{_QueuedJob} @param job: the job to be processed """ - self.SetTaskName("Job%s" % job.id) + assert job.writable, "Expected writable job" - logging.info("Processing job %s", job.id) - proc = mcpu.Processor(self.pool.queue.context, job.id) - queue = job.queue + # Ensure only one worker is active on a single job. If a job registers for + # a dependency job, and the other job notifies before the first worker is + # done, the job can end up in the tasklist more than once. + job.processor_lock.acquire() try: - try: - count = len(job.ops) - for idx, op in enumerate(job.ops): - op_summary = op.input.Summary() - if op.status == constants.OP_STATUS_SUCCESS: - # this is a job that was partially completed before master - # daemon shutdown, so it can be expected that some opcodes - # are already completed successfully (if any did error - # out, then the whole job should have been aborted and not - # resubmitted for processing) - logging.info("Op %s/%s: opcode %s already processed, skipping", - idx + 1, count, op_summary) - continue - try: - logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, - op_summary) + return self._RunTaskInner(job) + finally: + job.processor_lock.release() - queue.acquire(shared=1) - try: - if op.status == constants.OP_STATUS_CANCELED: - logging.debug("Canceling opcode") - raise CancelJob() - assert op.status == constants.OP_STATUS_QUEUED - logging.debug("Opcode %s/%s waiting for locks", - idx + 1, count) - op.status = constants.OP_STATUS_WAITLOCK - op.result = None - op.start_timestamp = TimeStampNow() - if idx == 0: # first opcode - job.start_timestamp = op.start_timestamp - queue.UpdateJobUnlocked(job) - - input_opcode = op.input - finally: - queue.release() - - # Make sure not to hold queue lock while calling ExecOpCode - result = proc.ExecOpCode(input_opcode, - _OpExecCallbacks(queue, job, op)) + def _RunTaskInner(self, job): + """Executes a job. - queue.acquire(shared=1) - try: - logging.debug("Opcode %s/%s succeeded", idx + 1, count) - op.status = constants.OP_STATUS_SUCCESS - op.result = result - op.end_timestamp = TimeStampNow() - if idx == count - 1: - job.end_timestamp = TimeStampNow() - - # Consistency check - assert compat.all(i.status == constants.OP_STATUS_SUCCESS - for i in job.ops) - - queue.UpdateJobUnlocked(job) - finally: - queue.release() - - logging.info("Op %s/%s: Successfully finished opcode %s", - idx + 1, count, op_summary) - except CancelJob: - # Will be handled further up - raise - except Exception, err: - queue.acquire(shared=1) - try: - try: - logging.debug("Opcode %s/%s failed", idx + 1, count) - op.status = constants.OP_STATUS_ERROR - op.result = _EncodeOpError(err) - op.end_timestamp = TimeStampNow() - logging.info("Op %s/%s: Error in opcode %s: %s", - idx + 1, count, op_summary, err) - - to_encode = errors.OpExecError("Preceding opcode failed") - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, - _EncodeOpError(to_encode)) - - # Consistency check - assert compat.all(i.status == constants.OP_STATUS_SUCCESS - for i in job.ops[:idx]) - assert compat.all(i.status == constants.OP_STATUS_ERROR and - errors.GetEncodedError(i.result) - for i in job.ops[idx:]) - finally: - job.end_timestamp = TimeStampNow() - queue.UpdateJobUnlocked(job) - finally: - queue.release() - raise - - except CancelJob: - queue.acquire(shared=1) - try: - job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, - "Job canceled by request") - job.end_timestamp = TimeStampNow() - queue.UpdateJobUnlocked(job) - finally: - queue.release() - except errors.GenericError, err: - logging.exception("Ganeti exception") - except: - logging.exception("Unhandled exception") + Must be called with per-job lock acquired. + + """ + queue = job.queue + assert queue == self.pool.queue + + setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op)) + setname_fn(None) + + proc = mcpu.Processor(queue.context, job.id) + + # Create wrapper for setting thread name + wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, + proc.ExecOpCode) + + _EvaluateJobProcessorResult(queue.depmgr, job, + _JobProcessor(queue, wrap_execop_fn, job)()) + + @staticmethod + def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs): + """Updates the worker thread name to include a short summary of the opcode. + + @param setname_fn: Callable setting worker thread name + @param execop_fn: Callable for executing opcode (usually + L{mcpu.Processor.ExecOpCode}) + + """ + setname_fn(op) + try: + return execop_fn(op, *args, **kwargs) finally: - status = job.CalcStatus() - logging.info("Finished job %s, status = %s", job.id, status) + setname_fn(None) + + @staticmethod + def _GetWorkerName(job, op): + """Sets the worker thread name. + + @type job: L{_QueuedJob} + @type op: L{opcodes.OpCode} + + """ + parts = ["Job%s" % job.id] + + if op: + parts.append(op.TinySummary()) + + return "/".join(parts) class _JobQueueWorkerPool(workerpool.WorkerPool): @@ -867,12 +1339,148 @@ class _JobQueueWorkerPool(workerpool.WorkerPool): """ def __init__(self, queue): - super(_JobQueueWorkerPool, self).__init__("JobQueue", + super(_JobQueueWorkerPool, self).__init__("Jq", JOBQUEUE_THREADS, _JobQueueWorker) self.queue = queue +class _JobDependencyManager: + """Keeps track of job dependencies. + + """ + (WAIT, + ERROR, + CANCEL, + CONTINUE, + WRONGSTATUS) = range(1, 6) + + def __init__(self, getstatus_fn, enqueue_fn): + """Initializes this class. + + """ + self._getstatus_fn = getstatus_fn + self._enqueue_fn = enqueue_fn + + self._waiters = {} + self._lock = locking.SharedLock("JobDepMgr") + + @locking.ssynchronized(_LOCK, shared=1) + def GetLockInfo(self, requested): # pylint: disable=W0613 + """Retrieves information about waiting jobs. + + @type requested: set + @param requested: Requested information, see C{query.LQ_*} + + """ + # No need to sort here, that's being done by the lock manager and query + # library. There are no priorities for notifying jobs, hence all show up as + # one item under "pending". + return [("job/%s" % job_id, None, None, + [("job", [job.id for job in waiters])]) + for job_id, waiters in self._waiters.items() + if waiters] + + @locking.ssynchronized(_LOCK, shared=1) + def JobWaiting(self, job): + """Checks if a job is waiting. + + """ + return compat.any(job in jobs + for jobs in self._waiters.values()) + + @locking.ssynchronized(_LOCK) + def CheckAndRegister(self, job, dep_job_id, dep_status): + """Checks if a dependency job has the requested status. + + If the other job is not yet in a finalized status, the calling job will be + notified (re-added to the workerpool) at a later point. + + @type job: L{_QueuedJob} + @param job: Job object + @type dep_job_id: string + @param dep_job_id: ID of dependency job + @type dep_status: list + @param dep_status: Required status + + """ + assert ht.TString(job.id) + assert ht.TString(dep_job_id) + assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) + + if job.id == dep_job_id: + return (self.ERROR, "Job can't depend on itself") + + # Get status of dependency job + try: + status = self._getstatus_fn(dep_job_id) + except errors.JobLost, err: + return (self.ERROR, "Dependency error: %s" % err) + + assert status in constants.JOB_STATUS_ALL + + job_id_waiters = self._waiters.setdefault(dep_job_id, set()) + + if status not in constants.JOBS_FINALIZED: + # Register for notification and wait for job to finish + job_id_waiters.add(job) + return (self.WAIT, + "Need to wait for job %s, wanted status '%s'" % + (dep_job_id, dep_status)) + + # Remove from waiters list + if job in job_id_waiters: + job_id_waiters.remove(job) + + if (status == constants.JOB_STATUS_CANCELED and + constants.JOB_STATUS_CANCELED not in dep_status): + return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id) + + elif not dep_status or status in dep_status: + return (self.CONTINUE, + "Dependency job %s finished with status '%s'" % + (dep_job_id, status)) + + else: + return (self.WRONGSTATUS, + "Dependency job %s finished with status '%s'," + " not one of '%s' as required" % + (dep_job_id, status, utils.CommaJoin(dep_status))) + + def _RemoveEmptyWaitersUnlocked(self): + """Remove all jobs without actual waiters. + + """ + for job_id in [job_id for (job_id, waiters) in self._waiters.items() + if not waiters]: + del self._waiters[job_id] + + def NotifyWaiters(self, job_id): + """Notifies all jobs waiting for a certain job ID. + + @attention: Do not call until L{CheckAndRegister} returned a status other + than C{WAITDEP} for C{job_id}, or behaviour is undefined + @type job_id: string + @param job_id: Job ID + + """ + assert ht.TString(job_id) + + self._lock.acquire() + try: + self._RemoveEmptyWaitersUnlocked() + + jobs = self._waiters.pop(job_id, None) + finally: + self._lock.release() + + if jobs: + # Re-add jobs to workerpool + logging.debug("Re-adding %s jobs which were waiting for job %s", + len(jobs), job_id) + self._enqueue_fn(jobs) + + def _RequireOpenQueue(fn): """Decorator for "public" functions. @@ -892,20 +1500,41 @@ def _RequireOpenQueue(fn): """ def wrapper(self, *args, **kwargs): - # pylint: disable-msg=W0212 + # pylint: disable=W0212 assert self._queue_filelock is not None, "Queue should be open" return fn(self, *args, **kwargs) return wrapper -class JobQueue(object): - """Queue used to manage the jobs. +def _RequireNonDrainedQueue(fn): + """Decorator checking for a non-drained queue. - @cvar _RE_JOB_FILE: regex matching the valid job file names + To be used with functions submitting new jobs. """ - _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) + def wrapper(self, *args, **kwargs): + """Wrapper function. + + @raise errors.JobQueueDrainError: if the job queue is marked for draining + + """ + # Ok when sharing the big job queue lock, as the drain file is created when + # the lock is exclusive. + # Needs access to protected member, pylint: disable=W0212 + if self._drained: + raise errors.JobQueueDrainError("Job queue is drained, refusing job") + + if not self._accepting_jobs: + raise errors.JobQueueError("Job queue is shutting down, refusing job") + + return fn(self, *args, **kwargs) + return wrapper + +class JobQueue(object): + """Queue used to manage the jobs. + + """ def __init__(self, context): """Constructor for JobQueue. @@ -933,6 +1562,9 @@ class JobQueue(object): self.acquire = self._lock.acquire self.release = self._lock.release + # Accept jobs by default + self._accepting_jobs = True + # Initialize the queue, and acquire the filelock. # This ensures no other process is working on the job queue. self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) @@ -952,9 +1584,15 @@ class JobQueue(object): # TODO: Check consistency across nodes - self._queue_size = 0 + self._queue_size = None self._UpdateQueueSizeUnlocked() - self._drained = self._IsQueueMarkedDrain() + assert ht.TInt(self._queue_size) + self._drained = jstore.CheckDrainFlag() + + # Job dependencies + self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, + self._EnqueueJobs) + self.context.glm.AddToLockMonitor(self.depmgr) # Setup worker pool self._wpool = _JobQueueWorkerPool(self) @@ -996,23 +1634,37 @@ class JobQueue(object): status = job.CalcStatus() - if status in (constants.JOB_STATUS_QUEUED, ): + if status == constants.JOB_STATUS_QUEUED: restartjobs.append(job) elif status in (constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITLOCK, + constants.JOB_STATUS_WAITING, constants.JOB_STATUS_CANCELING): logging.warning("Unfinished job %s found: %s", job.id, job) - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, - "Unclean master daemon shutdown") + + if status == constants.JOB_STATUS_WAITING: + # Restart job + job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) + restartjobs.append(job) + else: + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + "Unclean master daemon shutdown") + job.Finalize() + self.UpdateJobUnlocked(job) if restartjobs: logging.info("Restarting %s jobs", len(restartjobs)) - self._EnqueueJobs(restartjobs) + self._EnqueueJobsUnlocked(restartjobs) logging.info("Job queue inspection finished") + def _GetRpc(self, address_list): + """Gets RPC runner with context. + + """ + return rpc.JobQueueRunner(self.context, address_list) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def AddNode(self, node): @@ -1026,7 +1678,7 @@ class JobQueue(object): assert node_name != self._my_hostname # Clean queue directory on added node - result = rpc.RpcRunner.call_jobqueue_purge(node_name) + result = self._GetRpc(None).call_jobqueue_purge(node_name) msg = result.fail_msg if msg: logging.warning("Cannot cleanup queue directory on node %s: %s", @@ -1044,13 +1696,15 @@ class JobQueue(object): # Upload current serial file files.append(constants.JOB_QUEUE_SERIAL_FILE) + # Static address list + addrs = [node.primary_ip] + for file_name in files: # Read file content content = utils.ReadFile(file_name) - result = rpc.RpcRunner.call_jobqueue_update([node_name], - [node.primary_ip], - file_name, content) + result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name, + content) msg = result[node_name].fail_msg if msg: logging.error("Failed to upload file %s to node %s: %s", @@ -1134,7 +1788,7 @@ class JobQueue(object): if replicate: names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) + result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data) self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) def _RenameFilesUnlocked(self, rename): @@ -1153,7 +1807,7 @@ class JobQueue(object): # ... and on all nodes names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) + result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) @staticmethod @@ -1200,7 +1854,8 @@ class JobQueue(object): @return: a string representing the job identifier. """ - assert count > 0 + assert ht.TPositiveInt(count) + # New number serial = self._last_serial + count @@ -1209,10 +1864,13 @@ class JobQueue(object): "%s\n" % serial, True) result = [self._FormatJobID(v) - for v in range(self._last_serial, serial + 1)] + for v in range(self._last_serial + 1, serial + 1)] + # Keep it only if we were able to write the file self._last_serial = serial + assert len(result) == count + return result @staticmethod @@ -1240,7 +1898,8 @@ class JobQueue(object): return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, cls._GetArchiveDirectory(job_id), "job-%s" % job_id) - def _GetJobIDsUnlocked(self, sort=True): + @staticmethod + def _GetJobIDsUnlocked(sort=True): """Return all known job IDs. The method only looks at disk because it's a requirement that all @@ -1255,7 +1914,7 @@ class JobQueue(object): """ jlist = [] for filename in utils.ListVisibleFiles(constants.QUEUE_DIR): - m = self._RE_JOB_FILE.match(filename) + m = constants.JOB_FILE_RE.match(filename) if m: jlist.append(m.group(1)) if sort: @@ -1277,10 +1936,11 @@ class JobQueue(object): job = self._memcache.get(job_id, None) if job: logging.debug("Found job %s in memcache", job_id) + assert job.writable, "Found read-only job in memcache" return job try: - job = self._LoadJobFromDisk(job_id) + job = self._LoadJobFromDisk(job_id, False) if job is None: return job except errors.JobFileCorrupted: @@ -1295,39 +1955,59 @@ class JobQueue(object): self._RenameFilesUnlocked([(old_path, new_path)]) return None + assert job.writable, "Job just loaded is not writable" + self._memcache[job_id] = job logging.debug("Added job %s to the cache", job_id) return job - def _LoadJobFromDisk(self, job_id): + def _LoadJobFromDisk(self, job_id, try_archived, writable=None): """Load the given job file from disk. Given a job file, read, load and restore it in a _QueuedJob format. @type job_id: string @param job_id: job identifier + @type try_archived: bool + @param try_archived: Whether to try loading an archived job @rtype: L{_QueuedJob} or None @return: either None or the job object """ - filepath = self._GetJobPath(job_id) - logging.debug("Loading job from %s", filepath) - try: - raw_data = utils.ReadFile(filepath) - except EnvironmentError, err: - if err.errno in (errno.ENOENT, ): - return None - raise + path_functions = [(self._GetJobPath, True)] + + if try_archived: + path_functions.append((self._GetArchivedJobPath, False)) + + raw_data = None + writable_default = None + + for (fn, writable_default) in path_functions: + filepath = fn(job_id) + logging.debug("Loading job from %s", filepath) + try: + raw_data = utils.ReadFile(filepath) + except EnvironmentError, err: + if err.errno != errno.ENOENT: + raise + else: + break + + if not raw_data: + return None + + if writable is None: + writable = writable_default try: data = serializer.LoadJson(raw_data) - job = _QueuedJob.Restore(self, data) - except Exception, err: # pylint: disable-msg=W0703 + job = _QueuedJob.Restore(self, data, writable) + except Exception, err: # pylint: disable=W0703 raise errors.JobFileCorrupted(err) return job - def SafeLoadJobFromDisk(self, job_id): + def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None): """Load the given job file from disk. Given a job file, read, load and restore it in a _QueuedJob format. @@ -1336,29 +2016,18 @@ class JobQueue(object): @type job_id: string @param job_id: job identifier + @type try_archived: bool + @param try_archived: Whether to try loading an archived job @rtype: L{_QueuedJob} or None @return: either None or the job object """ try: - return self._LoadJobFromDisk(job_id) + return self._LoadJobFromDisk(job_id, try_archived, writable=writable) except (errors.JobFileCorrupted, EnvironmentError): logging.exception("Can't load/parse job %s", job_id) return None - @staticmethod - def _IsQueueMarkedDrain(): - """Check if the queue is marked from drain. - - This currently uses the queue drain file, which makes it a - per-node flag. In the future this can be moved to the config file. - - @rtype: boolean - @return: True of the job queue is marked for draining - - """ - return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) - def _UpdateQueueSizeUnlocked(self): """Update the queue size. @@ -1374,13 +2043,7 @@ class JobQueue(object): @param drain_flag: Whether to set or unset the drain flag """ - getents = runtime.GetEnts() - - if drain_flag: - utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True, - uid=getents.masterd_uid, gid=getents.masterd_gid) - else: - utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) + jstore.SetDrainFlag(drain_flag) self._drained = drain_flag @@ -1399,20 +2062,14 @@ class JobQueue(object): @param ops: The list of OpCodes that will become the new job. @rtype: L{_QueuedJob} @return: the job object to be queued - @raise errors.JobQueueDrainError: if the job queue is marked for draining @raise errors.JobQueueFull: if the job queue has too many jobs in it @raise errors.GenericError: If an opcode is not valid """ - # Ok when sharing the big job queue lock, as the drain file is created when - # the lock is exclusive. - if self._drained: - raise errors.JobQueueDrainError("Job queue is drained, refusing job") - if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: raise errors.JobQueueFull() - job = _QueuedJob(self, job_id, ops) + job = _QueuedJob(self, job_id, ops, True) # Check priority for idx, op in enumerate(job.ops): @@ -1421,6 +2078,13 @@ class JobQueue(object): raise errors.GenericError("Opcode %s has invalid priority %s, allowed" " are %s" % (idx, op.priority, allowed)) + dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None) + if not opcodes.TNoRelativeJobDependencies(dependencies): + raise errors.GenericError("Opcode %s has invalid dependencies, must" + " match %s: %s" % + (idx, opcodes.TNoRelativeJobDependencies, + dependencies)) + # Write to disk self.UpdateJobUnlocked(job) @@ -1433,41 +2097,113 @@ class JobQueue(object): @locking.ssynchronized(_LOCK) @_RequireOpenQueue + @_RequireNonDrainedQueue def SubmitJob(self, ops): """Create and store a new job. @see: L{_SubmitJobUnlocked} """ - job_id = self._NewSerialsUnlocked(1)[0] - self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)]) + (job_id, ) = self._NewSerialsUnlocked(1) + self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)]) return job_id @locking.ssynchronized(_LOCK) @_RequireOpenQueue + @_RequireNonDrainedQueue def SubmitManyJobs(self, jobs): """Create and store multiple jobs. @see: L{_SubmitJobUnlocked} """ - results = [] - added_jobs = [] all_job_ids = self._NewSerialsUnlocked(len(jobs)) - for job_id, ops in zip(all_job_ids, jobs): - try: - added_jobs.append(self._SubmitJobUnlocked(job_id, ops)) - status = True - data = job_id - except errors.GenericError, err: - data = str(err) - status = False - results.append((status, data)) - self._EnqueueJobs(added_jobs) + (results, added_jobs) = \ + self._SubmitManyJobsUnlocked(jobs, all_job_ids, []) + + self._EnqueueJobsUnlocked(added_jobs) return results + @staticmethod + def _FormatSubmitError(msg, ops): + """Formats errors which occurred while submitting a job. + + """ + return ("%s; opcodes %s" % + (msg, utils.CommaJoin(op.Summary() for op in ops))) + + @staticmethod + def _ResolveJobDependencies(resolve_fn, deps): + """Resolves relative job IDs in dependencies. + + @type resolve_fn: callable + @param resolve_fn: Function to resolve a relative job ID + @type deps: list + @param deps: Dependencies + @rtype: list + @return: Resolved dependencies + + """ + result = [] + + for (dep_job_id, dep_status) in deps: + if ht.TRelativeJobId(dep_job_id): + assert ht.TInt(dep_job_id) and dep_job_id < 0 + try: + job_id = resolve_fn(dep_job_id) + except IndexError: + # Abort + return (False, "Unable to resolve relative job ID %s" % dep_job_id) + else: + job_id = dep_job_id + + result.append((job_id, dep_status)) + + return (True, result) + + def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids): + """Create and store multiple jobs. + + @see: L{_SubmitJobUnlocked} + + """ + results = [] + added_jobs = [] + + def resolve_fn(job_idx, reljobid): + assert reljobid < 0 + return (previous_job_ids + job_ids[:job_idx])[reljobid] + + for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)): + for op in ops: + if getattr(op, opcodes.DEPEND_ATTR, None): + (status, data) = \ + self._ResolveJobDependencies(compat.partial(resolve_fn, idx), + op.depends) + if not status: + # Abort resolving dependencies + assert ht.TNonEmptyString(data), "No error message" + break + # Use resolved dependencies + op.depends = data + else: + try: + job = self._SubmitJobUnlocked(job_id, ops) + except errors.GenericError, err: + status = False + data = self._FormatSubmitError(str(err), ops) + else: + status = True + data = job_id + added_jobs.append(job) + + results.append((status, data)) + + return (results, added_jobs) + + @locking.ssynchronized(_LOCK) def _EnqueueJobs(self, jobs): """Helper function to add jobs to worker pool's queue. @@ -1475,9 +2211,42 @@ class JobQueue(object): @param jobs: List of all jobs """ + return self._EnqueueJobsUnlocked(jobs) + + def _EnqueueJobsUnlocked(self, jobs): + """Helper function to add jobs to worker pool's queue. + + @type jobs: list + @param jobs: List of all jobs + + """ + assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode" self._wpool.AddManyTasks([(job, ) for job in jobs], priority=[job.CalcPriority() for job in jobs]) + def _GetJobStatusForDependencies(self, job_id): + """Gets the status of a job for dependencies. + + @type job_id: string + @param job_id: Job ID + @raise errors.JobLost: If job can't be found + + """ + if not isinstance(job_id, basestring): + job_id = self._FormatJobID(job_id) + + # Not using in-memory cache as doing so would require an exclusive lock + + # Try to load from disk + job = self.SafeLoadJobFromDisk(job_id, True, writable=False) + + assert not job.writable, "Got writable job" # pylint: disable=E1101 + + if job: + return job.CalcStatus() + + raise errors.JobLost("Job %s not found" % job_id) + @_RequireOpenQueue def UpdateJobUnlocked(self, job, replicate=True): """Update a job's on disk storage. @@ -1492,8 +2261,13 @@ class JobQueue(object): @param replicate: whether to replicate the change to remote nodes """ + if __debug__: + finalized = job.CalcStatus() in constants.JOBS_FINALIZED + assert (finalized ^ (job.end_timestamp is None)) + assert job.writable, "Can't update read-only job" + filename = self._GetJobPath(job.id) - data = serializer.DumpJson(job.Serialize(), indent=False) + data = serializer.DumpJson(job.Serialize()) logging.debug("Writing job %s to %s", job.id, filename) self._UpdateJobQueueFile(filename, data, replicate) @@ -1521,7 +2295,8 @@ class JobQueue(object): as such by the clients """ - load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id) + load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False, + writable=False) helper = _WaitForJobChangesHelper() @@ -1546,9 +2321,13 @@ class JobQueue(object): logging.debug("Job %s not found", job_id) return (False, "Job %s not found" % job_id) + assert job.writable, "Can't cancel read-only job" + (success, msg) = job.Cancel() if success: + # If the job was finalized (e.g. cancelled), this is the final write + # allowed. The job can be archived anytime. self.UpdateJobUnlocked(job) return (success, msg) @@ -1566,6 +2345,8 @@ class JobQueue(object): archive_jobs = [] rename_files = [] for job in jobs: + assert job.writable, "Can't archive read-only job" + if job.CalcStatus() not in constants.JOBS_FINALIZED: logging.debug("Job %s is not yet done", job.id) continue @@ -1688,7 +2469,7 @@ class JobQueue(object): list_all = True for job_id in job_ids: - job = self.SafeLoadJobFromDisk(job_id) + job = self.SafeLoadJobFromDisk(job_id, True) if job is not None: jobs.append(job.GetInfo(fields)) elif not list_all: @@ -1697,6 +2478,31 @@ class JobQueue(object): return jobs @locking.ssynchronized(_LOCK) + def PrepareShutdown(self): + """Prepare to stop the job queue. + + Disables execution of jobs in the workerpool and returns whether there are + any jobs currently running. If the latter is the case, the job queue is not + yet ready for shutdown. Once this function returns C{True} L{Shutdown} can + be called without interfering with any job. Queued and unfinished jobs will + be resumed next time. + + Once this function has been called no new job submissions will be accepted + (see L{_RequireNonDrainedQueue}). + + @rtype: bool + @return: Whether there are any running jobs + + """ + if self._accepting_jobs: + self._accepting_jobs = False + + # Tell worker pool to stop processing pending tasks + self._wpool.SetActive(False) + + return self._wpool.HasRunningTasks() + + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def Shutdown(self): """Stops the job queue.