Revision be760ba8 test/ganeti.jqueue_unittest.py

b/test/ganeti.jqueue_unittest.py
347 347
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
348 348

  
349 349

  
350
class _FakeQueueForProc:
351
  def __init__(self):
352
    self._acquired = False
353

  
354
  def IsAcquired(self):
355
    return self._acquired
356

  
357
  def acquire(self, shared=0):
358
    assert shared == 1
359
    self._acquired = True
360

  
361
  def release(self):
362
    assert self._acquired
363
    self._acquired = False
364

  
365
  def UpdateJobUnlocked(self, job, replicate=None):
366
    # TODO: Ensure job is updated at the correct places
367
    pass
368

  
369

  
370
class _FakeExecOpCodeForProc:
371
  def __init__(self, before_start, after_start):
372
    self._before_start = before_start
373
    self._after_start = after_start
374

  
375
  def __call__(self, op, cbs):
376
    assert isinstance(op, opcodes.OpTestDummy)
377

  
378
    if self._before_start:
379
      self._before_start()
380

  
381
    cbs.NotifyStart()
382

  
383
    if self._after_start:
384
      self._after_start(op, cbs)
385

  
386
    if op.fail:
387
      raise errors.OpExecError("Error requested (%s)" % op.result)
388

  
389
    return op.result
390

  
391

  
392
class TestJobProcessor(unittest.TestCase):
393
  def _CreateJob(self, queue, job_id, ops):
394
    job = jqueue._QueuedJob(queue, job_id, ops)
395
    self.assertFalse(job.start_timestamp)
396
    self.assertFalse(job.end_timestamp)
397
    self.assertEqual(len(ops), len(job.ops))
398
    self.assert_(compat.all(op.input == inp
399
                            for (op, inp) in zip(job.ops, ops)))
400
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
401
    return job
402

  
403
  def _GenericCheckJob(self, job):
404
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
405
                      for op in job.ops)
406

  
407
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
408
                     [[op.start_timestamp for op in job.ops],
409
                      [op.exec_timestamp for op in job.ops],
410
                      [op.end_timestamp for op in job.ops]])
411
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
412
                     [job.received_timestamp,
413
                      job.start_timestamp,
414
                      job.end_timestamp])
415
    self.assert_(job.start_timestamp)
416
    self.assert_(job.end_timestamp)
417
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
418

  
419
  def testSuccess(self):
420
    queue = _FakeQueueForProc()
421

  
422
    for (job_id, opcount) in [(25351, 1), (6637, 3),
423
                              (24644, 10), (32207, 100)]:
424
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
425
             for i in range(opcount)]
426

  
427
      # Create job
428
      job = self._CreateJob(queue, job_id, ops)
429

  
430
      def _BeforeStart():
431
        self.assertFalse(queue.IsAcquired())
432
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
433

  
434
      def _AfterStart(op, cbs):
435
        self.assertFalse(queue.IsAcquired())
436
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
437

  
438
        # Job is running, cancelling shouldn't be possible
439
        (success, _) = job.Cancel()
440
        self.assertFalse(success)
441

  
442
      opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
443

  
444
      for idx in range(len(ops)):
445
        result = jqueue._JobProcessor(queue, opexec, job)()
446
        if idx == len(ops) - 1:
447
          # Last opcode
448
          self.assert_(result)
449
        else:
450
          self.assertFalse(result)
451

  
452
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
453
          self.assert_(job.start_timestamp)
454
          self.assertFalse(job.end_timestamp)
455

  
456
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
457
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
458
      self.assertEqual(job.GetInfo(["opresult"]),
459
                       [[op.input.result for op in job.ops]])
460
      self.assertEqual(job.GetInfo(["opstatus"]),
461
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
462
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
463
                              for op in job.ops))
464

  
465
      self._GenericCheckJob(job)
466

  
467
      # Finished jobs can't be processed any further
468
      self.assertRaises(errors.ProgrammerError,
469
                        jqueue._JobProcessor(queue, opexec, job))
470

  
471
  def testOpcodeError(self):
472
    queue = _FakeQueueForProc()
473

  
474
    testdata = [
475
      (17077, 1, 0, 0),
476
      (1782, 5, 2, 2),
477
      (18179, 10, 9, 9),
478
      (4744, 10, 3, 8),
479
      (23816, 100, 39, 45),
480
      ]
481

  
482
    for (job_id, opcount, failfrom, failto) in testdata:
483
      # Prepare opcodes
484
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
485
                                 fail=(failfrom <= i and
486
                                       i <= failto))
487
             for i in range(opcount)]
488

  
489
      # Create job
490
      job = self._CreateJob(queue, job_id, ops)
491

  
492
      opexec = _FakeExecOpCodeForProc(None, None)
493

  
494
      for idx in range(len(ops)):
495
        result = jqueue._JobProcessor(queue, opexec, job)()
496

  
497
        if idx in (failfrom, len(ops) - 1):
498
          # Last opcode
499
          self.assert_(result)
500
          break
501

  
502
        self.assertFalse(result)
503

  
504
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
505

  
506
      # Check job status
