X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/1316ebc2429ef50d01ddead084aa444eb60ad97d..81f7ea25fa942612873eb159e24c44e24ce84e69:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index 1391f2a..34ac6a7 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1237,6 +1237,29 @@ class _JobProcessor(object): queue.release() +def _EvaluateJobProcessorResult(depmgr, job, result): + """Looks at a result from L{_JobProcessor} for a job. + + To be used in a L{_JobQueueWorker}. + + """ + if result == _JobProcessor.FINISHED: + # Notify waiting jobs + depmgr.NotifyWaiters(job.id) + + elif result == _JobProcessor.DEFER: + # Schedule again + raise workerpool.DeferTask(priority=job.CalcPriority()) + + elif result == _JobProcessor.WAITDEP: + # No-op, dependency manager will re-schedule + pass + + else: + raise errors.ProgrammerError("Job processor returned unknown status %s" % + (result, )) + + class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. @@ -1277,23 +1300,8 @@ class _JobQueueWorker(workerpool.BaseWorker): wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, proc.ExecOpCode) - result = _JobProcessor(queue, wrap_execop_fn, job)() - - if result == _JobProcessor.FINISHED: - # Notify waiting jobs - queue.depmgr.NotifyWaiters(job.id) - - elif result == _JobProcessor.DEFER: - # Schedule again - raise workerpool.DeferTask(priority=job.CalcPriority()) - - elif result == _JobProcessor.WAITDEP: - # No-op, dependency manager will re-schedule - pass - - else: - raise errors.ProgrammerError("Job processor returned unknown status %s" % - (result, )) + _EvaluateJobProcessorResult(queue.depmgr, job, + _JobProcessor(queue, wrap_execop_fn, job)()) @staticmethod def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs): @@ -1498,6 +1506,31 @@ def _RequireOpenQueue(fn): return wrapper +def _RequireNonDrainedQueue(fn): + """Decorator checking for a non-drained queue. + + To be used with functions submitting new jobs. + + """ + def wrapper(self, *args, **kwargs): + """Wrapper function. + + @raise errors.JobQueueDrainError: if the job queue is marked for draining + + """ + # Ok when sharing the big job queue lock, as the drain file is created when + # the lock is exclusive. + # Needs access to protected member, pylint: disable=W0212 + if self._drained: + raise errors.JobQueueDrainError("Job queue is drained, refusing job") + + if not self._accepting_jobs: + raise errors.JobQueueError("Job queue is shutting down, refusing job") + + return fn(self, *args, **kwargs) + return wrapper + + class JobQueue(object): """Queue used to manage the jobs. @@ -1529,6 +1562,9 @@ class JobQueue(object): self.acquire = self._lock.acquire self.release = self._lock.release + # Accept jobs by default + self._accepting_jobs = True + # Initialize the queue, and acquire the filelock. # This ensures no other process is working on the job queue. self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) @@ -1548,8 +1584,9 @@ class JobQueue(object): # TODO: Check consistency across nodes - self._queue_size = 0 + self._queue_size = None self._UpdateQueueSizeUnlocked() + assert ht.TInt(self._queue_size) self._drained = jstore.CheckDrainFlag() # Job dependencies @@ -1622,6 +1659,12 @@ class JobQueue(object): logging.info("Job queue inspection finished") + def _GetRpc(self, address_list): + """Gets RPC runner with context. + + """ + return rpc.JobQueueRunner(self.context, address_list) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def AddNode(self, node): @@ -1635,7 +1678,7 @@ class JobQueue(object): assert node_name != self._my_hostname # Clean queue directory on added node - result = rpc.RpcRunner.call_jobqueue_purge(node_name) + result = self._GetRpc(None).call_jobqueue_purge(node_name) msg = result.fail_msg if msg: logging.warning("Cannot cleanup queue directory on node %s: %s", @@ -1653,13 +1696,15 @@ class JobQueue(object): # Upload current serial file files.append(constants.JOB_QUEUE_SERIAL_FILE) + # Static address list + addrs = [node.primary_ip] + for file_name in files: # Read file content content = utils.ReadFile(file_name) - result = rpc.RpcRunner.call_jobqueue_update([node_name], - [node.primary_ip], - file_name, content) + result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name, + content) msg = result[node_name].fail_msg if msg: logging.error("Failed to upload file %s to node %s: %s", @@ -1743,7 +1788,7 @@ class JobQueue(object): if replicate: names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) + result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data) self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) def _RenameFilesUnlocked(self, rename): @@ -1762,7 +1807,7 @@ class JobQueue(object): # ... and on all nodes names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) + result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) @staticmethod @@ -2017,16 +2062,10 @@ class JobQueue(object): @param ops: The list of OpCodes that will become the new job. @rtype: L{_QueuedJob} @return: the job object to be queued - @raise errors.JobQueueDrainError: if the job queue is marked for draining @raise errors.JobQueueFull: if the job queue has too many jobs in it @raise errors.GenericError: If an opcode is not valid """ - # Ok when sharing the big job queue lock, as the drain file is created when - # the lock is exclusive. - if self._drained: - raise errors.JobQueueDrainError("Job queue is drained, refusing job") - if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: raise errors.JobQueueFull() @@ -2058,6 +2097,7 @@ class JobQueue(object): @locking.ssynchronized(_LOCK) @_RequireOpenQueue + @_RequireNonDrainedQueue def SubmitJob(self, ops): """Create and store a new job. @@ -2070,6 +2110,7 @@ class JobQueue(object): @locking.ssynchronized(_LOCK) @_RequireOpenQueue + @_RequireNonDrainedQueue def SubmitManyJobs(self, jobs): """Create and store multiple jobs. @@ -2226,7 +2267,7 @@ class JobQueue(object): assert job.writable, "Can't update read-only job" filename = self._GetJobPath(job.id) - data = serializer.DumpJson(job.Serialize(), indent=False) + data = serializer.DumpJson(job.Serialize()) logging.debug("Writing job %s to %s", job.id, filename) self._UpdateJobQueueFile(filename, data, replicate) @@ -2437,6 +2478,31 @@ class JobQueue(object): return jobs @locking.ssynchronized(_LOCK) + def PrepareShutdown(self): + """Prepare to stop the job queue. + + Disables execution of jobs in the workerpool and returns whether there are + any jobs currently running. If the latter is the case, the job queue is not + yet ready for shutdown. Once this function returns C{True} L{Shutdown} can + be called without interfering with any job. Queued and unfinished jobs will + be resumed next time. + + Once this function has been called no new job submissions will be accepted + (see L{_RequireNonDrainedQueue}). + + @rtype: bool + @return: Whether there are any running jobs + + """ + if self._accepting_jobs: + self._accepting_jobs = False + + # Tell worker pool to stop processing pending tasks + self._wpool.SetActive(False) + + return self._wpool.HasRunningTasks() + + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def Shutdown(self): """Stops the job queue.