Revision be760ba8

b/lib/jqueue.py
176 176

  
177 177
  """
178 178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial",
179
  __slots__ = ["queue", "id", "ops", "log_serial", "current_op",
180 180
               "received_timestamp", "start_timestamp", "end_timestamp",
181 181
               "__weakref__"]
182 182

  
......
203 203
    self.start_timestamp = None
204 204
    self.end_timestamp = None
205 205

  
206
    self.current_op = None
207

  
206 208
  def __repr__(self):
207 209
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
208 210
              "id=%s" % self.id,
......
237 239
        obj.log_serial = max(obj.log_serial, log_entry[0])
238 240
      obj.ops.append(op)
239 241

  
242
    obj.current_op = None
243

  
240 244
    return obj
241 245

  
242 246
  def Serialize(self):
......
734 738
  return errors.EncodeException(to_encode)
735 739

  
736 740

  
741
class _JobProcessor(object):
742
  def __init__(self, queue, opexec_fn, job):
743
    """Initializes this class.
744

  
745
    """
746
    self.queue = queue
747
    self.opexec_fn = opexec_fn
748
    self.job = job
749

  
750
  @staticmethod
751
  def _FindNextOpcode(job):
752
    """Locates the next opcode to run.
753

  
754
    @type job: L{_QueuedJob}
755
    @param job: Job object
756

  
757
    """
758
    # Create some sort of a cache to speed up locating next opcode for future
759
    # lookups
760
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
761
    # pending and one for processed ops.
762
    if job.current_op is None:
763
      job.current_op = enumerate(job.ops)
764

  
765
    # Find next opcode to run
766
    while True:
767
      try:
768
        (idx, op) = job.current_op.next()
769
      except StopIteration:
770
        raise errors.ProgrammerError("Called for a finished job")
771

  
772
      if op.status == constants.OP_STATUS_RUNNING:
773
        # Found an opcode already marked as running
774
        raise errors.ProgrammerError("Called for job marked as running")
775

  
776
      log_prefix = "Op %s/%s" % (idx + 1, len(job.ops))
777
      summary = op.input.Summary()
778

  
779
      if op.status == constants.OP_STATUS_CANCELED:
780
        # Cancelled jobs are handled by the caller
781
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
782
                              for i in job.ops[idx:])
783

  
784
      elif op.status in constants.OPS_FINALIZED:
785
        # This is a job that was partially completed before master daemon
786
        # shutdown, so it can be expected that some opcodes are already
787
        # completed successfully (if any did error out, then the whole job
788
        # should have been aborted and not resubmitted for processing).
789
        logging.info("%s: opcode %s already processed, skipping",
790
                     log_prefix, summary)
791
        continue
792

  
793
      return (idx, op, log_prefix, summary)
794

  
795
  @staticmethod
796
  def _MarkWaitlock(job, op):
797
    """Marks an opcode as waiting for locks.
798

  
799
    The job's start timestamp is also set if necessary.
800

  
801
    @type job: L{_QueuedJob}
802
    @param job: Job object
803
    @type job: L{_QueuedOpCode}
804
    @param job: Opcode object
805

  
806
    """
807
    assert op in job.ops
808

  
809
    op.status = constants.OP_STATUS_WAITLOCK
810
    op.result = None
811
    op.start_timestamp = TimeStampNow()
812

  
813
    if job.start_timestamp is None:
814
      job.start_timestamp = op.start_timestamp
815

  
816
  def _ExecOpCodeUnlocked(self, log_prefix, op, summary):
817
    """Processes one opcode and returns the result.
818

  
819
    """
820
    assert op.status == constants.OP_STATUS_WAITLOCK
821

  
822
    try:
823
      # Make sure not to hold queue lock while calling ExecOpCode
824
      result = self.opexec_fn(op.input,
825
                              _OpExecCallbacks(self.queue, self.job, op))
826
    except CancelJob:
827
      logging.exception("%s: Canceling job", log_prefix)
828
      assert op.status == constants.OP_STATUS_CANCELING
829
      return (constants.OP_STATUS_CANCELING, None)
830
    except Exception, err: # pylint: disable-msg=W0703
831
      logging.exception("%s: Caught exception in %s", log_prefix, summary)
832
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
833
    else:
834
      logging.debug("%s: %s successful", log_prefix, summary)
835
      return (constants.OP_STATUS_SUCCESS, result)
836

  
837
  def __call__(self):
838
    """Continues execution of a job.
