Revision 26d3fd2f

b/daemons/ganeti-masterd
314 314
    """
315 315
    # Queries don't have a job id
316 316
    proc = mcpu.Processor(self.server.context, None)
317

  
318
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
319
    # Consider using a timeout for retries.
317 320
    return proc.ExecOpCode(op, None)
318 321

  
319 322

  
b/lib/jqueue.py
176 176

  
177 177
  """
178 178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter",
179
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180 180
               "received_timestamp", "start_timestamp", "end_timestamp",
181 181
               "__weakref__"]
182 182

  
......
211 211

  
212 212
    """
213 213
    obj.ops_iter = None
214
    obj.cur_opctx = None
214 215

  
215 216
  def __repr__(self):
216 217
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
......
745 746
  return errors.EncodeException(to_encode)
746 747

  
747 748

  
749
class _TimeoutStrategyWrapper:
750
  def __init__(self, fn):
751
    """Initializes this class.
752

  
753
    """
754
    self._fn = fn
755
    self._next = None
756

  
757
  def _Advance(self):
758
    """Gets the next timeout if necessary.
759

  
760
    """
761
    if self._next is None:
762
      self._next = self._fn()
763

  
764
  def Peek(self):
765
    """Returns the next timeout.
766

  
767
    """
768
    self._Advance()
769
    return self._next
770

  
771
  def Next(self):
772
    """Returns the current timeout and advances the internal state.
773

  
774
    """
775
    self._Advance()
776
    result = self._next
777
    self._next = None
778
    return result
779

  
780

  
748 781
class _OpExecContext:
749
  def __init__(self, op, index, log_prefix):
782
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
750 783
    """Initializes this class.
751 784

  
752 785
    """
......
755 788
    self.log_prefix = log_prefix
756 789
    self.summary = op.input.Summary()
757 790

  
791
    self._timeout_strategy_factory = timeout_strategy_factory
792
    self._ResetTimeoutStrategy()
793

  
794
  def _ResetTimeoutStrategy(self):
795
    """Creates a new timeout strategy.
796

  
797
    """
798
    self._timeout_strategy = \
799
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
800

  
801
  def CheckPriorityIncrease(self):
802
    """Checks whether priority can and should be increased.
803

  
804
    Called when locks couldn't be acquired.
805

  
806
    """
807
    op = self.op
808

  
809
    # Exhausted all retries and next round should not use blocking acquire
810
    # for locks?
811
    if (self._timeout_strategy.Peek() is None and
812
        op.priority > constants.OP_PRIO_HIGHEST):
813
      logging.debug("Increasing priority")
814
      op.priority -= 1
815
      self._ResetTimeoutStrategy()
816
      return True
817

  
818
    return False
819

  
820
  def GetNextLockTimeout(self):
821
    """Returns the next lock acquire timeout.
822

  
823
    """
824
    return self._timeout_strategy.Next()
825

  
758 826

  
759 827
class _JobProcessor(object):
760
  def __init__(self, queue, opexec_fn, job):
828
  def __init__(self, queue, opexec_fn, job,
829
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
761 830
    """Initializes this class.
762 831

  
763 832
    """
764 833
    self.queue = queue
765 834
    self.opexec_fn = opexec_fn
766 835
    self.job = job
836
    self._timeout_strategy_factory = _timeout_strategy_factory
767 837

  
768 838
  @staticmethod
769
  def _FindNextOpcode(job):
839
  def _FindNextOpcode(job, timeout_strategy_factory):
770 840
    """Locates the next opcode to run.
771 841

  
772 842
    @type job: L{_QueuedJob}
773 843
    @param job: Job object
844
    @param timeout_strategy_factory: Callable to create new timeout strategy