507
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
508
      self.assertEqual(job.GetInfo(["id"]), [job_id])
509
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
510

  
511
      # Check opcode status
512
      data = zip(job.ops,
513
                 job.GetInfo(["opstatus"])[0],
514
                 job.GetInfo(["opresult"])[0])
515

  
516
      for idx, (op, opstatus, opresult) in enumerate(data):
517
        if idx < failfrom:
518
          assert not op.input.fail
519
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
520
          self.assertEqual(opresult, op.input.result)
521
        elif idx <= failto:
522
          assert op.input.fail
523
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
524
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
525
        else:
526
          assert not op.input.fail
527
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
528
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
529

  
530
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
531
                              for op in job.ops[:failfrom]))
532

  
533
      self._GenericCheckJob(job)
534

  
535
      # Finished jobs can't be processed any further
536
      self.assertRaises(errors.ProgrammerError,
537
                        jqueue._JobProcessor(queue, opexec, job))
538

  
539
  def testCancelWhileInQueue(self):
540
    queue = _FakeQueueForProc()
541

  
542
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
543
           for i in range(5)]
544

  
545
    # Create job
546
    job_id = 17045
547
    job = self._CreateJob(queue, job_id, ops)
548

  
549
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
550

  
551
    # Mark as cancelled
552
    (success, _) = job.Cancel()
553
    self.assert_(success)
554

  
555
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
556
                            for op in job.ops))
557

  
558
    opexec = _FakeExecOpCodeForProc(None, None)
559
    jqueue._JobProcessor(queue, opexec, job)()
560

  
561
    # Check result
562
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
563
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
564
    self.assertFalse(job.start_timestamp)
565
    self.assert_(job.end_timestamp)
566
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
567
                                for op in job.ops))
568
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
569
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
570
                      ["Job canceled by request" for _ in job.ops]])
571

  
572
  def testCancelWhileWaitlock(self):
573
    queue = _FakeQueueForProc()
574

  
575
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
576
           for i in range(5)]
577

  
578
    # Create job
579
    job_id = 11009
580
    job = self._CreateJob(queue, job_id, ops)
581

  
582
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
583

  
584
    def _BeforeStart():
585
      self.assertFalse(queue.IsAcquired())
586
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
587

  
588
      # Mark as cancelled
589
      (success, _) = job.Cancel()
590
      self.assert_(success)
591

  
592
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
593
                              for op in job.ops))
594

  
595
    def _AfterStart(op, cbs):
596
      self.assertFalse(queue.IsAcquired())
597
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
598

  
599
    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
600

  
601
    jqueue._JobProcessor(queue, opexec, job)()
602

  
603
    # Check result
604
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
605
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
606
    self.assert_(job.start_timestamp)
607
    self.assert_(job.end_timestamp)
608
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
609
                                for op in job.ops))
610
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
611
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
612
                      ["Job canceled by request" for _ in job.ops]])
613

  
614
  def testCancelWhileRunning(self):
615
    # Tests canceling a job with finished opcodes and more, unprocessed ones
616
    queue = _FakeQueueForProc()
617

  
618
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
619
           for i in range(3)]
620

  
621
    # Create job
622
    job_id = 28492
623
    job = self._CreateJob(queue, job_id, ops)
624

  
625
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
626

  
627
    opexec = _FakeExecOpCodeForProc(None, None)
628

  
629
    # Run one opcode
630
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
631

  
632
    # Job goes back to queued
633
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
634
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
635
                     [[constants.OP_STATUS_SUCCESS,
636
                       constants.OP_STATUS_QUEUED,
637
                       constants.OP_STATUS_QUEUED],
638
                      ["Res0", None, None]])
639

  
640
    # Mark as cancelled
641
    (success, _) = job.Cancel()
642
    self.assert_(success)
643

  
644
    # Try processing another opcode (this will actually cancel the job)
645
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
646

  
647
    # Check result
648
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
649
    self.assertEqual(job.GetInfo(["id"]), [job_id])
650
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
651
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
652
                     [[constants.OP_STATUS_SUCCESS,
653
                       constants.OP_STATUS_CANCELED,
654
                       constants.OP_STATUS_CANCELED],
655
                      ["Res0", "Job canceled by request",
656
                       "Job canceled by request"]])
657

  
658
  def testPartiallyRun(self):
659
    # Tests calling the processor on a job that's been partially run before the
660
    # program was restarted
661
    queue = _FakeQueueForProc()
662

  
663
    opexec = _FakeExecOpCodeForProc(None, None)
664

  
665
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
666
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
667
             for i in range(10)]
668

  
669
      # Create job
670
      job = self._CreateJob(queue, job_id, ops)
671

  
672
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
673

  
674
      for _ in range(successcount):
675
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
676

  
677
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
678
      self.assertEqual(job.GetInfo(["opstatus"]),
679
                       [[constants.OP_STATUS_SUCCESS
680
                         for _ in range(successcount)] +
681
                        [constants.OP_STATUS_QUEUED
682
                         for _ in range(len(ops) - successcount)]])
683

  
684
      self.assert_(job.current_op)
