Revision b95479a5

b/lib/jqueue.py
1 1
#
2 2
#
3 3

  
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 5
#
6 6
# This program is free software; you can redistribute it and/or modify
7 7
# it under the terms of the GNU General Public License as published by
......
34 34
import re
35 35
import time
36 36
import weakref
37
import threading
37 38

  
38 39
try:
39 40
  # pylint: disable-msg=E0611
......
55 56
from ganeti import runtime
56 57
from ganeti import netutils
57 58
from ganeti import compat
59
from ganeti import ht
58 60

  
59 61

  
60 62
JOBQUEUE_THREADS = 25
......
177 179
  # pylint: disable-msg=W0212
178 180
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
179 181
               "received_timestamp", "start_timestamp", "end_timestamp",
180
               "__weakref__"]
182
               "__weakref__", "processor_lock"]
181 183

  
182 184
  def __init__(self, queue, job_id, ops):
183 185
    """Constructor for the _QueuedJob.
......
211 213
    """
212 214
    obj.ops_iter = None
213 215
    obj.cur_opctx = None
216
    obj.processor_lock = threading.Lock()
214 217

  
215 218
  def __repr__(self):
216 219
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
......
804 807
    self.log_prefix = log_prefix
805 808
    self.summary = op.input.Summary()
806 809

  
810
    # Create local copy to modify
811
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
812
      self.jobdeps = op.input.depends[:]
813
    else:
814
      self.jobdeps = None
815

  
807 816
    self._timeout_strategy_factory = timeout_strategy_factory
808 817
    self._ResetTimeoutStrategy()
809 818

  
......
927 936

  
928 937
    return update
929 938

  
939
  @staticmethod
940
  def _CheckDependencies(queue, job, opctx):
941
    """Checks if an opcode has dependencies and if so, processes them.
942

  
943
    @type queue: L{JobQueue}
944
    @param queue: Queue object
945
    @type job: L{_QueuedJob}
946
    @param job: Job object
947
    @type opctx: L{_OpExecContext}
948
    @param opctx: Opcode execution context
949
    @rtype: bool
950
    @return: Whether opcode will be re-scheduled by dependency tracker
951

  
952
    """
953
    op = opctx.op
954

  
955
    result = False
956

  
957
    while opctx.jobdeps:
958
      (dep_job_id, dep_status) = opctx.jobdeps[0]
959

  
960
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
961
                                                          dep_status)
962
      assert ht.TNonEmptyString(depmsg), "No dependency message"
963

  
964
      logging.info("%s: %s", opctx.log_prefix, depmsg)
965

  
966
      if depresult == _JobDependencyManager.CONTINUE:
967
        # Remove dependency and continue
968
        opctx.jobdeps.pop(0)
969

  
970
      elif depresult == _JobDependencyManager.WAIT:
971
        # Need to wait for notification, dependency tracker will re-add job
972
        # to workerpool
973
        result = True
974
        break
975

  
976
      elif depresult == _JobDependencyManager.CANCEL:
977
        # Job was cancelled, cancel this job as well
978
        job.Cancel()
979
        assert op.status == constants.OP_STATUS_CANCELING
980
        break
981

  
982
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
983
                         _JobDependencyManager.ERROR):
984
        # Job failed or there was an error, this job must fail
985
        op.status = constants.OP_STATUS_ERROR
986
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
987
        break
988

  
989
      else:
990
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
991
                                     depresult)
992

  
993
    return result
994

  
930 995
  def _ExecOpCodeUnlocked(self, opctx):
