Revision 942817f2
b/lib/jqueue/__init__.py | ||
---|---|---|
67 | 67 |
from ganeti import vcluster |
68 | 68 |
|
69 | 69 |
|
70 |
JOBQUEUE_THREADS = 25
|
|
70 |
JOBQUEUE_THREADS = 1
|
|
71 | 71 |
|
72 | 72 |
# member lock names to be passed to @ssynchronized decorator |
73 | 73 |
_LOCK = "_lock" |
... | ... | |
1631 | 1631 |
self._enqueue_fn(jobs) |
1632 | 1632 |
|
1633 | 1633 |
|
1634 |
def _RequireOpenQueue(fn): |
|
1635 |
"""Decorator for "public" functions. |
|
1636 |
|
|
1637 |
This function should be used for all 'public' functions. That is, |
|
1638 |
functions usually called from other classes. Note that this should |
|
1639 |
be applied only to methods (not plain functions), since it expects |
|
1640 |
that the decorated function is called with a first argument that has |
|
1641 |
a '_queue_filelock' argument. |
|
1642 |
|
|
1643 |
@warning: Use this decorator only after locking.ssynchronized |
|
1644 |
|
|
1645 |
Example:: |
|
1646 |
@locking.ssynchronized(_LOCK) |
|
1647 |
@_RequireOpenQueue |
|
1648 |
def Example(self): |
|
1649 |
pass |
|
1650 |
|
|
1651 |
""" |
|
1652 |
def wrapper(self, *args, **kwargs): |
|
1653 |
# pylint: disable=W0212 |
|
1654 |
assert self._queue_filelock is not None, "Queue should be open" |
|
1655 |
return fn(self, *args, **kwargs) |
|
1656 |
return wrapper |
|
1657 |
|
|
1658 |
|
|
1659 | 1634 |
def _RequireNonDrainedQueue(fn): |
1660 | 1635 |
"""Decorator checking for a non-drained queue. |
1661 | 1636 |
|
... | ... | |
1715 | 1690 |
# Accept jobs by default |
1716 | 1691 |
self._accepting_jobs = True |
1717 | 1692 |
|
1718 |
# Initialize the queue, and acquire the filelock. |
|
1719 |
# This ensures no other process is working on the job queue. |
|
1720 |
self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) |
|
1721 |
|
|
1722 | 1693 |
# Read serial file |
1723 | 1694 |
self._last_serial = jstore.ReadSerial() |
1724 | 1695 |
assert self._last_serial is not None, ("Serial file was modified between" |
... | ... | |
1796 | 1767 |
return rpc.JobQueueRunner(self.context, address_list) |
1797 | 1768 |
|
1798 | 1769 |
@locking.ssynchronized(_LOCK) |
1799 |
@_RequireOpenQueue |
|
1800 | 1770 |
def AddNode(self, node): |
1801 | 1771 |
"""Register a new node with the queue. |
1802 | 1772 |
|
... | ... | |
1852 | 1822 |
self._nodes[node_name] = node.primary_ip |
1853 | 1823 |
|
1854 | 1824 |
@locking.ssynchronized(_LOCK) |
1855 |
@_RequireOpenQueue |
|
1856 | 1825 |
def RemoveNode(self, node_name): |
1857 | 1826 |
"""Callback called when removing nodes from the cluster. |
1858 | 1827 |
|
... | ... | |
2137 | 2106 |
self._queue_size = len(self._GetJobIDsUnlocked(sort=False)) |
2138 | 2107 |
|
2139 | 2108 |
@locking.ssynchronized(_LOCK) |
2140 |
@_RequireOpenQueue |
|
2141 | 2109 |
def SetDrainFlag(self, drain_flag): |
2142 | 2110 |
"""Sets the drain flag for the queue. |
2143 | 2111 |
|
... | ... | |
2264 | 2232 |
|
2265 | 2233 |
raise errors.JobLost("Job %s not found" % job_id) |
2266 | 2234 |
|
2267 |
@_RequireOpenQueue |
|
2268 | 2235 |
def UpdateJobUnlocked(self, job, replicate=True): |
2269 | 2236 |
"""Update a job's on disk storage. |
2270 | 2237 |
|
... | ... | |
2339 | 2306 |
return None |
2340 | 2307 |
|
2341 | 2308 |
@locking.ssynchronized(_LOCK) |
2342 |
@_RequireOpenQueue |
|
2343 | 2309 |
def CancelJob(self, job_id): |
2344 | 2310 |
"""Cancels a job. |
2345 | 2311 |
|
... | ... | |
2354 | 2320 |
return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel()) |
2355 | 2321 |
|
2356 | 2322 |
@locking.ssynchronized(_LOCK) |
2357 |
@_RequireOpenQueue |
|
2358 | 2323 |
def ChangeJobPriority(self, job_id, priority): |
2359 | 2324 |
"""Changes a job's priority. |
2360 | 2325 |
|
... | ... | |
2411 | 2376 |
|
2412 | 2377 |
return (success, msg) |
2413 | 2378 |
|
2414 |
@_RequireOpenQueue |
|
2415 | 2379 |
def _ArchiveJobsUnlocked(self, jobs): |
2416 | 2380 |
"""Archives jobs. |
2417 | 2381 |
|
... | ... | |
2451 | 2415 |
return len(archive_jobs) |
2452 | 2416 |
|
2453 | 2417 |
@locking.ssynchronized(_LOCK) |
2454 |
@_RequireOpenQueue |
|
2455 | 2418 |
def ArchiveJob(self, job_id): |
2456 | 2419 |
"""Archives a job. |
2457 | 2420 |
|
... | ... | |
2473 | 2436 |
return self._ArchiveJobsUnlocked([job]) == 1 |
2474 | 2437 |
|
2475 | 2438 |
@locking.ssynchronized(_LOCK) |
2476 |
@_RequireOpenQueue |
|
2477 | 2439 |
def AutoArchiveJobs(self, age, timeout): |
2478 | 2440 |
"""Archives all jobs based on age. |
2479 | 2441 |
|
... | ... | |
2625 | 2587 |
return self._accepting_jobs |
2626 | 2588 |
|
2627 | 2589 |
@locking.ssynchronized(_LOCK) |
2628 |
@_RequireOpenQueue |
|
2629 | 2590 |
def Shutdown(self): |
2630 | 2591 |
"""Stops the job queue. |
2631 | 2592 |
|
... | ... | |
2633 | 2594 |
|
2634 | 2595 |
""" |
2635 | 2596 |
self._wpool.TerminateWorkers() |
2636 |
|
|
2637 |
self._queue_filelock.Close() |
|
2638 |
self._queue_filelock = None |
Also available in: Unified diff