774 845

  
775 846
    """
776 847
    # Create some sort of a cache to speed up locating next opcode for future
......
791 862
        # Found an opcode already marked as running
792 863
        raise errors.ProgrammerError("Called for job marked as running")
793 864

  
794
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)))
865
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
866
                             timeout_strategy_factory)
795 867

  
796 868
      if op.status == constants.OP_STATUS_CANCELED:
797 869
        # Cancelled jobs are handled by the caller
......
838 910

  
839 911
    assert op.status == constants.OP_STATUS_WAITLOCK
840 912

  
913
    timeout = opctx.GetNextLockTimeout()
914

  
841 915
    try:
842 916
      # Make sure not to hold queue lock while calling ExecOpCode
843 917
      result = self.opexec_fn(op.input,
844
                              _OpExecCallbacks(self.queue, self.job, op))
918
                              _OpExecCallbacks(self.queue, self.job, op),
919
                              timeout=timeout)
920
    except mcpu.LockAcquireTimeout:
921
      assert timeout is not None, "Received timeout for blocking acquire"
922
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
923
      assert op.status == constants.OP_STATUS_WAITLOCK
924
      return (constants.OP_STATUS_QUEUED, None)
845 925
    except CancelJob:
846 926
      logging.exception("%s: Canceling job", opctx.log_prefix)
847 927
      assert op.status == constants.OP_STATUS_CANCELING
......
855 935
                    opctx.log_prefix, opctx.summary)
856 936
      return (constants.OP_STATUS_SUCCESS, result)
857 937

  
858
  def __call__(self):
938
  def __call__(self, _nextop_fn=None):
859 939
    """Continues execution of a job.
860 940

  
941
    @param _nextop_fn: Callback function for tests
861 942
    @rtype: bool
862 943
    @return: True if job is finished, False if processor needs to be called
863 944
             again
......
872 953
    try:
873 954
      opcount = len(job.ops)
874 955

  
875
      opctx = self._FindNextOpcode(job)
956
      # Is a previous opcode still pending?
957
      if job.cur_opctx:
958
        opctx = job.cur_opctx
959
      else:
960
        if __debug__ and _nextop_fn:
961
          _nextop_fn()
962
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
963

  
876 964
      op = opctx.op
877 965

  
878 966
      # Consistency check
......
884 972
                           constants.OP_STATUS_WAITLOCK,
885 973
                           constants.OP_STATUS_CANCELED)
886 974

  
975
      assert (op.priority <= constants.OP_PRIO_LOWEST and
976
              op.priority >= constants.OP_PRIO_HIGHEST)
977

  
887 978
      if op.status != constants.OP_STATUS_CANCELED:
888 979
        # Prepare to start opcode
889 980
        self._MarkWaitlock(job, op)
......
903 994
        finally:
904 995
          queue.acquire(shared=1)
905 996

  
906
        # Finalize opcode
907
        op.end_timestamp = TimeStampNow()
908 997
        op.status = op_status
909 998
        op.result = op_result
910 999

  
911
        if op.status == constants.OP_STATUS_CANCELING:
912
          assert not compat.any(i.status != constants.OP_STATUS_CANCELING
913
                                for i in job.ops[opctx.index:])
1000
        if op.status == constants.OP_STATUS_QUEUED:
1001
          # Couldn't get locks in time
1002
          assert not op.end_timestamp
914 1003
        else:
915
          assert op.status in constants.OPS_FINALIZED
1004
          # Finalize opcode
1005
          op.end_timestamp = TimeStampNow()
916 1006

  
917
      # Ensure all opcodes so far have been successful
918
      assert (opctx.index == 0 or
919
              compat.all(i.status == constants.OP_STATUS_SUCCESS
920
                         for i in job.ops[:opctx.index]))
1007
          if op.status == constants.OP_STATUS_CANCELING:
1008
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1009
                                  for i in job.ops[opctx.index:])
1010
          else:
1011
            assert op.status in constants.OPS_FINALIZED
921 1012

  
922
      if op.status == constants.OP_STATUS_SUCCESS:
1013
      if op.status == constants.OP_STATUS_QUEUED:
923 1014
        finalize = False
924 1015

  
925
      elif op.status == constants.OP_STATUS_ERROR:
926
        # Ensure failed opcode has an exception as its result