931 996
    """Processes one opcode and returns the result.
932 997

  
......
1013 1078
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1014 1079
              op.priority >= constants.OP_PRIO_HIGHEST)
1015 1080

  
1081
      waitjob = None
1082

  
1016 1083
      if op.status != constants.OP_STATUS_CANCELING:
1017 1084
        assert op.status in (constants.OP_STATUS_QUEUED,
1018 1085
                             constants.OP_STATUS_WAITLOCK)
......
1025 1092
        assert op.status == constants.OP_STATUS_WAITLOCK
1026 1093
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1027 1094
        assert job.start_timestamp and op.start_timestamp
1095
        assert waitjob is None
1096

  
1097
        # Check if waiting for a job is necessary
1098
        waitjob = self._CheckDependencies(queue, job, opctx)
1028 1099

  
1029
        logging.info("%s: opcode %s waiting for locks",
1030
                     opctx.log_prefix, opctx.summary)
1100
        assert op.status in (constants.OP_STATUS_WAITLOCK,
1101
                             constants.OP_STATUS_CANCELING,
1102
                             constants.OP_STATUS_ERROR)
1031 1103

  
1032
        queue.release()
1033
        try:
1034
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1035
        finally:
1036
          queue.acquire(shared=1)
1104
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1105
                                         constants.OP_STATUS_ERROR)):
1106
          logging.info("%s: opcode %s waiting for locks",
1107
                       opctx.log_prefix, opctx.summary)
1037 1108

  
1038
        op.status = op_status
1039
        op.result = op_result
1109
          assert not opctx.jobdeps, "Not all dependencies were removed"
1110

  
1111
          queue.release()
1112
          try:
1113
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1114
          finally:
1115
            queue.acquire(shared=1)
1116

  
1117
          op.status = op_status
1118
          op.result = op_result
1119

  
1120
          assert not waitjob
1040 1121

  
1041 1122
        if op.status == constants.OP_STATUS_WAITLOCK:
1042 1123
          # Couldn't get locks in time
......
1051 1132
          else:
1052 1133
            assert op.status in constants.OPS_FINALIZED
1053 1134

  
1054
      if op.status == constants.OP_STATUS_WAITLOCK:
1135
      if op.status == constants.OP_STATUS_WAITLOCK or waitjob:
1055 1136
        finalize = False
1056 1137

  
1057
        if opctx.CheckPriorityIncrease():
1138
        if not waitjob and opctx.CheckPriorityIncrease():
1058 1139
          # Priority was changed, need to update on-disk file
1059 1140
          queue.UpdateJobUnlocked(job)
1060 1141

  
......
1113 1194
        # allowed. Once the file has been written, it can be archived anytime.
1114 1195
        queue.UpdateJobUnlocked(job)
1115 1196

  
1197
        assert not waitjob
1198

  
1116 1199
        if finalize:
1117 1200
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1201
          # TODO: Check locking
1202
          queue.depmgr.NotifyWaiters(job.id)
1118 1203
          return True
1119 1204

  
1120
      return False
1205
      assert not waitjob or queue.depmgr.JobWaiting(job)
1206

  
1207
      return bool(waitjob)
1121 1208
    finally:
1122 1209
      queue.release()
1123 1210

  
......
1129 1216
  def RunTask(self, job): # pylint: disable-msg=W0221
1130 1217
    """Job executor.
1131 1218

  
1132
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
1133
    L{_QueuedOpCode} classes.
1134

  
1135 1219
    @type job: L{_QueuedJob}
1136 1220
    @param job: the job to be processed
1137 1221

  
1138 1222
    """
1223
    # Ensure only one worker is active on a single job. If a job registers for
1224
    # a dependency job, and the other job notifies before the first worker is
1225
    # done, the job can end up in the tasklist more than once.
1226
    job.processor_lock.acquire()
1227
    try:
1228
      return self._RunTaskInner(job)
1229
    finally:
1230
      job.processor_lock.release()
1231

  
1232
  def _RunTaskInner(self, job):
1233
    """Executes a job.
1234

  
1235
    Must be called with per-job lock acquired.
1236

  
1237
    """
1139 1238
    queue = job.queue
1140 1239
    assert queue == self.pool.queue
1141 1240

  
......
1194 1293
    self.queue = queue
1195 1294

  
1196 1295

  
1296
class _JobDependencyManager:
1297
  """Keeps track of job dependencies.
1298

  
1299
  """
1300
  (WAIT,
1301
   ERROR,
1302
   CANCEL,
1303
   CONTINUE,
1304
   WRONGSTATUS) = range(1, 6)
1305

  
1306
  # TODO: Export waiter information to lock monitor
1307

  
1308
  def __init__(self, getstatus_fn, enqueue_fn):
1309
    """Initializes this class.
1310

  
1311
    """
1312
    self._getstatus_fn = getstatus_fn
1313
    self._enqueue_fn = enqueue_fn
1314

  
1315
    self._waiters = {}
1316
    self._lock = locking.SharedLock("JobDepMgr")
1317

  
1318
  @locking.ssynchronized(_LOCK, shared=1)
1319
  def JobWaiting(self, job):
1320
    """Checks if a job is waiting.
1321

  
1322
    """
1323
    return compat.any(job in jobs
1324
                      for jobs in self._waiters.values())
1325

  
1326
  @locking.ssynchronized(_LOCK)
1327
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1328
    """Checks if a dependency job has the requested status.
1329

  
1330
    If the other job is not yet in a finalized status, the calling job will be
1331
    notified (re-added to the workerpool) at a later point.
1332

  
1333
    @type job: L{_QueuedJob}
1334
    @param job: Job object
1335
    @type dep_job_id: string
1336
    @param dep_job_id: ID of dependency job
1337
    @type dep_status: list
1338
    @param dep_status: Required status
