Revision dfe57c22

b/daemons/ganeti-masterd
217 217
      job_id = args
218 218
      return queue.ArchiveJob(job_id)
219 219

  
220
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
221
      (job_id, fields, previous) = args
222
      return queue.WaitForJobChanges(job_id, fields, previous)
223

  
220 224
    elif method == luxi.REQ_QUERY_JOBS:
221 225
      (job_ids, fields) = args
222 226
      return queue.QueryJobs(job_ids, fields)
b/lib/jqueue.py
121 121
  This is what we use to track the user-submitted jobs.
122 122

  
123 123
  """
124
  def __new__(cls, *args, **kwargs):
125
    obj = object.__new__(cls, *args, **kwargs)
126
    # Condition to wait for changes
127
    obj.change = threading.Condition()
128
    return obj
129

  
124 130
  def __init__(self, queue, job_id, ops):
125 131
    if not ops:
126 132
      # TODO
......
204 210
            finally:
205 211
              queue.release()
206 212

  
207
            result = proc.ExecOpCode(input_opcode, op.Log)
213
            def _Log(*args):
214
              op.Log(*args)
215

  
216
              job.change.acquire()
217
              try:
218
                job.change.notifyAll()
219
              finally:
220
                job.change.release()
221

  
222
            result = proc.ExecOpCode(input_opcode, _Log)
208 223

  
209 224
            queue.acquire()
210 225
            try:
......
516 531
    self._WriteAndReplicateFileUnlocked(filename, data)
517 532
    self._CleanCacheUnlocked([job.id])
518 533

  
534
    # Notify waiters about potential changes
535
    job.change.acquire()
536
    try:
537
      job.change.notifyAll()
538
    finally:
539
      job.change.release()
540

  
519 541
  def _CleanCacheUnlocked(self, exclude):
520 542
    """Clean the memory cache.
521 543

  
......
536 558
        except KeyError:
537 559
          pass
538 560

  
561
  @_RequireOpenQueue
562
  def WaitForJobChanges(self, job_id, fields, previous):
563
    logging.debug("Waiting for changes in job %s", job_id)
564

  
565
    while True:
566
      self.acquire()
567
      try:
568
        job = self._LoadJobUnlocked(job_id)
569
        if not job:
570
          logging.debug("Job %s not found", job_id)
571
          new_state = None
572
          break
573

  
574
        new_state = self._GetJobInfoUnlocked(job, fields)
575
      finally:
576
        self.release()
577

  
578
      # Serializing and deserializing data can cause type changes (e.g. from
579
      # tuple to list) or precision loss. We're doing it here so that we get
580
      # the same modifications as the data received from the client. Without
581
      # this, the comparison afterwards might fail without the data being
582
      # significantly different.
583
      new_state = serializer.LoadJson(serializer.DumpJson(new_state))
584

  
585
      if previous != new_state:
586
        break
587

  
588
      job.change.acquire()
589
      try:
590
        job.change.wait()
591
      finally:
592
        job.change.release()
593

  
594
    logging.debug("Job %s changed", job_id)
595

  
596
    return new_state
597

  
539 598
  @utils.LockedMethod
540 599
  @_RequireOpenQueue
541 600
  def CancelJob(self, job_id):
b/lib/luxi.py
44 44
KEY_RESULT = "result"
45 45

  
46 46
REQ_SUBMIT_JOB = "SubmitJob"
47
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
47 48
REQ_CANCEL_JOB = "CancelJob"
48 49
REQ_ARCHIVE_JOB = "ArchiveJob"
49 50
REQ_QUERY_JOBS = "QueryJobs"
......
288 289
  def ArchiveJob(self, job_id):
289 290
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
290 291

  
292
  def WaitForJobChange(self, job_id, fields, previous):
293
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
294
                           (job_id, fields, previous))
295

  
291 296
  def QueryJobs(self, job_ids, fields):
292 297
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
293 298

  

Also available in: Unified diff