X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/9bf5e01f1060064cd2d1e8265a99510c123f9606..6b273e78da60addcb6ed88dc4d3acb12e2ceaf0b:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index 77512a9..ddf941e 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -53,16 +53,17 @@ from ganeti import mcpu from ganeti import utils from ganeti import jstore from ganeti import rpc +from ganeti import runtime +from ganeti import netutils +from ganeti import compat JOBQUEUE_THREADS = 25 JOBS_PER_ARCHIVE_DIRECTORY = 10000 -# The Big JobQueue lock. As for all B*Lock conversions, it must be acquired in -# shared mode to ensure exclusion with legacy code, which acquires it -# exclusively. It can not be acquired at all only after concurrency with all -# new and legacy code is ensured. -_big_jqueue_lock = locking.SharedLock() +# member lock names to be passed to @ssynchronized decorator +_LOCK = "_lock" +_QUEUE = "_queue" class CancelJob(Exception): @@ -94,7 +95,7 @@ class _QueuedOpCode(object): @ivar stop_timestamp: timestamp for the end of the execution """ - __slots__ = ["input", "status", "result", "log", + __slots__ = ["input", "status", "result", "log", "priority", "start_timestamp", "exec_timestamp", "end_timestamp", "__weakref__"] @@ -113,6 +114,9 @@ class _QueuedOpCode(object): self.exec_timestamp = None self.end_timestamp = None + # Get initial priority (it might change during the lifetime of this opcode) + self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT) + @classmethod def Restore(cls, state): """Restore the _QueuedOpCode from the serialized form. @@ -131,6 +135,7 @@ class _QueuedOpCode(object): obj.start_timestamp = state.get("start_timestamp", None) obj.exec_timestamp = state.get("exec_timestamp", None) obj.end_timestamp = state.get("end_timestamp", None) + obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) return obj def Serialize(self): @@ -148,6 +153,7 @@ class _QueuedOpCode(object): "start_timestamp": self.start_timestamp, "exec_timestamp": self.exec_timestamp, "end_timestamp": self.end_timestamp, + "priority": self.priority, } @@ -167,13 +173,11 @@ class _QueuedJob(object): @ivar received_timestamp: the timestamp for when the job was received @ivar start_timestmap: the timestamp for start of execution @ivar end_timestamp: the timestamp for end of execution - @ivar lock_status: In-memory locking information for debugging """ # pylint: disable-msg=W0212 - __slots__ = ["queue", "id", "ops", "log_serial", + __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", "received_timestamp", "start_timestamp", "end_timestamp", - "lock_status", "change", "__weakref__"] def __init__(self, queue, job_id, ops): @@ -199,8 +203,15 @@ class _QueuedJob(object): self.start_timestamp = None self.end_timestamp = None - # In-memory attributes - self.lock_status = None + self._InitInMemory(self) + + @staticmethod + def _InitInMemory(obj): + """Initializes in-memory variables. + + """ + obj.ops_iter = None + obj.cur_opctx = None def __repr__(self): status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), @@ -228,9 +239,6 @@ class _QueuedJob(object): obj.start_timestamp = state.get("start_timestamp", None) obj.end_timestamp = state.get("end_timestamp", None) - # In-memory attributes - obj.lock_status = None - obj.ops = [] obj.log_serial = 0 for op_state in state["ops"]: @@ -239,6 +247,8 @@ class _QueuedJob(object): obj.log_serial = max(obj.log_serial, log_entry[0]) obj.ops.append(op) + cls._InitInMemory(obj) + return obj def Serialize(self): @@ -309,6 +319,24 @@ class _QueuedJob(object): return status + def CalcPriority(self): + """Gets the current priority for this job. + + Only unfinished opcodes are considered. When all are done, the default + priority is used. + + @rtype: int + + """ + priorities = [op.priority for op in self.ops + if op.status not in constants.OPS_FINALIZED] + + if not priorities: + # All opcodes are done, assume default priority + return constants.OP_PRIO_DEFAULT + + return min(priorities) + def GetLogEntries(self, newer_than): """Selectively returns the log entries. @@ -348,6 +376,8 @@ class _QueuedJob(object): row.append(self.id) elif fname == "status": row.append(self.CalcStatus()) + elif fname == "priority": + row.append(self.CalcPriority()) elif fname == "ops": row.append([op.input.__getstate__() for op in self.ops]) elif fname == "opresult": @@ -362,14 +392,14 @@ class _QueuedJob(object): row.append([op.exec_timestamp for op in self.ops]) elif fname == "opend": row.append([op.end_timestamp for op in self.ops]) + elif fname == "oppriority": + row.append([op.priority for op in self.ops]) elif fname == "received_ts": row.append(self.received_timestamp) elif fname == "start_ts": row.append(self.start_timestamp) elif fname == "end_ts": row.append(self.end_timestamp) - elif fname == "lock_status": - row.append(self.lock_status) elif fname == "summary": row.append([op.input.Summary() for op in self.ops]) else: @@ -387,17 +417,38 @@ class _QueuedJob(object): @param result: the opcode result """ - try: - not_marked = True - for op in self.ops: - if op.status in constants.OPS_FINALIZED: - assert not_marked, "Finalized opcodes found after non-finalized ones" - continue - op.status = status - op.result = result - not_marked = False - finally: - self.queue.UpdateJobUnlocked(self) + not_marked = True + for op in self.ops: + if op.status in constants.OPS_FINALIZED: + assert not_marked, "Finalized opcodes found after non-finalized ones" + continue + op.status = status + op.result = result + not_marked = False + + def Cancel(self): + """Marks job as canceled/-ing if possible. + + @rtype: tuple; (bool, string) + @return: Boolean describing whether job was successfully canceled or marked + as canceling and a text message + + """ + status = self.CalcStatus() + + if status == constants.JOB_STATUS_QUEUED: + self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") + return (True, "Job %s canceled" % self.id) + + elif status == constants.JOB_STATUS_WAITLOCK: + # The worker will notice the new status and cancel the job + self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) + return (True, "Job %s will be canceled" % self.id) + + else: + logging.debug("Job %s is no longer waiting in the queue", self.id) + return (False, "Job %s is no longer waiting in the queue" % self.id) class _OpExecCallbacks(mcpu.OpExecCbBase): @@ -420,6 +471,16 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): self._job = job self._op = op + def _CheckCancel(self): + """Raises an exception to cancel the job if asked to. + + """ + # Cancel here if we were asked to + if self._op.status == constants.OP_STATUS_CANCELING: + logging.debug("Canceling opcode") + raise CancelJob() + + @locking.ssynchronized(_QUEUE, shared=1) def NotifyStart(self): """Mark the opcode as running, not lock-waiting. @@ -429,24 +490,22 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. """ - self._queue.acquire() - try: - assert self._op.status in (constants.OP_STATUS_WAITLOCK, - constants.OP_STATUS_CANCELING) + assert self._op in self._job.ops + assert self._op.status in (constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_CANCELING) - # All locks are acquired by now - self._job.lock_status = None + # Cancel here if we were asked to + self._CheckCancel() - # Cancel here if we were asked to - if self._op.status == constants.OP_STATUS_CANCELING: - raise CancelJob() + logging.debug("Opcode is now running") - self._op.status = constants.OP_STATUS_RUNNING - self._op.exec_timestamp = TimeStampNow() - finally: - self._queue.release() + self._op.status = constants.OP_STATUS_RUNNING + self._op.exec_timestamp = TimeStampNow() - @locking.ssynchronized(_big_jqueue_lock) + # And finally replicate the job status + self._queue.UpdateJobUnlocked(self._job) + + @locking.ssynchronized(_QUEUE, shared=1) def _AppendFeedback(self, timestamp, log_type, log_msg): """Internal feedback append function, with locks @@ -472,75 +531,44 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): timestamp = utils.SplitTime(time.time()) self._AppendFeedback(timestamp, log_type, log_msg) - def ReportLocks(self, msg): - """Write locking information to the job. - - Called whenever the LU processor is waiting for a lock or has acquired one. + def CheckCancel(self): + """Check whether job has been cancelled. """ - # Not getting the queue lock because this is a single assignment - self._job.lock_status = msg + assert self._op.status in (constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_CANCELING) + # Cancel here if we were asked to + self._CheckCancel() -class _WaitForJobChangesHelper(object): - """Helper class using initofy to wait for changes in a job file. - This class takes a previous job status and serial, and alerts the client when - the current job status has changed. +class _JobChangesChecker(object): + def __init__(self, fields, prev_job_info, prev_log_serial): + """Initializes this class. - @type job_id: string - @ivar job_id: id of the job we're watching - @type prev_job_info: string - @ivar prev_job_info: previous job info, as passed by the luxi client - @type prev_log_serial: string - @ivar prev_log_serial: previous job serial, as passed by the luxi client - @type queue: L{JobQueue} - @ivar queue: job queue (used for a few utility functions) - @type job_path: string - @ivar job_path: absolute path of the job file - @type wm: pyinotify.WatchManager (or None) - @ivar wm: inotify watch manager to watch for changes - @type inotify_handler: L{asyncnotifier.SingleFileEventHandler} - @ivar inotify_handler: single file event handler, used for watching - @type notifier: pyinotify.Notifier - @ivar notifier: inotify single-threaded notifier, used for watching + @type fields: list of strings + @param fields: Fields requested by LUXI client + @type prev_job_info: string + @param prev_job_info: previous job info, as passed by the LUXI client + @type prev_log_serial: string + @param prev_log_serial: previous job serial, as passed by the LUXI client - """ - def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue): - self.job_id = job_id - self.fields = fields - self.prev_job_info = prev_job_info - self.prev_log_serial = prev_log_serial - self.queue = queue - # pylint: disable-msg=W0212 - self.job_path = self.queue._GetJobPath(self.job_id) - self.wm = None - self.inotify_handler = None - self.notifier = None + """ + self._fields = fields + self._prev_job_info = prev_job_info + self._prev_log_serial = prev_log_serial - def _SetupInotify(self): - """Create the inotify + def __call__(self, job): + """Checks whether job has changed. - @raises errors.InotifyError: if the notifier cannot be setup + @type job: L{_QueuedJob} + @param job: Job object """ - if self.wm: - return - self.wm = pyinotify.WatchManager() - self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm, - self.OnInotify, - self.job_path) - self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler) - self.inotify_handler.enable() - - def _LoadDiskStatus(self): - job = self.queue.SafeLoadJobFromDisk(self.job_id) - if not job: - raise errors.JobLost() - self.job_status = job.CalcStatus() + status = job.CalcStatus() + job_info = job.GetInfo(self._fields) + log_entries = job.GetLogEntries(self._prev_log_serial) - job_info = job.GetInfo(self.fields) - log_entries = job.GetLogEntries(self.prev_log_serial) # Serializing and deserializing data can cause type changes (e.g. from # tuple to list) or precision loss. We're doing it here so that we get # the same modifications as the data received from the client. Without @@ -548,47 +576,540 @@ class _WaitForJobChangesHelper(object): # significantly different. # TODO: we just deserialized from disk, investigate how to make sure that # the job info and log entries are compatible to avoid this further step. - self.job_info = serializer.LoadJson(serializer.DumpJson(job_info)) - self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) + # TODO: Doing something like in testutils.py:UnifyValueType might be more + # efficient, though floats will be tricky + job_info = serializer.LoadJson(serializer.DumpJson(job_info)) + log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) - def _CheckForChanges(self): - self._LoadDiskStatus() # Don't even try to wait if the job is no longer running, there will be # no changes. - if (self.job_status not in (constants.JOB_STATUS_QUEUED, - constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITLOCK) or - self.prev_job_info != self.job_info or - (self.log_entries and self.prev_log_serial != self.log_entries[0][0])): - logging.debug("Job %s changed", self.job_id) - return (self.job_info, self.log_entries) + if (status not in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_RUNNING, + constants.JOB_STATUS_WAITLOCK) or + job_info != self._prev_job_info or + (log_entries and self._prev_log_serial != log_entries[0][0])): + logging.debug("Job %s changed", job.id) + return (job_info, log_entries) + + return None - raise utils.RetryAgain() - def OnInotify(self, notifier_enabled): +class _JobFileChangesWaiter(object): + def __init__(self, filename): + """Initializes this class. + + @type filename: string + @param filename: Path to job file + @raises errors.InotifyError: if the notifier cannot be setup + + """ + self._wm = pyinotify.WatchManager() + self._inotify_handler = \ + asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) + self._notifier = \ + pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler) + try: + self._inotify_handler.enable() + except Exception: + # pyinotify doesn't close file descriptors automatically + self._notifier.stop() + raise + + def _OnInotify(self, notifier_enabled): + """Callback for inotify. + + """ if not notifier_enabled: - self.inotify_handler.enable() + self._inotify_handler.enable() + + def Wait(self, timeout): + """Waits for the job file to change. + + @type timeout: float + @param timeout: Timeout in seconds + @return: Whether there have been events + + """ + assert timeout >= 0 + have_events = self._notifier.check_events(timeout * 1000) + if have_events: + self._notifier.read_events() + self._notifier.process_events() + return have_events + + def Close(self): + """Closes underlying notifier and its file descriptor. + + """ + self._notifier.stop() + + +class _JobChangesWaiter(object): + def __init__(self, filename): + """Initializes this class. - def WaitFn(self, timeout): - self._SetupInotify() - if self.notifier.check_events(timeout*1000): - self.notifier.read_events() - self.notifier.process_events() + @type filename: string + @param filename: Path to job file - def WaitForChanges(self, timeout): + """ + self._filewaiter = None + self._filename = filename + + def Wait(self, timeout): + """Waits for a job to change. + + @type timeout: float + @param timeout: Timeout in seconds + @return: Whether there have been events + + """ + if self._filewaiter: + return self._filewaiter.Wait(timeout) + + # Lazy setup: Avoid inotify setup cost when job file has already changed. + # If this point is reached, return immediately and let caller check the job + # file again in case there were changes since the last check. This avoids a + # race condition. + self._filewaiter = _JobFileChangesWaiter(self._filename) + + return True + + def Close(self): + """Closes underlying waiter. + + """ + if self._filewaiter: + self._filewaiter.Close() + + +class _WaitForJobChangesHelper(object): + """Helper class using inotify to wait for changes in a job file. + + This class takes a previous job status and serial, and alerts the client when + the current job status has changed. + + """ + @staticmethod + def _CheckForChanges(job_load_fn, check_fn): + job = job_load_fn() + if not job: + raise errors.JobLost() + + result = check_fn(job) + if result is None: + raise utils.RetryAgain() + + return result + + def __call__(self, filename, job_load_fn, + fields, prev_job_info, prev_log_serial, timeout): + """Waits for changes on a job. + + @type filename: string + @param filename: File on which to wait for changes + @type job_load_fn: callable + @param job_load_fn: Function to load job + @type fields: list of strings + @param fields: Which fields to check for changes + @type prev_job_info: list or None + @param prev_job_info: Last job information returned + @type prev_log_serial: int + @param prev_log_serial: Last job message serial number + @type timeout: float + @param timeout: maximum time to wait in seconds + + """ try: - return utils.Retry(self._CheckForChanges, - utils.RETRY_REMAINING_TIME, - timeout, - wait_fn=self.WaitFn) + check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) + waiter = _JobChangesWaiter(filename) + try: + return utils.Retry(compat.partial(self._CheckForChanges, + job_load_fn, check_fn), + utils.RETRY_REMAINING_TIME, timeout, + wait_fn=waiter.Wait) + finally: + waiter.Close() except (errors.InotifyError, errors.JobLost): return None except utils.RetryTimeout: return constants.JOB_NOTCHANGED - def Close(self): - if self.wm: - self.notifier.stop() + +def _EncodeOpError(err): + """Encodes an error which occurred while processing an opcode. + + """ + if isinstance(err, errors.GenericError): + to_encode = err + else: + to_encode = errors.OpExecError(str(err)) + + return errors.EncodeException(to_encode) + + +class _TimeoutStrategyWrapper: + def __init__(self, fn): + """Initializes this class. + + """ + self._fn = fn + self._next = None + + def _Advance(self): + """Gets the next timeout if necessary. + + """ + if self._next is None: + self._next = self._fn() + + def Peek(self): + """Returns the next timeout. + + """ + self._Advance() + return self._next + + def Next(self): + """Returns the current timeout and advances the internal state. + + """ + self._Advance() + result = self._next + self._next = None + return result + + +class _OpExecContext: + def __init__(self, op, index, log_prefix, timeout_strategy_factory): + """Initializes this class. + + """ + self.op = op + self.index = index + self.log_prefix = log_prefix + self.summary = op.input.Summary() + + self._timeout_strategy_factory = timeout_strategy_factory + self._ResetTimeoutStrategy() + + def _ResetTimeoutStrategy(self): + """Creates a new timeout strategy. + + """ + self._timeout_strategy = \ + _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt) + + def CheckPriorityIncrease(self): + """Checks whether priority can and should be increased. + + Called when locks couldn't be acquired. + + """ + op = self.op + + # Exhausted all retries and next round should not use blocking acquire + # for locks? + if (self._timeout_strategy.Peek() is None and + op.priority > constants.OP_PRIO_HIGHEST): + logging.debug("Increasing priority") + op.priority -= 1 + self._ResetTimeoutStrategy() + return True + + return False + + def GetNextLockTimeout(self): + """Returns the next lock acquire timeout. + + """ + return self._timeout_strategy.Next() + + +class _JobProcessor(object): + def __init__(self, queue, opexec_fn, job, + _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy): + """Initializes this class. + + """ + self.queue = queue + self.opexec_fn = opexec_fn + self.job = job + self._timeout_strategy_factory = _timeout_strategy_factory + + @staticmethod + def _FindNextOpcode(job, timeout_strategy_factory): + """Locates the next opcode to run. + + @type job: L{_QueuedJob} + @param job: Job object + @param timeout_strategy_factory: Callable to create new timeout strategy + + """ + # Create some sort of a cache to speed up locating next opcode for future + # lookups + # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for + # pending and one for processed ops. + if job.ops_iter is None: + job.ops_iter = enumerate(job.ops) + + # Find next opcode to run + while True: + try: + (idx, op) = job.ops_iter.next() + except StopIteration: + raise errors.ProgrammerError("Called for a finished job") + + if op.status == constants.OP_STATUS_RUNNING: + # Found an opcode already marked as running + raise errors.ProgrammerError("Called for job marked as running") + + opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), + timeout_strategy_factory) + + if op.status == constants.OP_STATUS_CANCELED: + # Cancelled jobs are handled by the caller + assert not compat.any(i.status != constants.OP_STATUS_CANCELED + for i in job.ops[idx:]) + + elif op.status in constants.OPS_FINALIZED: + # This is a job that was partially completed before master daemon + # shutdown, so it can be expected that some opcodes are already + # completed successfully (if any did error out, then the whole job + # should have been aborted and not resubmitted for processing). + logging.info("%s: opcode %s already processed, skipping", + opctx.log_prefix, opctx.summary) + continue + + return opctx + + @staticmethod + def _MarkWaitlock(job, op): + """Marks an opcode as waiting for locks. + + The job's start timestamp is also set if necessary. + + @type job: L{_QueuedJob} + @param job: Job object + @type op: L{_QueuedOpCode} + @param op: Opcode object + + """ + assert op in job.ops + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITLOCK) + + update = False + + op.result = None + + if op.status == constants.OP_STATUS_QUEUED: + op.status = constants.OP_STATUS_WAITLOCK + update = True + + if op.start_timestamp is None: + op.start_timestamp = TimeStampNow() + update = True + + if job.start_timestamp is None: + job.start_timestamp = op.start_timestamp + update = True + + assert op.status == constants.OP_STATUS_WAITLOCK + + return update + + def _ExecOpCodeUnlocked(self, opctx): + """Processes one opcode and returns the result. + + """ + op = opctx.op + + assert op.status == constants.OP_STATUS_WAITLOCK + + timeout = opctx.GetNextLockTimeout() + + try: + # Make sure not to hold queue lock while calling ExecOpCode + result = self.opexec_fn(op.input, + _OpExecCallbacks(self.queue, self.job, op), + timeout=timeout, priority=op.priority) + except mcpu.LockAcquireTimeout: + assert timeout is not None, "Received timeout for blocking acquire" + logging.debug("Couldn't acquire locks in %0.6fs", timeout) + + assert op.status in (constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_CANCELING) + + # Was job cancelled while we were waiting for the lock? + if op.status == constants.OP_STATUS_CANCELING: + return (constants.OP_STATUS_CANCELING, None) + + # Stay in waitlock while trying to re-acquire lock + return (constants.OP_STATUS_WAITLOCK, None) + except CancelJob: + logging.exception("%s: Canceling job", opctx.log_prefix) + assert op.status == constants.OP_STATUS_CANCELING + return (constants.OP_STATUS_CANCELING, None) + except Exception, err: # pylint: disable-msg=W0703 + logging.exception("%s: Caught exception in %s", + opctx.log_prefix, opctx.summary) + return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) + else: + logging.debug("%s: %s successful", + opctx.log_prefix, opctx.summary) + return (constants.OP_STATUS_SUCCESS, result) + + def __call__(self, _nextop_fn=None): + """Continues execution of a job. + + @param _nextop_fn: Callback function for tests + @rtype: bool + @return: True if job is finished, False if processor needs to be called + again + + """ + queue = self.queue + job = self.job + + logging.debug("Processing job %s", job.id) + + queue.acquire(shared=1) + try: + opcount = len(job.ops) + + # Is a previous opcode still pending? + if job.cur_opctx: + opctx = job.cur_opctx + job.cur_opctx = None + else: + if __debug__ and _nextop_fn: + _nextop_fn() + opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) + + op = opctx.op + + # Consistency check + assert compat.all(i.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_CANCELING, + constants.OP_STATUS_CANCELED) + for i in job.ops[opctx.index + 1:]) + + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_CANCELING, + constants.OP_STATUS_CANCELED) + + assert (op.priority <= constants.OP_PRIO_LOWEST and + op.priority >= constants.OP_PRIO_HIGHEST) + + if op.status not in (constants.OP_STATUS_CANCELING, + constants.OP_STATUS_CANCELED): + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITLOCK) + + # Prepare to start opcode + if self._MarkWaitlock(job, op): + # Write to disk + queue.UpdateJobUnlocked(job) + + assert op.status == constants.OP_STATUS_WAITLOCK + assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK + assert job.start_timestamp and op.start_timestamp + + logging.info("%s: opcode %s waiting for locks", + opctx.log_prefix, opctx.summary) + + queue.release() + try: + (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) + finally: + queue.acquire(shared=1) + + op.status = op_status + op.result = op_result + + if op.status == constants.OP_STATUS_WAITLOCK: + # Couldn't get locks in time + assert not op.end_timestamp + else: + # Finalize opcode + op.end_timestamp = TimeStampNow() + + if op.status == constants.OP_STATUS_CANCELING: + assert not compat.any(i.status != constants.OP_STATUS_CANCELING + for i in job.ops[opctx.index:]) + else: + assert op.status in constants.OPS_FINALIZED + + if op.status == constants.OP_STATUS_WAITLOCK: + finalize = False + + if opctx.CheckPriorityIncrease(): + # Priority was changed, need to update on-disk file + queue.UpdateJobUnlocked(job) + + # Keep around for another round + job.cur_opctx = opctx + + assert (op.priority <= constants.OP_PRIO_LOWEST and + op.priority >= constants.OP_PRIO_HIGHEST) + + # In no case must the status be finalized here + assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK + + else: + # Ensure all opcodes so far have been successful + assert (opctx.index == 0 or + compat.all(i.status == constants.OP_STATUS_SUCCESS + for i in job.ops[:opctx.index])) + + # Reset context + job.cur_opctx = None + + if op.status == constants.OP_STATUS_SUCCESS: + finalize = False + + elif op.status == constants.OP_STATUS_ERROR: + # Ensure failed opcode has an exception as its result + assert errors.GetEncodedError(job.ops[opctx.index].result) + + to_encode = errors.OpExecError("Preceding opcode failed") + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + _EncodeOpError(to_encode)) + finalize = True + + # Consistency check + assert compat.all(i.status == constants.OP_STATUS_ERROR and + errors.GetEncodedError(i.result) + for i in job.ops[opctx.index:]) + + elif op.status == constants.OP_STATUS_CANCELING: + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") + finalize = True + + elif op.status == constants.OP_STATUS_CANCELED: + finalize = True + + else: + raise errors.ProgrammerError("Unknown status '%s'" % op.status) + + # Finalizing or last opcode? + if finalize or opctx.index == (opcount - 1): + # All opcodes have been run, finalize job + job.end_timestamp = TimeStampNow() + + # Write to disk. If the job status is final, this is the final write + # allowed. Once the file has been written, it can be archived anytime. + queue.UpdateJobUnlocked(job) + + if finalize or opctx.index == (opcount - 1): + logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) + return True + + return False + finally: + queue.release() class _JobQueueWorker(workerpool.BaseWorker): @@ -598,111 +1119,23 @@ class _JobQueueWorker(workerpool.BaseWorker): def RunTask(self, job): # pylint: disable-msg=W0221 """Job executor. - This functions processes a job. It is closely tied to the _QueuedJob and - _QueuedOpCode classes. + This functions processes a job. It is closely tied to the L{_QueuedJob} and + L{_QueuedOpCode} classes. @type job: L{_QueuedJob} @param job: the job to be processed """ - logging.info("Processing job %s", job.id) - proc = mcpu.Processor(self.pool.queue.context, job.id) queue = job.queue - try: - try: - count = len(job.ops) - for idx, op in enumerate(job.ops): - op_summary = op.input.Summary() - if op.status == constants.OP_STATUS_SUCCESS: - # this is a job that was partially completed before master - # daemon shutdown, so it can be expected that some opcodes - # are already completed successfully (if any did error - # out, then the whole job should have been aborted and not - # resubmitted for processing) - logging.info("Op %s/%s: opcode %s already processed, skipping", - idx + 1, count, op_summary) - continue - try: - logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, - op_summary) - - queue.acquire() - try: - if op.status == constants.OP_STATUS_CANCELED: - raise CancelJob() - assert op.status == constants.OP_STATUS_QUEUED - op.status = constants.OP_STATUS_WAITLOCK - op.result = None - op.start_timestamp = TimeStampNow() - if idx == 0: # first opcode - job.start_timestamp = op.start_timestamp - queue.UpdateJobUnlocked(job) - - input_opcode = op.input - finally: - queue.release() - - # Make sure not to hold queue lock while calling ExecOpCode - result = proc.ExecOpCode(input_opcode, - _OpExecCallbacks(queue, job, op)) - - queue.acquire() - try: - op.status = constants.OP_STATUS_SUCCESS - op.result = result - op.end_timestamp = TimeStampNow() - queue.UpdateJobUnlocked(job) - finally: - queue.release() - - logging.info("Op %s/%s: Successfully finished opcode %s", - idx + 1, count, op_summary) - except CancelJob: - # Will be handled further up - raise - except Exception, err: - queue.acquire() - try: - try: - op.status = constants.OP_STATUS_ERROR - if isinstance(err, errors.GenericError): - op.result = errors.EncodeException(err) - else: - op.result = str(err) - op.end_timestamp = TimeStampNow() - logging.info("Op %s/%s: Error in opcode %s: %s", - idx + 1, count, op_summary, err) - finally: - queue.UpdateJobUnlocked(job) - finally: - queue.release() - raise - - except CancelJob: - queue.acquire() - try: - job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, - "Job canceled by request") - finally: - queue.release() - except errors.GenericError, err: - logging.exception("Ganeti exception") - except: - logging.exception("Unhandled exception") - finally: - queue.acquire() - try: - try: - job.lock_status = None - job.end_timestamp = TimeStampNow() - queue.UpdateJobUnlocked(job) - finally: - job_id = job.id - status = job.CalcStatus() - finally: - queue.release() + assert queue == self.pool.queue + + self.SetTaskName("Job%s" % job.id) + + proc = mcpu.Processor(queue.context, job.id) - logging.info("Finished job %s, status = %s", job_id, status) + if not _JobProcessor(queue, proc.ExecOpCode, job)(): + # Schedule again + raise workerpool.DeferTask(priority=job.CalcPriority()) class _JobQueueWorkerPool(workerpool.WorkerPool): @@ -728,7 +1161,7 @@ def _RequireOpenQueue(fn): @warning: Use this decorator only after locking.ssynchronized Example:: - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def Example(self): pass @@ -764,10 +1197,17 @@ class JobQueue(object): """ self.context = context self._memcache = weakref.WeakValueDictionary() - self._my_hostname = utils.HostInfo().name + self._my_hostname = netutils.Hostname.GetSysName() - self.acquire = _big_jqueue_lock.acquire - self.release = _big_jqueue_lock.release + # The Big JobQueue lock. If a code block or method acquires it in shared + # mode safe it must guarantee concurrency with all the code acquiring it in + # shared mode, including itself. In order not to acquire it at all + # concurrency must be guaranteed with all code acquiring it in shared mode + # and all code acquiring it exclusively. + self._lock = locking.SharedLock("JobQueue") + + self.acquire = self._lock.acquire + self.release = self._lock.release # Initialize the queue, and acquire the filelock. # This ensures no other process is working on the job queue. @@ -795,49 +1235,68 @@ class JobQueue(object): # Setup worker pool self._wpool = _JobQueueWorkerPool(self) try: - # We need to lock here because WorkerPool.AddTask() may start a job while - # we're still doing our work. - self.acquire() - try: - logging.info("Inspecting job queue") + self._InspectQueue() + except: + self._wpool.TerminateWorkers() + raise + + @locking.ssynchronized(_LOCK) + @_RequireOpenQueue + def _InspectQueue(self): + """Loads the whole job queue and resumes unfinished jobs. + + This function needs the lock here because WorkerPool.AddTask() may start a + job while we're still doing our work. + + """ + logging.info("Inspecting job queue") + + restartjobs = [] - all_job_ids = self._GetJobIDsUnlocked() - jobs_count = len(all_job_ids) + all_job_ids = self._GetJobIDsUnlocked() + jobs_count = len(all_job_ids) + lastinfo = time.time() + for idx, job_id in enumerate(all_job_ids): + # Give an update every 1000 jobs or 10 seconds + if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or + idx == (jobs_count - 1)): + logging.info("Job queue inspection: %d/%d (%0.1f %%)", + idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) lastinfo = time.time() - for idx, job_id in enumerate(all_job_ids): - # Give an update every 1000 jobs or 10 seconds - if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or - idx == (jobs_count - 1)): - logging.info("Job queue inspection: %d/%d (%0.1f %%)", - idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) - lastinfo = time.time() - job = self._LoadJobUnlocked(job_id) + job = self._LoadJobUnlocked(job_id) - # a failure in loading the job can cause 'None' to be returned - if job is None: - continue + # a failure in loading the job can cause 'None' to be returned + if job is None: + continue - status = job.CalcStatus() + status = job.CalcStatus() - if status in (constants.JOB_STATUS_QUEUED, ): - self._wpool.AddTask(job) + if status == constants.JOB_STATUS_QUEUED: + restartjobs.append(job) - elif status in (constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITLOCK, - constants.JOB_STATUS_CANCELING): - logging.warning("Unfinished job %s found: %s", job.id, job) - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, - "Unclean master daemon shutdown") + elif status in (constants.JOB_STATUS_RUNNING, + constants.JOB_STATUS_WAITLOCK, + constants.JOB_STATUS_CANCELING): + logging.warning("Unfinished job %s found: %s", job.id, job) - logging.info("Job queue inspection finished") - finally: - self.release() - except: - self._wpool.TerminateWorkers() - raise + if status == constants.JOB_STATUS_WAITLOCK: + # Restart job + job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) + restartjobs.append(job) + else: + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + "Unclean master daemon shutdown") + + self.UpdateJobUnlocked(job) + + if restartjobs: + logging.info("Restarting %s jobs", len(restartjobs)) + self._EnqueueJobs(restartjobs) + + logging.info("Job queue inspection finished") - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def AddNode(self, node): """Register a new node with the queue. @@ -882,7 +1341,7 @@ class JobQueue(object): self._nodes[node_name] = node.primary_ip - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def RemoveNode(self, node_name): """Callback called when removing nodes from the cluster. @@ -933,6 +1392,7 @@ class JobQueue(object): names and the second one with the node addresses """ + # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? name_list = self._nodes.keys() addr_list = [self._nodes[name] for name in name_list] return name_list, addr_list @@ -951,7 +1411,9 @@ class JobQueue(object): @param replicate: whether to spread the changes to the remote nodes """ - utils.WriteFile(file_name, data=data) + getents = runtime.GetEnts() + utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, + gid=getents.masterd_gid) if replicate: names, addrs = self._GetNodeIp() @@ -1102,6 +1564,8 @@ class JobQueue(object): try: job = self._LoadJobFromDisk(job_id) + if job is None: + return job except errors.JobFileCorrupted: old_path = self._GetJobPath(job_id) new_path = self._GetArchivedJobPath(job_id) @@ -1184,7 +1648,7 @@ class JobQueue(object): """ self._queue_size = len(self._GetJobIDsUnlocked(sort=False)) - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def SetDrainFlag(self, drain_flag): """Sets the drain flag for the queue. @@ -1193,8 +1657,11 @@ class JobQueue(object): @param drain_flag: Whether to set or unset the drain flag """ + getents = runtime.GetEnts() + if drain_flag: - utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True) + utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True, + uid=getents.masterd_uid, gid=getents.masterd_gid) else: utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) @@ -1217,6 +1684,7 @@ class JobQueue(object): @return: the job object to be queued @raise errors.JobQueueDrainError: if the job queue is marked for draining @raise errors.JobQueueFull: if the job queue has too many jobs in it + @raise errors.GenericError: If an opcode is not valid """ # Ok when sharing the big job queue lock, as the drain file is created when @@ -1229,6 +1697,13 @@ class JobQueue(object): job = _QueuedJob(self, job_id, ops) + # Check priority + for idx, op in enumerate(job.ops): + if op.priority not in constants.OP_PRIO_SUBMIT_VALID: + allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) + raise errors.GenericError("Opcode %s has invalid priority %s, allowed" + " are %s" % (idx, op.priority, allowed)) + # Write to disk self.UpdateJobUnlocked(job) @@ -1239,7 +1714,7 @@ class JobQueue(object): return job - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def SubmitJob(self, ops): """Create and store a new job. @@ -1248,10 +1723,10 @@ class JobQueue(object): """ job_id = self._NewSerialsUnlocked(1)[0] - self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops)) + self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)]) return job_id - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def SubmitManyJobs(self, jobs): """Create and store multiple jobs. @@ -1260,21 +1735,32 @@ class JobQueue(object): """ results = [] - tasks = [] + added_jobs = [] all_job_ids = self._NewSerialsUnlocked(len(jobs)) for job_id, ops in zip(all_job_ids, jobs): try: - tasks.append((self._SubmitJobUnlocked(job_id, ops), )) + added_jobs.append(self._SubmitJobUnlocked(job_id, ops)) status = True data = job_id except errors.GenericError, err: data = str(err) status = False results.append((status, data)) - self._wpool.AddManyTasks(tasks) + + self._EnqueueJobs(added_jobs) return results + def _EnqueueJobs(self, jobs): + """Helper function to add jobs to worker pool's queue. + + @type jobs: list + @param jobs: List of all jobs + + """ + self._wpool.AddManyTasks([(job, ) for job in jobs], + priority=[job.CalcPriority() for job in jobs]) + @_RequireOpenQueue def UpdateJobUnlocked(self, job, replicate=True): """Update a job's on disk storage. @@ -1307,7 +1793,7 @@ class JobQueue(object): @type prev_log_serial: int @param prev_log_serial: Last job message serial number @type timeout: float - @param timeout: maximum time to wait + @param timeout: maximum time to wait in seconds @rtype: tuple (job info, log entries) @return: a tuple of the job information as required via the fields parameter, and the log entries as a list @@ -1318,14 +1804,14 @@ class JobQueue(object): as such by the clients """ - helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info, - prev_log_serial, self) - try: - return helper.WaitForChanges(timeout) - finally: - helper.Close() + load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id) - @locking.ssynchronized(_big_jqueue_lock) + helper = _WaitForJobChangesHelper() + + return helper(self._GetJobPath(job_id), load_fn, + fields, prev_job_info, prev_log_serial, timeout) + + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def CancelJob(self, job_id): """Cancels a job. @@ -1343,22 +1829,12 @@ class JobQueue(object): logging.debug("Job %s not found", job_id) return (False, "Job %s not found" % job_id) - job_status = job.CalcStatus() + (success, msg) = job.Cancel() - if job_status not in (constants.JOB_STATUS_QUEUED, - constants.JOB_STATUS_WAITLOCK): - logging.debug("Job %s is no longer waiting in the queue", job.id) - return (False, "Job %s is no longer waiting in the queue" % job.id) + if success: + self.UpdateJobUnlocked(job) - if job_status == constants.JOB_STATUS_QUEUED: - job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, - "Job canceled by request") - return (True, "Job %s canceled" % job.id) - - elif job_status == constants.JOB_STATUS_WAITLOCK: - # The worker will notice the new status and cancel the job - job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) - return (True, "Job %s will be canceled" % job.id) + return (success, msg) @_RequireOpenQueue def _ArchiveJobsUnlocked(self, jobs): @@ -1373,9 +1849,7 @@ class JobQueue(object): archive_jobs = [] rename_files = [] for job in jobs: - if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, - constants.JOB_STATUS_SUCCESS, - constants.JOB_STATUS_ERROR): + if job.CalcStatus() not in constants.JOBS_FINALIZED: logging.debug("Job %s is not yet done", job.id) continue @@ -1398,7 +1872,7 @@ class JobQueue(object): self._UpdateQueueSizeUnlocked() return len(archive_jobs) - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def ArchiveJob(self, job_id): """Archives a job. @@ -1420,7 +1894,7 @@ class JobQueue(object): return self._ArchiveJobsUnlocked([job]) == 1 - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def AutoArchiveJobs(self, age, timeout): """Archives all jobs based on age. @@ -1505,7 +1979,7 @@ class JobQueue(object): return jobs - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def Shutdown(self): """Stops the job queue.