1339

  
1340
    """
1341
    assert ht.TString(job.id)
1342
    assert ht.TString(dep_job_id)
1343
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1344

  
1345
    if job.id == dep_job_id:
1346
      return (self.ERROR, "Job can't depend on itself")
1347

  
1348
    # Get status of dependency job
1349
    try:
1350
      status = self._getstatus_fn(dep_job_id)
1351
    except errors.JobLost, err:
1352
      return (self.ERROR, "Dependency error: %s" % err)
1353

  
1354
    assert status in constants.JOB_STATUS_ALL
1355

  
1356
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1357

  
1358
    if status not in constants.JOBS_FINALIZED:
1359
      # Register for notification and wait for job to finish
1360
      job_id_waiters.add(job)
1361
      return (self.WAIT,
1362
              "Need to wait for job %s, wanted status '%s'" %
1363
              (dep_job_id, dep_status))
1364

  
1365
    # Remove from waiters list
1366
    if job in job_id_waiters:
1367
      job_id_waiters.remove(job)
1368

  
1369
    if (status == constants.JOB_STATUS_CANCELED and
1370
        constants.JOB_STATUS_CANCELED not in dep_status):
1371
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1372

  
1373
    elif not dep_status or status in dep_status:
1374
      return (self.CONTINUE,
1375
              "Dependency job %s finished with status '%s'" %
1376
              (dep_job_id, status))
1377

  
1378
    else:
1379
      return (self.WRONGSTATUS,
1380
              "Dependency job %s finished with status '%s',"
1381
              " not one of '%s' as required" %
1382
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1383

  
1384
  @locking.ssynchronized(_LOCK)
1385
  def NotifyWaiters(self, job_id):
1386
    """Notifies all jobs waiting for a certain job ID.
1387

  
1388
    @type job_id: string
1389
    @param job_id: Job ID
1390

  
1391
    """
1392
    assert ht.TString(job_id)
1393

  
1394
    jobs = self._waiters.pop(job_id, None)
1395
    if jobs:
1396
      # Re-add jobs to workerpool
1397
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1398
                    len(jobs), job_id)
1399
      self._enqueue_fn(jobs)
1400

  
1401
    # Remove all jobs without actual waiters
1402
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1403
                   if not waiters]:
1404
      del self._waiters[job_id]
1405

  
1406

  
1197 1407
def _RequireOpenQueue(fn):
1198 1408
  """Decorator for "public" functions.
1199 1409

  
......
1277 1487
    self._UpdateQueueSizeUnlocked()
1278 1488
    self._drained = jstore.CheckDrainFlag()
1279 1489

  
1490
    # Job dependencies
1491
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1492
                                        self._EnqueueJobs)
1493

  
1280 1494
    # Setup worker pool
1281 1495
    self._wpool = _JobQueueWorkerPool(self)
1282 1496
    try:
......
1808 2022
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1809 2023
                             priority=[job.CalcPriority() for job in jobs])
1810 2024

  
2025
  def _GetJobStatusForDependencies(self, job_id):
2026
    """Gets the status of a job for dependencies.
2027

  
2028
    @type job_id: string
2029
    @param job_id: Job ID
2030
    @raise errors.JobLost: If job can't be found
2031

  
2032
    """
2033
    if not isinstance(job_id, basestring):
2034
      job_id = self._FormatJobID(job_id)
2035

  
2036
    # Not using in-memory cache as doing so would require an exclusive lock
2037

  
2038
    # Try to load from disk
2039
    job = self.SafeLoadJobFromDisk(job_id, True)
2040

  
2041
    if job:
2042
      return job.CalcStatus()
2043

  
2044
    raise errors.JobLost("Job %s not found" % job_id)
2045

  
1811 2046
  @_RequireOpenQueue
1812 2047
  def UpdateJobUnlocked(self, job, replicate=True):
1813 2048
    """Update a job's on disk storage.
b/lib/opcodes.py
151 151
  "INSTANCE_": "I_",
152 152
  }
153 153

  
154
#: Attribute name for dependencies
155
DEPEND_ATTR = "depends"
156

  
154 157

  
155 158
def _NameToId(name):
156 159
  """Convert an opcode class name to an OP_ID.
......
422 425
    ("debug_level", None, ht.TOr(ht.TNone, ht.TPositiveInt), "Debug level"),
423 426
    ("priority", constants.OP_PRIO_DEFAULT,
424 427
     ht.TElemOf(constants.OP_PRIO_SUBMIT_VALID), "Opcode priority"),
425
    ("depends", None, ht.TOr(ht.TNone, ht.TListOf(_T_JOB_DEP)),
428
    (DEPEND_ATTR, None, ht.TOr(ht.TNone, ht.TListOf(_T_JOB_DEP)),
426 429
     "Job dependencies"),
427 430
    ]
428 431

  
b/test/ganeti.jqueue_unittest.py
424 424
        self.assertEqual(job.CalcStatus(), status)
