Revision 942e2262 lib/jqueue.py

b/lib/jqueue.py
1 1
#
2 2
#
3 3

  
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 5
#
6 6
# This program is free software; you can redistribute it and/or modify
7 7
# it under the terms of the GNU General Public License as published by
......
75 75
  """
76 76

  
77 77

  
78
class QueueShutdown(Exception):
79
  """Special exception to abort a job when the job queue is shutting down.
80

  
81
  """
82

  
83

  
78 84
def TimeStampNow():
79 85
  """Returns the current timestamp.
80 86

  
......
487 493
      logging.debug("Canceling opcode")
488 494
      raise CancelJob()
489 495

  
496
    # See if queue is shutting down
497
    if not self._queue.AcceptingJobsUnlocked():
498
      logging.debug("Queue is shutting down")
499
      raise QueueShutdown()
500

  
490 501
  @locking.ssynchronized(_QUEUE, shared=1)
491 502
  def NotifyStart(self):
492 503
    """Mark the opcode as running, not lock-waiting.
......
1029 1040
      if op.status == constants.OP_STATUS_CANCELING:
1030 1041
        return (constants.OP_STATUS_CANCELING, None)
1031 1042

  
1043
      # Queue is shutting down, return to queued
1044
      if not self.queue.AcceptingJobsUnlocked():
1045
        return (constants.OP_STATUS_QUEUED, None)
1046

  
1032 1047
      # Stay in waitlock while trying to re-acquire lock
1033 1048
      return (constants.OP_STATUS_WAITING, None)
1034 1049
    except CancelJob:
1035 1050
      logging.exception("%s: Canceling job", opctx.log_prefix)
1036 1051
      assert op.status == constants.OP_STATUS_CANCELING
1037 1052
      return (constants.OP_STATUS_CANCELING, None)
1053

  
1054
    except QueueShutdown:
1055
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1056

  
1057
      assert op.status == constants.OP_STATUS_WAITING
1058

  
1059
      # Job hadn't been started yet, so it should return to the queue
1060
      return (constants.OP_STATUS_QUEUED, None)
1061

  
1038 1062
    except Exception, err: # pylint: disable=W0703
1039 1063
      logging.exception("%s: Caught exception in %s",
1040 1064
                        opctx.log_prefix, opctx.summary)
......
1132 1156

  
1133 1157
          assert not waitjob
1134 1158

  
1135
        if op.status == constants.OP_STATUS_WAITING:
1136
          # Couldn't get locks in time
1159
        if op.status in (constants.OP_STATUS_WAITING,
1160
                         constants.OP_STATUS_QUEUED):
1161
          # waiting: Couldn't get locks in time
1162
          # queued: Queue is shutting down
1137 1163
          assert not op.end_timestamp
1138 1164
        else:
1139 1165
          # Finalize opcode
......
1145 1171
          else:
1146 1172
            assert op.status in constants.OPS_FINALIZED
1147 1173

  
1148
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1174
      if op.status == constants.OP_STATUS_QUEUED:
1175
        # Queue is shutting down
1176
        assert not waitjob
1177

  
1178
        finalize = False
1179

  
1180
        # Reset context
1181
        job.cur_opctx = None
1182

  
1183
        # In no case must the status be finalized here
1184
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1185

  
1186
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1149 1187
        finalize = False
1150 1188

  
1151 1189
        if not waitjob and opctx.CheckPriorityIncrease():
......
2513 2551

  
2514 2552
    return self._wpool.HasRunningTasks()
2515 2553

  
2554
  def AcceptingJobsUnlocked(self):
2555
    """Returns whether jobs are accepted.
2556

  
2557
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2558
    queue is shutting down.
2559

  
2560
    @rtype: bool
2561

  
2562
    """
2563
    return self._accepting_jobs
2564

  
2516 2565
  @locking.ssynchronized(_LOCK)
2517 2566
  @_RequireOpenQueue
2518 2567
  def Shutdown(self):

Also available in: Unified diff