927
        assert errors.GetEncodedError(job.ops[opctx.index].result)
1016
        opctx.CheckPriorityIncrease()
928 1017

  
929
        to_encode = errors.OpExecError("Preceding opcode failed")
930
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
931
                              _EncodeOpError(to_encode))
932
        finalize = True
1018
        # Keep around for another round
1019
        job.cur_opctx = opctx
933 1020

  
934
        # Consistency check
935
        assert compat.all(i.status == constants.OP_STATUS_ERROR and
936
                          errors.GetEncodedError(i.result)
937
                          for i in job.ops[opctx.index:])
1021
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1022
                op.priority >= constants.OP_PRIO_HIGHEST)
938 1023

  
939
      elif op.status == constants.OP_STATUS_CANCELING:
940
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
941
                              "Job canceled by request")
942
        finalize = True
1024
        # In no case must the status be finalized here
1025
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
943 1026

  
944
      elif op.status == constants.OP_STATUS_CANCELED:
945
        finalize = True
1027
        queue.UpdateJobUnlocked(job)
946 1028

  
947 1029
      else:
948
        raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1030
        # Ensure all opcodes so far have been successful
1031
        assert (opctx.index == 0 or
1032
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1033
                           for i in job.ops[:opctx.index]))
1034

  
1035
        # Reset context
1036
        job.cur_opctx = None
1037

  
1038
        if op.status == constants.OP_STATUS_SUCCESS:
1039
          finalize = False
1040

  
1041
        elif op.status == constants.OP_STATUS_ERROR:
1042
          # Ensure failed opcode has an exception as its result
1043
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1044

  
1045
          to_encode = errors.OpExecError("Preceding opcode failed")
1046
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1047
                                _EncodeOpError(to_encode))
1048
          finalize = True
949 1049

  
950
      # Finalizing or last opcode?
951
      if finalize or opctx.index == (opcount - 1):
952
        # All opcodes have been run, finalize job
953
        job.end_timestamp = TimeStampNow()
1050
          # Consistency check
1051
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1052
                            errors.GetEncodedError(i.result)
1053
                            for i in job.ops[opctx.index:])
954 1054

  
955
      # Write to disk. If the job status is final, this is the final write
956
      # allowed. Once the file has been written, it can be archived anytime.
957
      queue.UpdateJobUnlocked(job)
1055
        elif op.status == constants.OP_STATUS_CANCELING:
1056
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1057
                                "Job canceled by request")
1058
          finalize = True
1059

  
1060
        elif op.status == constants.OP_STATUS_CANCELED:
1061
          finalize = True
1062

  
1063
        else:
1064
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1065

  
1066
        # Finalizing or last opcode?
1067
        if finalize or opctx.index == (opcount - 1):
1068
          # All opcodes have been run, finalize job
1069
          job.end_timestamp = TimeStampNow()
1070

  
1071
        # Write to disk. If the job status is final, this is the final write
1072
        # allowed. Once the file has been written, it can be archived anytime.
1073
        queue.UpdateJobUnlocked(job)
958 1074

  
959
      if finalize or opctx.index == (opcount - 1):
960
        logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
961
        return True
1075
        if finalize or opctx.index == (opcount - 1):
1076
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1077
          return True
962 1078

  
963 1079
      return False
964 1080
    finally:
......
988 1104

  
989 1105
    if not _JobProcessor(queue, proc.ExecOpCode, job)():
990 1106
      # Schedule again
991
      raise workerpool.DeferTask()
1107
      raise workerpool.DeferTask(priority=job.CalcPriority())
992 1108

  
993 1109

  
994 1110
class _JobQueueWorkerPool(workerpool.WorkerPool):
b/test/ganeti.jqueue_unittest.py
27 27
import tempfile
28 28
import shutil
29 29
import errno
30
import itertools
30 31

  
31 32
from ganeti import constants
32 33
from ganeti import utils
......
34 35
from ganeti import jqueue
35 36
from ganeti import opcodes
36 37
from ganeti import compat
38
from ganeti import mcpu
37 39

  
38 40
import testutils
39 41

  
......
447 449
    self._before_start = before_start
