Revision 20571a26 lib/jqueue.py

b/lib/jqueue.py
628 628

  
629 629
    # TODO: Check consistency across nodes
630 630

  
631
    self._queue_size = 0
632
    self._UpdateQueueSizeUnlocked()
633
    self._drained = self._IsQueueMarkedDrain()
634

  
631 635
    # Setup worker pool
632 636
    self._wpool = _JobQueueWorkerPool(self)
633 637
    try:
......
997 1001
    """
998 1002
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
999 1003

  
1000
  @staticmethod
1001
  def SetDrainFlag(drain_flag):
1004
  def _UpdateQueueSizeUnlocked(self):
1005
    """Update the queue size.
1006

  
1007
    """
1008
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1009

  
1010
  @utils.LockedMethod
1011
  @_RequireOpenQueue
1012
  def SetDrainFlag(self, drain_flag):
1002 1013
    """Sets the drain flag for the queue.
1003 1014

  
1004 1015
    @type drain_flag: boolean
......
1009 1020
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1010 1021
    else:
1011 1022
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1023

  
1024
    self._drained = drain_flag
1025

  
1012 1026
    return True
1013 1027

  
1014 1028
  @_RequireOpenQueue
......
1027 1041
    @raise errors.JobQueueDrainError: if the job is marked for draining
1028 1042

  
1029 1043
    """
1030
    if self._IsQueueMarkedDrain():
1044
    # Ok when sharing the big job queue lock, as the drain file is created when
1045
    # the lock is exclusive.
1046
    if self._drained:
1031 1047
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1032 1048

  
1033
    # Check job queue size
1034
    size = len(self._GetJobIDsUnlocked(sort=False))
1035
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1036
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1037
      # submission, though.
1038
      #size = ...
1039
      pass
1040

  
1041
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1049
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1042 1050
      raise errors.JobQueueFull()
1043 1051

  
1044 1052
    job = _QueuedJob(self, job_id, ops)
......
1046 1054
    # Write to disk
1047 1055
    self.UpdateJobUnlocked(job)
1048 1056

  
1057
    self._queue_size += 1
1058

  
1049 1059
    logging.debug("Adding new job %s to the cache", job_id)
1050 1060
    self._memcache[job_id] = job
1051 1061

  
......
1250 1260
    logging.debug("Successfully archived job(s) %s",
1251 1261
                  utils.CommaJoin(job.id for job in archive_jobs))
1252 1262

  
1263
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1264
    # the files, we update the cached queue size from the filesystem. When we
1265
    # get around to fix the TODO: above, we can use the number of actually
1266
    # archived jobs to fix this.
1267
    self._UpdateQueueSizeUnlocked()
1253 1268
    return len(archive_jobs)
1254 1269

  
1255 1270
  @utils.LockedMethod

Also available in: Unified diff