Revision 7b5c4a69

b/lib/jqueue.py
975 975
    """
976 976
    logging.info("Inspecting job queue")
977 977

  
978
    restartjobs = []
979

  
978 980
    all_job_ids = self._GetJobIDsUnlocked()
979 981
    jobs_count = len(all_job_ids)
980 982
    lastinfo = time.time()
......
995 997
      status = job.CalcStatus()
996 998

  
997 999
      if status in (constants.JOB_STATUS_QUEUED, ):
998
        self._wpool.AddTask((job, ))
1000
        restartjobs.append(job)
999 1001

  
1000 1002
      elif status in (constants.JOB_STATUS_RUNNING,
1001 1003
                      constants.JOB_STATUS_WAITLOCK,
......
1005 1007
                              "Unclean master daemon shutdown")
1006 1008
        self.UpdateJobUnlocked(job)
1007 1009

  
1010
    if restartjobs:
1011
      logging.info("Restarting %s jobs", len(restartjobs))
1012
      self._EnqueueJobs(restartjobs)
1013

  
1008 1014
    logging.info("Job queue inspection finished")
1009 1015

  
1010 1016
  @locking.ssynchronized(_LOCK)
......
1434 1440

  
1435 1441
    """
1436 1442
    job_id = self._NewSerialsUnlocked(1)[0]
1437
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1443
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1438 1444
    return job_id
1439 1445

  
1440 1446
  @locking.ssynchronized(_LOCK)
......
1446 1452

  
1447 1453
    """
1448 1454
    results = []
1449
    tasks = []
1455
    added_jobs = []
1450 1456
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1451 1457
    for job_id, ops in zip(all_job_ids, jobs):
1452 1458
      try:
1453
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1459
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1454 1460
        status = True
1455 1461
        data = job_id
1456 1462
      except errors.GenericError, err:
1457 1463
        data = str(err)
1458 1464
        status = False
1459 1465
      results.append((status, data))
1460
    self._wpool.AddManyTasks(tasks)
1466

  
1467
    self._EnqueueJobs(added_jobs)
1461 1468

  
1462 1469
    return results
1463 1470

  
1471
  def _EnqueueJobs(self, jobs):
1472
    """Helper function to add jobs to worker pool's queue.
1473

  
1474
    @type jobs: list
1475
    @param jobs: List of all jobs
1476

  
1477
    """
1478
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1479
                             priority=[job.CalcPriority() for job in jobs])
1480

  
1464 1481
  @_RequireOpenQueue
1465 1482
  def UpdateJobUnlocked(self, job, replicate=True):
1466 1483
    """Update a job's on disk storage.

Also available in: Unified diff