Revision 5fd6b694 test/ganeti.jqueue_unittest.py

b/test/ganeti.jqueue_unittest.py
521 521
        self.assertRaises(IndexError, queue.GetNextUpdate)
522 522
        self.assertFalse(queue.IsAcquired())
523 523
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
524
        self.assertFalse(job.cur_opctx)
524 525

  
525 526
      def _AfterStart(op, cbs):
526 527
        self.assertEqual(queue.GetNextUpdate(), (job, True))
......
528 529

  
529 530
        self.assertFalse(queue.IsAcquired())
530 531
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
532
        self.assertFalse(job.cur_opctx)
531 533

  
532 534
        # Job is running, cancelling shouldn't be possible
533 535
        (success, _) = job.Cancel()
......
1049 1051
    self.retries = 0
1050 1052
    self.prev_tsop = None
1051 1053
    self.prev_prio = None
1054
    self.prev_status = None
1055
    self.lock_acq_prio = None
1052 1056
    self.gave_lock = None
1053 1057
    self.done_lock_before_blocking = False
1054 1058

  
1055 1059
  def _BeforeStart(self, timeout, priority):
1056 1060
    job = self.job
1057 1061

  
1058
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1062
    # If status has changed, job must've been written
1063
    if self.prev_status != self.job.ops[self.curop].status:
1064
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1059 1065
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1066

  
1060 1067
    self.assertFalse(self.queue.IsAcquired())
1061 1068
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1062 1069

  
......
1067 1074
    self.assertEqual(priority, job.ops[self.curop].priority)
1068 1075

  
1069 1076
    self.gave_lock = True
1077
    self.lock_acq_prio = priority
1070 1078

  
1071 1079
    if (self.curop == 3 and
1072 1080
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
......
1090 1098
  def _AfterStart(self, op, cbs):
1091 1099
    job = self.job
1092 1100

  
1101
    # Setting to "running" requires an update
1093 1102
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1094 1103
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1104

  
1095 1105
    self.assertFalse(self.queue.IsAcquired())
1096 1106
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1097 1107

  
......
1102 1112
  def _NextOpcode(self):
1103 1113
    self.curop = self.opcounter.next()
1104 1114
    self.prev_prio = self.job.ops[self.curop].priority
1115
    self.prev_status = self.job.ops[self.curop].status
1105 1116

  
1106 1117
  def _NewTimeoutStrategy(self):
1107 1118
    job = self.job
......
1173 1184

  
1174 1185
    self.assertFalse(self.done_lock_before_blocking)
1175 1186

  
1176
    for i in itertools.count(0):
1187
    while True:
1177 1188
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1178 1189
                                  _timeout_strategy_factory=tsf)
1179 1190

  
1180 1191
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1192

  
1193
      if self.curop is not None:
1194
        self.prev_status = self.job.ops[self.curop].status
1195

  
1196
      self.lock_acq_prio = None
1197

  
1181 1198
      result = proc(_nextop_fn=self._NextOpcode)
1182
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1183
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1199
      assert self.curop is not None
1200

  
1201
      if result or self.gave_lock:
1202
        # Got lock and/or job is done, result must've been written
1203
        self.assertFalse(job.cur_opctx)
1204
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1205
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1206
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1207
        self.assert_(job.ops[self.curop].exec_timestamp)
1208

  
1184 1209
      if result:
1185 1210
        self.assertFalse(job.cur_opctx)
1186 1211
        break
1187 1212

  
1188 1213
      self.assertFalse(result)
1189 1214

  
1215
      if self.curop == 0:
1216
        self.assertEqual(job.ops[self.curop].start_timestamp,
1217
                         job.start_timestamp)
1218

  
1190 1219
      if self.gave_lock:
1191
        self.assertFalse(job.cur_opctx)
1220
        # Opcode finished, but job not yet done
1221
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1192 1222
      else:
1223
        # Did not get locks
1193 1224
        self.assert_(job.cur_opctx)
1194 1225
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1195 1226
                         self.timeout_strategy.NextAttempt)
1227
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1228
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1229

  
1230
        # If priority has changed since acquiring locks, the job must've been
1231
        # updated
1232
        if self.lock_acq_prio != job.ops[self.curop].priority:
1233
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1234

  
1235
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1196 1236

  
1197
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1198 1237
      self.assert_(job.start_timestamp)
1199 1238
      self.assertFalse(job.end_timestamp)
1200 1239

  

Also available in: Unified diff