448 450
    self._after_start = after_start
449 451

  
450
  def __call__(self, op, cbs):
452
  def __call__(self, op, cbs, timeout=None):
451 453
    assert isinstance(op, opcodes.OpTestDummy)
452 454

  
453 455
    if self._before_start:
454
      self._before_start()
456
      self._before_start(timeout)
455 457

  
456 458
    cbs.NotifyStart()
457 459

  
......
464 466
    return op.result
465 467

  
466 468

  
467
class TestJobProcessor(unittest.TestCase):
469
class _JobProcessorTestUtils:
468 470
  def _CreateJob(self, queue, job_id, ops):
469 471
    job = jqueue._QueuedJob(queue, job_id, ops)
470 472
    self.assertFalse(job.start_timestamp)
......
475 477
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
476 478
    return job
477 479

  
480

  
481
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
478 482
  def _GenericCheckJob(self, job):
479 483
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
480 484
                      for op in job.ops)
......
502 506
      # Create job
503 507
      job = self._CreateJob(queue, job_id, ops)
504 508

  
505
      def _BeforeStart():
509
      def _BeforeStart(_):
506 510
        self.assertFalse(queue.IsAcquired())
507 511
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
508 512

  
......
656 660

  
657 661
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
658 662

  
659
    def _BeforeStart():
663
    def _BeforeStart(_):
660 664
      self.assertFalse(queue.IsAcquired())
661 665
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
662 666

  
......
845 849
    # Create job
846 850
    job = self._CreateJob(queue, 29386, ops)
847 851

  
848
    def _BeforeStart():
852
    def _BeforeStart(_):
849 853
      self.assertFalse(queue.IsAcquired())
850 854
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
851 855

  
......
922 926
    self.assertFalse(job.GetLogEntries(count + 3))
923 927

  
924 928

  
929
class _FakeTimeoutStrategy:
930
  def __init__(self, timeouts):
931
    self.timeouts = timeouts
932
    self.attempts = 0
933
    self.last_timeout = None
934

  
935
  def NextAttempt(self):
936
    self.attempts += 1
937
    if self.timeouts:
938
      timeout = self.timeouts.pop(0)
939
    else:
940
      timeout = None
941
    self.last_timeout = timeout
942
    return timeout
943

  
944

  
945
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
946
  def setUp(self):
947
    self.queue = _FakeQueueForProc()
948
    self.job = None
949
    self.curop = None
950
    self.opcounter = None
951
    self.timeout_strategy = None
952
    self.retries = 0
953
    self.prev_tsop = None
954
    self.prev_prio = None
955
    self.gave_lock = None
956
    self.done_lock_before_blocking = False
957

  
958
  def _BeforeStart(self, timeout):
959
    job = self.job
960

  
961
    self.assertFalse(self.queue.IsAcquired())
962
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
963

  
964
    ts = self.timeout_strategy
965

  
966
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
967
    self.assertEqual(timeout, ts.last_timeout)
968

  
969
    self.gave_lock = True
