X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/4c27b2319efd0301ba23e54b308f55c1293daeba..4f2f98f170abda39b7075eaf61755bcec663f47a:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index d16b8f7..9e9d988 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -35,6 +35,7 @@ import time import weakref import threading import itertools +import operator try: # pylint: disable=E0611 @@ -48,6 +49,7 @@ from ganeti import serializer from ganeti import workerpool from ganeti import locking from ganeti import opcodes +from ganeti import opcodes_base from ganeti import errors from ganeti import mcpu from ganeti import utils @@ -69,6 +71,9 @@ JOBQUEUE_THREADS = 25 _LOCK = "_lock" _QUEUE = "_queue" +#: Retrieves "id" attribute +_GetIdAttr = operator.attrgetter("id") + class CancelJob(Exception): """Special exception to cancel a job. @@ -76,6 +81,12 @@ class CancelJob(Exception): """ +class QueueShutdown(Exception): + """Special exception to abort a job when the job queue is shutting down. + + """ + + def TimeStampNow(): """Returns the current timestamp. @@ -210,7 +221,23 @@ class _QueuedJob(object): # pylint: disable=W0212 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", "received_timestamp", "start_timestamp", "end_timestamp", - "__weakref__", "processor_lock", "writable"] + "__weakref__", "processor_lock", "writable", "archived"] + + def _AddReasons(self): + """Extend the reason trail + + Add the reason for all the opcodes of this job to be executed. + + """ + count = 0 + for queued_op in self.ops: + op = queued_op.input + reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__) + reason_text = "job=%d;index=%d" % (self.id, count) + reason = getattr(op, "reason", []) + reason.append((reason_src, reason_text, utils.EpochNano())) + op.reason = reason + count = count + 1 def __init__(self, queue, job_id, ops, writable): """Constructor for the _QueuedJob. @@ -232,13 +259,17 @@ class _QueuedJob(object): self.queue = queue self.id = int(job_id) self.ops = [_QueuedOpCode(op) for op in ops] + self._AddReasons() self.log_serial = 0 self.received_timestamp = TimeStampNow() self.start_timestamp = None self.end_timestamp = None + self.archived = False self._InitInMemory(self, writable) + assert not self.archived, "New jobs can not be marked as archived" + @staticmethod def _InitInMemory(obj, writable): """Initializes in-memory variables. @@ -262,7 +293,7 @@ class _QueuedJob(object): return "<%s at %#x>" % (" ".join(status), id(self)) @classmethod - def Restore(cls, queue, state, writable): + def Restore(cls, queue, state, writable, archived): """Restore a _QueuedJob from serialized state: @type queue: L{JobQueue} @@ -271,6 +302,8 @@ class _QueuedJob(object): @param state: the serialized state @type writable: bool @param writable: Whether job can be modified + @type archived: bool + @param archived: Whether job was already archived @rtype: _JobQueue @return: the restored _JobQueue instance @@ -281,6 +314,7 @@ class _QueuedJob(object): obj.received_timestamp = state.get("received_timestamp", None) obj.start_timestamp = state.get("start_timestamp", None) obj.end_timestamp = state.get("end_timestamp", None) + obj.archived = archived obj.ops = [] obj.log_serial = 0 @@ -466,6 +500,50 @@ class _QueuedJob(object): logging.debug("Job %s is no longer waiting in the queue", self.id) return (False, "Job %s is no longer waiting in the queue" % self.id) + def ChangePriority(self, priority): + """Changes the job priority. + + @type priority: int + @param priority: New priority + @rtype: tuple; (bool, string) + @return: Boolean describing whether job's priority was successfully changed + and a text message + + """ + status = self.CalcStatus() + + if status in constants.JOBS_FINALIZED: + return (False, "Job %s is finished" % self.id) + elif status == constants.JOB_STATUS_CANCELING: + return (False, "Job %s is cancelling" % self.id) + else: + assert status in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_WAITING, + constants.JOB_STATUS_RUNNING) + + changed = False + for op in self.ops: + if (op.status == constants.OP_STATUS_RUNNING or + op.status in constants.OPS_FINALIZED): + assert not changed, \ + ("Found opcode for which priority should not be changed after" + " priority has been changed for previous opcodes") + continue + + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITING) + + changed = True + + # Set new priority (doesn't modify opcode input) + op.priority = priority + + if changed: + return (True, ("Priorities of pending opcodes for job %s have been" + " changed to %s" % (self.id, priority))) + else: + return (False, "Job %s had no pending opcodes" % self.id) + class _OpExecCallbacks(mcpu.OpExecCbBase): def __init__(self, queue, job, op): @@ -496,6 +574,11 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): logging.debug("Canceling opcode") raise CancelJob() + # See if queue is shutting down + if not self._queue.AcceptingJobsUnlocked(): + logging.debug("Queue is shutting down") + raise QueueShutdown() + @locking.ssynchronized(_QUEUE, shared=1) def NotifyStart(self): """Mark the opcode as running, not lock-waiting. @@ -547,8 +630,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): timestamp = utils.SplitTime(time.time()) self._AppendFeedback(timestamp, log_type, log_msg) - def CheckCancel(self): - """Check whether job has been cancelled. + def CurrentPriority(self): + """Returns current priority for opcode. """ assert self._op.status in (constants.OP_STATUS_WAITING, @@ -557,6 +640,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): # Cancel here if we were asked to self._CheckCancel() + return self._op.priority + def SubmitManyJobs(self, jobs): """Submits jobs for processing. @@ -622,7 +707,7 @@ class _JobChangesChecker(object): class _JobFileChangesWaiter(object): - def __init__(self, filename): + def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager): """Initializes this class. @type filename: string @@ -630,7 +715,7 @@ class _JobFileChangesWaiter(object): @raises errors.InotifyError: if the notifier cannot be setup """ - self._wm = pyinotify.WatchManager() + self._wm = _inotify_wm_cls() self._inotify_handler = \ asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) self._notifier = \ @@ -672,7 +757,7 @@ class _JobFileChangesWaiter(object): class _JobChangesWaiter(object): - def __init__(self, filename): + def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter): """Initializes this class. @type filename: string @@ -681,6 +766,7 @@ class _JobChangesWaiter(object): """ self._filewaiter = None self._filename = filename + self._waiter_cls = _waiter_cls def Wait(self, timeout): """Waits for a job to change. @@ -697,7 +783,7 @@ class _JobChangesWaiter(object): # If this point is reached, return immediately and let caller check the job # file again in case there were changes since the last check. This avoids a # race condition. - self._filewaiter = _JobFileChangesWaiter(self._filename) + self._filewaiter = self._waiter_cls(self._filename) return True @@ -735,7 +821,8 @@ class _WaitForJobChangesHelper(object): return result def __call__(self, filename, job_load_fn, - fields, prev_job_info, prev_log_serial, timeout): + fields, prev_job_info, prev_log_serial, timeout, + _waiter_cls=_JobChangesWaiter): """Waits for changes on a job. @type filename: string @@ -755,7 +842,7 @@ class _WaitForJobChangesHelper(object): counter = itertools.count() try: check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) - waiter = _JobChangesWaiter(filename) + waiter = _waiter_cls(filename) try: return utils.Retry(compat.partial(self._CheckForChanges, counter, job_load_fn, check_fn), @@ -763,7 +850,7 @@ class _WaitForJobChangesHelper(object): wait_fn=waiter.Wait) finally: waiter.Close() - except (errors.InotifyError, errors.JobLost): + except errors.JobLost: return None except utils.RetryTimeout: return constants.JOB_NOTCHANGED @@ -824,7 +911,7 @@ class _OpExecContext: self.summary = op.input.Summary() # Create local copy to modify - if getattr(op.input, opcodes.DEPEND_ATTR, None): + if getattr(op.input, opcodes_base.DEPEND_ATTR, None): self.jobdeps = op.input.depends[:] else: self.jobdeps = None @@ -1026,7 +1113,7 @@ class _JobProcessor(object): # Make sure not to hold queue lock while calling ExecOpCode result = self.opexec_fn(op.input, _OpExecCallbacks(self.queue, self.job, op), - timeout=timeout, priority=op.priority) + timeout=timeout) except mcpu.LockAcquireTimeout: assert timeout is not None, "Received timeout for blocking acquire" logging.debug("Couldn't acquire locks in %0.6fs", timeout) @@ -1038,12 +1125,25 @@ class _JobProcessor(object): if op.status == constants.OP_STATUS_CANCELING: return (constants.OP_STATUS_CANCELING, None) + # Queue is shutting down, return to queued + if not self.queue.AcceptingJobsUnlocked(): + return (constants.OP_STATUS_QUEUED, None) + # Stay in waitlock while trying to re-acquire lock return (constants.OP_STATUS_WAITING, None) except CancelJob: logging.exception("%s: Canceling job", opctx.log_prefix) assert op.status == constants.OP_STATUS_CANCELING return (constants.OP_STATUS_CANCELING, None) + + except QueueShutdown: + logging.exception("%s: Queue is shutting down", opctx.log_prefix) + + assert op.status == constants.OP_STATUS_WAITING + + # Job hadn't been started yet, so it should return to the queue + return (constants.OP_STATUS_QUEUED, None) + except Exception, err: # pylint: disable=W0703 logging.exception("%s: Caught exception in %s", opctx.log_prefix, opctx.summary) @@ -1141,8 +1241,10 @@ class _JobProcessor(object): assert not waitjob - if op.status == constants.OP_STATUS_WAITING: - # Couldn't get locks in time + if op.status in (constants.OP_STATUS_WAITING, + constants.OP_STATUS_QUEUED): + # waiting: Couldn't get locks in time + # queued: Queue is shutting down assert not op.end_timestamp else: # Finalize opcode @@ -1154,7 +1256,19 @@ class _JobProcessor(object): else: assert op.status in constants.OPS_FINALIZED - if op.status == constants.OP_STATUS_WAITING or waitjob: + if op.status == constants.OP_STATUS_QUEUED: + # Queue is shutting down + assert not waitjob + + finalize = False + + # Reset context + job.cur_opctx = None + + # In no case must the status be finalized here + assert job.CalcStatus() == constants.JOB_STATUS_QUEUED + + elif op.status == constants.OP_STATUS_WAITING or waitjob: finalize = False if not waitjob and opctx.CheckPriorityIncrease(): @@ -1706,6 +1820,15 @@ class JobQueue(object): logging.error("Failed to upload file %s to node %s: %s", file_name, node_name, msg) + # Set queue drained flag + result = \ + self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name], + self._drained) + msg = result[node_name].fail_msg + if msg: + logging.error("Failed to set queue drained flag on node %s: %s", + node_name, msg) + self._nodes[node_name] = node.primary_ip @locking.ssynchronized(_LOCK) @@ -1780,7 +1903,8 @@ class JobQueue(object): """ getents = runtime.GetEnts() utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, - gid=getents.masterd_gid) + gid=getents.daemons_gid, + mode=constants.JOB_QUEUE_FILES_PERMS) if replicate: names, addrs = self._GetNodeIp() @@ -1817,7 +1941,7 @@ class JobQueue(object): @return: a list of job identifiers. """ - assert ht.TPositiveInt(count) + assert ht.TNonNegativeInt(count) # New number serial = self._last_serial + count @@ -1863,7 +1987,25 @@ class JobQueue(object): "job-%s" % job_id) @staticmethod - def _GetJobIDsUnlocked(sort=True): + def _DetermineJobDirectories(archived): + """Build list of directories containing job files. + + @type archived: bool + @param archived: Whether to include directories for archived jobs + @rtype: list + + """ + result = [pathutils.QUEUE_DIR] + + if archived: + archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR + result.extend(map(compat.partial(utils.PathJoin, archive_path), + utils.ListVisibleFiles(archive_path))) + + return result + + @classmethod + def _GetJobIDsUnlocked(cls, sort=True, archived=False): """Return all known job IDs. The method only looks at disk because it's a requirement that all @@ -1877,10 +2019,13 @@ class JobQueue(object): """ jlist = [] - for filename in utils.ListVisibleFiles(pathutils.QUEUE_DIR): - m = constants.JOB_FILE_RE.match(filename) - if m: - jlist.append(int(m.group(1))) + + for path in cls._DetermineJobDirectories(archived): + for filename in utils.ListVisibleFiles(path): + m = constants.JOB_FILE_RE.match(filename) + if m: + jlist.append(int(m.group(1))) + if sort: jlist.sort() return jlist @@ -1939,15 +2084,15 @@ class JobQueue(object): @return: either None or the job object """ - path_functions = [(self._GetJobPath, True)] + path_functions = [(self._GetJobPath, False)] if try_archived: - path_functions.append((self._GetArchivedJobPath, False)) + path_functions.append((self._GetArchivedJobPath, True)) raw_data = None - writable_default = None + archived = None - for (fn, writable_default) in path_functions: + for (fn, archived) in path_functions: filepath = fn(job_id) logging.debug("Loading job from %s", filepath) try: @@ -1962,11 +2107,11 @@ class JobQueue(object): return None if writable is None: - writable = writable_default + writable = not archived try: data = serializer.LoadJson(raw_data) - job = _QueuedJob.Restore(self, data, writable) + job = _QueuedJob.Restore(self, data, writable, archived) except Exception, err: # pylint: disable=W0703 raise errors.JobFileCorrupted(err) @@ -2008,10 +2153,18 @@ class JobQueue(object): @param drain_flag: Whether to set or unset the drain flag """ + # Change flag locally jstore.SetDrainFlag(drain_flag) self._drained = drain_flag + # ... and on all nodes + (names, addrs) = self._GetNodeIp() + result = \ + self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag) + self._CheckRpcResult(result, self._nodes, + "Setting queue drain flag to %s" % drain_flag) + return True @_RequireOpenQueue @@ -2036,18 +2189,19 @@ class JobQueue(object): job = _QueuedJob(self, job_id, ops, True) - # Check priority for idx, op in enumerate(job.ops): + # Check priority if op.priority not in constants.OP_PRIO_SUBMIT_VALID: allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) raise errors.GenericError("Opcode %s has invalid priority %s, allowed" " are %s" % (idx, op.priority, allowed)) - dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None) - if not opcodes.TNoRelativeJobDependencies(dependencies): + # Check job dependencies + dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None) + if not opcodes_base.TNoRelativeJobDependencies(dependencies): raise errors.GenericError("Opcode %s has invalid dependencies, must" " match %s: %s" % - (idx, opcodes.TNoRelativeJobDependencies, + (idx, opcodes_base.TNoRelativeJobDependencies, dependencies)) # Write to disk @@ -2075,6 +2229,19 @@ class JobQueue(object): @locking.ssynchronized(_LOCK) @_RequireOpenQueue + def SubmitJobToDrainedQueue(self, ops): + """Forcefully create and store a new job. + + Do so, even if the job queue is drained. + @see: L{_SubmitJobUnlocked} + + """ + (job_id, ) = self._NewSerialsUnlocked(1) + self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)]) + return job_id + + @locking.ssynchronized(_LOCK) + @_RequireOpenQueue @_RequireNonDrainedQueue def SubmitManyJobs(self, jobs): """Create and store multiple jobs. @@ -2145,7 +2312,7 @@ class JobQueue(object): for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)): for op in ops: - if getattr(op, opcodes.DEPEND_ATTR, None): + if getattr(op, opcodes_base.DEPEND_ATTR, None): (status, data) = \ self._ResolveJobDependencies(compat.partial(resolve_fn, idx), op.depends) @@ -2189,7 +2356,8 @@ class JobQueue(object): """ assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode" self._wpool.AddManyTasks([(job, ) for job in jobs], - priority=[job.CalcPriority() for job in jobs]) + priority=[job.CalcPriority() for job in jobs], + task_id=map(_GetIdAttr, jobs)) def _GetJobStatusForDependencies(self, job_id): """Gets the status of a job for dependencies. @@ -2229,6 +2397,7 @@ class JobQueue(object): finalized = job.CalcStatus() in constants.JOBS_FINALIZED assert (finalized ^ (job.end_timestamp is None)) assert job.writable, "Can't update read-only job" + assert not job.archived, "Can't update archived job" filename = self._GetJobPath(job.id) data = serializer.DumpJson(job.Serialize()) @@ -2280,14 +2449,58 @@ class JobQueue(object): """ logging.info("Cancelling job %s", job_id) + return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel()) + + @locking.ssynchronized(_LOCK) + @_RequireOpenQueue + def ChangeJobPriority(self, job_id, priority): + """Changes a job's priority. + + @type job_id: int + @param job_id: ID of the job whose priority should be changed + @type priority: int + @param priority: New priority + + """ + logging.info("Changing priority of job %s to %s", job_id, priority) + + if priority not in constants.OP_PRIO_SUBMIT_VALID: + allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) + raise errors.GenericError("Invalid priority %s, allowed are %s" % + (priority, allowed)) + + def fn(job): + (success, msg) = job.ChangePriority(priority) + + if success: + try: + self._wpool.ChangeTaskPriority(job.id, job.CalcPriority()) + except workerpool.NoSuchTask: + logging.debug("Job %s is not in workerpool at this time", job.id) + + return (success, msg) + + return self._ModifyJobUnlocked(job_id, fn) + + def _ModifyJobUnlocked(self, job_id, mod_fn): + """Modifies a job. + + @type job_id: int + @param job_id: Job ID + @type mod_fn: callable + @param mod_fn: Modifying function, receiving job object as parameter, + returning tuple of (status boolean, message string) + + """ job = self._LoadJobUnlocked(job_id) if not job: logging.debug("Job %s not found", job_id) return (False, "Job %s not found" % job_id) - assert job.writable, "Can't cancel read-only job" + assert job.writable, "Can't modify read-only job" + assert not job.archived, "Can't modify archived job" - (success, msg) = job.Cancel() + (success, msg) = mod_fn(job) if success: # If the job was finalized (e.g. cancelled), this is the final write @@ -2310,6 +2523,7 @@ class JobQueue(object): rename_files = [] for job in jobs: assert job.writable, "Can't archive read-only job" + assert not job.archived, "Can't cancel archived job" if job.CalcStatus() not in constants.JOBS_FINALIZED: logging.debug("Job %s is not yet done", job.id) @@ -2416,6 +2630,11 @@ class JobQueue(object): qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter, namefield="id") + # Archived jobs are only looked at if the "archived" field is referenced + # either as a requested field or in the filter. By default archived jobs + # are ignored. + include_archived = (query.JQ_ARCHIVED in qobj.RequestedData()) + job_ids = qobj.RequestedNames() list_all = (job_ids is None) @@ -2423,7 +2642,7 @@ class JobQueue(object): if list_all: # Since files are added to/removed from the queue atomically, there's no # risk of getting the job ids in an inconsistent state. - job_ids = self._GetJobIDsUnlocked() + job_ids = self._GetJobIDsUnlocked(archived=include_archived) jobs = [] @@ -2492,6 +2711,17 @@ class JobQueue(object): return self._wpool.HasRunningTasks() + def AcceptingJobsUnlocked(self): + """Returns whether jobs are accepted. + + Once L{PrepareShutdown} has been called, no new jobs are accepted and the + queue is shutting down. + + @rtype: bool + + """ + return self._accepting_jobs + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def Shutdown(self):