Revision 37d76f1e

b/lib/jqueue.py
1439 1439
              " not one of '%s' as required" %
1440 1440
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1441 1441

  
1442
  @locking.ssynchronized(_LOCK)
1442
  def _RemoveEmptyWaitersUnlocked(self):
1443
    """Remove all jobs without actual waiters.
1444

  
1445
    """
1446
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1447
                   if not waiters]:
1448
      del self._waiters[job_id]
1449

  
1443 1450
  def NotifyWaiters(self, job_id):
1444 1451
    """Notifies all jobs waiting for a certain job ID.
1445 1452

  
1453
    @important: Do not call until L{CheckAndRegister} returned a status other
1454
      than L{self.WAIT} for C{job_id}, or behaviour is undefined
1446 1455
    @type job_id: string
1447 1456
    @param job_id: Job ID
1448 1457

  
1449 1458
    """
1450 1459
    assert ht.TString(job_id)
1451 1460

  
1452
    jobs = self._waiters.pop(job_id, None)
1461
    self._lock.acquire()
1462
    try:
1463
      self._RemoveEmptyWaitersUnlocked()
1464

  
1465
      jobs = self._waiters.pop(job_id, None)
1466
    finally:
1467
      self._lock.release()
1468

  
1453 1469
    if jobs:
1454 1470
      # Re-add jobs to workerpool
1455 1471
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1456 1472
                    len(jobs), job_id)
1457 1473
      self._enqueue_fn(jobs)
1458 1474

  
1459
    # Remove all jobs without actual waiters
1460
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1461
                   if not waiters]:
1462
      del self._waiters[job_id]
1463

  
1464 1475

  
1465 1476
def _RequireOpenQueue(fn):
1466 1477
  """Decorator for "public" functions.
b/test/ganeti.jqueue_unittest.py
1874 1874
    return result
1875 1875

  
1876 1876
  def _Enqueue(self, jobs):
1877
    self.assertFalse(self.jdm._lock.is_owned(),
1878
                     msg=("Must not own manager lock while re-adding jobs"
1879
                          " (potential deadlock)"))
1877 1880
    self._queue.append(jobs)
1878 1881

  
1879 1882
  def testNotFinalizedThenCancel(self):

Also available in: Unified diff