Revision 26d3fd2f test/ganeti.jqueue_unittest.py

b/test/ganeti.jqueue_unittest.py
27 27
import tempfile
28 28
import shutil
29 29
import errno
30
import itertools
30 31

  
31 32
from ganeti import constants
32 33
from ganeti import utils
......
34 35
from ganeti import jqueue
35 36
from ganeti import opcodes
36 37
from ganeti import compat
38
from ganeti import mcpu
37 39

  
38 40
import testutils
39 41

  
......
447 449
    self._before_start = before_start
448 450
    self._after_start = after_start
449 451

  
450
  def __call__(self, op, cbs):
452
  def __call__(self, op, cbs, timeout=None):
451 453
    assert isinstance(op, opcodes.OpTestDummy)
452 454

  
453 455
    if self._before_start:
454
      self._before_start()
456
      self._before_start(timeout)
455 457

  
456 458
    cbs.NotifyStart()
457 459

  
......
464 466
    return op.result
465 467

  
466 468

  
467
class TestJobProcessor(unittest.TestCase):
469
class _JobProcessorTestUtils:
468 470
  def _CreateJob(self, queue, job_id, ops):
469 471
    job = jqueue._QueuedJob(queue, job_id, ops)
470 472
    self.assertFalse(job.start_timestamp)
......
475 477
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
476 478
    return job
477 479

  
480

  
481
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
478 482
  def _GenericCheckJob(self, job):
479 483
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
480 484
                      for op in job.ops)
......
502 506
      # Create job
503 507
      job = self._CreateJob(queue, job_id, ops)
504 508

  
505
      def _BeforeStart():
509
      def _BeforeStart(_):
506 510
        self.assertFalse(queue.IsAcquired())
507 511
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
508 512

  
......
656 660

  
657 661
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
658 662

  
659
    def _BeforeStart():
663
    def _BeforeStart(_):
660 664
      self.assertFalse(queue.IsAcquired())
661 665
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
662 666

  
......
845 849
    # Create job
846 850
    job = self._CreateJob(queue, 29386, ops)
847 851

  
848
    def _BeforeStart():
852
    def _BeforeStart(_):
849 853
      self.assertFalse(queue.IsAcquired())
850 854
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
851 855

  
......
922 926
    self.assertFalse(job.GetLogEntries(count + 3))
923 927

  
924 928

  
929
class _FakeTimeoutStrategy:
930
  def __init__(self, timeouts):
931
    self.timeouts = timeouts
932
    self.attempts = 0
933
    self.last_timeout = None
934

  
935
  def NextAttempt(self):
936
    self.attempts += 1
937
    if self.timeouts:
938
      timeout = self.timeouts.pop(0)
939
    else:
940
      timeout = None
941
    self.last_timeout = timeout
942
    return timeout
943

  
944

  
945
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
946
  def setUp(self):
947
    self.queue = _FakeQueueForProc()
948
    self.job = None
949
    self.curop = None
950
    self.opcounter = None
951
    self.timeout_strategy = None
952
    self.retries = 0
953
    self.prev_tsop = None
954
    self.prev_prio = None
955
    self.gave_lock = None
956
    self.done_lock_before_blocking = False
957

  
958
  def _BeforeStart(self, timeout):
959
    job = self.job
960

  
961
    self.assertFalse(self.queue.IsAcquired())
962
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
963

  
964
    ts = self.timeout_strategy
965

  
966
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
967
    self.assertEqual(timeout, ts.last_timeout)
968

  
969
    self.gave_lock = True