839

  
840
    @rtype: bool
841
    @return: True if job is finished, False if processor needs to be called
842
             again
843

  
844
    """
845
    queue = self.queue
846
    job = self.job
847

  
848
    logging.debug("Processing job %s", job.id)
849

  
850
    queue.acquire(shared=1)
851
    try:
852
      opcount = len(job.ops)
853

  
854
      (opidx, op, log_prefix, op_summary) = self._FindNextOpcode(job)
855

  
856
      # Consistency check
857
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
858
                                     constants.OP_STATUS_CANCELED)
859
                        for i in job.ops[opidx:])
860

  
861
      assert op.status in (constants.OP_STATUS_QUEUED,
862
                           constants.OP_STATUS_WAITLOCK,
863
                           constants.OP_STATUS_CANCELED)
864

  
865
      if op.status != constants.OP_STATUS_CANCELED:
866
        # Prepare to start opcode
867
        self._MarkWaitlock(job, op)
868

  
869
        assert op.status == constants.OP_STATUS_WAITLOCK
870
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
871

  
872
        # Write to disk
873
        queue.UpdateJobUnlocked(job)
874

  
875
        logging.info("%s: opcode %s waiting for locks", log_prefix, op_summary)
876

  
877
        queue.release()
878
        try:
879
          (op_status, op_result) = \
880
            self._ExecOpCodeUnlocked(log_prefix, op, op_summary)
881
        finally:
882
          queue.acquire(shared=1)
883

  
884
        # Finalize opcode
885
        op.end_timestamp = TimeStampNow()
886
        op.status = op_status
887
        op.result = op_result
888

  
889
        if op.status == constants.OP_STATUS_CANCELING:
890
          assert not compat.any(i.status != constants.OP_STATUS_CANCELING
891
                                for i in job.ops[opidx:])
892
        else:
893
          assert op.status in constants.OPS_FINALIZED
894

  
895
      # Ensure all opcodes so far have been successful
896
      assert (opidx == 0 or
897
              compat.all(i.status == constants.OP_STATUS_SUCCESS
898
                         for i in job.ops[:opidx]))
899

  
900
      if op.status == constants.OP_STATUS_SUCCESS:
901
        finalize = False
902

  
903
      elif op.status == constants.OP_STATUS_ERROR:
904
        # Ensure failed opcode has an exception as its result
905
        assert errors.GetEncodedError(job.ops[opidx].result)
906

  
907
        to_encode = errors.OpExecError("Preceding opcode failed")
908
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
909
                              _EncodeOpError(to_encode))
910
        finalize = True
911

  
912
        # Consistency check
913
        assert compat.all(i.status == constants.OP_STATUS_ERROR and
914
                          errors.GetEncodedError(i.result)
915
                          for i in job.ops[opidx:])
916

  
917
      elif op.status == constants.OP_STATUS_CANCELING:
918
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
919
                              "Job canceled by request")
920
        finalize = True
921

  
922
      elif op.status == constants.OP_STATUS_CANCELED:
923
        finalize = True
924

  
925
      else:
926
        raise errors.ProgrammerError("Unknown status '%s'" % op.status)
927

  
928
      # Finalizing or last opcode?
929
      if finalize or opidx == (opcount - 1):
930
        # All opcodes have been run, finalize job
931
        job.end_timestamp = TimeStampNow()
932

  
933
      # Write to disk. If the job status is final, this is the final write
934
      # allowed. Once the file has been written, it can be archived anytime.
935
      queue.UpdateJobUnlocked(job)
936

  
937
      if finalize or opidx == (opcount - 1):
938
        logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
939
        return True
940

  
941
      return False
942
    finally:
943
      queue.release()
944

  
945

  
737 946
class _JobQueueWorker(workerpool.BaseWorker):
738 947
  """The actual job workers.
739 948

  
......
741 950
  def RunTask(self, job): # pylint: disable-msg=W0221
