From: Michael Hanselmann Date: Mon, 11 Aug 2008 16:27:45 +0000 (+0000) Subject: Add RPC call to wait for job changes X-Git-Tag: v2.0.0alpha0~133 X-Git-Url: https://code.grnet.gr/git/ganeti-local/commitdiff_plain/dfe57c22c61c52f1c172af652392762b754f87ba Add RPC call to wait for job changes This way clients can react faster to status or message changes and don't have to poll anymore. Reviewed-by: ultrotter --- diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index f240c62..6400e2f 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -217,6 +217,10 @@ class ClientOps: job_id = args return queue.ArchiveJob(job_id) + elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE: + (job_id, fields, previous) = args + return queue.WaitForJobChanges(job_id, fields, previous) + elif method == luxi.REQ_QUERY_JOBS: (job_ids, fields) = args return queue.QueryJobs(job_ids, fields) diff --git a/lib/jqueue.py b/lib/jqueue.py index 9f9709a..d97f10e 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -121,6 +121,12 @@ class _QueuedJob(object): This is what we use to track the user-submitted jobs. """ + def __new__(cls, *args, **kwargs): + obj = object.__new__(cls, *args, **kwargs) + # Condition to wait for changes + obj.change = threading.Condition() + return obj + def __init__(self, queue, job_id, ops): if not ops: # TODO @@ -204,7 +210,16 @@ class _JobQueueWorker(workerpool.BaseWorker): finally: queue.release() - result = proc.ExecOpCode(input_opcode, op.Log) + def _Log(*args): + op.Log(*args) + + job.change.acquire() + try: + job.change.notifyAll() + finally: + job.change.release() + + result = proc.ExecOpCode(input_opcode, _Log) queue.acquire() try: @@ -516,6 +531,13 @@ class JobQueue(object): self._WriteAndReplicateFileUnlocked(filename, data) self._CleanCacheUnlocked([job.id]) + # Notify waiters about potential changes + job.change.acquire() + try: + job.change.notifyAll() + finally: + job.change.release() + def _CleanCacheUnlocked(self, exclude): """Clean the memory cache. @@ -536,6 +558,43 @@ class JobQueue(object): except KeyError: pass + @_RequireOpenQueue + def WaitForJobChanges(self, job_id, fields, previous): + logging.debug("Waiting for changes in job %s", job_id) + + while True: + self.acquire() + try: + job = self._LoadJobUnlocked(job_id) + if not job: + logging.debug("Job %s not found", job_id) + new_state = None + break + + new_state = self._GetJobInfoUnlocked(job, fields) + finally: + self.release() + + # Serializing and deserializing data can cause type changes (e.g. from + # tuple to list) or precision loss. We're doing it here so that we get + # the same modifications as the data received from the client. Without + # this, the comparison afterwards might fail without the data being + # significantly different. + new_state = serializer.LoadJson(serializer.DumpJson(new_state)) + + if previous != new_state: + break + + job.change.acquire() + try: + job.change.wait() + finally: + job.change.release() + + logging.debug("Job %s changed", job_id) + + return new_state + @utils.LockedMethod @_RequireOpenQueue def CancelJob(self, job_id): diff --git a/lib/luxi.py b/lib/luxi.py index a35adad..6aa4ee9 100644 --- a/lib/luxi.py +++ b/lib/luxi.py @@ -44,6 +44,7 @@ KEY_SUCCESS = "success" KEY_RESULT = "result" REQ_SUBMIT_JOB = "SubmitJob" +REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange" REQ_CANCEL_JOB = "CancelJob" REQ_ARCHIVE_JOB = "ArchiveJob" REQ_QUERY_JOBS = "QueryJobs" @@ -288,6 +289,10 @@ class Client(object): def ArchiveJob(self, job_id): return self.CallMethod(REQ_ARCHIVE_JOB, job_id) + def WaitForJobChange(self, job_id, fields, previous): + return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, + (job_id, fields, previous)) + def QueryJobs(self, job_ids, fields): return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))