685

  
686
      # Serialize and restore (simulates program restart)
687
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
688
      self.assertFalse(newjob.current_op)
689
      self._TestPartial(newjob, successcount)
690

  
691
  def _TestPartial(self, job, successcount):
692
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
693
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
694

  
695
    queue = _FakeQueueForProc()
696
    opexec = _FakeExecOpCodeForProc(None, None)
697

  
698
    for remaining in reversed(range(len(job.ops) - successcount)):
699
      result = jqueue._JobProcessor(queue, opexec, job)()
700

  
701
      if remaining == 0:
702
        # Last opcode
703
        self.assert_(result)
704
        break
705

  
706
      self.assertFalse(result)
707

  
708
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
709

  
710
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
711
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
712
    self.assertEqual(job.GetInfo(["opresult"]),
713
                     [[op.input.result for op in job.ops]])
714
    self.assertEqual(job.GetInfo(["opstatus"]),
715
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
716
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
717
                            for op in job.ops))
718

  
719
    self._GenericCheckJob(job)
720

  
721
    # Finished jobs can't be processed any further
722
    self.assertRaises(errors.ProgrammerError,
723
                      jqueue._JobProcessor(queue, opexec, job))
724

  
725
    # ... also after being restored
726
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
727
    self.assertRaises(errors.ProgrammerError,
728
                      jqueue._JobProcessor(queue, opexec, job2))
729

  
730
  def testProcessorOnRunningJob(self):
731
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
732

  
733
    queue = _FakeQueueForProc()
734
    opexec = _FakeExecOpCodeForProc(None, None)
735

  
736
    # Create job
737
    job = self._CreateJob(queue, 9571, ops)
738

  
739
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
740

  
741
    job.ops[0].status = constants.OP_STATUS_RUNNING
742

  
743
    assert len(job.ops) == 1
744

  
745
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
746

  
747
    # Calling on running job must fail
748
    self.assertRaises(errors.ProgrammerError,
749
                      jqueue._JobProcessor(queue, opexec, job))
750

  
751
  def testLogMessages(self):
752
    # Tests the "Feedback" callback function
753
    queue = _FakeQueueForProc()
754

  
755
    messages = {
756
      1: [
757
        (None, "Hello"),
758
        (None, "World"),
759
        (constants.ELOG_MESSAGE, "there"),
760
        ],
761
      4: [
762
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
763
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
764
        ],
765
      }
766
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
767
                               messages=messages.get(i, []))
768
           for i in range(5)]
769

  
770
    # Create job
771
    job = self._CreateJob(queue, 29386, ops)
772

  
773
    def _BeforeStart():
774
      self.assertFalse(queue.IsAcquired())
775
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
776

  
777
    def _AfterStart(op, cbs):
778
      self.assertFalse(queue.IsAcquired())
779
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
780

  
781
      self.assertRaises(AssertionError, cbs.Feedback,
782
                        "too", "many", "arguments")
783

  
784
      for (log_type, msg) in op.messages:
785
        if log_type:
786
          cbs.Feedback(log_type, msg)
787
        else:
788
          cbs.Feedback(msg)
789

  
790
    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
791

  
792
    for remaining in reversed(range(len(job.ops))):
793
      result = jqueue._JobProcessor(queue, opexec, job)()
794

  
795
      if remaining == 0:
796
        # Last opcode
797
        self.assert_(result)
798
        break
799

  
800
      self.assertFalse(result)
801

  
802
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
803

  
804
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
805
    self.assertEqual(job.GetInfo(["opresult"]),
806
                     [[op.input.result for op in job.ops]])
807

  
808
    logmsgcount = sum(len(m) for m in messages.values())
809

  
810
    self._CheckLogMessages(job, logmsgcount)
811

  
812
    # Serialize and restore (simulates program restart)
813
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
814
    self._CheckLogMessages(newjob, logmsgcount)
815

  
816
    # Check each message
817
    prevserial = -1
818
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
819
      for (serial, timestamp, log_type, msg) in oplog:
820
        (exptype, expmsg) = messages.get(idx).pop(0)
821
        if exptype:
822
          self.assertEqual(log_type, exptype)
823
        else:
824
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
825
        self.assertEqual(expmsg, msg)
826
        self.assert_(serial > prevserial)
827
        prevserial = serial
828

  
829
  def _CheckLogMessages(self, job, count):
830
    # Check serial
831
    self.assertEqual(job.log_serial, count)
832

  
833
    # No filter
834
    self.assertEqual(job.GetLogEntries(None),
835
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
836
                      for entry in entries])
837

  
838
    # Filter with serial
839
    assert count > 3
840
    self.assert_(job.GetLogEntries(3))
841
    self.assertEqual(job.GetLogEntries(3),
842
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
843
                      for entry in entries][3:])
844

  
845
    # No log message after highest serial
846
    self.assertFalse(job.GetLogEntries(count))
847
    self.assertFalse(job.GetLogEntries(count + 3))
848

  
849

  
350 850
if __name__ == "__main__":
351 851
  testutils.GanetiTestProgram()

Also available in: Unified diff