742 951
    """Job executor.
743 952

  
744
    This functions processes a job. It is closely tied to the _QueuedJob and
745
    _QueuedOpCode classes.
953
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
954
    L{_QueuedOpCode} classes.
746 955

  
747 956
    @type job: L{_QueuedJob}
748 957
    @param job: the job to be processed
749 958

  
750 959
    """
960
    queue = job.queue
961
    assert queue == self.pool.queue
962

  
751 963
    self.SetTaskName("Job%s" % job.id)
752 964

  
753
    logging.info("Processing job %s", job.id)
754
    proc = mcpu.Processor(self.pool.queue.context, job.id)
755
    queue = job.queue
756
    try:
757
      try:
758
        count = len(job.ops)
759
        for idx, op in enumerate(job.ops):
760
          op_summary = op.input.Summary()
761
          if op.status == constants.OP_STATUS_SUCCESS:
762
            # this is a job that was partially completed before master
763
            # daemon shutdown, so it can be expected that some opcodes
764
            # are already completed successfully (if any did error
765
            # out, then the whole job should have been aborted and not
766
            # resubmitted for processing)
767
            logging.info("Op %s/%s: opcode %s already processed, skipping",
768
                         idx + 1, count, op_summary)
769
            continue
770
          try:
771
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
772
                         op_summary)
773

  
774
            queue.acquire(shared=1)
775
            try:
776
              if op.status == constants.OP_STATUS_CANCELED:
777
                logging.debug("Canceling opcode")
778
                raise CancelJob()
779
              assert op.status == constants.OP_STATUS_QUEUED
780
              logging.debug("Opcode %s/%s waiting for locks",
781
                            idx + 1, count)
782
              op.status = constants.OP_STATUS_WAITLOCK
783
              op.result = None
784
              op.start_timestamp = TimeStampNow()
785
              if idx == 0: # first opcode
786
                job.start_timestamp = op.start_timestamp
787
              queue.UpdateJobUnlocked(job)
788

  
789
              input_opcode = op.input
790
            finally:
791
              queue.release()
792

  
793
            # Make sure not to hold queue lock while calling ExecOpCode
794
            result = proc.ExecOpCode(input_opcode,
795
                                     _OpExecCallbacks(queue, job, op))
796

  
797
            queue.acquire(shared=1)
798
            try:
799
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
800
              op.status = constants.OP_STATUS_SUCCESS
801
              op.result = result
802
              op.end_timestamp = TimeStampNow()
803
              if idx == count - 1:
804
                job.end_timestamp = TimeStampNow()
805

  
806
                # Consistency check
807
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
808
                                  for i in job.ops)
809

  
810
              queue.UpdateJobUnlocked(job)
811
            finally:
812
              queue.release()
813

  
814
            logging.info("Op %s/%s: Successfully finished opcode %s",
815
                         idx + 1, count, op_summary)
816
          except CancelJob:
817
            # Will be handled further up
818
            raise
819
          except Exception, err:
820
            queue.acquire(shared=1)
821
            try:
822
              try:
823
                logging.debug("Opcode %s/%s failed", idx + 1, count)
824
                op.status = constants.OP_STATUS_ERROR
825
                op.result = _EncodeOpError(err)
826
                op.end_timestamp = TimeStampNow()
827
                logging.info("Op %s/%s: Error in opcode %s: %s",
828
                             idx + 1, count, op_summary, err)
829

  
830
                to_encode = errors.OpExecError("Preceding opcode failed")
831
                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
832
                                      _EncodeOpError(to_encode))
833

  
834
                # Consistency check
835
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
836
                                  for i in job.ops[:idx])
837
                assert compat.all(i.status == constants.OP_STATUS_ERROR and
838
                                  errors.GetEncodedError(i.result)
839
                                  for i in job.ops[idx:])
840
              finally:
841
                job.end_timestamp = TimeStampNow()
842
                queue.UpdateJobUnlocked(job)
843
            finally:
844
              queue.release()
845
            raise
846

  
847
      except CancelJob:
848
        queue.acquire(shared=1)
849
        try:
850
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
851
                                "Job canceled by request")
