Revision df5a5730

b/lib/jqueue.py
1237 1237
      queue.release()
1238 1238

  
1239 1239

  
1240
def _EvaluateJobProcessorResult(depmgr, job, result):
1241
  """Looks at a result from L{_JobProcessor} for a job.
1242

  
1243
  To be used in a L{_JobQueueWorker}.
1244

  
1245
  """
1246
  if result == _JobProcessor.FINISHED:
1247
    # Notify waiting jobs
1248
    depmgr.NotifyWaiters(job.id)
1249

  
1250
  elif result == _JobProcessor.DEFER:
1251
    # Schedule again
1252
    raise workerpool.DeferTask(priority=job.CalcPriority())
1253

  
1254
  elif result == _JobProcessor.WAITDEP:
1255
    # No-op, dependency manager will re-schedule
1256
    pass
1257

  
1258
  else:
1259
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1260
                                 (result, ))
1261

  
1262

  
1240 1263
class _JobQueueWorker(workerpool.BaseWorker):
1241 1264
  """The actual job workers.
1242 1265

  
......
1277 1300
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1278 1301
                                    proc.ExecOpCode)
1279 1302

  
1280
    result = _JobProcessor(queue, wrap_execop_fn, job)()
1281

  
1282
    if result == _JobProcessor.FINISHED:
1283
      # Notify waiting jobs
1284
      queue.depmgr.NotifyWaiters(job.id)
1285

  
1286
    elif result == _JobProcessor.DEFER:
1287
      # Schedule again
1288
      raise workerpool.DeferTask(priority=job.CalcPriority())
1289

  
1290
    elif result == _JobProcessor.WAITDEP:
1291
      # No-op, dependency manager will re-schedule
1292
      pass
1293

  
1294
    else:
1295
      raise errors.ProgrammerError("Job processor returned unknown status %s" %
1296
                                   (result, ))
1303
    _EvaluateJobProcessorResult(queue.depmgr, job,
1304
                                _JobProcessor(queue, wrap_execop_fn, job)())
1297 1305

  
1298 1306
  @staticmethod
1299 1307
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
b/test/ganeti.jqueue_unittest.py
38 38
from ganeti import compat
39 39
from ganeti import mcpu
40 40
from ganeti import query
41
from ganeti import workerpool
41 42

  
42 43
import testutils
43 44

  
......
1625 1626
    self.assertRaises(IndexError, queue.GetNextUpdate)
1626 1627

  
1627 1628

  
1629
class TestEvaluateJobProcessorResult(unittest.TestCase):
1630
  def testFinished(self):
1631
    depmgr = _FakeDependencyManager()
1632
    job = _IdOnlyFakeJob(30953)
1633
    jqueue._EvaluateJobProcessorResult(depmgr, job,
1634
                                       jqueue._JobProcessor.FINISHED)
1635
    self.assertEqual(depmgr.GetNextNotification(), job.id)
1636
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1637

  
1638
  def testDefer(self):
1639
    depmgr = _FakeDependencyManager()
1640
    job = _IdOnlyFakeJob(11326, priority=5463)
1641
    try:
1642
      jqueue._EvaluateJobProcessorResult(depmgr, job,
1643
                                         jqueue._JobProcessor.DEFER)
1644
    except workerpool.DeferTask, err:
1645
      self.assertEqual(err.priority, 5463)
1646
    else:
1647
      self.fail("Didn't raise exception")
1648
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1649

  
1650
  def testWaitdep(self):
1651
    depmgr = _FakeDependencyManager()
1652
    job = _IdOnlyFakeJob(21317)
1653
    jqueue._EvaluateJobProcessorResult(depmgr, job,
1654
                                       jqueue._JobProcessor.WAITDEP)
1655
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1656

  
1657
  def testOther(self):
1658
    depmgr = _FakeDependencyManager()
1659
    job = _IdOnlyFakeJob(5813)
1660
    self.assertRaises(errors.ProgrammerError,
1661
                      jqueue._EvaluateJobProcessorResult,
1662
                      depmgr, job, "Other result")
1663
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1664

  
1665

  
1628 1666
class _FakeTimeoutStrategy:
1629 1667
  def __init__(self, timeouts):
1630 1668
    self.timeouts = timeouts

Also available in: Unified diff