X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/e4e59de86329c99ad201ac87266c24b6ee5af20f..def6577f00a482c310f4e20bb315fa90290ad5b7:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index a562e6e..110d386 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -35,6 +35,7 @@ import time import weakref import threading import itertools +import operator try: # pylint: disable=E0611 @@ -69,6 +70,9 @@ JOBQUEUE_THREADS = 25 _LOCK = "_lock" _QUEUE = "_queue" +#: Retrieves "id" attribute +_GetIdAttr = operator.attrgetter("id") + class CancelJob(Exception): """Special exception to cancel a job. @@ -218,6 +222,22 @@ class _QueuedJob(object): "received_timestamp", "start_timestamp", "end_timestamp", "__weakref__", "processor_lock", "writable", "archived"] + def _AddReasons(self): + """Extend the reason trail + + Add the reason for all the opcodes of this job to be executed. + + """ + count = 0 + for queued_op in self.ops: + op = queued_op.input + reason_src = opcodes.NameToReasonSrc(op.__class__.__name__) + reason_text = "job=%d;index=%d" % (self.id, count) + reason = getattr(op, "reason", []) + reason.append((reason_src, reason_text, utils.EpochNano())) + op.reason = reason + count = count + 1 + def __init__(self, queue, job_id, ops, writable): """Constructor for the _QueuedJob. @@ -238,6 +258,7 @@ class _QueuedJob(object): self.queue = queue self.id = int(job_id) self.ops = [_QueuedOpCode(op) for op in ops] + self._AddReasons() self.log_serial = 0 self.received_timestamp = TimeStampNow() self.start_timestamp = None @@ -478,6 +499,50 @@ class _QueuedJob(object): logging.debug("Job %s is no longer waiting in the queue", self.id) return (False, "Job %s is no longer waiting in the queue" % self.id) + def ChangePriority(self, priority): + """Changes the job priority. + + @type priority: int + @param priority: New priority + @rtype: tuple; (bool, string) + @return: Boolean describing whether job's priority was successfully changed + and a text message + + """ + status = self.CalcStatus() + + if status in constants.JOBS_FINALIZED: + return (False, "Job %s is finished" % self.id) + elif status == constants.JOB_STATUS_CANCELING: + return (False, "Job %s is cancelling" % self.id) + else: + assert status in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_WAITING, + constants.JOB_STATUS_RUNNING) + + changed = False + for op in self.ops: + if (op.status == constants.OP_STATUS_RUNNING or + op.status in constants.OPS_FINALIZED): + assert not changed, \ + ("Found opcode for which priority should not be changed after" + " priority has been changed for previous opcodes") + continue + + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITING) + + changed = True + + # Set new priority (doesn't modify opcode input) + op.priority = priority + + if changed: + return (True, ("Priorities of pending opcodes for job %s have been" + " changed to %s" % (self.id, priority))) + else: + return (False, "Job %s had no pending opcodes" % self.id) + class _OpExecCallbacks(mcpu.OpExecCbBase): def __init__(self, queue, job, op): @@ -641,7 +706,7 @@ class _JobChangesChecker(object): class _JobFileChangesWaiter(object): - def __init__(self, filename): + def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager): """Initializes this class. @type filename: string @@ -649,7 +714,7 @@ class _JobFileChangesWaiter(object): @raises errors.InotifyError: if the notifier cannot be setup """ - self._wm = pyinotify.WatchManager() + self._wm = _inotify_wm_cls() self._inotify_handler = \ asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) self._notifier = \ @@ -691,7 +756,7 @@ class _JobFileChangesWaiter(object): class _JobChangesWaiter(object): - def __init__(self, filename): + def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter): """Initializes this class. @type filename: string @@ -700,6 +765,7 @@ class _JobChangesWaiter(object): """ self._filewaiter = None self._filename = filename + self._waiter_cls = _waiter_cls def Wait(self, timeout): """Waits for a job to change. @@ -716,7 +782,7 @@ class _JobChangesWaiter(object): # If this point is reached, return immediately and let caller check the job # file again in case there were changes since the last check. This avoids a # race condition. - self._filewaiter = _JobFileChangesWaiter(self._filename) + self._filewaiter = self._waiter_cls(self._filename) return True @@ -754,7 +820,8 @@ class _WaitForJobChangesHelper(object): return result def __call__(self, filename, job_load_fn, - fields, prev_job_info, prev_log_serial, timeout): + fields, prev_job_info, prev_log_serial, timeout, + _waiter_cls=_JobChangesWaiter): """Waits for changes on a job. @type filename: string @@ -774,7 +841,7 @@ class _WaitForJobChangesHelper(object): counter = itertools.count() try: check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) - waiter = _JobChangesWaiter(filename) + waiter = _waiter_cls(filename) try: return utils.Retry(compat.partial(self._CheckForChanges, counter, job_load_fn, check_fn), @@ -782,7 +849,7 @@ class _WaitForJobChangesHelper(object): wait_fn=waiter.Wait) finally: waiter.Close() - except (errors.InotifyError, errors.JobLost): + except errors.JobLost: return None except utils.RetryTimeout: return constants.JOB_NOTCHANGED @@ -1752,6 +1819,15 @@ class JobQueue(object): logging.error("Failed to upload file %s to node %s: %s", file_name, node_name, msg) + # Set queue drained flag + result = \ + self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name], + self._drained) + msg = result[node_name].fail_msg + if msg: + logging.error("Failed to set queue drained flag on node %s: %s", + node_name, msg) + self._nodes[node_name] = node.primary_ip @locking.ssynchronized(_LOCK) @@ -1826,7 +1902,8 @@ class JobQueue(object): """ getents = runtime.GetEnts() utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, - gid=getents.masterd_gid) + gid=getents.daemons_gid, + mode=constants.JOB_QUEUE_FILES_PERMS) if replicate: names, addrs = self._GetNodeIp() @@ -1863,7 +1940,7 @@ class JobQueue(object): @return: a list of job identifiers. """ - assert ht.TPositiveInt(count) + assert ht.TNonNegativeInt(count) # New number serial = self._last_serial + count @@ -2075,10 +2152,18 @@ class JobQueue(object): @param drain_flag: Whether to set or unset the drain flag """ + # Change flag locally jstore.SetDrainFlag(drain_flag) self._drained = drain_flag + # ... and on all nodes + (names, addrs) = self._GetNodeIp() + result = \ + self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag) + self._CheckRpcResult(result, self._nodes, + "Setting queue drain flag to %s" % drain_flag) + return True @_RequireOpenQueue @@ -2257,7 +2342,8 @@ class JobQueue(object): """ assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode" self._wpool.AddManyTasks([(job, ) for job in jobs], - priority=[job.CalcPriority() for job in jobs]) + priority=[job.CalcPriority() for job in jobs], + task_id=map(_GetIdAttr, jobs)) def _GetJobStatusForDependencies(self, job_id): """Gets the status of a job for dependencies. @@ -2351,6 +2437,37 @@ class JobQueue(object): return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel()) + @locking.ssynchronized(_LOCK) + @_RequireOpenQueue + def ChangeJobPriority(self, job_id, priority): + """Changes a job's priority. + + @type job_id: int + @param job_id: ID of the job whose priority should be changed + @type priority: int + @param priority: New priority + + """ + logging.info("Changing priority of job %s to %s", job_id, priority) + + if priority not in constants.OP_PRIO_SUBMIT_VALID: + allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) + raise errors.GenericError("Invalid priority %s, allowed are %s" % + (priority, allowed)) + + def fn(job): + (success, msg) = job.ChangePriority(priority) + + if success: + try: + self._wpool.ChangeTaskPriority(job.id, job.CalcPriority()) + except workerpool.NoSuchTask: + logging.debug("Job %s is not in workerpool at this time", job.id) + + return (success, msg) + + return self._ModifyJobUnlocked(job_id, fn) + def _ModifyJobUnlocked(self, job_id, mod_fn): """Modifies a job.