852
          job.end_timestamp = TimeStampNow()
853
          queue.UpdateJobUnlocked(job)
854
        finally:
855
          queue.release()
856
      except errors.GenericError, err:
857
        logging.exception("Ganeti exception")
858
      except:
859
        logging.exception("Unhandled exception")
860
    finally:
861
      status = job.CalcStatus()
862
      logging.info("Finished job %s, status = %s", job.id, status)
965
    proc = mcpu.Processor(queue.context, job.id)
966

  
967
    if not _JobProcessor(queue, proc.ExecOpCode, job)():
968
      # Schedule again
969
      raise workerpool.DeferTask()
863 970

  
864 971

  
865 972
class _JobQueueWorkerPool(workerpool.WorkerPool):
b/lib/opcodes.py
802 802
    ]
803 803

  
804 804

  
805
class OpTestDummy(OpCode):
806
  """Utility opcode used by unittests.
807

  
808
  """
809
  OP_ID = "OP_TEST_DUMMY"
810
  __slots__ = [
811
    "result",
812
    "messages",
813
    "fail",
814
    ]
815

  
816

  
805 817
OP_MAPPING = dict([(v.OP_ID, v) for v in globals().values()
806 818
                   if (isinstance(v, type) and issubclass(v, OpCode) and
807 819
                       hasattr(v, "OP_ID"))])
b/test/ganeti.jqueue_unittest.py
347 347
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
348 348

  
349 349

  
350
class _FakeQueueForProc:
351
  def __init__(self):
352
    self._acquired = False
353

  
354
  def IsAcquired(self):
355
    return self._acquired
356

  
357
  def acquire(self, shared=0):
358
    assert shared == 1
359
    self._acquired = True
360

  
361
  def release(self):
362
    assert self._acquired
363
    self._acquired = False
364

  
365
  def UpdateJobUnlocked(self, job, replicate=None):
366
    # TODO: Ensure job is updated at the correct places
367
    pass
368

  
369

  
370
class _FakeExecOpCodeForProc:
371
  def __init__(self, before_start, after_start):
372
    self._before_start = before_start
373
    self._after_start = after_start
374

  
375
  def __call__(self, op, cbs):
376
    assert isinstance(op, opcodes.OpTestDummy)
377

  
378
    if self._before_start:
379
      self._before_start()
380

  
381
    cbs.NotifyStart()
382

  
383
    if self._after_start:
384
      self._after_start(op, cbs)
385

  
386
    if op.fail:
387
      raise errors.OpExecError("Error requested (%s)" % op.result)
388

  
389
    return op.result
390

  
391

  
392
class TestJobProcessor(unittest.TestCase):
393
  def _CreateJob(self, queue, job_id, ops):
394
    job = jqueue._QueuedJob(queue, job_id, ops)
395
    self.assertFalse(job.start_timestamp)
396
    self.assertFalse(job.end_timestamp)
397
    self.assertEqual(len(ops), len(job.ops))
398
    self.assert_(compat.all(op.input == inp
399
                            for (op, inp) in zip(job.ops, ops)))
400
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
401
    return job
402

  
403
  def _GenericCheckJob(self, job):
404
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
405
                      for op in job.ops)
406

  
407
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
408
                     [[op.start_timestamp for op in job.ops],
409
                      [op.exec_timestamp for op in job.ops],
410
                      [op.end_timestamp for op in job.ops]])
411
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
412
                     [job.received_timestamp,
413
                      job.start_timestamp,
414
                      job.end_timestamp])
415
    self.assert_(job.start_timestamp)
416
    self.assert_(job.end_timestamp)
417
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
418

  
419
  def testSuccess(self):
420
    queue = _FakeQueueForProc()
421

  
422
    for (job_id, opcount) in [(25351, 1), (6637, 3),
423
                              (24644, 10), (32207, 100)]:
424
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
425
             for i in range(opcount)]
426

  
427
      # Create job
428
      job = self._CreateJob(queue, job_id, ops)
429

  
430
      def _BeforeStart():
431
        self.assertFalse(queue.IsAcquired())
432
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
433

  
434
      def _AfterStart(op, cbs):
