Revision 16714921

b/lib/jqueue.py
506 506

  
507 507
    # Setup worker pool
508 508
    self._wpool = _JobQueueWorkerPool(self)
509

  
510
    # We need to lock here because WorkerPool.AddTask() may start a job while
511
    # we're still doing our work.
512
    self.acquire()
513 509
    try:
514
      for job in self._GetJobsUnlocked(None):
515
        # a failure in loading the job can cause 'None' to be returned
516
        if job is None:
517
          continue
510
      # We need to lock here because WorkerPool.AddTask() may start a job while
511
      # we're still doing our work.
512
      self.acquire()
513
      try:
514
        for job in self._GetJobsUnlocked(None):
515
          # a failure in loading the job can cause 'None' to be returned
516
          if job is None:
517
            continue
518 518

  
519
        status = job.CalcStatus()
519
          status = job.CalcStatus()
520 520

  
521
        if status in (constants.JOB_STATUS_QUEUED, ):
522
          self._wpool.AddTask(job)
521
          if status in (constants.JOB_STATUS_QUEUED, ):
522
            self._wpool.AddTask(job)
523 523

  
524
        elif status in (constants.JOB_STATUS_RUNNING,
525
                        constants.JOB_STATUS_WAITLOCK):
526
          logging.warning("Unfinished job %s found: %s", job.id, job)
527
          try:
528
            for op in job.ops:
529
              op.status = constants.OP_STATUS_ERROR
530
              op.result = "Unclean master daemon shutdown"
531
          finally:
532
            self.UpdateJobUnlocked(job)
533
    finally:
534
      self.release()
524
          elif status in (constants.JOB_STATUS_RUNNING,
525
                          constants.JOB_STATUS_WAITLOCK):
526
            logging.warning("Unfinished job %s found: %s", job.id, job)
527
            try:
528
              for op in job.ops:
529
                op.status = constants.OP_STATUS_ERROR
530
                op.result = "Unclean master daemon shutdown"
531
            finally:
532
              self.UpdateJobUnlocked(job)
533
      finally:
534
        self.release()
535
    except:
536
      self._wpool.TerminateWorkers()
537
      raise
535 538

  
536 539
  @utils.LockedMethod
537 540
  @_RequireOpenQueue

Also available in: Unified diff