Revision 5c735209

b/daemons/ganeti-masterd
217 217
      return queue.ArchiveJob(job_id)
218 218

  
219 219
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
220
      (job_id, fields, prev_job_info, prev_log_serial) = args
220
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
221 221
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
222
                                     prev_log_serial)
222
                                     prev_log_serial, timeout)
223 223

  
224 224
    elif method == luxi.REQ_QUERY_JOBS:
225 225
      (job_ids, fields) = args
b/lib/constants.py
279 279

  
280 280
JOB_ID_TEMPLATE = r"\d+"
281 281

  
282
# unchanged job return
283
JOB_NOTCHANGED = "nochange"
284

  
282 285
# Job status
283 286
JOB_STATUS_QUEUED = "queued"
284 287
JOB_STATUS_RUNNING = "running"
b/lib/jqueue.py
558 558

  
559 559
  @utils.LockedMethod
560 560
  @_RequireOpenQueue
561
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
561
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
562
                        timeout):
562 563
    """Waits for changes in a job.
563 564

  
564 565
    @type job_id: string
......
569 570
    @param prev_job_info: Last job information returned
570 571
    @type prev_log_serial: int
571 572
    @param prev_log_serial: Last job message serial number
573
    @type timeout: float
574
    @param timeout: maximum time to wait
572 575

  
573 576
    """
574 577
    logging.debug("Waiting for changes in job %s", job_id)
575

  
578
    end_time = time.time() + timeout
576 579
    while True:
580
      delta_time = end_time - time.time()
581
      if delta_time < 0:
582
        return constants.JOB_NOTCHANGED
583

  
577 584
      job = self._LoadJobUnlocked(job_id)
578 585
      if not job:
579 586
        logging.debug("Job %s not found", job_id)
580
        new_state = None
581 587
        break
582 588

  
583 589
      status = job.CalcStatus()
......
605 611
      logging.debug("Waiting again")
606 612

  
607 613
      # Release the queue lock while waiting
608
      job.change.wait()
614
      job.change.wait(delta_time)
609 615

  
610 616
    logging.debug("Job %s changed", job_id)
611 617

  
b/lib/luxi.py
290 290
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
291 291

  
292 292
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
293
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
294
                           (job_id, fields, prev_job_info, prev_log_serial))
293
    timeout = (DEF_RWTO - 1) / 2
294
    while True:
295
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
296
                               (job_id, fields, prev_job_info,
297
                                prev_log_serial, timeout))
298
      if result != constants.JOB_NOTCHANGED:
299
        break
300
    return result
295 301

  
296 302
  def QueryJobs(self, job_ids, fields):
297 303
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))

Also available in: Unified diff