435
        self.assertFalse(queue.IsAcquired())
436
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
437

  
438
        # Job is running, cancelling shouldn't be possible
439
        (success, _) = job.Cancel()
440
        self.assertFalse(success)
441

  
442
      opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
443

  
444
      for idx in range(len(ops)):
445
        result = jqueue._JobProcessor(queue, opexec, job)()
446
        if idx == len(ops) - 1:
447
          # Last opcode
448
          self.assert_(result)
449
        else:
450
          self.assertFalse(result)
451

  
452
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
453
          self.assert_(job.start_timestamp)
454
          self.assertFalse(job.end_timestamp)
455

  
456
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
457
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
458
      self.assertEqual(job.GetInfo(["opresult"]),
459
                       [[op.input.result for op in job.ops]])
460
      self.assertEqual(job.GetInfo(["opstatus"]),
461
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
462
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
463
                              for op in job.ops))
464

  
465
      self._GenericCheckJob(job)
466

  
467
      # Finished jobs can't be processed any further
468
      self.assertRaises(errors.ProgrammerError,
469
                        jqueue._JobProcessor(queue, opexec, job))
470

  
471
  def testOpcodeError(self):
472
    queue = _FakeQueueForProc()
473

  
474
    testdata = [
475
      (17077, 1, 0, 0),
476
      (1782, 5, 2, 2),
477
      (18179, 10, 9, 9),
478
      (4744, 10, 3, 8),
479
      (23816, 100, 39, 45),
480
      ]
481

  
482
    for (job_id, opcount, failfrom, failto) in testdata:
483
      # Prepare opcodes
484
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
485
                                 fail=(failfrom <= i and
486
                                       i <= failto))
487
             for i in range(opcount)]
488

  
489
      # Create job
490
      job = self._CreateJob(queue, job_id, ops)
491

  
492
      opexec = _FakeExecOpCodeForProc(None, None)
493

  
494
      for idx in range(len(ops)):
495
        result = jqueue._JobProcessor(queue, opexec, job)()
496

  
497
        if idx in (failfrom, len(ops) - 1):
498
          # Last opcode
499
          self.assert_(result)
500
          break
501

  
502
        self.assertFalse(result)
503

  
504
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
505

  
506
      # Check job status
507
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
508
      self.assertEqual(job.GetInfo(["id"]), [job_id])
509
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
510

  
511
      # Check opcode status
512
      data = zip(job.ops,
513
                 job.GetInfo(["opstatus"])[0],
514
                 job.GetInfo(["opresult"])[0])
515

  
516
      for idx, (op, opstatus, opresult) in enumerate(data):
517
        if idx < failfrom:
518
          assert not op.input.fail
519
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
520
          self.assertEqual(opresult, op.input.result)
521
        elif idx <= failto:
522
          assert op.input.fail
523
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
524
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
525
        else:
526
          assert not op.input.fail
527
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
528
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
529

  
530
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
531
                              for op in job.ops[:failfrom]))
532

  
533
      self._GenericCheckJob(job)
534

  
535
      # Finished jobs can't be processed any further
536
      self.assertRaises(errors.ProgrammerError,
537
                        jqueue._JobProcessor(queue, opexec, job))
538

  
539
  def testCancelWhileInQueue(self):
540
    queue = _FakeQueueForProc()
541

  
542
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
543
           for i in range(5)]
544

  
545
    # Create job
546
    job_id = 17045
547
    job = self._CreateJob(queue, job_id, ops)
548

  
549
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
550

  
551
    # Mark as cancelled
552
    (success, _) = job.Cancel()
553
    self.assert_(success)
554

  
555
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
556
                            for op in job.ops))
557

  
558
    opexec = _FakeExecOpCodeForProc(None, None)
559
    jqueue._JobProcessor(queue, opexec, job)()
560

  
561
    # Check result
562
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
563
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
564
    self.assertFalse(job.start_timestamp)
565
    self.assert_(job.end_timestamp)
566
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
567
                                for op in job.ops))
568
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
569
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
570
                      ["Job canceled by request" for _ in job.ops]])
571

  
572
  def testCancelWhileWaitlock(self):
