Revision db37da70

b/lib/jqueue.py
251 251
class JobQueue(object):
252 252
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
253 253

  
254
  def _RequireOpenQueue(fn):
255
    """Decorator for "public" functions.
256

  
257
    This function should be used for all "public" functions. That is, functions
258
    usually called from other classes.
259

  
260
    Important: Use this decorator only after utils.LockedMethod!
261

  
262
    Example:
263
      @utils.LockedMethod
264
      @_RequireOpenQueue
265
      def Example(self):
266
        pass
267

  
268
    """
269
    def wrapper(self, *args, **kwargs):
270
      assert self.lock_fd, "Queue should be open"
271
      return fn(self, *args, **kwargs)
272
    return wrapper
273

  
254 274
  def __init__(self, context):
255 275
    self.context = context
256 276
    self._memcache = {}
......
352 372
    return serial
353 373

  
354 374
  def _InitQueueUnlocked(self):
355
    assert self.lock_fd, "Queue should be open"
356

  
357 375
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
358 376
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
359 377
    if self._ReadSerial() is None:
......
376 394
    Returns: A string representing the job identifier.
377 395

  
378 396
    """
379
    assert self.lock_fd, "Queue should be open"
380

  
381 397
    # New number
382 398
    serial = self._last_serial + 1
383 399

  
......
433 449
    return jlist
434 450

  
435 451
  def _ListJobFiles(self):
436
    assert self.lock_fd, "Queue should be open"
437

  
438 452
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
439 453
            if self._RE_JOB_FILE.match(name)]
440 454

  
441 455
  def _LoadJobUnlocked(self, job_id):
442
    assert self.lock_fd, "Queue should be open"
443

  
444 456
    if job_id in self._memcache:
445 457
      logging.debug("Found job %s in memcache", job_id)
446 458
      return self._memcache[job_id]
......
470 482
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
471 483

  
472 484
  @utils.LockedMethod
485
  @_RequireOpenQueue
473 486
  def SubmitJob(self, ops, nodes):
474 487
    """Create and store a new job.
475 488

  
......
483 496
                  distributed.
484 497

  
485 498
    """
486
    assert self.lock_fd, "Queue should be open"
487

  
488 499
    # Get job identifier
489 500
    job_id = self._NewSerialUnlocked(nodes)
490 501
    job = _QueuedJob(self, job_id, ops)
......
500 511

  
501 512
    return job.id
502 513

  
514
  @_RequireOpenQueue
503 515
  def UpdateJobUnlocked(self, job):
504
    assert self.lock_fd, "Queue should be open"
505

  
506 516
    filename = self._GetJobPath(job.id)
507 517
    logging.debug("Writing job %s to %s", job.id, filename)
508 518
    utils.WriteFile(filename,
......
530 540
          pass
531 541

  
532 542
  @utils.LockedMethod
543
  @_RequireOpenQueue
533 544
  def CancelJob(self, job_id):
534 545
    """Cancels a job.
535 546

  
......
556 567
      self.UpdateJobUnlocked(job)
557 568

  
558 569
  @utils.LockedMethod
570
  @_RequireOpenQueue
559 571
  def ArchiveJob(self, job_id):
560 572
    """Archives a job.
561 573

  
......
618 630
    return row
619 631

  
620 632
  @utils.LockedMethod
633
  @_RequireOpenQueue
621 634
  def QueryJobs(self, job_ids, fields):
622 635
    """Returns a list of jobs in queue.
623 636

  
......
637 650
    return jobs
638 651

  
639 652
  @utils.LockedMethod
653
  @_RequireOpenQueue
640 654
  def Shutdown(self):
641 655
    """Stops the job queue.
642 656

  
643 657
    """
644
    assert self.lock_fd, "Queue should be open"
645

  
646 658
    self._wpool.TerminateWorkers()
647 659

  
648 660
    self.lock_fd.close()

Also available in: Unified diff