X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/75d81fc8f0141a1d76520d9e107642247ac35732..65c9591cb40abb922116ed475fd2693df689db38:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index ecbd82a..d5ea3cb 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -31,13 +31,13 @@ used by all other classes in this module. import logging import errno -import re import time import weakref import threading +import itertools try: - # pylint: disable-msg=E0611 + # pylint: disable=E0611 from pyinotify import pyinotify except ImportError: import pyinotify @@ -177,7 +177,7 @@ class _QueuedJob(object): @ivar writable: Whether the job is allowed to be modified """ - # pylint: disable-msg=W0212 + # pylint: disable=W0212 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", "received_timestamp", "start_timestamp", "end_timestamp", "__weakref__", "processor_lock", "writable"] @@ -312,8 +312,8 @@ class _QueuedJob(object): if op.status == constants.OP_STATUS_QUEUED: pass - elif op.status == constants.OP_STATUS_WAITLOCK: - status = constants.JOB_STATUS_WAITLOCK + elif op.status == constants.OP_STATUS_WAITING: + status = constants.JOB_STATUS_WAITING elif op.status == constants.OP_STATUS_RUNNING: status = constants.JOB_STATUS_RUNNING elif op.status == constants.OP_STATUS_CANCELING: @@ -461,7 +461,7 @@ class _QueuedJob(object): self.Finalize() return (True, "Job %s canceled" % self.id) - elif status == constants.JOB_STATUS_WAITLOCK: + elif status == constants.JOB_STATUS_WAITING: # The worker will notice the new status and cancel the job self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) return (True, "Job %s will be canceled" % self.id) @@ -507,11 +507,11 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): This is called from the mcpu code as a notifier function, when the LU is finally about to start the Exec() method. Of course, to have end-user visible results, the opcode must be initially (before calling into - Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. + Processor.ExecOpCode) set to OP_STATUS_WAITING. """ assert self._op in self._job.ops - assert self._op.status in (constants.OP_STATUS_WAITLOCK, + assert self._op.status in (constants.OP_STATUS_WAITING, constants.OP_STATUS_CANCELING) # Cancel here if we were asked to @@ -555,7 +555,7 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): """Check whether job has been cancelled. """ - assert self._op.status in (constants.OP_STATUS_WAITLOCK, + assert self._op.status in (constants.OP_STATUS_WAITING, constants.OP_STATUS_CANCELING) # Cancel here if we were asked to @@ -616,7 +616,7 @@ class _JobChangesChecker(object): # no changes. if (status not in (constants.JOB_STATUS_QUEUED, constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITLOCK) or + constants.JOB_STATUS_WAITING) or job_info != self._prev_job_info or (log_entries and self._prev_log_serial != log_entries[0][0])): logging.debug("Job %s changed", job.id) @@ -721,7 +721,13 @@ class _WaitForJobChangesHelper(object): """ @staticmethod - def _CheckForChanges(job_load_fn, check_fn): + def _CheckForChanges(counter, job_load_fn, check_fn): + if counter.next() > 0: + # If this isn't the first check the job is given some more time to change + # again. This gives better performance for jobs generating many + # changes/messages. + time.sleep(0.1) + job = job_load_fn() if not job: raise errors.JobLost() @@ -750,12 +756,13 @@ class _WaitForJobChangesHelper(object): @param timeout: maximum time to wait in seconds """ + counter = itertools.count() try: check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) waiter = _JobChangesWaiter(filename) try: return utils.Retry(compat.partial(self._CheckForChanges, - job_load_fn, check_fn), + counter, job_load_fn, check_fn), utils.RETRY_REMAINING_TIME, timeout, wait_fn=waiter.Wait) finally: @@ -931,14 +938,14 @@ class _JobProcessor(object): """ assert op in job.ops assert op.status in (constants.OP_STATUS_QUEUED, - constants.OP_STATUS_WAITLOCK) + constants.OP_STATUS_WAITING) update = False op.result = None if op.status == constants.OP_STATUS_QUEUED: - op.status = constants.OP_STATUS_WAITLOCK + op.status = constants.OP_STATUS_WAITING update = True if op.start_timestamp is None: @@ -949,7 +956,7 @@ class _JobProcessor(object): job.start_timestamp = op.start_timestamp update = True - assert op.status == constants.OP_STATUS_WAITLOCK + assert op.status == constants.OP_STATUS_WAITING return update @@ -1015,7 +1022,7 @@ class _JobProcessor(object): """ op = opctx.op - assert op.status == constants.OP_STATUS_WAITLOCK + assert op.status == constants.OP_STATUS_WAITING timeout = opctx.GetNextLockTimeout() @@ -1028,7 +1035,7 @@ class _JobProcessor(object): assert timeout is not None, "Received timeout for blocking acquire" logging.debug("Couldn't acquire locks in %0.6fs", timeout) - assert op.status in (constants.OP_STATUS_WAITLOCK, + assert op.status in (constants.OP_STATUS_WAITING, constants.OP_STATUS_CANCELING) # Was job cancelled while we were waiting for the lock? @@ -1036,12 +1043,12 @@ class _JobProcessor(object): return (constants.OP_STATUS_CANCELING, None) # Stay in waitlock while trying to re-acquire lock - return (constants.OP_STATUS_WAITLOCK, None) + return (constants.OP_STATUS_WAITING, None) except CancelJob: logging.exception("%s: Canceling job", opctx.log_prefix) assert op.status == constants.OP_STATUS_CANCELING return (constants.OP_STATUS_CANCELING, None) - except Exception, err: # pylint: disable-msg=W0703 + except Exception, err: # pylint: disable=W0703 logging.exception("%s: Caught exception in %s", opctx.log_prefix, opctx.summary) return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) @@ -1091,7 +1098,7 @@ class _JobProcessor(object): for i in job.ops[opctx.index + 1:]) assert op.status in (constants.OP_STATUS_QUEUED, - constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_WAITING, constants.OP_STATUS_CANCELING) assert (op.priority <= constants.OP_PRIO_LOWEST and @@ -1101,22 +1108,22 @@ class _JobProcessor(object): if op.status != constants.OP_STATUS_CANCELING: assert op.status in (constants.OP_STATUS_QUEUED, - constants.OP_STATUS_WAITLOCK) + constants.OP_STATUS_WAITING) # Prepare to start opcode if self._MarkWaitlock(job, op): # Write to disk queue.UpdateJobUnlocked(job) - assert op.status == constants.OP_STATUS_WAITLOCK - assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK + assert op.status == constants.OP_STATUS_WAITING + assert job.CalcStatus() == constants.JOB_STATUS_WAITING assert job.start_timestamp and op.start_timestamp assert waitjob is None # Check if waiting for a job is necessary waitjob = self._CheckDependencies(queue, job, opctx) - assert op.status in (constants.OP_STATUS_WAITLOCK, + assert op.status in (constants.OP_STATUS_WAITING, constants.OP_STATUS_CANCELING, constants.OP_STATUS_ERROR) @@ -1138,7 +1145,7 @@ class _JobProcessor(object): assert not waitjob - if op.status == constants.OP_STATUS_WAITLOCK: + if op.status == constants.OP_STATUS_WAITING: # Couldn't get locks in time assert not op.end_timestamp else: @@ -1151,7 +1158,7 @@ class _JobProcessor(object): else: assert op.status in constants.OPS_FINALIZED - if op.status == constants.OP_STATUS_WAITLOCK or waitjob: + if op.status == constants.OP_STATUS_WAITING or waitjob: finalize = False if not waitjob and opctx.CheckPriorityIncrease(): @@ -1165,7 +1172,7 @@ class _JobProcessor(object): op.priority >= constants.OP_PRIO_HIGHEST) # In no case must the status be finalized here - assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK + assert job.CalcStatus() == constants.JOB_STATUS_WAITING else: # Ensure all opcodes so far have been successful @@ -1234,7 +1241,7 @@ class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. """ - def RunTask(self, job): # pylint: disable-msg=W0221 + def RunTask(self, job): # pylint: disable=W0221 """Job executor. @type job: L{_QueuedJob} @@ -1340,8 +1347,6 @@ class _JobDependencyManager: CONTINUE, WRONGSTATUS) = range(1, 6) - # TODO: Export waiter information to lock monitor - def __init__(self, getstatus_fn, enqueue_fn): """Initializes this class. @@ -1353,6 +1358,22 @@ class _JobDependencyManager: self._lock = locking.SharedLock("JobDepMgr") @locking.ssynchronized(_LOCK, shared=1) + def GetLockInfo(self, requested): # pylint: disable=W0613 + """Retrieves information about waiting jobs. + + @type requested: set + @param requested: Requested information, see C{query.LQ_*} + + """ + # No need to sort here, that's being done by the lock manager and query + # library. There are no priorities for notifying jobs, hence all show up as + # one item under "pending". + return [("job/%s" % job_id, None, None, + [("job", [job.id for job in waiters])]) + for job_id, waiters in self._waiters.items() + if waiters] + + @locking.ssynchronized(_LOCK, shared=1) def JobWaiting(self, job): """Checks if a job is waiting. @@ -1460,7 +1481,7 @@ def _RequireOpenQueue(fn): """ def wrapper(self, *args, **kwargs): - # pylint: disable-msg=W0212 + # pylint: disable=W0212 assert self._queue_filelock is not None, "Queue should be open" return fn(self, *args, **kwargs) return wrapper @@ -1469,11 +1490,7 @@ def _RequireOpenQueue(fn): class JobQueue(object): """Queue used to manage the jobs. - @cvar _RE_JOB_FILE: regex matching the valid job file names - """ - _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) - def __init__(self, context): """Constructor for JobQueue. @@ -1527,6 +1544,7 @@ class JobQueue(object): # Job dependencies self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, self._EnqueueJobs) + self.context.glm.AddToLockMonitor(self.depmgr) # Setup worker pool self._wpool = _JobQueueWorkerPool(self) @@ -1572,11 +1590,11 @@ class JobQueue(object): restartjobs.append(job) elif status in (constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITLOCK, + constants.JOB_STATUS_WAITING, constants.JOB_STATUS_CANCELING): logging.warning("Unfinished job %s found: %s", job.id, job) - if status == constants.JOB_STATUS_WAITLOCK: + if status == constants.JOB_STATUS_WAITING: # Restart job job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) restartjobs.append(job) @@ -1780,7 +1798,8 @@ class JobQueue(object): @return: a string representing the job identifier. """ - assert count > 0 + assert ht.TPositiveInt(count) + # New number serial = self._last_serial + count @@ -1823,7 +1842,8 @@ class JobQueue(object): return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, cls._GetArchiveDirectory(job_id), "job-%s" % job_id) - def _GetJobIDsUnlocked(self, sort=True): + @staticmethod + def _GetJobIDsUnlocked(sort=True): """Return all known job IDs. The method only looks at disk because it's a requirement that all @@ -1838,7 +1858,7 @@ class JobQueue(object): """ jlist = [] for filename in utils.ListVisibleFiles(constants.QUEUE_DIR): - m = self._RE_JOB_FILE.match(filename) + m = constants.JOB_FILE_RE.match(filename) if m: jlist.append(m.group(1)) if sort: @@ -1926,7 +1946,7 @@ class JobQueue(object): try: data = serializer.LoadJson(raw_data) job = _QueuedJob.Restore(self, data, writable) - except Exception, err: # pylint: disable-msg=W0703 + except Exception, err: # pylint: disable=W0703 raise errors.JobFileCorrupted(err) return job @@ -2168,7 +2188,7 @@ class JobQueue(object): # Try to load from disk job = self.SafeLoadJobFromDisk(job_id, True, writable=False) - assert not job.writable, "Got writable job" + assert not job.writable, "Got writable job" # pylint: disable=E1101 if job: return job.CalcStatus()