970

  
971
    if (self.curop == 3 and
972
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
973
      # Give locks before running into blocking acquire
974
      assert self.retries == 7
975
      self.retries = 0
976
      self.done_lock_before_blocking = True
977
      return
978

  
979
    if self.retries > 0:
980
      self.assert_(timeout is not None)
981
      self.retries -= 1
982
      self.gave_lock = False
983
      raise mcpu.LockAcquireTimeout()
984

  
985
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
986
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
987
      assert not ts.timeouts
988
      self.assert_(timeout is None)
989

  
990
  def _AfterStart(self, op, cbs):
991
    job = self.job
992

  
993
    self.assertFalse(self.queue.IsAcquired())
994
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
995

  
996
    # Job is running, cancelling shouldn't be possible
997
    (success, _) = job.Cancel()
998
    self.assertFalse(success)
999

  
1000
  def _NextOpcode(self):
1001
    self.curop = self.opcounter.next()
1002
    self.prev_prio = self.job.ops[self.curop].priority
1003

  
1004
  def _NewTimeoutStrategy(self):
1005
    job = self.job
1006

  
1007
    self.assertEqual(self.retries, 0)
1008

  
1009
    if self.prev_tsop == self.curop:
1010
      # Still on the same opcode, priority must've been increased
1011
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1012

  
1013
    if self.curop == 1:
1014
      # Normal retry
1015
      timeouts = range(10, 31, 10)
1016
      self.retries = len(timeouts) - 1
1017

  
1018
    elif self.curop == 2:
1019
      # Let this run into a blocking acquire
1020
      timeouts = range(11, 61, 12)
1021
      self.retries = len(timeouts)
1022

  
1023
    elif self.curop == 3:
1024
      # Wait for priority to increase, but give lock before blocking acquire
1025
      timeouts = range(12, 100, 14)
1026
      self.retries = len(timeouts)
1027

  
1028
      self.assertFalse(self.done_lock_before_blocking)
1029

  
1030
    elif self.curop == 4:
1031
      self.assert_(self.done_lock_before_blocking)
1032

  
1033
      # Timeouts, but no need to retry
1034
      timeouts = range(10, 31, 10)
1035
      self.retries = 0
1036

  
1037
    elif self.curop == 5:
1038
      # Normal retry
1039
      timeouts = range(19, 100, 11)
1040
      self.retries = len(timeouts)
1041

  
1042
    else:
1043
      timeouts = []
1044
      self.retries = 0
1045

  
1046
    assert len(job.ops) == 10
1047
    assert self.retries <= len(timeouts)
1048

  
1049
    ts = _FakeTimeoutStrategy(timeouts)
1050

  
1051
    self.timeout_strategy = ts
1052
    self.prev_tsop = self.curop
1053
    self.prev_prio = job.ops[self.curop].priority
1054

  
1055
    return ts
1056

  
1057
  def testTimeout(self):
1058
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1059
           for i in range(10)]
1060

  
1061
    # Create job
1062
    job_id = 15801
1063
    job = self._CreateJob(self.queue, job_id, ops)
1064
    self.job = job
1065

  
1066
    self.opcounter = itertools.count(0)
1067

  
1068
    opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart)
1069
    tsf = self._NewTimeoutStrategy
1070

  
1071
    self.assertFalse(self.done_lock_before_blocking)
1072

  
1073
    for i in itertools.count(0):
1074
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1075
                                  _timeout_strategy_factory=tsf)
1076

  
1077
      result = proc(_nextop_fn=self._NextOpcode)
1078
      if result:
1079
        self.assertFalse(job.cur_opctx)
1080
        break
1081

  
1082
      self.assertFalse(result)
1083

  
1084
      if self.gave_lock:
1085
        self.assertFalse(job.cur_opctx)
1086
      else:
1087
        self.assert_(job.cur_opctx)
1088
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1089
                         self.timeout_strategy.NextAttempt)
1090

  
1091
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1092
      self.assert_(job.start_timestamp)
1093
      self.assertFalse(job.end_timestamp)
1094

  
1095
    self.assertEqual(self.curop, len(job.ops) - 1)
1096
    self.assertEqual(self.job, job)
1097
    self.assertEqual(self.opcounter.next(), len(job.ops))
1098
    self.assert_(self.done_lock_before_blocking)
1099

  
1100
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1101
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1102
    self.assertEqual(job.GetInfo(["opresult"]),
1103
                     [[op.input.result for op in job.ops]])
1104
    self.assertEqual(job.GetInfo(["opstatus"]),
1105
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1106
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1107
                            for op in job.ops))
1108

  
1109
    # Finished jobs can't be processed any further
1110
    self.assertRaises(errors.ProgrammerError,
1111
                      jqueue._JobProcessor(self.queue, opexec, job))
1112

  
1113

  
925 1114
if __name__ == "__main__":
926 1115
  testutils.GanetiTestProgram()

Also available in: Unified diff