573
    queue = _FakeQueueForProc()
574

  
575
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
576
           for i in range(5)]
577

  
578
    # Create job
579
    job_id = 11009
580
    job = self._CreateJob(queue, job_id, ops)
581

  
582
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
583

  
584
    def _BeforeStart():
585
      self.assertFalse(queue.IsAcquired())
586
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
587

  
588
      # Mark as cancelled
589
      (success, _) = job.Cancel()
590
      self.assert_(success)
591

  
592
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
593
                              for op in job.ops))
594

  
595
    def _AfterStart(op, cbs):
596
      self.assertFalse(queue.IsAcquired())
597
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
598

  
599
    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
600

  
601
    jqueue._JobProcessor(queue, opexec, job)()
602

  
603
    # Check result
604
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
605
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
606
    self.assert_(job.start_timestamp)
607
    self.assert_(job.end_timestamp)
608
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
609
                                for op in job.ops))
610
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
611
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
612
                      ["Job canceled by request" for _ in job.ops]])
613

  
614
  def testCancelWhileRunning(self):
615
    # Tests canceling a job with finished opcodes and more, unprocessed ones
616
    queue = _FakeQueueForProc()
617

  
618
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
619
           for i in range(3)]
620

  
621
    # Create job
622
    job_id = 28492
623
    job = self._CreateJob(queue, job_id, ops)
624

  
625
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
626

  
627
    opexec = _FakeExecOpCodeForProc(None, None)
628

  
629
    # Run one opcode
630
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
631

  
632
    # Job goes back to queued
633
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
634
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
635
                     [[constants.OP_STATUS_SUCCESS,
636
                       constants.OP_STATUS_QUEUED,
637
                       constants.OP_STATUS_QUEUED],
638
                      ["Res0", None, None]])
639

  
640
    # Mark as cancelled
641
    (success, _) = job.Cancel()
642
    self.assert_(success)
643

  
644
    # Try processing another opcode (this will actually cancel the job)
645
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
646

  
647
    # Check result
648
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
649
    self.assertEqual(job.GetInfo(["id"]), [job_id])
650
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
651
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
652
                     [[constants.OP_STATUS_SUCCESS,
653
                       constants.OP_STATUS_CANCELED,
654
                       constants.OP_STATUS_CANCELED],
655
                      ["Res0", "Job canceled by request",
656
                       "Job canceled by request"]])
657

  
658
  def testPartiallyRun(self):
659
    # Tests calling the processor on a job that's been partially run before the
660
    # program was restarted
661
    queue = _FakeQueueForProc()
662

  
663
    opexec = _FakeExecOpCodeForProc(None, None)
664

  
665
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
666
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
667
             for i in range(10)]
668

  
669
      # Create job
670
      job = self._CreateJob(queue, job_id, ops)
671

  
672
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
673

  
674
      for _ in range(successcount):
675
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
676

  
677
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
678
      self.assertEqual(job.GetInfo(["opstatus"]),
679
                       [[constants.OP_STATUS_SUCCESS
680
                         for _ in range(successcount)] +
681
                        [constants.OP_STATUS_QUEUED
682
                         for _ in range(len(ops) - successcount)]])
683

  
684
      self.assert_(job.current_op)
685

  
686
      # Serialize and restore (simulates program restart)
687
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
688
      self.assertFalse(newjob.current_op)
689
      self._TestPartial(newjob, successcount)
690

  
691
  def _TestPartial(self, job, successcount):
692
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
693
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
694

  
695
    queue = _FakeQueueForProc()
696
    opexec = _FakeExecOpCodeForProc(None, None)
697

  
698
    for remaining in reversed(range(len(job.ops) - successcount)):
699
      result = jqueue._JobProcessor(queue, opexec, job)()
700

  
701
      if remaining == 0:
702
        # Last opcode
703
        self.assert_(result)
704
        break
705

  
706
      self.assertFalse(result)
707

  
708
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
709

  
710
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
711
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
712
    self.assertEqual(job.GetInfo(["opresult"]),
713
                     [[op.input.result for op in job.ops]])
714
    self.assertEqual(job.GetInfo(["opstatus"]),
715
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
716
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
717
                            for op in job.ops))
