X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/1316ebc2429ef50d01ddead084aa444eb60ad97d..33c730a22222f266a726a47ddc0e553ec012060d:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index 1391f2a..9752f93 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -35,6 +35,7 @@ import time import weakref import threading import itertools +import operator try: # pylint: disable=E0611 @@ -57,15 +58,21 @@ from ganeti import runtime from ganeti import netutils from ganeti import compat from ganeti import ht +from ganeti import query +from ganeti import qlang +from ganeti import pathutils +from ganeti import vcluster JOBQUEUE_THREADS = 25 -JOBS_PER_ARCHIVE_DIRECTORY = 10000 # member lock names to be passed to @ssynchronized decorator _LOCK = "_lock" _QUEUE = "_queue" +#: Retrieves "id" attribute +_GetIdAttr = operator.attrgetter("id") + class CancelJob(Exception): """Special exception to cancel a job. @@ -73,6 +80,12 @@ class CancelJob(Exception): """ +class QueueShutdown(Exception): + """Special exception to abort a job when the job queue is shutting down. + + """ + + def TimeStampNow(): """Returns the current timestamp. @@ -83,6 +96,33 @@ def TimeStampNow(): return utils.SplitTime(time.time()) +def _CallJqUpdate(runner, names, file_name, content): + """Updates job queue file after virtualizing filename. + + """ + virt_file_name = vcluster.MakeVirtualPath(file_name) + return runner.call_jobqueue_update(names, virt_file_name, content) + + +class _SimpleJobQuery: + """Wrapper for job queries. + + Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}. + + """ + def __init__(self, fields): + """Initializes this class. + + """ + self._query = query.Query(query.JOB_FIELDS, fields) + + def __call__(self, job): + """Executes a job query using cached field list. + + """ + return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0] + + class _QueuedOpCode(object): """Encapsulates an opcode object. @@ -101,7 +141,7 @@ class _QueuedOpCode(object): "__weakref__"] def __init__(self, op): - """Constructor for the _QuededOpCode. + """Initializes instances of this class. @type op: L{opcodes.OpCode} @param op: the opcode we encapsulate @@ -180,7 +220,7 @@ class _QueuedJob(object): # pylint: disable=W0212 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", "received_timestamp", "start_timestamp", "end_timestamp", - "__weakref__", "processor_lock", "writable"] + "__weakref__", "processor_lock", "writable", "archived"] def __init__(self, queue, job_id, ops, writable): """Constructor for the _QueuedJob. @@ -200,15 +240,18 @@ class _QueuedJob(object): raise errors.GenericError("A job needs at least one opcode") self.queue = queue - self.id = job_id + self.id = int(job_id) self.ops = [_QueuedOpCode(op) for op in ops] self.log_serial = 0 self.received_timestamp = TimeStampNow() self.start_timestamp = None self.end_timestamp = None + self.archived = False self._InitInMemory(self, writable) + assert not self.archived, "New jobs can not be marked as archived" + @staticmethod def _InitInMemory(obj, writable): """Initializes in-memory variables. @@ -232,7 +275,7 @@ class _QueuedJob(object): return "<%s at %#x>" % (" ".join(status), id(self)) @classmethod - def Restore(cls, queue, state, writable): + def Restore(cls, queue, state, writable, archived): """Restore a _QueuedJob from serialized state: @type queue: L{JobQueue} @@ -241,16 +284,19 @@ class _QueuedJob(object): @param state: the serialized state @type writable: bool @param writable: Whether job can be modified + @type archived: bool + @param archived: Whether job was already archived @rtype: _JobQueue @return: the restored _JobQueue instance """ obj = _QueuedJob.__new__(cls) obj.queue = queue - obj.id = state["id"] + obj.id = int(state["id"]) obj.received_timestamp = state.get("received_timestamp", None) obj.start_timestamp = state.get("start_timestamp", None) obj.end_timestamp = state.get("end_timestamp", None) + obj.archived = archived obj.ops = [] obj.log_serial = 0 @@ -383,41 +429,7 @@ class _QueuedJob(object): has been passed """ - row = [] - for fname in fields: - if fname == "id": - row.append(self.id) - elif fname == "status": - row.append(self.CalcStatus()) - elif fname == "priority": - row.append(self.CalcPriority()) - elif fname == "ops": - row.append([op.input.__getstate__() for op in self.ops]) - elif fname == "opresult": - row.append([op.result for op in self.ops]) - elif fname == "opstatus": - row.append([op.status for op in self.ops]) - elif fname == "oplog": - row.append([op.log for op in self.ops]) - elif fname == "opstart": - row.append([op.start_timestamp for op in self.ops]) - elif fname == "opexec": - row.append([op.exec_timestamp for op in self.ops]) - elif fname == "opend": - row.append([op.end_timestamp for op in self.ops]) - elif fname == "oppriority": - row.append([op.priority for op in self.ops]) - elif fname == "received_ts": - row.append(self.received_timestamp) - elif fname == "start_ts": - row.append(self.start_timestamp) - elif fname == "end_ts": - row.append(self.end_timestamp) - elif fname == "summary": - row.append([op.input.Summary() for op in self.ops]) - else: - raise errors.OpExecError("Invalid self query field '%s'" % fname) - return row + return _SimpleJobQuery(fields)(self) def MarkUnfinishedOps(self, status, result): """Mark unfinished opcodes with a given status and result. @@ -470,6 +482,50 @@ class _QueuedJob(object): logging.debug("Job %s is no longer waiting in the queue", self.id) return (False, "Job %s is no longer waiting in the queue" % self.id) + def ChangePriority(self, priority): + """Changes the job priority. + + @type priority: int + @param priority: New priority + @rtype: tuple; (bool, string) + @return: Boolean describing whether job's priority was successfully changed + and a text message + + """ + status = self.CalcStatus() + + if status in constants.JOBS_FINALIZED: + return (False, "Job %s is finished" % self.id) + elif status == constants.JOB_STATUS_CANCELING: + return (False, "Job %s is cancelling" % self.id) + else: + assert status in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_WAITING, + constants.JOB_STATUS_RUNNING) + + changed = False + for op in self.ops: + if (op.status == constants.OP_STATUS_RUNNING or + op.status in constants.OPS_FINALIZED): + assert not changed, \ + ("Found opcode for which priority should not be changed after" + " priority has been changed for previous opcodes") + continue + + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITING) + + changed = True + + # Set new priority (doesn't modify opcode input) + op.priority = priority + + if changed: + return (True, ("Priorities of pending opcodes for job %s have been" + " changed to %s" % (self.id, priority))) + else: + return (False, "Job %s had no pending opcodes" % self.id) + class _OpExecCallbacks(mcpu.OpExecCbBase): def __init__(self, queue, job, op): @@ -500,6 +556,11 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): logging.debug("Canceling opcode") raise CancelJob() + # See if queue is shutting down + if not self._queue.AcceptingJobsUnlocked(): + logging.debug("Queue is shutting down") + raise QueueShutdown() + @locking.ssynchronized(_QUEUE, shared=1) def NotifyStart(self): """Mark the opcode as running, not lock-waiting. @@ -551,8 +612,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): timestamp = utils.SplitTime(time.time()) self._AppendFeedback(timestamp, log_type, log_msg) - def CheckCancel(self): - """Check whether job has been cancelled. + def CurrentPriority(self): + """Returns current priority for opcode. """ assert self._op.status in (constants.OP_STATUS_WAITING, @@ -561,6 +622,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): # Cancel here if we were asked to self._CheckCancel() + return self._op.priority + def SubmitManyJobs(self, jobs): """Submits jobs for processing. @@ -583,7 +646,7 @@ class _JobChangesChecker(object): @param prev_log_serial: previous job serial, as passed by the LUXI client """ - self._fields = fields + self._squery = _SimpleJobQuery(fields) self._prev_job_info = prev_job_info self._prev_log_serial = prev_log_serial @@ -597,7 +660,7 @@ class _JobChangesChecker(object): assert not job.writable, "Expected read-only job" status = job.CalcStatus() - job_info = job.GetInfo(self._fields) + job_info = self._squery(job) log_entries = job.GetLogEntries(self._prev_log_serial) # Serializing and deserializing data can cause type changes (e.g. from @@ -626,7 +689,7 @@ class _JobChangesChecker(object): class _JobFileChangesWaiter(object): - def __init__(self, filename): + def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager): """Initializes this class. @type filename: string @@ -634,7 +697,7 @@ class _JobFileChangesWaiter(object): @raises errors.InotifyError: if the notifier cannot be setup """ - self._wm = pyinotify.WatchManager() + self._wm = _inotify_wm_cls() self._inotify_handler = \ asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) self._notifier = \ @@ -676,7 +739,7 @@ class _JobFileChangesWaiter(object): class _JobChangesWaiter(object): - def __init__(self, filename): + def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter): """Initializes this class. @type filename: string @@ -685,6 +748,7 @@ class _JobChangesWaiter(object): """ self._filewaiter = None self._filename = filename + self._waiter_cls = _waiter_cls def Wait(self, timeout): """Waits for a job to change. @@ -701,7 +765,7 @@ class _JobChangesWaiter(object): # If this point is reached, return immediately and let caller check the job # file again in case there were changes since the last check. This avoids a # race condition. - self._filewaiter = _JobFileChangesWaiter(self._filename) + self._filewaiter = self._waiter_cls(self._filename) return True @@ -739,7 +803,8 @@ class _WaitForJobChangesHelper(object): return result def __call__(self, filename, job_load_fn, - fields, prev_job_info, prev_log_serial, timeout): + fields, prev_job_info, prev_log_serial, timeout, + _waiter_cls=_JobChangesWaiter): """Waits for changes on a job. @type filename: string @@ -759,7 +824,7 @@ class _WaitForJobChangesHelper(object): counter = itertools.count() try: check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) - waiter = _JobChangesWaiter(filename) + waiter = _waiter_cls(filename) try: return utils.Retry(compat.partial(self._CheckForChanges, counter, job_load_fn, check_fn), @@ -767,7 +832,7 @@ class _WaitForJobChangesHelper(object): wait_fn=waiter.Wait) finally: waiter.Close() - except (errors.InotifyError, errors.JobLost): + except errors.JobLost: return None except utils.RetryTimeout: return constants.JOB_NOTCHANGED @@ -1030,7 +1095,7 @@ class _JobProcessor(object): # Make sure not to hold queue lock while calling ExecOpCode result = self.opexec_fn(op.input, _OpExecCallbacks(self.queue, self.job, op), - timeout=timeout, priority=op.priority) + timeout=timeout) except mcpu.LockAcquireTimeout: assert timeout is not None, "Received timeout for blocking acquire" logging.debug("Couldn't acquire locks in %0.6fs", timeout) @@ -1042,12 +1107,25 @@ class _JobProcessor(object): if op.status == constants.OP_STATUS_CANCELING: return (constants.OP_STATUS_CANCELING, None) + # Queue is shutting down, return to queued + if not self.queue.AcceptingJobsUnlocked(): + return (constants.OP_STATUS_QUEUED, None) + # Stay in waitlock while trying to re-acquire lock return (constants.OP_STATUS_WAITING, None) except CancelJob: logging.exception("%s: Canceling job", opctx.log_prefix) assert op.status == constants.OP_STATUS_CANCELING return (constants.OP_STATUS_CANCELING, None) + + except QueueShutdown: + logging.exception("%s: Queue is shutting down", opctx.log_prefix) + + assert op.status == constants.OP_STATUS_WAITING + + # Job hadn't been started yet, so it should return to the queue + return (constants.OP_STATUS_QUEUED, None) + except Exception, err: # pylint: disable=W0703 logging.exception("%s: Caught exception in %s", opctx.log_prefix, opctx.summary) @@ -1145,8 +1223,10 @@ class _JobProcessor(object): assert not waitjob - if op.status == constants.OP_STATUS_WAITING: - # Couldn't get locks in time + if op.status in (constants.OP_STATUS_WAITING, + constants.OP_STATUS_QUEUED): + # waiting: Couldn't get locks in time + # queued: Queue is shutting down assert not op.end_timestamp else: # Finalize opcode @@ -1158,7 +1238,19 @@ class _JobProcessor(object): else: assert op.status in constants.OPS_FINALIZED - if op.status == constants.OP_STATUS_WAITING or waitjob: + if op.status == constants.OP_STATUS_QUEUED: + # Queue is shutting down + assert not waitjob + + finalize = False + + # Reset context + job.cur_opctx = None + + # In no case must the status be finalized here + assert job.CalcStatus() == constants.JOB_STATUS_QUEUED + + elif op.status == constants.OP_STATUS_WAITING or waitjob: finalize = False if not waitjob and opctx.CheckPriorityIncrease(): @@ -1237,6 +1329,29 @@ class _JobProcessor(object): queue.release() +def _EvaluateJobProcessorResult(depmgr, job, result): + """Looks at a result from L{_JobProcessor} for a job. + + To be used in a L{_JobQueueWorker}. + + """ + if result == _JobProcessor.FINISHED: + # Notify waiting jobs + depmgr.NotifyWaiters(job.id) + + elif result == _JobProcessor.DEFER: + # Schedule again + raise workerpool.DeferTask(priority=job.CalcPriority()) + + elif result == _JobProcessor.WAITDEP: + # No-op, dependency manager will re-schedule + pass + + else: + raise errors.ProgrammerError("Job processor returned unknown status %s" % + (result, )) + + class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. @@ -1277,23 +1392,8 @@ class _JobQueueWorker(workerpool.BaseWorker): wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, proc.ExecOpCode) - result = _JobProcessor(queue, wrap_execop_fn, job)() - - if result == _JobProcessor.FINISHED: - # Notify waiting jobs - queue.depmgr.NotifyWaiters(job.id) - - elif result == _JobProcessor.DEFER: - # Schedule again - raise workerpool.DeferTask(priority=job.CalcPriority()) - - elif result == _JobProcessor.WAITDEP: - # No-op, dependency manager will re-schedule - pass - - else: - raise errors.ProgrammerError("Job processor returned unknown status %s" % - (result, )) + _EvaluateJobProcessorResult(queue.depmgr, job, + _JobProcessor(queue, wrap_execop_fn, job)()) @staticmethod def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs): @@ -1390,14 +1490,14 @@ class _JobDependencyManager: @type job: L{_QueuedJob} @param job: Job object - @type dep_job_id: string + @type dep_job_id: int @param dep_job_id: ID of dependency job @type dep_status: list @param dep_status: Required status """ - assert ht.TString(job.id) - assert ht.TString(dep_job_id) + assert ht.TJobId(job.id) + assert ht.TJobId(dep_job_id) assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) if job.id == dep_job_id: @@ -1452,11 +1552,11 @@ class _JobDependencyManager: @attention: Do not call until L{CheckAndRegister} returned a status other than C{WAITDEP} for C{job_id}, or behaviour is undefined - @type job_id: string + @type job_id: int @param job_id: Job ID """ - assert ht.TString(job_id) + assert ht.TJobId(job_id) self._lock.acquire() try: @@ -1498,6 +1598,31 @@ def _RequireOpenQueue(fn): return wrapper +def _RequireNonDrainedQueue(fn): + """Decorator checking for a non-drained queue. + + To be used with functions submitting new jobs. + + """ + def wrapper(self, *args, **kwargs): + """Wrapper function. + + @raise errors.JobQueueDrainError: if the job queue is marked for draining + + """ + # Ok when sharing the big job queue lock, as the drain file is created when + # the lock is exclusive. + # Needs access to protected member, pylint: disable=W0212 + if self._drained: + raise errors.JobQueueDrainError("Job queue is drained, refusing job") + + if not self._accepting_jobs: + raise errors.JobQueueError("Job queue is shutting down, refusing job") + + return fn(self, *args, **kwargs) + return wrapper + + class JobQueue(object): """Queue used to manage the jobs. @@ -1529,6 +1654,9 @@ class JobQueue(object): self.acquire = self._lock.acquire self.release = self._lock.release + # Accept jobs by default + self._accepting_jobs = True + # Initialize the queue, and acquire the filelock. # This ensures no other process is working on the job queue. self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) @@ -1548,8 +1676,9 @@ class JobQueue(object): # TODO: Check consistency across nodes - self._queue_size = 0 + self._queue_size = None self._UpdateQueueSizeUnlocked() + assert ht.TInt(self._queue_size) self._drained = jstore.CheckDrainFlag() # Job dependencies @@ -1622,6 +1751,12 @@ class JobQueue(object): logging.info("Job queue inspection finished") + def _GetRpc(self, address_list): + """Gets RPC runner with context. + + """ + return rpc.JobQueueRunner(self.context, address_list) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def AddNode(self, node): @@ -1635,7 +1770,7 @@ class JobQueue(object): assert node_name != self._my_hostname # Clean queue directory on added node - result = rpc.RpcRunner.call_jobqueue_purge(node_name) + result = self._GetRpc(None).call_jobqueue_purge(node_name) msg = result.fail_msg if msg: logging.warning("Cannot cleanup queue directory on node %s: %s", @@ -1651,20 +1786,31 @@ class JobQueue(object): files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] # Upload current serial file - files.append(constants.JOB_QUEUE_SERIAL_FILE) + files.append(pathutils.JOB_QUEUE_SERIAL_FILE) + + # Static address list + addrs = [node.primary_ip] for file_name in files: # Read file content content = utils.ReadFile(file_name) - result = rpc.RpcRunner.call_jobqueue_update([node_name], - [node.primary_ip], - file_name, content) + result = _CallJqUpdate(self._GetRpc(addrs), [node_name], + file_name, content) msg = result[node_name].fail_msg if msg: logging.error("Failed to upload file %s to node %s: %s", file_name, node_name, msg) + # Set queue drained flag + result = \ + self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name], + self._drained) + msg = result[node_name].fail_msg + if msg: + logging.error("Failed to set queue drained flag on node %s: %s", + node_name, msg) + self._nodes[node_name] = node.primary_ip @locking.ssynchronized(_LOCK) @@ -1743,7 +1889,7 @@ class JobQueue(object): if replicate: names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) + result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data) self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) def _RenameFilesUnlocked(self, rename): @@ -1762,42 +1908,9 @@ class JobQueue(object): # ... and on all nodes names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) + result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) - @staticmethod - def _FormatJobID(job_id): - """Convert a job ID to string format. - - Currently this just does C{str(job_id)} after performing some - checks, but if we want to change the job id format this will - abstract this change. - - @type job_id: int or long - @param job_id: the numeric job id - @rtype: str - @return: the formatted job id - - """ - if not isinstance(job_id, (int, long)): - raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) - if job_id < 0: - raise errors.ProgrammerError("Job ID %s is negative" % job_id) - - return str(job_id) - - @classmethod - def _GetArchiveDirectory(cls, job_id): - """Returns the archive directory for a job. - - @type job_id: str - @param job_id: Job identifier - @rtype: str - @return: Directory name - - """ - return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY) - def _NewSerialsUnlocked(self, count): """Generates a new job identifier. @@ -1805,20 +1918,20 @@ class JobQueue(object): @type count: integer @param count: how many serials to return - @rtype: str - @return: a string representing the job identifier. + @rtype: list of int + @return: a list of job identifiers. """ - assert ht.TPositiveInt(count) + assert ht.TNonNegativeInt(count) # New number serial = self._last_serial + count # Write to file - self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE, + self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE, "%s\n" % serial, True) - result = [self._FormatJobID(v) + result = [jstore.FormatJobID(v) for v in range(self._last_serial + 1, serial + 1)] # Keep it only if we were able to write the file @@ -1838,10 +1951,10 @@ class JobQueue(object): @return: the path to the job file """ - return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id) + return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id) - @classmethod - def _GetArchivedJobPath(cls, job_id): + @staticmethod + def _GetArchivedJobPath(job_id): """Returns the archived job file for a give job id. @type job_id: str @@ -1850,11 +1963,30 @@ class JobQueue(object): @return: the path to the archived job file """ - return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, - cls._GetArchiveDirectory(job_id), "job-%s" % job_id) + return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR, + jstore.GetArchiveDirectory(job_id), + "job-%s" % job_id) @staticmethod - def _GetJobIDsUnlocked(sort=True): + def _DetermineJobDirectories(archived): + """Build list of directories containing job files. + + @type archived: bool + @param archived: Whether to include directories for archived jobs + @rtype: list + + """ + result = [pathutils.QUEUE_DIR] + + if archived: + archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR + result.extend(map(compat.partial(utils.PathJoin, archive_path), + utils.ListVisibleFiles(archive_path))) + + return result + + @classmethod + def _GetJobIDsUnlocked(cls, sort=True, archived=False): """Return all known job IDs. The method only looks at disk because it's a requirement that all @@ -1868,12 +2000,15 @@ class JobQueue(object): """ jlist = [] - for filename in utils.ListVisibleFiles(constants.QUEUE_DIR): - m = constants.JOB_FILE_RE.match(filename) - if m: - jlist.append(m.group(1)) + + for path in cls._DetermineJobDirectories(archived): + for filename in utils.ListVisibleFiles(path): + m = constants.JOB_FILE_RE.match(filename) + if m: + jlist.append(int(m.group(1))) + if sort: - jlist = utils.NiceSort(jlist) + jlist.sort() return jlist def _LoadJobUnlocked(self, job_id): @@ -1883,6 +2018,7 @@ class JobQueue(object): existing, or try to load the job from the disk. If loading from disk, it will also add the job to the cache. + @type job_id: int @param job_id: the job id @rtype: L{_QueuedJob} or None @return: either None or the job object @@ -1921,7 +2057,7 @@ class JobQueue(object): Given a job file, read, load and restore it in a _QueuedJob format. - @type job_id: string + @type job_id: int @param job_id: job identifier @type try_archived: bool @param try_archived: Whether to try loading an archived job @@ -1929,15 +2065,15 @@ class JobQueue(object): @return: either None or the job object """ - path_functions = [(self._GetJobPath, True)] + path_functions = [(self._GetJobPath, False)] if try_archived: - path_functions.append((self._GetArchivedJobPath, False)) + path_functions.append((self._GetArchivedJobPath, True)) raw_data = None - writable_default = None + archived = None - for (fn, writable_default) in path_functions: + for (fn, archived) in path_functions: filepath = fn(job_id) logging.debug("Loading job from %s", filepath) try: @@ -1952,11 +2088,11 @@ class JobQueue(object): return None if writable is None: - writable = writable_default + writable = not archived try: data = serializer.LoadJson(raw_data) - job = _QueuedJob.Restore(self, data, writable) + job = _QueuedJob.Restore(self, data, writable, archived) except Exception, err: # pylint: disable=W0703 raise errors.JobFileCorrupted(err) @@ -1969,7 +2105,7 @@ class JobQueue(object): In case of error reading the job, it gets returned as None, and the exception is logged. - @type job_id: string + @type job_id: int @param job_id: job identifier @type try_archived: bool @param try_archived: Whether to try loading an archived job @@ -1998,10 +2134,18 @@ class JobQueue(object): @param drain_flag: Whether to set or unset the drain flag """ + # Change flag locally jstore.SetDrainFlag(drain_flag) self._drained = drain_flag + # ... and on all nodes + (names, addrs) = self._GetNodeIp() + result = \ + self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag) + self._CheckRpcResult(result, self._nodes, + "Setting queue drain flag to %s" % drain_flag) + return True @_RequireOpenQueue @@ -2017,28 +2161,23 @@ class JobQueue(object): @param ops: The list of OpCodes that will become the new job. @rtype: L{_QueuedJob} @return: the job object to be queued - @raise errors.JobQueueDrainError: if the job queue is marked for draining @raise errors.JobQueueFull: if the job queue has too many jobs in it @raise errors.GenericError: If an opcode is not valid """ - # Ok when sharing the big job queue lock, as the drain file is created when - # the lock is exclusive. - if self._drained: - raise errors.JobQueueDrainError("Job queue is drained, refusing job") - if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: raise errors.JobQueueFull() job = _QueuedJob(self, job_id, ops, True) - # Check priority for idx, op in enumerate(job.ops): + # Check priority if op.priority not in constants.OP_PRIO_SUBMIT_VALID: allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) raise errors.GenericError("Opcode %s has invalid priority %s, allowed" " are %s" % (idx, op.priority, allowed)) + # Check job dependencies dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None) if not opcodes.TNoRelativeJobDependencies(dependencies): raise errors.GenericError("Opcode %s has invalid dependencies, must" @@ -2058,6 +2197,7 @@ class JobQueue(object): @locking.ssynchronized(_LOCK) @_RequireOpenQueue + @_RequireNonDrainedQueue def SubmitJob(self, ops): """Create and store a new job. @@ -2070,6 +2210,7 @@ class JobQueue(object): @locking.ssynchronized(_LOCK) @_RequireOpenQueue + @_RequireNonDrainedQueue def SubmitManyJobs(self, jobs): """Create and store multiple jobs. @@ -2101,8 +2242,10 @@ class JobQueue(object): @param resolve_fn: Function to resolve a relative job ID @type deps: list @param deps: Dependencies - @rtype: list - @return: Resolved dependencies + @rtype: tuple; (boolean, string or list) + @return: If successful (first tuple item), the returned list contains + resolved job IDs along with the requested status; if not successful, + the second element is an error message """ result = [] @@ -2181,19 +2324,17 @@ class JobQueue(object): """ assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode" self._wpool.AddManyTasks([(job, ) for job in jobs], - priority=[job.CalcPriority() for job in jobs]) + priority=[job.CalcPriority() for job in jobs], + task_id=map(_GetIdAttr, jobs)) def _GetJobStatusForDependencies(self, job_id): """Gets the status of a job for dependencies. - @type job_id: string + @type job_id: int @param job_id: Job ID @raise errors.JobLost: If job can't be found """ - if not isinstance(job_id, basestring): - job_id = self._FormatJobID(job_id) - # Not using in-memory cache as doing so would require an exclusive lock # Try to load from disk @@ -2224,9 +2365,10 @@ class JobQueue(object): finalized = job.CalcStatus() in constants.JOBS_FINALIZED assert (finalized ^ (job.end_timestamp is None)) assert job.writable, "Can't update read-only job" + assert not job.archived, "Can't update archived job" filename = self._GetJobPath(job.id) - data = serializer.DumpJson(job.Serialize(), indent=False) + data = serializer.DumpJson(job.Serialize()) logging.debug("Writing job %s to %s", job.id, filename) self._UpdateJobQueueFile(filename, data, replicate) @@ -2234,7 +2376,7 @@ class JobQueue(object): timeout): """Waits for changes in a job. - @type job_id: string + @type job_id: int @param job_id: Job identifier @type fields: list of strings @param fields: Which fields to check for changes @@ -2254,7 +2396,7 @@ class JobQueue(object): as such by the clients """ - load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False, + load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True, writable=False) helper = _WaitForJobChangesHelper() @@ -2269,20 +2411,64 @@ class JobQueue(object): This will only succeed if the job has not started yet. - @type job_id: string + @type job_id: int @param job_id: job ID of job to be cancelled. """ logging.info("Cancelling job %s", job_id) + return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel()) + + @locking.ssynchronized(_LOCK) + @_RequireOpenQueue + def ChangeJobPriority(self, job_id, priority): + """Changes a job's priority. + + @type job_id: int + @param job_id: ID of the job whose priority should be changed + @type priority: int + @param priority: New priority + + """ + logging.info("Changing priority of job %s to %s", job_id, priority) + + if priority not in constants.OP_PRIO_SUBMIT_VALID: + allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) + raise errors.GenericError("Invalid priority %s, allowed are %s" % + (priority, allowed)) + + def fn(job): + (success, msg) = job.ChangePriority(priority) + + if success: + try: + self._wpool.ChangeTaskPriority(job.id, job.CalcPriority()) + except workerpool.NoSuchTask: + logging.debug("Job %s is not in workerpool at this time", job.id) + + return (success, msg) + + return self._ModifyJobUnlocked(job_id, fn) + + def _ModifyJobUnlocked(self, job_id, mod_fn): + """Modifies a job. + + @type job_id: int + @param job_id: Job ID + @type mod_fn: callable + @param mod_fn: Modifying function, receiving job object as parameter, + returning tuple of (status boolean, message string) + + """ job = self._LoadJobUnlocked(job_id) if not job: logging.debug("Job %s not found", job_id) return (False, "Job %s not found" % job_id) - assert job.writable, "Can't cancel read-only job" + assert job.writable, "Can't modify read-only job" + assert not job.archived, "Can't modify archived job" - (success, msg) = job.Cancel() + (success, msg) = mod_fn(job) if success: # If the job was finalized (e.g. cancelled), this is the final write @@ -2305,6 +2491,7 @@ class JobQueue(object): rename_files = [] for job in jobs: assert job.writable, "Can't archive read-only job" + assert not job.archived, "Can't cancel archived job" if job.CalcStatus() not in constants.JOBS_FINALIZED: logging.debug("Job %s is not yet done", job.id) @@ -2336,7 +2523,7 @@ class JobQueue(object): This is just a wrapper over L{_ArchiveJobsUnlocked}. - @type job_id: string + @type job_id: int @param job_id: Job ID of job to be archived. @rtype: bool @return: Whether job was archived @@ -2407,7 +2594,47 @@ class JobQueue(object): return (archived_count, len(all_job_ids) - last_touched) - def QueryJobs(self, job_ids, fields): + def _Query(self, fields, qfilter): + qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter, + namefield="id") + + # Archived jobs are only looked at if the "archived" field is referenced + # either as a requested field or in the filter. By default archived jobs + # are ignored. + include_archived = (query.JQ_ARCHIVED in qobj.RequestedData()) + + job_ids = qobj.RequestedNames() + + list_all = (job_ids is None) + + if list_all: + # Since files are added to/removed from the queue atomically, there's no + # risk of getting the job ids in an inconsistent state. + job_ids = self._GetJobIDsUnlocked(archived=include_archived) + + jobs = [] + + for job_id in job_ids: + job = self.SafeLoadJobFromDisk(job_id, True, writable=False) + if job is not None or not list_all: + jobs.append((job_id, job)) + + return (qobj, jobs, list_all) + + def QueryJobs(self, fields, qfilter): + """Returns a list of jobs in queue. + + @type fields: sequence + @param fields: List of wanted fields + @type qfilter: None or query2 filter (list) + @param qfilter: Query filter + + """ + (qobj, ctx, _) = self._Query(fields, qfilter) + + return query.GetQueryResponse(qobj, ctx, sort_by_name=False) + + def OldStyleQueryJobs(self, job_ids, fields): """Returns a list of jobs in queue. @type job_ids: list @@ -2419,22 +2646,49 @@ class JobQueue(object): the requested fields """ - jobs = [] - list_all = False - if not job_ids: - # Since files are added to/removed from the queue atomically, there's no - # risk of getting the job ids in an inconsistent state. - job_ids = self._GetJobIDsUnlocked() - list_all = True + # backwards compat: + job_ids = [int(jid) for jid in job_ids] + qfilter = qlang.MakeSimpleFilter("id", job_ids) - for job_id in job_ids: - job = self.SafeLoadJobFromDisk(job_id, True) - if job is not None: - jobs.append(job.GetInfo(fields)) - elif not list_all: - jobs.append(None) + (qobj, ctx, _) = self._Query(fields, qfilter) + + return qobj.OldStyleQuery(ctx, sort_by_name=False) + + @locking.ssynchronized(_LOCK) + def PrepareShutdown(self): + """Prepare to stop the job queue. + + Disables execution of jobs in the workerpool and returns whether there are + any jobs currently running. If the latter is the case, the job queue is not + yet ready for shutdown. Once this function returns C{True} L{Shutdown} can + be called without interfering with any job. Queued and unfinished jobs will + be resumed next time. + + Once this function has been called no new job submissions will be accepted + (see L{_RequireNonDrainedQueue}). - return jobs + @rtype: bool + @return: Whether there are any running jobs + + """ + if self._accepting_jobs: + self._accepting_jobs = False + + # Tell worker pool to stop processing pending tasks + self._wpool.SetActive(False) + + return self._wpool.HasRunningTasks() + + def AcceptingJobsUnlocked(self): + """Returns whether jobs are accepted. + + Once L{PrepareShutdown} has been called, no new jobs are accepted and the + queue is shutting down. + + @rtype: bool + + """ + return self._accepting_jobs @locking.ssynchronized(_LOCK) @_RequireOpenQueue