Revision 37d76f1e
b/lib/jqueue.py | ||
---|---|---|
1439 | 1439 |
" not one of '%s' as required" % |
1440 | 1440 |
(dep_job_id, status, utils.CommaJoin(dep_status))) |
1441 | 1441 |
|
1442 |
@locking.ssynchronized(_LOCK) |
|
1442 |
def _RemoveEmptyWaitersUnlocked(self): |
|
1443 |
"""Remove all jobs without actual waiters. |
|
1444 |
|
|
1445 |
""" |
|
1446 |
for job_id in [job_id for (job_id, waiters) in self._waiters.items() |
|
1447 |
if not waiters]: |
|
1448 |
del self._waiters[job_id] |
|
1449 |
|
|
1443 | 1450 |
def NotifyWaiters(self, job_id): |
1444 | 1451 |
"""Notifies all jobs waiting for a certain job ID. |
1445 | 1452 |
|
1453 |
@important: Do not call until L{CheckAndRegister} returned a status other |
|
1454 |
than L{self.WAIT} for C{job_id}, or behaviour is undefined |
|
1446 | 1455 |
@type job_id: string |
1447 | 1456 |
@param job_id: Job ID |
1448 | 1457 |
|
1449 | 1458 |
""" |
1450 | 1459 |
assert ht.TString(job_id) |
1451 | 1460 |
|
1452 |
jobs = self._waiters.pop(job_id, None) |
|
1461 |
self._lock.acquire() |
|
1462 |
try: |
|
1463 |
self._RemoveEmptyWaitersUnlocked() |
|
1464 |
|
|
1465 |
jobs = self._waiters.pop(job_id, None) |
|
1466 |
finally: |
|
1467 |
self._lock.release() |
|
1468 |
|
|
1453 | 1469 |
if jobs: |
1454 | 1470 |
# Re-add jobs to workerpool |
1455 | 1471 |
logging.debug("Re-adding %s jobs which were waiting for job %s", |
1456 | 1472 |
len(jobs), job_id) |
1457 | 1473 |
self._enqueue_fn(jobs) |
1458 | 1474 |
|
1459 |
# Remove all jobs without actual waiters |
|
1460 |
for job_id in [job_id for (job_id, waiters) in self._waiters.items() |
|
1461 |
if not waiters]: |
|
1462 |
del self._waiters[job_id] |
|
1463 |
|
|
1464 | 1475 |
|
1465 | 1476 |
def _RequireOpenQueue(fn): |
1466 | 1477 |
"""Decorator for "public" functions. |
b/test/ganeti.jqueue_unittest.py | ||
---|---|---|
1874 | 1874 |
return result |
1875 | 1875 |
|
1876 | 1876 |
def _Enqueue(self, jobs): |
1877 |
self.assertFalse(self.jdm._lock.is_owned(), |
|
1878 |
msg=("Must not own manager lock while re-adding jobs" |
|
1879 |
" (potential deadlock)")) |
|
1877 | 1880 |
self._queue.append(jobs) |
1878 | 1881 |
|
1879 | 1882 |
def testNotFinalizedThenCancel(self): |
Also available in: Unified diff