718

  
719
    self._GenericCheckJob(job)
720

  
721
    # Finished jobs can't be processed any further
722
    self.assertRaises(errors.ProgrammerError,
723
                      jqueue._JobProcessor(queue, opexec, job))
724

  
725
    # ... also after being restored
726
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
727
    self.assertRaises(errors.ProgrammerError,
728
                      jqueue._JobProcessor(queue, opexec, job2))
729

  
730
  def testProcessorOnRunningJob(self):
731
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
732

  
733
    queue = _FakeQueueForProc()
734
    opexec = _FakeExecOpCodeForProc(None, None)
735

  
736
    # Create job
737
    job = self._CreateJob(queue, 9571, ops)
738

  
739
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
740

  
741
    job.ops[0].status = constants.OP_STATUS_RUNNING
742

  
743
    assert len(job.ops) == 1
744

  
745
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
746

  
747
    # Calling on running job must fail
748
    self.assertRaises(errors.ProgrammerError,
749
                      jqueue._JobProcessor(queue, opexec, job))
750

  
751
  def testLogMessages(self):
752
    # Tests the "Feedback" callback function
753
    queue = _FakeQueueForProc()
754

  
755
    messages = {
756
      1: [
757
        (None, "Hello"),
758
        (None, "World"),
759
        (constants.ELOG_MESSAGE, "there"),
760
        ],
761
      4: [
762
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
763
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
764
        ],
765
      }
766
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
767
                               messages=messages.get(i, []))
768
           for i in range(5)]
769

  
770
    # Create job
771
    job = self._CreateJob(queue, 29386, ops)
772

  
773
    def _BeforeStart():
774
      self.assertFalse(queue.IsAcquired())
775
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
776

  
777
    def _AfterStart(op, cbs):
778
      self.assertFalse(queue.IsAcquired())
779
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
780

  
781
      self.assertRaises(AssertionError, cbs.Feedback,
782
                        "too", "many", "arguments")
783

  
784
      for (log_type, msg) in op.messages:
785
        if log_type:
786
          cbs.Feedback(log_type, msg)
787
        else:
788
          cbs.Feedback(msg)
789

  
790
    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
791

  
792
    for remaining in reversed(range(len(job.ops))):
793
      result = jqueue._JobProcessor(queue, opexec, job)()
794

  
795
      if remaining == 0:
796
        # Last opcode
797
        self.assert_(result)
798
        break
799

  
800
      self.assertFalse(result)
801

  
802
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
803

  
804
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
805
    self.assertEqual(job.GetInfo(["opresult"]),
806
                     [[op.input.result for op in job.ops]])
807

  
808
    logmsgcount = sum(len(m) for m in messages.values())
809

  
810
    self._CheckLogMessages(job, logmsgcount)
811

  
812
    # Serialize and restore (simulates program restart)
813
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
814
    self._CheckLogMessages(newjob, logmsgcount)
815

  
816
    # Check each message
817
    prevserial = -1
818
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
819
      for (serial, timestamp, log_type, msg) in oplog:
820
        (exptype, expmsg) = messages.get(idx).pop(0)
821
        if exptype:
822
          self.assertEqual(log_type, exptype)
823
        else:
824
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
825
        self.assertEqual(expmsg, msg)
826
        self.assert_(serial > prevserial)
827
        prevserial = serial
828

  
829
  def _CheckLogMessages(self, job, count):
830
    # Check serial
831
    self.assertEqual(job.log_serial, count)
832

  
833
    # No filter
834
    self.assertEqual(job.GetLogEntries(None),
835
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
836
                      for entry in entries])
837

  
838
    # Filter with serial
839
    assert count > 3
840
    self.assert_(job.GetLogEntries(3))
841
    self.assertEqual(job.GetLogEntries(3),
842
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
843
                      for entry in entries][3:])
844

  
845
    # No log message after highest serial
846
    self.assertFalse(job.GetLogEntries(count))
847
    self.assertFalse(job.GetLogEntries(count + 3))
848

  
849

  
350 850
if __name__ == "__main__":
351 851
  testutils.GanetiTestProgram()

Also available in: Unified diff