From: Iustin Pop Date: Fri, 29 Aug 2008 13:42:23 +0000 (+0000) Subject: Make WaitForJobChanges deal with long jobs X-Git-Tag: v2.0.0alpha0~85 X-Git-Url: https://code.grnet.gr/git/ganeti-local/commitdiff_plain/5c73520908a113583d7ff61f751a9cc4737a68b7 Make WaitForJobChanges deal with long jobs This patch alters the WaitForJobChanges luxi-RPC call to have a configurable timeout, so that the call behaves nicely with long jobs that have no update. We do this by adding a timeout parameter in the RPC call, and returning a special constant when the timeout is reached without an update. The luxi client will repeatedly call the WaitForJobChanges until it gets a real change. The timeout is hardcoded as half the RWTO value. The patch also removes an unused variable (new_state) from the WaitForJobChanges method. Reviewed-by: imsnah,ultrotter --- diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index d6ef907..6df6aec 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -217,9 +217,9 @@ class ClientOps: return queue.ArchiveJob(job_id) elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE: - (job_id, fields, prev_job_info, prev_log_serial) = args + (job_id, fields, prev_job_info, prev_log_serial, timeout) = args return queue.WaitForJobChanges(job_id, fields, prev_job_info, - prev_log_serial) + prev_log_serial, timeout) elif method == luxi.REQ_QUERY_JOBS: (job_ids, fields) = args diff --git a/lib/constants.py b/lib/constants.py index 2d1ea8f..08e5ba0 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -279,6 +279,9 @@ JOB_QUEUE_ARCHIVE_DIR = QUEUE_DIR + "/archive" JOB_ID_TEMPLATE = r"\d+" +# unchanged job return +JOB_NOTCHANGED = "nochange" + # Job status JOB_STATUS_QUEUED = "queued" JOB_STATUS_RUNNING = "running" diff --git a/lib/jqueue.py b/lib/jqueue.py index 517e4b5..edc69b6 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -558,7 +558,8 @@ class JobQueue(object): @utils.LockedMethod @_RequireOpenQueue - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial): + def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, + timeout): """Waits for changes in a job. @type job_id: string @@ -569,15 +570,20 @@ class JobQueue(object): @param prev_job_info: Last job information returned @type prev_log_serial: int @param prev_log_serial: Last job message serial number + @type timeout: float + @param timeout: maximum time to wait """ logging.debug("Waiting for changes in job %s", job_id) - + end_time = time.time() + timeout while True: + delta_time = end_time - time.time() + if delta_time < 0: + return constants.JOB_NOTCHANGED + job = self._LoadJobUnlocked(job_id) if not job: logging.debug("Job %s not found", job_id) - new_state = None break status = job.CalcStatus() @@ -605,7 +611,7 @@ class JobQueue(object): logging.debug("Waiting again") # Release the queue lock while waiting - job.change.wait() + job.change.wait(delta_time) logging.debug("Job %s changed", job_id) diff --git a/lib/luxi.py b/lib/luxi.py index 5c48dd5..f04aee7 100644 --- a/lib/luxi.py +++ b/lib/luxi.py @@ -290,8 +290,14 @@ class Client(object): return self.CallMethod(REQ_ARCHIVE_JOB, job_id) def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial): - return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, - (job_id, fields, prev_job_info, prev_log_serial)) + timeout = (DEF_RWTO - 1) / 2 + while True: + result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, + (job_id, fields, prev_job_info, + prev_log_serial, timeout)) + if result != constants.JOB_NOTCHANGED: + break + return result def QueryJobs(self, job_ids, fields): return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))