X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/47099cd115bd2a084a1aacceea6acc8087f0ee4c..c113a9ab55b9b4a825f3b8f2ea1644d2ec3e2f24:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index d3bdc47..34ac6a7 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -31,13 +31,13 @@ used by all other classes in this module. import logging import errno -import re import time import weakref import threading +import itertools try: - # pylint: disable-msg=E0611 + # pylint: disable=E0611 from pyinotify import pyinotify except ImportError: import pyinotify @@ -177,7 +177,7 @@ class _QueuedJob(object): @ivar writable: Whether the job is allowed to be modified """ - # pylint: disable-msg=W0212 + # pylint: disable=W0212 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", "received_timestamp", "start_timestamp", "end_timestamp", "__weakref__", "processor_lock", "writable"] @@ -721,7 +721,13 @@ class _WaitForJobChangesHelper(object): """ @staticmethod - def _CheckForChanges(job_load_fn, check_fn): + def _CheckForChanges(counter, job_load_fn, check_fn): + if counter.next() > 0: + # If this isn't the first check the job is given some more time to change + # again. This gives better performance for jobs generating many + # changes/messages. + time.sleep(0.1) + job = job_load_fn() if not job: raise errors.JobLost() @@ -750,12 +756,13 @@ class _WaitForJobChangesHelper(object): @param timeout: maximum time to wait in seconds """ + counter = itertools.count() try: check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) waiter = _JobChangesWaiter(filename) try: return utils.Retry(compat.partial(self._CheckForChanges, - job_load_fn, check_fn), + counter, job_load_fn, check_fn), utils.RETRY_REMAINING_TIME, timeout, wait_fn=waiter.Wait) finally: @@ -1041,7 +1048,7 @@ class _JobProcessor(object): logging.exception("%s: Canceling job", opctx.log_prefix) assert op.status == constants.OP_STATUS_CANCELING return (constants.OP_STATUS_CANCELING, None) - except Exception, err: # pylint: disable-msg=W0703 + except Exception, err: # pylint: disable=W0703 logging.exception("%s: Caught exception in %s", opctx.log_prefix, opctx.summary) return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) @@ -1230,11 +1237,34 @@ class _JobProcessor(object): queue.release() +def _EvaluateJobProcessorResult(depmgr, job, result): + """Looks at a result from L{_JobProcessor} for a job. + + To be used in a L{_JobQueueWorker}. + + """ + if result == _JobProcessor.FINISHED: + # Notify waiting jobs + depmgr.NotifyWaiters(job.id) + + elif result == _JobProcessor.DEFER: + # Schedule again + raise workerpool.DeferTask(priority=job.CalcPriority()) + + elif result == _JobProcessor.WAITDEP: + # No-op, dependency manager will re-schedule + pass + + else: + raise errors.ProgrammerError("Job processor returned unknown status %s" % + (result, )) + + class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. """ - def RunTask(self, job): # pylint: disable-msg=W0221 + def RunTask(self, job): # pylint: disable=W0221 """Job executor. @type job: L{_QueuedJob} @@ -1270,23 +1300,8 @@ class _JobQueueWorker(workerpool.BaseWorker): wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, proc.ExecOpCode) - result = _JobProcessor(queue, wrap_execop_fn, job)() - - if result == _JobProcessor.FINISHED: - # Notify waiting jobs - queue.depmgr.NotifyWaiters(job.id) - - elif result == _JobProcessor.DEFER: - # Schedule again - raise workerpool.DeferTask(priority=job.CalcPriority()) - - elif result == _JobProcessor.WAITDEP: - # No-op, dependency manager will re-schedule - pass - - else: - raise errors.ProgrammerError("Job processor returned unknown status %s" % - (result, )) + _EvaluateJobProcessorResult(queue.depmgr, job, + _JobProcessor(queue, wrap_execop_fn, job)()) @staticmethod def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs): @@ -1340,8 +1355,6 @@ class _JobDependencyManager: CONTINUE, WRONGSTATUS) = range(1, 6) - # TODO: Export waiter information to lock monitor - def __init__(self, getstatus_fn, enqueue_fn): """Initializes this class. @@ -1353,6 +1366,22 @@ class _JobDependencyManager: self._lock = locking.SharedLock("JobDepMgr") @locking.ssynchronized(_LOCK, shared=1) + def GetLockInfo(self, requested): # pylint: disable=W0613 + """Retrieves information about waiting jobs. + + @type requested: set + @param requested: Requested information, see C{query.LQ_*} + + """ + # No need to sort here, that's being done by the lock manager and query + # library. There are no priorities for notifying jobs, hence all show up as + # one item under "pending". + return [("job/%s" % job_id, None, None, + [("job", [job.id for job in waiters])]) + for job_id, waiters in self._waiters.items() + if waiters] + + @locking.ssynchronized(_LOCK, shared=1) def JobWaiting(self, job): """Checks if a job is waiting. @@ -1418,28 +1447,39 @@ class _JobDependencyManager: " not one of '%s' as required" % (dep_job_id, status, utils.CommaJoin(dep_status))) - @locking.ssynchronized(_LOCK) + def _RemoveEmptyWaitersUnlocked(self): + """Remove all jobs without actual waiters. + + """ + for job_id in [job_id for (job_id, waiters) in self._waiters.items() + if not waiters]: + del self._waiters[job_id] + def NotifyWaiters(self, job_id): """Notifies all jobs waiting for a certain job ID. + @attention: Do not call until L{CheckAndRegister} returned a status other + than C{WAITDEP} for C{job_id}, or behaviour is undefined @type job_id: string @param job_id: Job ID """ assert ht.TString(job_id) - jobs = self._waiters.pop(job_id, None) + self._lock.acquire() + try: + self._RemoveEmptyWaitersUnlocked() + + jobs = self._waiters.pop(job_id, None) + finally: + self._lock.release() + if jobs: # Re-add jobs to workerpool logging.debug("Re-adding %s jobs which were waiting for job %s", len(jobs), job_id) self._enqueue_fn(jobs) - # Remove all jobs without actual waiters - for job_id in [job_id for (job_id, waiters) in self._waiters.items() - if not waiters]: - del self._waiters[job_id] - def _RequireOpenQueue(fn): """Decorator for "public" functions. @@ -1460,20 +1500,41 @@ def _RequireOpenQueue(fn): """ def wrapper(self, *args, **kwargs): - # pylint: disable-msg=W0212 + # pylint: disable=W0212 assert self._queue_filelock is not None, "Queue should be open" return fn(self, *args, **kwargs) return wrapper -class JobQueue(object): - """Queue used to manage the jobs. +def _RequireNonDrainedQueue(fn): + """Decorator checking for a non-drained queue. - @cvar _RE_JOB_FILE: regex matching the valid job file names + To be used with functions submitting new jobs. """ - _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) + def wrapper(self, *args, **kwargs): + """Wrapper function. + @raise errors.JobQueueDrainError: if the job queue is marked for draining + + """ + # Ok when sharing the big job queue lock, as the drain file is created when + # the lock is exclusive. + # Needs access to protected member, pylint: disable=W0212 + if self._drained: + raise errors.JobQueueDrainError("Job queue is drained, refusing job") + + if not self._accepting_jobs: + raise errors.JobQueueError("Job queue is shutting down, refusing job") + + return fn(self, *args, **kwargs) + return wrapper + + +class JobQueue(object): + """Queue used to manage the jobs. + + """ def __init__(self, context): """Constructor for JobQueue. @@ -1501,6 +1562,9 @@ class JobQueue(object): self.acquire = self._lock.acquire self.release = self._lock.release + # Accept jobs by default + self._accepting_jobs = True + # Initialize the queue, and acquire the filelock. # This ensures no other process is working on the job queue. self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) @@ -1520,13 +1584,15 @@ class JobQueue(object): # TODO: Check consistency across nodes - self._queue_size = 0 + self._queue_size = None self._UpdateQueueSizeUnlocked() + assert ht.TInt(self._queue_size) self._drained = jstore.CheckDrainFlag() # Job dependencies self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, self._EnqueueJobs) + self.context.glm.AddToLockMonitor(self.depmgr) # Setup worker pool self._wpool = _JobQueueWorkerPool(self) @@ -1593,6 +1659,12 @@ class JobQueue(object): logging.info("Job queue inspection finished") + def _GetRpc(self, address_list): + """Gets RPC runner with context. + + """ + return rpc.JobQueueRunner(self.context, address_list) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def AddNode(self, node): @@ -1606,7 +1678,7 @@ class JobQueue(object): assert node_name != self._my_hostname # Clean queue directory on added node - result = rpc.RpcRunner.call_jobqueue_purge(node_name) + result = self._GetRpc(None).call_jobqueue_purge(node_name) msg = result.fail_msg if msg: logging.warning("Cannot cleanup queue directory on node %s: %s", @@ -1624,13 +1696,15 @@ class JobQueue(object): # Upload current serial file files.append(constants.JOB_QUEUE_SERIAL_FILE) + # Static address list + addrs = [node.primary_ip] + for file_name in files: # Read file content content = utils.ReadFile(file_name) - result = rpc.RpcRunner.call_jobqueue_update([node_name], - [node.primary_ip], - file_name, content) + result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name, + content) msg = result[node_name].fail_msg if msg: logging.error("Failed to upload file %s to node %s: %s", @@ -1714,7 +1788,7 @@ class JobQueue(object): if replicate: names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) + result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data) self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) def _RenameFilesUnlocked(self, rename): @@ -1733,7 +1807,7 @@ class JobQueue(object): # ... and on all nodes names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) + result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) @staticmethod @@ -1780,7 +1854,8 @@ class JobQueue(object): @return: a string representing the job identifier. """ - assert count > 0 + assert ht.TPositiveInt(count) + # New number serial = self._last_serial + count @@ -1823,7 +1898,8 @@ class JobQueue(object): return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, cls._GetArchiveDirectory(job_id), "job-%s" % job_id) - def _GetJobIDsUnlocked(self, sort=True): + @staticmethod + def _GetJobIDsUnlocked(sort=True): """Return all known job IDs. The method only looks at disk because it's a requirement that all @@ -1838,7 +1914,7 @@ class JobQueue(object): """ jlist = [] for filename in utils.ListVisibleFiles(constants.QUEUE_DIR): - m = self._RE_JOB_FILE.match(filename) + m = constants.JOB_FILE_RE.match(filename) if m: jlist.append(m.group(1)) if sort: @@ -1926,7 +2002,7 @@ class JobQueue(object): try: data = serializer.LoadJson(raw_data) job = _QueuedJob.Restore(self, data, writable) - except Exception, err: # pylint: disable-msg=W0703 + except Exception, err: # pylint: disable=W0703 raise errors.JobFileCorrupted(err) return job @@ -1986,16 +2062,10 @@ class JobQueue(object): @param ops: The list of OpCodes that will become the new job. @rtype: L{_QueuedJob} @return: the job object to be queued - @raise errors.JobQueueDrainError: if the job queue is marked for draining @raise errors.JobQueueFull: if the job queue has too many jobs in it @raise errors.GenericError: If an opcode is not valid """ - # Ok when sharing the big job queue lock, as the drain file is created when - # the lock is exclusive. - if self._drained: - raise errors.JobQueueDrainError("Job queue is drained, refusing job") - if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: raise errors.JobQueueFull() @@ -2027,6 +2097,7 @@ class JobQueue(object): @locking.ssynchronized(_LOCK) @_RequireOpenQueue + @_RequireNonDrainedQueue def SubmitJob(self, ops): """Create and store a new job. @@ -2039,6 +2110,7 @@ class JobQueue(object): @locking.ssynchronized(_LOCK) @_RequireOpenQueue + @_RequireNonDrainedQueue def SubmitManyJobs(self, jobs): """Create and store multiple jobs. @@ -2168,7 +2240,7 @@ class JobQueue(object): # Try to load from disk job = self.SafeLoadJobFromDisk(job_id, True, writable=False) - assert not job.writable, "Got writable job" + assert not job.writable, "Got writable job" # pylint: disable=E1101 if job: return job.CalcStatus() @@ -2195,7 +2267,7 @@ class JobQueue(object): assert job.writable, "Can't update read-only job" filename = self._GetJobPath(job.id) - data = serializer.DumpJson(job.Serialize(), indent=False) + data = serializer.DumpJson(job.Serialize()) logging.debug("Writing job %s to %s", job.id, filename) self._UpdateJobQueueFile(filename, data, replicate) @@ -2406,6 +2478,31 @@ class JobQueue(object): return jobs @locking.ssynchronized(_LOCK) + def PrepareShutdown(self): + """Prepare to stop the job queue. + + Disables execution of jobs in the workerpool and returns whether there are + any jobs currently running. If the latter is the case, the job queue is not + yet ready for shutdown. Once this function returns C{True} L{Shutdown} can + be called without interfering with any job. Queued and unfinished jobs will + be resumed next time. + + Once this function has been called no new job submissions will be accepted + (see L{_RequireNonDrainedQueue}). + + @rtype: bool + @return: Whether there are any running jobs + + """ + if self._accepting_jobs: + self._accepting_jobs = False + + # Tell worker pool to stop processing pending tasks + self._wpool.SetActive(False) + + return self._wpool.HasRunningTasks() + + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def Shutdown(self): """Stops the job queue.