Revision dfe57c22 lib/jqueue.py

b/lib/jqueue.py
121 121
  This is what we use to track the user-submitted jobs.
122 122

  
123 123
  """
124
  def __new__(cls, *args, **kwargs):
125
    obj = object.__new__(cls, *args, **kwargs)
126
    # Condition to wait for changes
127
    obj.change = threading.Condition()
128
    return obj
129

  
124 130
  def __init__(self, queue, job_id, ops):
125 131
    if not ops:
126 132
      # TODO
......
204 210
            finally:
205 211
              queue.release()
206 212

  
207
            result = proc.ExecOpCode(input_opcode, op.Log)
213
            def _Log(*args):
214
              op.Log(*args)
215

  
216
              job.change.acquire()
217
              try:
218
                job.change.notifyAll()
219
              finally:
220
                job.change.release()
221

  
222
            result = proc.ExecOpCode(input_opcode, _Log)
208 223

  
209 224
            queue.acquire()
210 225
            try:
......
516 531
    self._WriteAndReplicateFileUnlocked(filename, data)
517 532
    self._CleanCacheUnlocked([job.id])
518 533

  
534
    # Notify waiters about potential changes
535
    job.change.acquire()
536
    try:
537
      job.change.notifyAll()
538
    finally:
539
      job.change.release()
540

  
519 541
  def _CleanCacheUnlocked(self, exclude):
520 542
    """Clean the memory cache.
521 543

  
......
536 558
        except KeyError:
537 559
          pass
538 560

  
561
  @_RequireOpenQueue
562
  def WaitForJobChanges(self, job_id, fields, previous):
563
    logging.debug("Waiting for changes in job %s", job_id)
564

  
565
    while True:
566
      self.acquire()
567
      try:
568
        job = self._LoadJobUnlocked(job_id)
569
        if not job:
570
          logging.debug("Job %s not found", job_id)
571
          new_state = None
572
          break
573

  
574
        new_state = self._GetJobInfoUnlocked(job, fields)
575
      finally:
576
        self.release()
577

  
578
      # Serializing and deserializing data can cause type changes (e.g. from
579
      # tuple to list) or precision loss. We're doing it here so that we get
580
      # the same modifications as the data received from the client. Without
581
      # this, the comparison afterwards might fail without the data being
582
      # significantly different.
583
      new_state = serializer.LoadJson(serializer.DumpJson(new_state))
584

  
585
      if previous != new_state:
586
        break
587

  
588
      job.change.acquire()
589
      try:
590
        job.change.wait()
591
      finally:
592
        job.change.release()
593

  
594
    logging.debug("Job %s changed", job_id)
595

  
596
    return new_state
597

  
539 598
  @utils.LockedMethod
540 599
  @_RequireOpenQueue
541 600
  def CancelJob(self, job_id):

Also available in: Unified diff