Revision db37da70
b/lib/jqueue.py | ||
---|---|---|
251 | 251 |
class JobQueue(object): |
252 | 252 |
_RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) |
253 | 253 |
|
254 |
def _RequireOpenQueue(fn): |
|
255 |
"""Decorator for "public" functions. |
|
256 |
|
|
257 |
This function should be used for all "public" functions. That is, functions |
|
258 |
usually called from other classes. |
|
259 |
|
|
260 |
Important: Use this decorator only after utils.LockedMethod! |
|
261 |
|
|
262 |
Example: |
|
263 |
@utils.LockedMethod |
|
264 |
@_RequireOpenQueue |
|
265 |
def Example(self): |
|
266 |
pass |
|
267 |
|
|
268 |
""" |
|
269 |
def wrapper(self, *args, **kwargs): |
|
270 |
assert self.lock_fd, "Queue should be open" |
|
271 |
return fn(self, *args, **kwargs) |
|
272 |
return wrapper |
|
273 |
|
|
254 | 274 |
def __init__(self, context): |
255 | 275 |
self.context = context |
256 | 276 |
self._memcache = {} |
... | ... | |
352 | 372 |
return serial |
353 | 373 |
|
354 | 374 |
def _InitQueueUnlocked(self): |
355 |
assert self.lock_fd, "Queue should be open" |
|
356 |
|
|
357 | 375 |
utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE, |
358 | 376 |
data="%s\n" % constants.JOB_QUEUE_VERSION) |
359 | 377 |
if self._ReadSerial() is None: |
... | ... | |
376 | 394 |
Returns: A string representing the job identifier. |
377 | 395 |
|
378 | 396 |
""" |
379 |
assert self.lock_fd, "Queue should be open" |
|
380 |
|
|
381 | 397 |
# New number |
382 | 398 |
serial = self._last_serial + 1 |
383 | 399 |
|
... | ... | |
433 | 449 |
return jlist |
434 | 450 |
|
435 | 451 |
def _ListJobFiles(self): |
436 |
assert self.lock_fd, "Queue should be open" |
|
437 |
|
|
438 | 452 |
return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR) |
439 | 453 |
if self._RE_JOB_FILE.match(name)] |
440 | 454 |
|
441 | 455 |
def _LoadJobUnlocked(self, job_id): |
442 |
assert self.lock_fd, "Queue should be open" |
|
443 |
|
|
444 | 456 |
if job_id in self._memcache: |
445 | 457 |
logging.debug("Found job %s in memcache", job_id) |
446 | 458 |
return self._memcache[job_id] |
... | ... | |
470 | 482 |
return [self._LoadJobUnlocked(job_id) for job_id in job_ids] |
471 | 483 |
|
472 | 484 |
@utils.LockedMethod |
485 |
@_RequireOpenQueue |
|
473 | 486 |
def SubmitJob(self, ops, nodes): |
474 | 487 |
"""Create and store a new job. |
475 | 488 |
|
... | ... | |
483 | 496 |
distributed. |
484 | 497 |
|
485 | 498 |
""" |
486 |
assert self.lock_fd, "Queue should be open" |
|
487 |
|
|
488 | 499 |
# Get job identifier |
489 | 500 |
job_id = self._NewSerialUnlocked(nodes) |
490 | 501 |
job = _QueuedJob(self, job_id, ops) |
... | ... | |
500 | 511 |
|
501 | 512 |
return job.id |
502 | 513 |
|
514 |
@_RequireOpenQueue |
|
503 | 515 |
def UpdateJobUnlocked(self, job): |
504 |
assert self.lock_fd, "Queue should be open" |
|
505 |
|
|
506 | 516 |
filename = self._GetJobPath(job.id) |
507 | 517 |
logging.debug("Writing job %s to %s", job.id, filename) |
508 | 518 |
utils.WriteFile(filename, |
... | ... | |
530 | 540 |
pass |
531 | 541 |
|
532 | 542 |
@utils.LockedMethod |
543 |
@_RequireOpenQueue |
|
533 | 544 |
def CancelJob(self, job_id): |
534 | 545 |
"""Cancels a job. |
535 | 546 |
|
... | ... | |
556 | 567 |
self.UpdateJobUnlocked(job) |
557 | 568 |
|
558 | 569 |
@utils.LockedMethod |
570 |
@_RequireOpenQueue |
|
559 | 571 |
def ArchiveJob(self, job_id): |
560 | 572 |
"""Archives a job. |
561 | 573 |
|
... | ... | |
618 | 630 |
return row |
619 | 631 |
|
620 | 632 |
@utils.LockedMethod |
633 |
@_RequireOpenQueue |
|
621 | 634 |
def QueryJobs(self, job_ids, fields): |
622 | 635 |
"""Returns a list of jobs in queue. |
623 | 636 |
|
... | ... | |
637 | 650 |
return jobs |
638 | 651 |
|
639 | 652 |
@utils.LockedMethod |
653 |
@_RequireOpenQueue |
|
640 | 654 |
def Shutdown(self): |
641 | 655 |
"""Stops the job queue. |
642 | 656 |
|
643 | 657 |
""" |
644 |
assert self.lock_fd, "Queue should be open" |
|
645 |
|
|
646 | 658 |
self._wpool.TerminateWorkers() |
647 | 659 |
|
648 | 660 |
self.lock_fd.close() |
Also available in: Unified diff