970

  
971
    if (self.curop == 3 and
972
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
973
      # Give locks before running into blocking acquire
974
      assert self.retries == 7
975
      self.retries = 0
976
      self.done_lock_before_blocking = True
977
      return
978

  
979
    if self.retries > 0:
980
      self.assert_(timeout is not None)
981
      self.retries -= 1
982
      self.gave_lock = False
983
      raise mcpu.LockAcquireTimeout()
984

  
985
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
986
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
987
      assert not ts.timeouts
988
      self.assert_(timeout is None)
989

  
990
  def _AfterStart(self, op, cbs):
991
    job = self.job
992

  
993
    self.assertFalse(self.queue.IsAcquired())
994
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
995

  
996
    # Job is running, cancelling shouldn't be possible
997
    (success, _) = job.Cancel()
998
    self.assertFalse(success)
999

  
1000
  def _NextOpcode(self):
1001
    self.curop = self.opcounter.next()
1002
    self.prev_prio = self.job.ops[self.curop].priority
1003

  
1004
  def _NewTimeoutStrategy(self):
1005
    job = self.job
1006

  
1007
    self.assertEqual(self.retries, 0)
1008

  
1009
    if self.prev_tsop == self.curop:
1010
      # Still on the same opcode, priority must've been increased
1011
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1012

  
1013
    if self.curop == 1:
1014
      # Normal retry
1015
      timeouts = range(10, 31, 10)
1016
      self.retries = len(timeouts) - 1
1017

  
1018
    elif self.curop == 2:
1019
      # Let this run into a blocking acquire
1020
      timeouts = range(11, 61, 12)
1021
      self.retries = len(timeouts)
1022

  
1023
    elif self.curop == 3:
1024
      # Wait for priority to increase, but give lock before blocking acquire
1025
      timeouts = range(12, 100, 14)
1026
      self.retries = len(timeouts)
1027

  
1028
      self.assertFalse(self.done_lock_before_blocking)
1029

  
1030
    elif self.curop == 4:
1031
      self.assert_(self.done_lock_before_blocking)
1032

  
1033
      # Timeouts, but no need to retry
1034
      timeouts = range(10, 31, 10)
1035
      self.retries = 0
1036

  
1037
    elif self.curop == 5:
1038
      # Normal retry
1039
      timeouts = range(19, 100, 11)
1040
      self.retries = len(timeouts)
1041

  
1042
    else:
1043
      timeouts = []
1044
      self.retries = 0
1045

  
1046
    assert len(job.ops) == 10
1047
    assert self.retries <= len(timeouts)
1048

  
1049
    ts = _FakeTimeoutStrategy(timeouts)
1050

  
1051
    self.timeout_strategy = ts
1052
    self.prev_tsop = self.curop
1053
    self.prev_prio = job.ops[self.curop].priority
1054

  
1055
    return ts
1056

  
1057
  def testTimeout(self):
1058
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1059
           for i in range(10)]
1060

  
1061
    # Create job
1062
    job_id = 15801
1063
    job = self._CreateJob(self.queue, job_id, ops)
1064
    self.job = job
1065

  
1066
    self.opcounter = itertools.count(0)
1067

  
1068
    opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart)
1069
    tsf = self._NewTimeoutStrategy
1070

  
1071
    self.assertFalse(self.done_lock_before_blocking)
1072

  
1073
    for i in itertools.count(0):
1074
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1075
                                  _timeout_strategy_factory=tsf)
1076

  
1077
      result = proc(_nextop_fn=self._NextOpcode)
1078
      if result:
1079
        self.assertFalse(job.cur_opctx)
1080
        break
1081

  
1082
      self.assertFalse(result)
1083

  
1084
      if self.gave_lock:
1085
        self.assertFalse(job.cur_opctx)
1086
      else:
1087
        self.assert_(job.cur_opctx)
1088
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1089
                         self.timeout_strategy.NextAttempt)
1090

  
1091
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1092
      self.assert_(job.start_timestamp)
1093
      self.assertFalse(job.end_timestamp)
1094

  
1095
    self.assertEqual(self.curop, len(job.ops) - 1)
1096
    self.assertEqual(self.job, job)
1097
    self.assertEqual(self.opcounter.next(), len(job.ops))
1098
    self.assert_(self.done_lock_before_blocking)
1099

  
1100
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1101
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1102
    self.assertEqual(job.GetInfo(["opresult"]),
1103
                     [[op.input.result for op in job.ops]])
1104
    self.assertEqual(job.GetInfo(["opstatus"]),
1105
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1106
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1107
                            for op in job.ops))
1108

  
1109
    # Finished jobs can't be processed any further
1110
    self.assertRaises(errors.ProgrammerError,
1111
                      jqueue._JobProcessor(self.queue, opexec, job))
1112

  
1113

  
925 1114
if __name__ == "__main__":
926 1115
  testutils.GanetiTestProgram()

Also available in: Unified diff