425 425

  
426 426

  
427
class _FakeQueueForProc:
427
class _FakeDependencyManager:
428 428
  def __init__(self):
429
    self._checks = []
430
    self._notifications = []
431
    self._waiting = set()
432

  
433
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
434
    self._checks.append((job, dep_job_id, dep_status, result))
435

  
436
  def CountPendingResults(self):
437
    return len(self._checks)
438

  
439
  def CountWaitingJobs(self):
440
    return len(self._waiting)
441

  
442
  def GetNextNotification(self):
443
    return self._notifications.pop(0)
444

  
445
  def JobWaiting(self, job):
446
    return job in self._waiting
447

  
448
  def CheckAndRegister(self, job, dep_job_id, dep_status):
449
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
450

  
451
    assert exp_job == job
452
    assert exp_dep_job_id == dep_job_id
453
    assert exp_dep_status == dep_status
454

  
455
    (result_status, _) = result
456

  
457
    if result_status == jqueue._JobDependencyManager.WAIT:
458
      self._waiting.add(job)
459
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
460
      self._waiting.remove(job)
461

  
462
    return result
463

  
464
  def NotifyWaiters(self, job_id):
465
    self._notifications.append(job_id)
466

  
467

  
468
class _DisabledFakeDependencyManager:
469
  def JobWaiting(self, _):
470
    return False
471

  
472
  def CheckAndRegister(self, *args):
473
    assert False, "Should not be called"
474

  
475
  def NotifyWaiters(self, _):
476
    pass
477

  
478

  
479
class _FakeQueueForProc:
480
  def __init__(self, depmgr=None):
429 481
    self._acquired = False
430 482
    self._updates = []
431 483
    self._submitted = []
432 484

  
433 485
    self._submit_count = itertools.count(1000)
434 486

  
487
    if depmgr:
488
      self.depmgr = depmgr
489
    else:
490
      self.depmgr = _DisabledFakeDependencyManager()
491

  
435 492
  def IsAcquired(self):
436 493
    return self._acquired
437 494

  
......
960 1017

  
961 1018
    # ... also after being restored
962 1019
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
1020
    # Calling the processor on a finished job should be a no-op
963 1021
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
964 1022
    self.assertRaises(IndexError, queue.GetNextUpdate)
965 1023

  
......
1179 1237
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1180 1238
    self.assertRaises(IndexError, queue.GetNextUpdate)
1181 1239

  
1240
  def testJobDependency(self):
1241
    depmgr = _FakeDependencyManager()
1242
    queue = _FakeQueueForProc(depmgr=depmgr)
1243

  
1244
    self.assertEqual(queue.depmgr, depmgr)
1245

  
1246
    prev_job_id = 22113
1247
    prev_job_id2 = 28102
1248
    job_id = 29929
1249
    ops = [
1250
      opcodes.OpTestDummy(result="Res0", fail=False,
1251
                          depends=[
1252
                            [prev_job_id2, None],
1253
                            [prev_job_id, None],
1254
                            ]),
1255
      opcodes.OpTestDummy(result="Res1", fail=False),
1256
      ]
1257

  
1258
    # Create job
1259
    job = self._CreateJob(queue, job_id, ops)
1260

  
1261
    def _BeforeStart(timeout, priority):
1262
      if attempt == 0 or attempt > 5:
1263
        # Job should only be updated when it wasn't waiting for another job
1264
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1265
      self.assertRaises(IndexError, queue.GetNextUpdate)
1266
      self.assertFalse(queue.IsAcquired())
1267
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1268
      self.assertFalse(job.cur_opctx)
1269

  
1270
    def _AfterStart(op, cbs):
1271
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1272
      self.assertRaises(IndexError, queue.GetNextUpdate)
1273

  
1274
      self.assertFalse(queue.IsAcquired())
1275
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1276
      self.assertFalse(job.cur_opctx)
1277

  
1278
      # Job is running, cancelling shouldn't be possible
1279
      (success, _) = job.Cancel()
1280
      self.assertFalse(success)
1281

  
1282
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1283

  
1284
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1285

  
1286
    counter = itertools.count()
1287
    while True:
1288
      attempt = counter.next()
1289

  
1290
      self.assertRaises(IndexError, queue.GetNextUpdate)
1291
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1292

  
1293
      if attempt < 2:
1294
        depmgr.AddCheckResult(job, prev_job_id2, None,
1295
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1296
      elif attempt == 2:
1297
        depmgr.AddCheckResult(job, prev_job_id2, None,
1298
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1299
        # The processor will ask for the next dependency immediately
1300
        depmgr.AddCheckResult(job, prev_job_id, None,
1301
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1302
      elif attempt < 5:
1303
        depmgr.AddCheckResult(job, prev_job_id, None,
1304
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1305
      elif attempt == 5:
1306
        depmgr.AddCheckResult(job, prev_job_id, None,
1307
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1308
      if attempt == 2:
1309
        self.assertEqual(depmgr.CountPendingResults(), 2)
1310
      elif attempt > 5:
1311
        self.assertEqual(depmgr.CountPendingResults(), 0)
1312
      else:
1313
        self.assertEqual(depmgr.CountPendingResults(), 1)
1314

  
1315
      result = jqueue._JobProcessor(queue, opexec, job)()
1316
      if attempt == 0 or attempt >= 5:
1317
        # Job should only be updated if there was an actual change
1318
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1319
      self.assertRaises(IndexError, queue.GetNextUpdate)
1320
      self.assertFalse(depmgr.CountPendingResults())
1321

  
1322
      if attempt < 5:
1323
        # Simulate waiting for other job
1324
        self.assertTrue(result)
1325
        self.assertTrue(job.cur_opctx)
1326
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1327
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1328
        self.assert_(job.start_timestamp)
1329
        self.assertFalse(job.end_timestamp)
1330
        continue
1331

  
1332
      if result:
1333
        # Last opcode
1334
        self.assertFalse(job.cur_opctx)
1335
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1336
        break
1337

  
1338
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1339

  
1340
      self.assertFalse(result)
1341
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1342
      self.assert_(job.start_timestamp)
1343
      self.assertFalse(job.end_timestamp)
1344

  
1345
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1346
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1347
    self.assertEqual(job.GetInfo(["opresult"]),
1348
                     [[op.input.result for op in job.ops]])
1349
    self.assertEqual(job.GetInfo(["opstatus"]),
1350
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1351
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1352
                               for op in job.ops))
1353

  
1354
    self._GenericCheckJob(job)
1355

  
1356
    self.assertRaises(IndexError, queue.GetNextUpdate)
1357
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1358
    self.assertFalse(depmgr.CountPendingResults())
1359
    self.assertFalse(depmgr.CountWaitingJobs())
1360

  
1361
    # Calling the processor on a finished job should be a no-op
1362
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1363
    self.assertRaises(IndexError, queue.GetNextUpdate)
1364

  
1365
  def testJobDependencyCancel(self):
1366
    depmgr = _FakeDependencyManager()
1367
    queue = _FakeQueueForProc(depmgr=depmgr)
1368

  
1369
    self.assertEqual(queue.depmgr, depmgr)
1370

  
1371
    prev_job_id = 13623
1372
    job_id = 30876
1373
    ops = [
1374
      opcodes.OpTestDummy(result="Res0", fail=False),
1375
      opcodes.OpTestDummy(result="Res1", fail=False,
1376
                          depends=[
1377
                            [prev_job_id, None],
1378
                            ]),
1379
      opcodes.OpTestDummy(result="Res2", fail=False),
1380
      ]
1381

  
1382
    # Create job
1383
    job = self._CreateJob(queue, job_id, ops)
1384

  
1385
    def _BeforeStart(timeout, priority):
1386
      if attempt == 0 or attempt > 5:
1387
        # Job should only be updated when it wasn't waiting for another job
1388
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1389
      self.assertRaises(IndexError, queue.GetNextUpdate)
1390
      self.assertFalse(queue.IsAcquired())
1391
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1392
      self.assertFalse(job.cur_opctx)
1393

  
1394
    def _AfterStart(op, cbs):
1395
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1396
      self.assertRaises(IndexError, queue.GetNextUpdate)
1397

  
1398
      self.assertFalse(queue.IsAcquired())
1399
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1400
      self.assertFalse(job.cur_opctx)
1401

  
1402
      # Job is running, cancelling shouldn't be possible
1403
      (success, _) = job.Cancel()
1404
      self.assertFalse(success)
1405

  
1406
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1407

  
1408
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1409

  
1410
    counter = itertools.count()
1411
    while True:
1412
      attempt = counter.next()
1413

  
1414
      self.assertRaises(IndexError, queue.GetNextUpdate)
1415
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1416

  
1417
      if attempt == 0:
1418
        # This will handle the first opcode
1419
        pass
1420
      elif attempt < 4:
1421
        depmgr.AddCheckResult(job, prev_job_id, None,
1422
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1423
      elif attempt == 4:
1424
        # Other job was cancelled
1425
        depmgr.AddCheckResult(job, prev_job_id, None,
1426
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1427

  
1428
      if attempt == 0:
1429
        self.assertEqual(depmgr.CountPendingResults(), 0)
1430
      else:
1431
        self.assertEqual(depmgr.CountPendingResults(), 1)
1432

  
1433
      result = jqueue._JobProcessor(queue, opexec, job)()
1434
      if attempt <= 1 or attempt >= 4:
1435
        # Job should only be updated if there was an actual change
1436
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1437
      self.assertRaises(IndexError, queue.GetNextUpdate)
1438
      self.assertFalse(depmgr.CountPendingResults())
1439

  
1440
      if attempt > 0 and attempt < 4:
1441
        # Simulate waiting for other job
1442
        self.assertTrue(result)
1443
        self.assertTrue(job.cur_opctx)
1444
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1445
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1446
        self.assert_(job.start_timestamp)
1447
        self.assertFalse(job.end_timestamp)
1448
        continue
1449

  
1450
      if result:
1451
        # Last opcode
1452
        self.assertFalse(job.cur_opctx)
1453
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1454
        break
1455

  
1456
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1457

  
1458
      self.assertFalse(result)
1459
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1460
      self.assert_(job.start_timestamp)
1461
      self.assertFalse(job.end_timestamp)
1462

  
1463
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1464
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1465
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1466
                     [[constants.OP_STATUS_SUCCESS,
1467
                       constants.OP_STATUS_CANCELED,
1468
                       constants.OP_STATUS_CANCELED],
1469
                      ["Res0", "Job canceled by request",
1470
                       "Job canceled by request"]])
1471

  
1472
    self._GenericCheckJob(job)
1473

  
1474
    self.assertRaises(IndexError, queue.GetNextUpdate)
1475
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1476
    self.assertFalse(depmgr.CountPendingResults())
1477

  
1478
    # Calling the processor on a finished job should be a no-op
1479
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1480
    self.assertRaises(IndexError, queue.GetNextUpdate)
1481

  
1482
  def testJobDependencyWrongstatus(self):
1483
    depmgr = _FakeDependencyManager()
1484
    queue = _FakeQueueForProc(depmgr=depmgr)
1485

  
1486
    self.assertEqual(queue.depmgr, depmgr)
1487

  
1488
    prev_job_id = 9741
1489
    job_id = 11763
1490
    ops = [
1491
      opcodes.OpTestDummy(result="Res0", fail=False),
1492
      opcodes.OpTestDummy(result="Res1", fail=False,
1493
                          depends=[
1494
                            [prev_job_id, None],
1495
                            ]),
1496
      opcodes.OpTestDummy(result="Res2", fail=False),
1497
      ]
1498

  
1499
    # Create job
1500
    job = self._CreateJob(queue, job_id, ops)
1501

  
1502
    def _BeforeStart(timeout, priority):
1503
      if attempt == 0 or attempt > 5:
1504
        # Job should only be updated when it wasn't waiting for another job
1505
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1506
      self.assertRaises(IndexError, queue.GetNextUpdate)
1507
      self.assertFalse(queue.IsAcquired())
1508
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1509
      self.assertFalse(job.cur_opctx)
1510

  
1511
    def _AfterStart(op, cbs):
1512
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1513
      self.assertRaises(IndexError, queue.GetNextUpdate)
1514

  
1515
      self.assertFalse(queue.IsAcquired())
1516
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1517
      self.assertFalse(job.cur_opctx)
1518

  
1519
      # Job is running, cancelling shouldn't be possible
1520
      (success, _) = job.Cancel()
1521
      self.assertFalse(success)
1522

  
1523
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1524

  
1525
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1526

  
1527
    counter = itertools.count()
1528
    while True:
1529
      attempt = counter.next()
1530

  
1531
      self.assertRaises(IndexError, queue.GetNextUpdate)
1532
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1533

  
1534
      if attempt == 0:
1535
        # This will handle the first opcode
1536
        pass
1537
      elif attempt < 4:
1538
        depmgr.AddCheckResult(job, prev_job_id, None,
1539
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1540
      elif attempt == 4:
1541
        # Other job failed
1542
        depmgr.AddCheckResult(job, prev_job_id, None,
1543
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1544

  
1545
      if attempt == 0:
1546
        self.assertEqual(depmgr.CountPendingResults(), 0)
1547
      else:
1548
        self.assertEqual(depmgr.CountPendingResults(), 1)
1549

  
1550
      result = jqueue._JobProcessor(queue, opexec, job)()
1551
      if attempt <= 1 or attempt >= 4:
1552
        # Job should only be updated if there was an actual change
1553
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1554
      self.assertRaises(IndexError, queue.GetNextUpdate)
1555
      self.assertFalse(depmgr.CountPendingResults())
1556

  
1557
      if attempt > 0 and attempt < 4:
1558
        # Simulate waiting for other job
1559
        self.assertTrue(result)
1560
        self.assertTrue(job.cur_opctx)
1561
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1562
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1563
        self.assert_(job.start_timestamp)
1564
        self.assertFalse(job.end_timestamp)
1565
        continue
1566

  
1567
      if result:
1568
        # Last opcode
1569
        self.assertFalse(job.cur_opctx)
1570
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1571
        break
1572

  
1573
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1574

  
1575
      self.assertFalse(result)
1576
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1577
      self.assert_(job.start_timestamp)
1578
      self.assertFalse(job.end_timestamp)
1579

  
1580
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1581
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1582
    self.assertEqual(job.GetInfo(["opstatus"]),
1583
                     [[constants.OP_STATUS_SUCCESS,
1584
                       constants.OP_STATUS_ERROR,
1585
                       constants.OP_STATUS_ERROR]]),
1586

  
1587
    (opresult, ) = job.GetInfo(["opresult"])
1588
    self.assertEqual(len(opresult), len(ops))
1589
    self.assertEqual(opresult[0], "Res0")
1590
    self.assertTrue(errors.GetEncodedError(opresult[1]))
1591
    self.assertTrue(errors.GetEncodedError(opresult[2]))
1592

  
1593
    self._GenericCheckJob(job)
1594

  
1595
    self.assertRaises(IndexError, queue.GetNextUpdate)
1596
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1597
    self.assertFalse(depmgr.CountPendingResults())
1598

  
1599
    # Calling the processor on a finished job should be a no-op
1600
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1601
    self.assertRaises(IndexError, queue.GetNextUpdate)
1602

  
1182 1603

  
1183 1604
class _FakeTimeoutStrategy:
1184 1605
  def __init__(self, timeouts):
......
1412 1833
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1413 1834

  
1414 1835

  
1836
class TestJobDependencyManager(unittest.TestCase):
1837
  class _FakeJob:
1838
    def __init__(self, job_id):
1839
      self.id = str(job_id)
1840

  
1841
  def setUp(self):
1842
    self._status = []
1843
    self._queue = []
1844
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1845

  
1846
  def _GetStatus(self, job_id):
1847
    (exp_job_id, result) = self._status.pop(0)
1848
    self.assertEqual(exp_job_id, job_id)
1849
    return result
1850

  
1851
  def _Enqueue(self, jobs):
1852
    self._queue.append(jobs)
1853

  
1854
  def testNotFinalizedThenCancel(self):
1855
    job = self._FakeJob(17697)
1856
    job_id = str(28625)
1857

  
1858
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1859
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1860
    self.assertEqual(result, self.jdm.WAIT)
1861
    self.assertFalse(self._status)
1862
    self.assertFalse(self._queue)
1863
    self.assertTrue(self.jdm.JobWaiting(job))
1864
    self.assertEqual(self.jdm._waiters, {
1865
      job_id: set([job]),
1866
      })
1867

  
1868
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1869
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1870
    self.assertEqual(result, self.jdm.CANCEL)
1871
    self.assertFalse(self._status)
1872
    self.assertFalse(self._queue)
1873
    self.assertFalse(self.jdm.JobWaiting(job))
1874

  
1875
  def testRequireCancel(self):
1876
    job = self._FakeJob(5278)
1877
    job_id = str(9610)
1878
    dep_status = [constants.JOB_STATUS_CANCELED]
1879

  
1880
    self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1881
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1882
    self.assertEqual(result, self.jdm.WAIT)
1883
    self.assertFalse(self._status)
1884
    self.assertFalse(self._queue)
1885
    self.assertTrue(self.jdm.JobWaiting(job))
1886
    self.assertEqual(self.jdm._waiters, {
1887
      job_id: set([job]),
1888
      })
1889

  
1890
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1891
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1892
    self.assertEqual(result, self.jdm.CONTINUE)
1893
    self.assertFalse(self._status)
1894
    self.assertFalse(self._queue)
1895
    self.assertFalse(self.jdm.JobWaiting(job))
1896

  
1897
  def testRequireError(self):
1898
    job = self._FakeJob(21459)
1899
    job_id = str(25519)
1900
    dep_status = [constants.JOB_STATUS_ERROR]
1901

  
1902
    self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1903
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1904
    self.assertEqual(result, self.jdm.WAIT)
1905
    self.assertFalse(self._status)
1906
    self.assertFalse(self._queue)
1907
    self.assertTrue(self.jdm.JobWaiting(job))
1908
    self.assertEqual(self.jdm._waiters, {
1909
      job_id: set([job]),
1910
      })
1911

  
1912
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1913
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1914
    self.assertEqual(result, self.jdm.CONTINUE)
1915
    self.assertFalse(self._status)
1916
    self.assertFalse(self._queue)
1917
    self.assertFalse(self.jdm.JobWaiting(job))
1918

  
1919
  def testRequireMultiple(self):
1920
    dep_status = list(constants.JOBS_FINALIZED)
1921

  
1922
    for end_status in dep_status:
1923
      job = self._FakeJob(21343)
1924
      job_id = str(14609)
1925

  
1926
      self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1927
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1928
      self.assertEqual(result, self.jdm.WAIT)
1929
      self.assertFalse(self._status)
1930
      self.assertFalse(self._queue)
1931
      self.assertTrue(self.jdm.JobWaiting(job))
1932
      self.assertEqual(self.jdm._waiters, {
1933
        job_id: set([job]),
1934
        })
1935

  
1936
      self._status.append((job_id, end_status))
1937
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1938
      self.assertEqual(result, self.jdm.CONTINUE)
1939
      self.assertFalse(self._status)
1940
      self.assertFalse(self._queue)
1941
      self.assertFalse(self.jdm.JobWaiting(job))
1942

  
1943
  def testNotify(self):
1944
    job = self._FakeJob(8227)
1945
    job_id = str(4113)
1946

  
1947
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1948
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1949
    self.assertEqual(result, self.jdm.WAIT)
1950
    self.assertFalse(self._status)
1951
    self.assertFalse(self._queue)
1952
    self.assertTrue(self.jdm.JobWaiting(job))
1953
    self.assertEqual(self.jdm._waiters, {
1954
      job_id: set([job]),
1955
      })
1956

  
1957
    self.jdm.NotifyWaiters(job_id)
1958
    self.assertFalse(self._status)
1959
    self.assertFalse(self.jdm._waiters)
1960
    self.assertFalse(self.jdm.JobWaiting(job))
1961
    self.assertEqual(self._queue, [set([job])])
1962

  
1963
  def testWrongStatus(self):
1964
    job = self._FakeJob(10102)
1965
    job_id = str(1271)
1966

  
1967
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
1968
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1969
                                            [constants.JOB_STATUS_SUCCESS])
1970
    self.assertEqual(result, self.jdm.WAIT)
1971
    self.assertFalse(self._status)
1972
    self.assertFalse(self._queue)
1973
    self.assertTrue(self.jdm.JobWaiting(job))
1974
    self.assertEqual(self.jdm._waiters, {
1975
      job_id: set([job]),
1976
      })
1977

  
1978
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1979
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1980
                                            [constants.JOB_STATUS_SUCCESS])
1981
    self.assertEqual(result, self.jdm.WRONGSTATUS)
1982
    self.assertFalse(self._status)
1983
    self.assertFalse(self._queue)
1984
    self.assertFalse(self.jdm.JobWaiting(job))
1985

  
1986
  def testCorrectStatus(self):
1987
    job = self._FakeJob(24273)
1988
    job_id = str(23885)
1989

  
1990
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
1991
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1992
                                            [constants.JOB_STATUS_SUCCESS])
1993
    self.assertEqual(result, self.jdm.WAIT)
1994
    self.assertFalse(self._status)
1995
    self.assertFalse(self._queue)
1996
    self.assertTrue(self.jdm.JobWaiting(job))
1997
    self.assertEqual(self.jdm._waiters, {
1998
      job_id: set([job]),
1999
      })
2000

  
2001
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2002
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2003
                                            [constants.JOB_STATUS_SUCCESS])
2004
    self.assertEqual(result, self.jdm.CONTINUE)
2005
    self.assertFalse(self._status)
2006
    self.assertFalse(self._queue)
2007
    self.assertFalse(self.jdm.JobWaiting(job))
2008

  
2009
  def testFinalizedRightAway(self):
2010
    job = self._FakeJob(224)
2011
    job_id = str(3081)
2012

  
2013
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2014
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2015
                                            [constants.JOB_STATUS_SUCCESS])
2016
    self.assertEqual(result, self.jdm.CONTINUE)
2017
    self.assertFalse(self._status)
2018
    self.assertFalse(self._queue)
2019
    self.assertFalse(self.jdm.JobWaiting(job))
2020
    self.assertEqual(self.jdm._waiters, {
2021
      job_id: set(),
2022
      })
2023

  
2024
    # Force cleanup
2025
    self.jdm.NotifyWaiters("0")
2026
    self.assertFalse(self.jdm._waiters)
2027
    self.assertFalse(self._status)
2028
    self.assertFalse(self._queue)
2029

  
2030
  def testSelfDependency(self):
2031
    job = self._FakeJob(18937)
2032

  
2033
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2034
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2035
    self.assertEqual(result, self.jdm.ERROR)
2036

  
2037
  def testJobDisappears(self):
2038
    job = self._FakeJob(30540)
2039
    job_id = str(23769)
2040

  
2041
    def _FakeStatus(_):
2042
      raise errors.JobLost("#msg#")
2043

  
2044
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2045
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2046
    self.assertEqual(result, self.jdm.ERROR)
2047
    self.assertFalse(jdm.JobWaiting(job))
2048

  
2049

  
1415 2050
if __name__ == "__main__":
1416 2051
  testutils.GanetiTestProgram()

Also available in: Unified diff