Revision 75d81fc8

b/lib/jqueue.py
863 863

  
864 864

  
865 865
class _JobProcessor(object):
866
  (DEFER,
867
   WAITDEP,
868
   FINISHED) = range(1, 4)
869

  
866 870
  def __init__(self, queue, opexec_fn, job,
867 871
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
868 872
    """Initializes this class.
......
1050 1054
    """Continues execution of a job.
1051 1055

  
1052 1056
    @param _nextop_fn: Callback function for tests
1053
    @rtype: bool
1054
    @return: True if job is finished, False if processor needs to be called
1055
             again
1057
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1058
      be deferred and C{WAITDEP} if the dependency manager
1059
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1056 1060

  
1057 1061
    """
1058 1062
    queue = self.queue
......
1068 1072

  
1069 1073
      # Don't do anything for finalized jobs
1070 1074
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1071
        return True
1075
        return self.FINISHED
1072 1076

  
1073 1077
      # Is a previous opcode still pending?
1074 1078
      if job.cur_opctx:
......
1213 1217

  
1214 1218
        if finalize:
1215 1219
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1216
          # TODO: Check locking
1217
          queue.depmgr.NotifyWaiters(job.id)
1218
          return True
1220
          return self.FINISHED
1219 1221

  
1220 1222
      assert not waitjob or queue.depmgr.JobWaiting(job)
1221 1223

  
1222
      return bool(waitjob)
1224
      if waitjob:
1225
        return self.WAITDEP
1226
      else:
1227
        return self.DEFER
1223 1228
    finally:
1224 1229
      assert job.writable, "Job became read-only while being processed"
1225 1230
      queue.release()
......
1265 1270
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1266 1271
                                    proc.ExecOpCode)
1267 1272

  
1268
    if not _JobProcessor(queue, wrap_execop_fn, job)():
1273
    result = _JobProcessor(queue, wrap_execop_fn, job)()
1274

  
1275
    if result == _JobProcessor.FINISHED:
1276
      # Notify waiting jobs
1277
      queue.depmgr.NotifyWaiters(job.id)
1278

  
1279
    elif result == _JobProcessor.DEFER:
1269 1280
      # Schedule again
1270 1281
      raise workerpool.DeferTask(priority=job.CalcPriority())
1271 1282

  
1283
    elif result == _JobProcessor.WAITDEP:
1284
      # No-op, dependency manager will re-schedule
1285
      pass
1286

  
1287
    else:
1288
      raise errors.ProgrammerError("Job processor returned unknown status %s" %
1289
                                   (result, ))
1290

  
1272 1291
  @staticmethod
1273 1292
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1274 1293
    """Updates the worker thread name to include a short summary of the opcode.
......
1570 1589

  
1571 1590
    if restartjobs:
1572 1591
      logging.info("Restarting %s jobs", len(restartjobs))
1573
      self._EnqueueJobs(restartjobs)
1592
      self._EnqueueJobsUnlocked(restartjobs)
1574 1593

  
1575 1594
    logging.info("Job queue inspection finished")
1576 1595

  
......
2015 2034

  
2016 2035
    """
2017 2036
    (job_id, ) = self._NewSerialsUnlocked(1)
2018
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
2037
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2019 2038
    return job_id
2020 2039

  
2021 2040
  @locking.ssynchronized(_LOCK)
......
2031 2050
    (results, added_jobs) = \
2032 2051
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2033 2052

  
2034
    self._EnqueueJobs(added_jobs)
2053
    self._EnqueueJobsUnlocked(added_jobs)
2035 2054

  
2036 2055
    return results
2037 2056

  
......
2112 2131

  
2113 2132
    return (results, added_jobs)
2114 2133

  
2134
  @locking.ssynchronized(_LOCK)
2115 2135
  def _EnqueueJobs(self, jobs):
2116 2136
    """Helper function to add jobs to worker pool's queue.
2117 2137

  
......
2119 2139
    @param jobs: List of all jobs
2120 2140

  
2121 2141
    """
2142
    return self._EnqueueJobsUnlocked(jobs)
2143

  
2144
  def _EnqueueJobsUnlocked(self, jobs):
2145
    """Helper function to add jobs to worker pool's queue.
2146

  
2147
    @type jobs: list
2148
    @param jobs: List of all jobs
2149

  
2150
    """
2151
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2122 2152
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2123 2153
                             priority=[job.CalcPriority() for job in jobs])
2124 2154

  
b/test/ganeti.jqueue_unittest.py
626 626
        self.assertRaises(IndexError, queue.GetNextUpdate)
627 627
        if idx == len(ops) - 1:
628 628
          # Last opcode
629
          self.assert_(result)
629
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
630 630
        else:
631
          self.assertFalse(result)
631
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
632 632

  
633 633
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
634 634
          self.assert_(job.start_timestamp)
......
648 648
      self._GenericCheckJob(job)
649 649

  
650 650
      # Calling the processor on a finished job should be a no-op
651
      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
651
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
652
                       jqueue._JobProcessor.FINISHED)
652 653
      self.assertRaises(IndexError, queue.GetNextUpdate)
653 654

  
654 655
  def testOpcodeError(self):
......
687 688

  
688 689
        if idx in (failfrom, len(ops) - 1):
689 690
          # Last opcode
690
          self.assert_(result)
691
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
691 692
          break
692 693

  
693
        self.assertFalse(result)
694
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
694 695

  
695 696
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
696 697

  
......
726 727
      self._GenericCheckJob(job)
727 728

  
728 729
      # Calling the processor on a finished job should be a no-op
729
      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
730
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
731
                       jqueue._JobProcessor.FINISHED)
730 732
      self.assertRaises(IndexError, queue.GetNextUpdate)
731 733

  
732 734
  def testCancelWhileInQueue(self):
......
757 759

  
758 760
    # Simulate processor called in workerpool
759 761
    opexec = _FakeExecOpCodeForProc(queue, None, None)
760
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
762
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
763
                     jqueue._JobProcessor.FINISHED)
761 764

  
762 765
    # Check result
763 766
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
......
802 805
                            for op in job.ops))
803 806

  
804 807
    opexec = _FakeExecOpCodeForProc(queue, None, None)
805
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
808
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
809
                     jqueue._JobProcessor.FINISHED)
806 810

  
807 811
    # Check result
808 812
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
......
850 854
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
851 855

  
852 856
    self.assertRaises(IndexError, queue.GetNextUpdate)
853
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
857
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
858
                     jqueue._JobProcessor.FINISHED)
854 859
    self.assertEqual(queue.GetNextUpdate(), (job, True))
855 860
    self.assertRaises(IndexError, queue.GetNextUpdate)
856 861

  
......
896 901

  
897 902
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
898 903

  
899
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
904
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
905
                     jqueue._JobProcessor.FINISHED)
900 906

  
901 907
    # Check result
902 908
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
......
925 931
    opexec = _FakeExecOpCodeForProc(queue, None, None)
926 932

  
927 933
    # Run one opcode
928
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
934
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
935
                     jqueue._JobProcessor.DEFER)
929 936

  
930 937
    # Job goes back to queued
931 938
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
......
940 947
    self.assert_(success)
941 948

  
942 949
    # Try processing another opcode (this will actually cancel the job)
943
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
950
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
951
                     jqueue._JobProcessor.FINISHED)
944 952

  
945 953
    # Check result
946 954
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
......
970 978
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
971 979

  
972 980
      for _ in range(successcount):
973
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
981
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
982
                         jqueue._JobProcessor.DEFER)
974 983

  
975 984
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
976 985
      self.assertEqual(job.GetInfo(["opstatus"]),
......
1002 1011

  
1003 1012
      if remaining == 0:
1004 1013
        # Last opcode
1005
        self.assert_(result)
1014
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1006 1015
        break
1007 1016

  
1008
      self.assertFalse(result)
1017
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1009 1018

  
1010 1019
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1011 1020

  
......
1022 1031
    self._GenericCheckJob(job)
1023 1032

  
1024 1033
    # Calling the processor on a finished job should be a no-op
1025
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1034
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1035
                     jqueue._JobProcessor.FINISHED)
1026 1036
    self.assertRaises(IndexError, queue.GetNextUpdate)
1027 1037

  
1028 1038
    # ... also after being restored
1029 1039
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1030 1040
    # Calling the processor on a finished job should be a no-op
1031
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
1041
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1042
                     jqueue._JobProcessor.FINISHED)
1032 1043
    self.assertRaises(IndexError, queue.GetNextUpdate)
1033 1044

  
1034 1045
  def testProcessorOnRunningJob(self):
......
1109 1120

  
1110 1121
      if remaining == 0:
1111 1122
        # Last opcode
1112
        self.assert_(result)
1123
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1113 1124
        break
1114 1125

  
1115
      self.assertFalse(result)
1126
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1116 1127

  
1117 1128
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1118 1129

  
......
1217 1228
      self.assertRaises(IndexError, queue.GetNextUpdate)
1218 1229
      if idx == len(ops) - 1:
1219 1230
        # Last opcode
1220
        self.assert_(result)
1231
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1221 1232
      else:
1222
        self.assertFalse(result)
1233
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1223 1234

  
1224 1235
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1225 1236
        self.assert_(job.start_timestamp)
......
1244 1255
    self._GenericCheckJob(job)
1245 1256

  
1246 1257
    # Calling the processor on a finished job should be a no-op
1247
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1258
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1259
                     jqueue._JobProcessor.FINISHED)
1248 1260
    self.assertRaises(IndexError, queue.GetNextUpdate)
1249 1261

  
1250 1262
  def testJobDependency(self):
......
1331 1343

  
1332 1344
      if attempt < 5:
1333 1345
        # Simulate waiting for other job
1334
        self.assertTrue(result)
1346
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1335 1347
        self.assertTrue(job.cur_opctx)
1336 1348
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1337 1349
        self.assertRaises(IndexError, depmgr.GetNextNotification)
......
1339 1351
        self.assertFalse(job.end_timestamp)
1340 1352
        continue
1341 1353

  
1342
      if result:
1354
      if result == jqueue._JobProcessor.FINISHED:
1343 1355
        # Last opcode
1344 1356
        self.assertFalse(job.cur_opctx)
1345
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1346 1357
        break
1347 1358

  
1348 1359
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1349 1360

  
1350
      self.assertFalse(result)
1361
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1351 1362
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1352 1363
      self.assert_(job.start_timestamp)
1353 1364
      self.assertFalse(job.end_timestamp)
......
1369 1380
    self.assertFalse(depmgr.CountWaitingJobs())
1370 1381

  
1371 1382
    # Calling the processor on a finished job should be a no-op
1372
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1383
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1384
                     jqueue._JobProcessor.FINISHED)
1373 1385
    self.assertRaises(IndexError, queue.GetNextUpdate)
1374 1386

  
1375 1387
  def testJobDependencyCancel(self):
......
1449 1461

  
1450 1462
      if attempt > 0 and attempt < 4:
1451 1463
        # Simulate waiting for other job
1452
        self.assertTrue(result)
1464
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1453 1465
        self.assertTrue(job.cur_opctx)
1454 1466
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1455 1467
        self.assertRaises(IndexError, depmgr.GetNextNotification)
......
1457 1469
        self.assertFalse(job.end_timestamp)
1458 1470
        continue
1459 1471

  
1460
      if result:
1472
      if result == jqueue._JobProcessor.FINISHED:
1461 1473
        # Last opcode
1462 1474
        self.assertFalse(job.cur_opctx)
1463
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1464 1475
        break
1465 1476

  
1466 1477
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1467 1478

  
1468
      self.assertFalse(result)
1479
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1469 1480
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1470 1481
      self.assert_(job.start_timestamp)
1471 1482
      self.assertFalse(job.end_timestamp)
......
1486 1497
    self.assertFalse(depmgr.CountPendingResults())
1487 1498

  
1488 1499
    # Calling the processor on a finished job should be a no-op
1489
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1500
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1501
                     jqueue._JobProcessor.FINISHED)
1490 1502
    self.assertRaises(IndexError, queue.GetNextUpdate)
1491 1503

  
1492 1504
  def testJobDependencyWrongstatus(self):
......
1566 1578

  
1567 1579
      if attempt > 0 and attempt < 4:
1568 1580
        # Simulate waiting for other job
1569
        self.assertTrue(result)
1581
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1570 1582
        self.assertTrue(job.cur_opctx)
1571 1583
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1572 1584
        self.assertRaises(IndexError, depmgr.GetNextNotification)
......
1574 1586
        self.assertFalse(job.end_timestamp)
1575 1587
        continue
1576 1588

  
1577
      if result:
1589
      if result == jqueue._JobProcessor.FINISHED:
1578 1590
        # Last opcode
1579 1591
        self.assertFalse(job.cur_opctx)
1580
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1581 1592
        break
1582 1593

  
1583 1594
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1584 1595

  
1585
      self.assertFalse(result)
1596
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1586 1597
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1587 1598
      self.assert_(job.start_timestamp)
1588 1599
      self.assertFalse(job.end_timestamp)
......
1607 1618
    self.assertFalse(depmgr.CountPendingResults())
1608 1619

  
1609 1620
    # Calling the processor on a finished job should be a no-op
1610
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1621
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1622
                     jqueue._JobProcessor.FINISHED)
1611 1623
    self.assertRaises(IndexError, queue.GetNextUpdate)
1612 1624

  
1613 1625

  
......
1784 1796
      result = proc(_nextop_fn=self._NextOpcode)
1785 1797
      assert self.curop is not None
1786 1798

  
1787
      if result or self.gave_lock:
1799
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
1788 1800
        # Got lock and/or job is done, result must've been written
1789 1801
        self.assertFalse(job.cur_opctx)
1790 1802
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
......
1792 1804
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1793 1805
        self.assert_(job.ops[self.curop].exec_timestamp)
1794 1806

  
1795
      if result:
1807
      if result == jqueue._JobProcessor.FINISHED:
1796 1808
        self.assertFalse(job.cur_opctx)
1797 1809
        break
1798 1810

  
1799
      self.assertFalse(result)
1811
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1800 1812

  
1801 1813
      if self.curop == 0:
1802 1814
        self.assertEqual(job.ops[self.curop].start_timestamp,
......
1839 1851
                            for op in job.ops))
1840 1852

  
1841 1853
    # Calling the processor on a finished job should be a no-op
1842
    self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
1854
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
1855
                     jqueue._JobProcessor.FINISHED)
1843 1856
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1844 1857

  
1845 1858

  

Also available in: Unified diff