query: Fix bug when names are specified
[ganeti-local] / test / ganeti.jqueue_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.jqueue"""
23
24 import os
25 import sys
26 import unittest
27 import tempfile
28 import shutil
29 import errno
30 import itertools
31
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import jqueue
36 from ganeti import opcodes
37 from ganeti import compat
38 from ganeti import mcpu
39
40 import testutils
41
42
43 class _FakeJob:
44   def __init__(self, job_id, status):
45     self.id = job_id
46     self._status = status
47     self._log = []
48
49   def SetStatus(self, status):
50     self._status = status
51
52   def AddLogEntry(self, msg):
53     self._log.append((len(self._log), msg))
54
55   def CalcStatus(self):
56     return self._status
57
58   def GetInfo(self, fields):
59     result = []
60
61     for name in fields:
62       if name == "status":
63         result.append(self._status)
64       else:
65         raise Exception("Unknown field")
66
67     return result
68
69   def GetLogEntries(self, newer_than):
70     assert newer_than is None or newer_than >= 0
71
72     if newer_than is None:
73       return self._log
74
75     return self._log[newer_than:]
76
77
78 class TestJobChangesChecker(unittest.TestCase):
79   def testStatus(self):
80     job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81     checker = jqueue._JobChangesChecker(["status"], None, None)
82     self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83
84     job.SetStatus(constants.JOB_STATUS_RUNNING)
85     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86
87     job.SetStatus(constants.JOB_STATUS_SUCCESS)
88     self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89
90     # job.id is used by checker
91     self.assertEqual(job.id, 9094)
92
93   def testStatusWithPrev(self):
94     job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95     checker = jqueue._JobChangesChecker(["status"],
96                                         [constants.JOB_STATUS_QUEUED], None)
97     self.assert_(checker(job) is None)
98
99     job.SetStatus(constants.JOB_STATUS_RUNNING)
100     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101
102   def testFinalStatus(self):
103     for status in constants.JOBS_FINALIZED:
104       job = _FakeJob(2178711, status)
105       checker = jqueue._JobChangesChecker(["status"], [status], None)
106       # There won't be any changes in this status, hence it should signal
107       # a change immediately
108       self.assertEqual(checker(job), ([status], []))
109
110   def testLog(self):
111     job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112     checker = jqueue._JobChangesChecker(["status"], None, None)
113     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114
115     job.AddLogEntry("Hello World")
116     (job_info, log_entries) = checker(job)
117     self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118     self.assertEqual(log_entries, [[0, "Hello World"]])
119
120     checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121     self.assert_(checker2(job) is None)
122
123     job.AddLogEntry("Foo Bar")
124     job.SetStatus(constants.JOB_STATUS_ERROR)
125
126     (job_info, log_entries) = checker2(job)
127     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128     self.assertEqual(log_entries, [[1, "Foo Bar"]])
129
130     checker3 = jqueue._JobChangesChecker(["status"], None, None)
131     (job_info, log_entries) = checker3(job)
132     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133     self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134
135
136 class TestJobChangesWaiter(unittest.TestCase):
137   def setUp(self):
138     self.tmpdir = tempfile.mkdtemp()
139     self.filename = utils.PathJoin(self.tmpdir, "job-1")
140     utils.WriteFile(self.filename, data="")
141
142   def tearDown(self):
143     shutil.rmtree(self.tmpdir)
144
145   def _EnsureNotifierClosed(self, notifier):
146     try:
147       os.fstat(notifier._fd)
148     except EnvironmentError, err:
149       self.assertEqual(err.errno, errno.EBADF)
150     else:
151       self.fail("File descriptor wasn't closed")
152
153   def testClose(self):
154     for wait in [False, True]:
155       waiter = jqueue._JobFileChangesWaiter(self.filename)
156       try:
157         if wait:
158           waiter.Wait(0.001)
159       finally:
160         waiter.Close()
161
162       # Ensure file descriptor was closed
163       self._EnsureNotifierClosed(waiter._notifier)
164
165   def testChangingFile(self):
166     waiter = jqueue._JobFileChangesWaiter(self.filename)
167     try:
168       self.assertFalse(waiter.Wait(0.1))
169       utils.WriteFile(self.filename, data="changed")
170       self.assert_(waiter.Wait(60))
171     finally:
172       waiter.Close()
173
174     self._EnsureNotifierClosed(waiter._notifier)
175
176   def testChangingFile2(self):
177     waiter = jqueue._JobChangesWaiter(self.filename)
178     try:
179       self.assertFalse(waiter._filewaiter)
180       self.assert_(waiter.Wait(0.1))
181       self.assert_(waiter._filewaiter)
182
183       # File waiter is now used, but there have been no changes
184       self.assertFalse(waiter.Wait(0.1))
185       utils.WriteFile(self.filename, data="changed")
186       self.assert_(waiter.Wait(60))
187     finally:
188       waiter.Close()
189
190     self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191
192
193 class TestWaitForJobChangesHelper(unittest.TestCase):
194   def setUp(self):
195     self.tmpdir = tempfile.mkdtemp()
196     self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197     utils.WriteFile(self.filename, data="")
198
199   def tearDown(self):
200     shutil.rmtree(self.tmpdir)
201
202   def _LoadWaitingJob(self):
203     return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204
205   def _LoadLostJob(self):
206     return None
207
208   def testNoChanges(self):
209     wfjc = jqueue._WaitForJobChangesHelper()
210
211     # No change
212     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213                           [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214                      constants.JOB_NOTCHANGED)
215
216     # No previous information
217     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218                           ["status"], None, None, 1.0),
219                      ([constants.JOB_STATUS_WAITLOCK], []))
220
221   def testLostJob(self):
222     wfjc = jqueue._WaitForJobChangesHelper()
223     self.assert_(wfjc(self.filename, self._LoadLostJob,
224                       ["status"], None, None, 1.0) is None)
225
226
227 class TestEncodeOpError(unittest.TestCase):
228   def test(self):
229     encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230     self.assert_(isinstance(encerr, tuple))
231     self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232
233     encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234     self.assert_(isinstance(encerr, tuple))
235     self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236
237     encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238     self.assert_(isinstance(encerr, tuple))
239     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240
241     encerr = jqueue._EncodeOpError("Hello World")
242     self.assert_(isinstance(encerr, tuple))
243     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244
245
246 class TestQueuedOpCode(unittest.TestCase):
247   def testDefaults(self):
248     def _Check(op):
249       self.assertFalse(hasattr(op.input, "dry_run"))
250       self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251       self.assertFalse(op.log)
252       self.assert_(op.start_timestamp is None)
253       self.assert_(op.exec_timestamp is None)
254       self.assert_(op.end_timestamp is None)
255       self.assert_(op.result is None)
256       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257
258     op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259     _Check(op1)
260     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261     _Check(op2)
262     self.assertEqual(op1.Serialize(), op2.Serialize())
263
264   def testPriority(self):
265     def _Check(op):
266       assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267              "Default priority equals high priority; test can't work"
268       self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270
271     inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
272     op1 = jqueue._QueuedOpCode(inpop)
273     _Check(op1)
274     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275     _Check(op2)
276     self.assertEqual(op1.Serialize(), op2.Serialize())
277
278
279 class TestQueuedJob(unittest.TestCase):
280   def test(self):
281     self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282                       None, 1, [])
283
284   def testDefaults(self):
285     job_id = 4260
286     ops = [
287       opcodes.OpTagsGet(),
288       opcodes.OpTestDelay(),
289       ]
290
291     def _Check(job):
292       self.assertEqual(job.id, job_id)
293       self.assertEqual(job.log_serial, 0)
294       self.assert_(job.received_timestamp)
295       self.assert_(job.start_timestamp is None)
296       self.assert_(job.end_timestamp is None)
297       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298       self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299       self.assert_(repr(job).startswith("<"))
300       self.assertEqual(len(job.ops), len(ops))
301       self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302                               for (inp, op) in zip(ops, job.ops)))
303       self.assertRaises(errors.OpExecError, job.GetInfo,
304                         ["unknown-field"])
305       self.assertEqual(job.GetInfo(["summary"]),
306                        [[op.input.Summary() for op in job.ops]])
307
308     job1 = jqueue._QueuedJob(None, job_id, ops)
309     _Check(job1)
310     job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311     _Check(job2)
312     self.assertEqual(job1.Serialize(), job2.Serialize())
313
314   def testPriority(self):
315     job_id = 4283
316     ops = [
317       opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
318       opcodes.OpTestDelay(),
319       ]
320
321     def _Check(job):
322       self.assertEqual(job.id, job_id)
323       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324       self.assert_(repr(job).startswith("<"))
325
326     job = jqueue._QueuedJob(None, job_id, ops)
327     _Check(job)
328     self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329                             for op in job.ops))
330     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331
332     # Increase first
333     job.ops[0].priority -= 1
334     _Check(job)
335     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336
337     # Mark opcode as finished
338     job.ops[0].status = constants.OP_STATUS_SUCCESS
339     _Check(job)
340     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341
342     # Increase second
343     job.ops[1].priority -= 10
344     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345
346     # Test increasing first
347     job.ops[0].status = constants.OP_STATUS_RUNNING
348     job.ops[0].priority -= 19
349     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350
351   def testCalcStatus(self):
352     def _Queued(ops):
353       # The default status is "queued"
354       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355                               for op in ops))
356
357     def _Waitlock1(ops):
358       ops[0].status = constants.OP_STATUS_WAITLOCK
359
360     def _Waitlock2(ops):
361       ops[0].status = constants.OP_STATUS_SUCCESS
362       ops[1].status = constants.OP_STATUS_SUCCESS
363       ops[2].status = constants.OP_STATUS_WAITLOCK
364
365     def _Running(ops):
366       ops[0].status = constants.OP_STATUS_SUCCESS
367       ops[1].status = constants.OP_STATUS_RUNNING
368       for op in ops[2:]:
369         op.status = constants.OP_STATUS_QUEUED
370
371     def _Canceling1(ops):
372       ops[0].status = constants.OP_STATUS_SUCCESS
373       ops[1].status = constants.OP_STATUS_SUCCESS
374       for op in ops[2:]:
375         op.status = constants.OP_STATUS_CANCELING
376
377     def _Canceling2(ops):
378       for op in ops:
379         op.status = constants.OP_STATUS_CANCELING
380
381     def _Canceled(ops):
382       for op in ops:
383         op.status = constants.OP_STATUS_CANCELED
384
385     def _Error1(ops):
386       for idx, op in enumerate(ops):
387         if idx > 3:
388           op.status = constants.OP_STATUS_ERROR
389         else:
390           op.status = constants.OP_STATUS_SUCCESS
391
392     def _Error2(ops):
393       for op in ops:
394         op.status = constants.OP_STATUS_ERROR
395
396     def _Success(ops):
397       for op in ops:
398         op.status = constants.OP_STATUS_SUCCESS
399
400     tests = {
401       constants.JOB_STATUS_QUEUED: [_Queued],
402       constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403       constants.JOB_STATUS_RUNNING: [_Running],
404       constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405       constants.JOB_STATUS_CANCELED: [_Canceled],
406       constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407       constants.JOB_STATUS_SUCCESS: [_Success],
408       }
409
410     def _NewJob():
411       job = jqueue._QueuedJob(None, 1,
412                               [opcodes.OpTestDelay() for _ in range(10)])
413       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415                               for op in job.ops))
416       return job
417
418     for status in constants.JOB_STATUS_ALL:
419       sttests = tests[status]
420       assert sttests
421       for fn in sttests:
422         job = _NewJob()
423         fn(job.ops)
424         self.assertEqual(job.CalcStatus(), status)
425
426
427 class _FakeQueueForProc:
428   def __init__(self):
429     self._acquired = False
430     self._updates = []
431
432   def IsAcquired(self):
433     return self._acquired
434
435   def GetNextUpdate(self):
436     return self._updates.pop(0)
437
438   def acquire(self, shared=0):
439     assert shared == 1
440     self._acquired = True
441
442   def release(self):
443     assert self._acquired
444     self._acquired = False
445
446   def UpdateJobUnlocked(self, job, replicate=True):
447     assert self._acquired, "Lock not acquired while updating job"
448     self._updates.append((job, bool(replicate)))
449
450
451 class _FakeExecOpCodeForProc:
452   def __init__(self, queue, before_start, after_start):
453     self._queue = queue
454     self._before_start = before_start
455     self._after_start = after_start
456
457   def __call__(self, op, cbs, timeout=None, priority=None):
458     assert isinstance(op, opcodes.OpTestDummy)
459     assert not self._queue.IsAcquired(), \
460            "Queue lock not released when executing opcode"
461
462     if self._before_start:
463       self._before_start(timeout, priority)
464
465     cbs.NotifyStart()
466
467     if self._after_start:
468       self._after_start(op, cbs)
469
470     # Check again after the callbacks
471     assert not self._queue.IsAcquired()
472
473     if op.fail:
474       raise errors.OpExecError("Error requested (%s)" % op.result)
475
476     return op.result
477
478
479 class _JobProcessorTestUtils:
480   def _CreateJob(self, queue, job_id, ops):
481     job = jqueue._QueuedJob(queue, job_id, ops)
482     self.assertFalse(job.start_timestamp)
483     self.assertFalse(job.end_timestamp)
484     self.assertEqual(len(ops), len(job.ops))
485     self.assert_(compat.all(op.input == inp
486                             for (op, inp) in zip(job.ops, ops)))
487     self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
488     return job
489
490
491 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
492   def _GenericCheckJob(self, job):
493     assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
494                       for op in job.ops)
495
496     self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
497                      [[op.start_timestamp for op in job.ops],
498                       [op.exec_timestamp for op in job.ops],
499                       [op.end_timestamp for op in job.ops]])
500     self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
501                      [job.received_timestamp,
502                       job.start_timestamp,
503                       job.end_timestamp])
504     self.assert_(job.start_timestamp)
505     self.assert_(job.end_timestamp)
506     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
507
508   def testSuccess(self):
509     queue = _FakeQueueForProc()
510
511     for (job_id, opcount) in [(25351, 1), (6637, 3),
512                               (24644, 10), (32207, 100)]:
513       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
514              for i in range(opcount)]
515
516       # Create job
517       job = self._CreateJob(queue, job_id, ops)
518
519       def _BeforeStart(timeout, priority):
520         self.assertEqual(queue.GetNextUpdate(), (job, True))
521         self.assertRaises(IndexError, queue.GetNextUpdate)
522         self.assertFalse(queue.IsAcquired())
523         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
524         self.assertFalse(job.cur_opctx)
525
526       def _AfterStart(op, cbs):
527         self.assertEqual(queue.GetNextUpdate(), (job, True))
528         self.assertRaises(IndexError, queue.GetNextUpdate)
529
530         self.assertFalse(queue.IsAcquired())
531         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
532         self.assertFalse(job.cur_opctx)
533
534         # Job is running, cancelling shouldn't be possible
535         (success, _) = job.Cancel()
536         self.assertFalse(success)
537
538       opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
539
540       for idx in range(len(ops)):
541         self.assertRaises(IndexError, queue.GetNextUpdate)
542         result = jqueue._JobProcessor(queue, opexec, job)()
543         self.assertEqual(queue.GetNextUpdate(), (job, True))
544         self.assertRaises(IndexError, queue.GetNextUpdate)
545         if idx == len(ops) - 1:
546           # Last opcode
547           self.assert_(result)
548         else:
549           self.assertFalse(result)
550
551           self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
552           self.assert_(job.start_timestamp)
553           self.assertFalse(job.end_timestamp)
554
555       self.assertRaises(IndexError, queue.GetNextUpdate)
556
557       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
558       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
559       self.assertEqual(job.GetInfo(["opresult"]),
560                        [[op.input.result for op in job.ops]])
561       self.assertEqual(job.GetInfo(["opstatus"]),
562                        [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
563       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
564                               for op in job.ops))
565
566       self._GenericCheckJob(job)
567
568       # Finished jobs can't be processed any further
569       self.assertRaises(errors.ProgrammerError,
570                         jqueue._JobProcessor(queue, opexec, job))
571
572   def testOpcodeError(self):
573     queue = _FakeQueueForProc()
574
575     testdata = [
576       (17077, 1, 0, 0),
577       (1782, 5, 2, 2),
578       (18179, 10, 9, 9),
579       (4744, 10, 3, 8),
580       (23816, 100, 39, 45),
581       ]
582
583     for (job_id, opcount, failfrom, failto) in testdata:
584       # Prepare opcodes
585       ops = [opcodes.OpTestDummy(result="Res%s" % i,
586                                  fail=(failfrom <= i and
587                                        i <= failto))
588              for i in range(opcount)]
589
590       # Create job
591       job = self._CreateJob(queue, job_id, ops)
592
593       opexec = _FakeExecOpCodeForProc(queue, None, None)
594
595       for idx in range(len(ops)):
596         self.assertRaises(IndexError, queue.GetNextUpdate)
597         result = jqueue._JobProcessor(queue, opexec, job)()
598         # queued to waitlock
599         self.assertEqual(queue.GetNextUpdate(), (job, True))
600         # waitlock to running
601         self.assertEqual(queue.GetNextUpdate(), (job, True))
602         # Opcode result
603         self.assertEqual(queue.GetNextUpdate(), (job, True))
604         self.assertRaises(IndexError, queue.GetNextUpdate)
605
606         if idx in (failfrom, len(ops) - 1):
607           # Last opcode
608           self.assert_(result)
609           break
610
611         self.assertFalse(result)
612
613         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
614
615       self.assertRaises(IndexError, queue.GetNextUpdate)
616
617       # Check job status
618       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
619       self.assertEqual(job.GetInfo(["id"]), [job_id])
620       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
621
622       # Check opcode status
623       data = zip(job.ops,
624                  job.GetInfo(["opstatus"])[0],
625                  job.GetInfo(["opresult"])[0])
626
627       for idx, (op, opstatus, opresult) in enumerate(data):
628         if idx < failfrom:
629           assert not op.input.fail
630           self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
631           self.assertEqual(opresult, op.input.result)
632         elif idx <= failto:
633           assert op.input.fail
634           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
635           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
636         else:
637           assert not op.input.fail
638           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
639           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
640
641       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
642                               for op in job.ops[:failfrom]))
643
644       self._GenericCheckJob(job)
645
646       # Finished jobs can't be processed any further
647       self.assertRaises(errors.ProgrammerError,
648                         jqueue._JobProcessor(queue, opexec, job))
649
650   def testCancelWhileInQueue(self):
651     queue = _FakeQueueForProc()
652
653     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
654            for i in range(5)]
655
656     # Create job
657     job_id = 17045
658     job = self._CreateJob(queue, job_id, ops)
659
660     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
661
662     # Mark as cancelled
663     (success, _) = job.Cancel()
664     self.assert_(success)
665
666     self.assertRaises(IndexError, queue.GetNextUpdate)
667
668     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
669                             for op in job.ops))
670
671     opexec = _FakeExecOpCodeForProc(queue, None, None)
672     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
673
674     # Check result
675     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
676     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
677     self.assertFalse(job.start_timestamp)
678     self.assert_(job.end_timestamp)
679     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
680                                 for op in job.ops))
681     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
682                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
683                       ["Job canceled by request" for _ in job.ops]])
684
685   def testCancelWhileWaitlockInQueue(self):
686     queue = _FakeQueueForProc()
687
688     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
689            for i in range(5)]
690
691     # Create job
692     job_id = 8645
693     job = self._CreateJob(queue, job_id, ops)
694
695     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
696
697     job.ops[0].status = constants.OP_STATUS_WAITLOCK
698
699     assert len(job.ops) == 5
700
701     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
702
703     # Mark as cancelling
704     (success, _) = job.Cancel()
705     self.assert_(success)
706
707     self.assertRaises(IndexError, queue.GetNextUpdate)
708
709     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
710                             for op in job.ops))
711
712     opexec = _FakeExecOpCodeForProc(queue, None, None)
713     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
714
715     # Check result
716     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
717     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
718     self.assertFalse(job.start_timestamp)
719     self.assert_(job.end_timestamp)
720     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
721                                 for op in job.ops))
722     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
723                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
724                       ["Job canceled by request" for _ in job.ops]])
725
726   def testCancelWhileWaitlock(self):
727     queue = _FakeQueueForProc()
728
729     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
730            for i in range(5)]
731
732     # Create job
733     job_id = 11009
734     job = self._CreateJob(queue, job_id, ops)
735
736     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
737
738     def _BeforeStart(timeout, priority):
739       self.assertEqual(queue.GetNextUpdate(), (job, True))
740       self.assertRaises(IndexError, queue.GetNextUpdate)
741       self.assertFalse(queue.IsAcquired())
742       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
743
744       # Mark as cancelled
745       (success, _) = job.Cancel()
746       self.assert_(success)
747
748       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
749                               for op in job.ops))
750       self.assertRaises(IndexError, queue.GetNextUpdate)
751
752     def _AfterStart(op, cbs):
753       self.assertEqual(queue.GetNextUpdate(), (job, True))
754       self.assertRaises(IndexError, queue.GetNextUpdate)
755       self.assertFalse(queue.IsAcquired())
756       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
757
758     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
759
760     self.assertRaises(IndexError, queue.GetNextUpdate)
761     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
762     self.assertEqual(queue.GetNextUpdate(), (job, True))
763     self.assertRaises(IndexError, queue.GetNextUpdate)
764
765     # Check result
766     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
767     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
768     self.assert_(job.start_timestamp)
769     self.assert_(job.end_timestamp)
770     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
771                                 for op in job.ops))
772     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
773                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
774                       ["Job canceled by request" for _ in job.ops]])
775
776   def testCancelWhileWaitlockWithTimeout(self):
777     queue = _FakeQueueForProc()
778
779     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
780            for i in range(5)]
781
782     # Create job
783     job_id = 24314
784     job = self._CreateJob(queue, job_id, ops)
785
786     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
787
788     def _BeforeStart(timeout, priority):
789       self.assertFalse(queue.IsAcquired())
790       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
791
792       # Mark as cancelled
793       (success, _) = job.Cancel()
794       self.assert_(success)
795
796       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
797                               for op in job.ops))
798
799       # Fake an acquire attempt timing out
800       raise mcpu.LockAcquireTimeout()
801
802     def _AfterStart(op, cbs):
803       self.fail("Should not reach this")
804
805     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
806
807     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
808
809     # Check result
810     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
811     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
812     self.assert_(job.start_timestamp)
813     self.assert_(job.end_timestamp)
814     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
815                                 for op in job.ops))
816     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
817                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
818                       ["Job canceled by request" for _ in job.ops]])
819
820   def testCancelWhileRunning(self):
821     # Tests canceling a job with finished opcodes and more, unprocessed ones
822     queue = _FakeQueueForProc()
823
824     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
825            for i in range(3)]
826
827     # Create job
828     job_id = 28492
829     job = self._CreateJob(queue, job_id, ops)
830
831     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
832
833     opexec = _FakeExecOpCodeForProc(queue, None, None)
834
835     # Run one opcode
836     self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
837
838     # Job goes back to queued
839     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
840     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
841                      [[constants.OP_STATUS_SUCCESS,
842                        constants.OP_STATUS_QUEUED,
843                        constants.OP_STATUS_QUEUED],
844                       ["Res0", None, None]])
845
846     # Mark as cancelled
847     (success, _) = job.Cancel()
848     self.assert_(success)
849
850     # Try processing another opcode (this will actually cancel the job)
851     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
852
853     # Check result
854     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
855     self.assertEqual(job.GetInfo(["id"]), [job_id])
856     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
857     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
858                      [[constants.OP_STATUS_SUCCESS,
859                        constants.OP_STATUS_CANCELED,
860                        constants.OP_STATUS_CANCELED],
861                       ["Res0", "Job canceled by request",
862                        "Job canceled by request"]])
863
864   def testPartiallyRun(self):
865     # Tests calling the processor on a job that's been partially run before the
866     # program was restarted
867     queue = _FakeQueueForProc()
868
869     opexec = _FakeExecOpCodeForProc(queue, None, None)
870
871     for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
872       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
873              for i in range(10)]
874
875       # Create job
876       job = self._CreateJob(queue, job_id, ops)
877
878       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
879
880       for _ in range(successcount):
881         self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
882
883       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
884       self.assertEqual(job.GetInfo(["opstatus"]),
885                        [[constants.OP_STATUS_SUCCESS
886                          for _ in range(successcount)] +
887                         [constants.OP_STATUS_QUEUED
888                          for _ in range(len(ops) - successcount)]])
889
890       self.assert_(job.ops_iter)
891
892       # Serialize and restore (simulates program restart)
893       newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
894       self.assertFalse(newjob.ops_iter)
895       self._TestPartial(newjob, successcount)
896
897   def _TestPartial(self, job, successcount):
898     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
899     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
900
901     queue = _FakeQueueForProc()
902     opexec = _FakeExecOpCodeForProc(queue, None, None)
903
904     for remaining in reversed(range(len(job.ops) - successcount)):
905       result = jqueue._JobProcessor(queue, opexec, job)()
906
907       if remaining == 0:
908         # Last opcode
909         self.assert_(result)
910         break
911
912       self.assertFalse(result)
913
914       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
915
916     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
917     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
918     self.assertEqual(job.GetInfo(["opresult"]),
919                      [[op.input.result for op in job.ops]])
920     self.assertEqual(job.GetInfo(["opstatus"]),
921                      [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
922     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
923                             for op in job.ops))
924
925     self._GenericCheckJob(job)
926
927     # Finished jobs can't be processed any further
928     self.assertRaises(errors.ProgrammerError,
929                       jqueue._JobProcessor(queue, opexec, job))
930
931     # ... also after being restored
932     job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
933     self.assertRaises(errors.ProgrammerError,
934                       jqueue._JobProcessor(queue, opexec, job2))
935
936   def testProcessorOnRunningJob(self):
937     ops = [opcodes.OpTestDummy(result="result", fail=False)]
938
939     queue = _FakeQueueForProc()
940     opexec = _FakeExecOpCodeForProc(queue, None, None)
941
942     # Create job
943     job = self._CreateJob(queue, 9571, ops)
944
945     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
946
947     job.ops[0].status = constants.OP_STATUS_RUNNING
948
949     assert len(job.ops) == 1
950
951     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
952
953     # Calling on running job must fail
954     self.assertRaises(errors.ProgrammerError,
955                       jqueue._JobProcessor(queue, opexec, job))
956
957   def testLogMessages(self):
958     # Tests the "Feedback" callback function
959     queue = _FakeQueueForProc()
960
961     messages = {
962       1: [
963         (None, "Hello"),
964         (None, "World"),
965         (constants.ELOG_MESSAGE, "there"),
966         ],
967       4: [
968         (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
969         (constants.ELOG_JQUEUE_TEST, ("other", "type")),
970         ],
971       }
972     ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
973                                messages=messages.get(i, []))
974            for i in range(5)]
975
976     # Create job
977     job = self._CreateJob(queue, 29386, ops)
978
979     def _BeforeStart(timeout, priority):
980       self.assertEqual(queue.GetNextUpdate(), (job, True))
981       self.assertRaises(IndexError, queue.GetNextUpdate)
982       self.assertFalse(queue.IsAcquired())
983       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
984
985     def _AfterStart(op, cbs):
986       self.assertEqual(queue.GetNextUpdate(), (job, True))
987       self.assertRaises(IndexError, queue.GetNextUpdate)
988       self.assertFalse(queue.IsAcquired())
989       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
990
991       self.assertRaises(AssertionError, cbs.Feedback,
992                         "too", "many", "arguments")
993
994       for (log_type, msg) in op.messages:
995         self.assertRaises(IndexError, queue.GetNextUpdate)
996         if log_type:
997           cbs.Feedback(log_type, msg)
998         else:
999           cbs.Feedback(msg)
1000         # Check for job update without replication
1001         self.assertEqual(queue.GetNextUpdate(), (job, False))
1002         self.assertRaises(IndexError, queue.GetNextUpdate)
1003
1004     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1005
1006     for remaining in reversed(range(len(job.ops))):
1007       self.assertRaises(IndexError, queue.GetNextUpdate)
1008       result = jqueue._JobProcessor(queue, opexec, job)()
1009       self.assertEqual(queue.GetNextUpdate(), (job, True))
1010       self.assertRaises(IndexError, queue.GetNextUpdate)
1011
1012       if remaining == 0:
1013         # Last opcode
1014         self.assert_(result)
1015         break
1016
1017       self.assertFalse(result)
1018
1019       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1020
1021     self.assertRaises(IndexError, queue.GetNextUpdate)
1022
1023     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1024     self.assertEqual(job.GetInfo(["opresult"]),
1025                      [[op.input.result for op in job.ops]])
1026
1027     logmsgcount = sum(len(m) for m in messages.values())
1028
1029     self._CheckLogMessages(job, logmsgcount)
1030
1031     # Serialize and restore (simulates program restart)
1032     newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
1033     self._CheckLogMessages(newjob, logmsgcount)
1034
1035     # Check each message
1036     prevserial = -1
1037     for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1038       for (serial, timestamp, log_type, msg) in oplog:
1039         (exptype, expmsg) = messages.get(idx).pop(0)
1040         if exptype:
1041           self.assertEqual(log_type, exptype)
1042         else:
1043           self.assertEqual(log_type, constants.ELOG_MESSAGE)
1044         self.assertEqual(expmsg, msg)
1045         self.assert_(serial > prevserial)
1046         prevserial = serial
1047
1048   def _CheckLogMessages(self, job, count):
1049     # Check serial
1050     self.assertEqual(job.log_serial, count)
1051
1052     # No filter
1053     self.assertEqual(job.GetLogEntries(None),
1054                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1055                       for entry in entries])
1056
1057     # Filter with serial
1058     assert count > 3
1059     self.assert_(job.GetLogEntries(3))
1060     self.assertEqual(job.GetLogEntries(3),
1061                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1062                       for entry in entries][3:])
1063
1064     # No log message after highest serial
1065     self.assertFalse(job.GetLogEntries(count))
1066     self.assertFalse(job.GetLogEntries(count + 3))
1067
1068
1069 class _FakeTimeoutStrategy:
1070   def __init__(self, timeouts):
1071     self.timeouts = timeouts
1072     self.attempts = 0
1073     self.last_timeout = None
1074
1075   def NextAttempt(self):
1076     self.attempts += 1
1077     if self.timeouts:
1078       timeout = self.timeouts.pop(0)
1079     else:
1080       timeout = None
1081     self.last_timeout = timeout
1082     return timeout
1083
1084
1085 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1086   def setUp(self):
1087     self.queue = _FakeQueueForProc()
1088     self.job = None
1089     self.curop = None
1090     self.opcounter = None
1091     self.timeout_strategy = None
1092     self.retries = 0
1093     self.prev_tsop = None
1094     self.prev_prio = None
1095     self.prev_status = None
1096     self.lock_acq_prio = None
1097     self.gave_lock = None
1098     self.done_lock_before_blocking = False
1099
1100   def _BeforeStart(self, timeout, priority):
1101     job = self.job
1102
1103     # If status has changed, job must've been written
1104     if self.prev_status != self.job.ops[self.curop].status:
1105       self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1106     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1107
1108     self.assertFalse(self.queue.IsAcquired())
1109     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1110
1111     ts = self.timeout_strategy
1112
1113     self.assert_(timeout is None or isinstance(timeout, (int, float)))
1114     self.assertEqual(timeout, ts.last_timeout)
1115     self.assertEqual(priority, job.ops[self.curop].priority)
1116
1117     self.gave_lock = True
1118     self.lock_acq_prio = priority
1119
1120     if (self.curop == 3 and
1121         job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1122       # Give locks before running into blocking acquire
1123       assert self.retries == 7
1124       self.retries = 0
1125       self.done_lock_before_blocking = True
1126       return
1127
1128     if self.retries > 0:
1129       self.assert_(timeout is not None)
1130       self.retries -= 1
1131       self.gave_lock = False
1132       raise mcpu.LockAcquireTimeout()
1133
1134     if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1135       assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1136       assert not ts.timeouts
1137       self.assert_(timeout is None)
1138
1139   def _AfterStart(self, op, cbs):
1140     job = self.job
1141
1142     # Setting to "running" requires an update
1143     self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1144     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1145
1146     self.assertFalse(self.queue.IsAcquired())
1147     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1148
1149     # Job is running, cancelling shouldn't be possible
1150     (success, _) = job.Cancel()
1151     self.assertFalse(success)
1152
1153   def _NextOpcode(self):
1154     self.curop = self.opcounter.next()
1155     self.prev_prio = self.job.ops[self.curop].priority
1156     self.prev_status = self.job.ops[self.curop].status
1157
1158   def _NewTimeoutStrategy(self):
1159     job = self.job
1160
1161     self.assertEqual(self.retries, 0)
1162
1163     if self.prev_tsop == self.curop:
1164       # Still on the same opcode, priority must've been increased
1165       self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1166
1167     if self.curop == 1:
1168       # Normal retry
1169       timeouts = range(10, 31, 10)
1170       self.retries = len(timeouts) - 1
1171
1172     elif self.curop == 2:
1173       # Let this run into a blocking acquire
1174       timeouts = range(11, 61, 12)
1175       self.retries = len(timeouts)
1176
1177     elif self.curop == 3:
1178       # Wait for priority to increase, but give lock before blocking acquire
1179       timeouts = range(12, 100, 14)
1180       self.retries = len(timeouts)
1181
1182       self.assertFalse(self.done_lock_before_blocking)
1183
1184     elif self.curop == 4:
1185       self.assert_(self.done_lock_before_blocking)
1186
1187       # Timeouts, but no need to retry
1188       timeouts = range(10, 31, 10)
1189       self.retries = 0
1190
1191     elif self.curop == 5:
1192       # Normal retry
1193       timeouts = range(19, 100, 11)
1194       self.retries = len(timeouts)
1195
1196     else:
1197       timeouts = []
1198       self.retries = 0
1199
1200     assert len(job.ops) == 10
1201     assert self.retries <= len(timeouts)
1202
1203     ts = _FakeTimeoutStrategy(timeouts)
1204
1205     self.timeout_strategy = ts
1206     self.prev_tsop = self.curop
1207     self.prev_prio = job.ops[self.curop].priority
1208
1209     return ts
1210
1211   def testTimeout(self):
1212     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1213            for i in range(10)]
1214
1215     # Create job
1216     job_id = 15801
1217     job = self._CreateJob(self.queue, job_id, ops)
1218     self.job = job
1219
1220     self.opcounter = itertools.count(0)
1221
1222     opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1223                                     self._AfterStart)
1224     tsf = self._NewTimeoutStrategy
1225
1226     self.assertFalse(self.done_lock_before_blocking)
1227
1228     while True:
1229       proc = jqueue._JobProcessor(self.queue, opexec, job,
1230                                   _timeout_strategy_factory=tsf)
1231
1232       self.assertRaises(IndexError, self.queue.GetNextUpdate)
1233
1234       if self.curop is not None:
1235         self.prev_status = self.job.ops[self.curop].status
1236
1237       self.lock_acq_prio = None
1238
1239       result = proc(_nextop_fn=self._NextOpcode)
1240       assert self.curop is not None
1241
1242       if result or self.gave_lock:
1243         # Got lock and/or job is done, result must've been written
1244         self.assertFalse(job.cur_opctx)
1245         self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1246         self.assertRaises(IndexError, self.queue.GetNextUpdate)
1247         self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1248         self.assert_(job.ops[self.curop].exec_timestamp)
1249
1250       if result:
1251         self.assertFalse(job.cur_opctx)
1252         break
1253
1254       self.assertFalse(result)
1255
1256       if self.curop == 0:
1257         self.assertEqual(job.ops[self.curop].start_timestamp,
1258                          job.start_timestamp)
1259
1260       if self.gave_lock:
1261         # Opcode finished, but job not yet done
1262         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1263       else:
1264         # Did not get locks
1265         self.assert_(job.cur_opctx)
1266         self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1267                          self.timeout_strategy.NextAttempt)
1268         self.assertFalse(job.ops[self.curop].exec_timestamp)
1269         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1270
1271         # If priority has changed since acquiring locks, the job must've been
1272         # updated
1273         if self.lock_acq_prio != job.ops[self.curop].priority:
1274           self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1275
1276       self.assertRaises(IndexError, self.queue.GetNextUpdate)
1277
1278       self.assert_(job.start_timestamp)
1279       self.assertFalse(job.end_timestamp)
1280
1281     self.assertEqual(self.curop, len(job.ops) - 1)
1282     self.assertEqual(self.job, job)
1283     self.assertEqual(self.opcounter.next(), len(job.ops))
1284     self.assert_(self.done_lock_before_blocking)
1285
1286     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1287     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1288     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1289     self.assertEqual(job.GetInfo(["opresult"]),
1290                      [[op.input.result for op in job.ops]])
1291     self.assertEqual(job.GetInfo(["opstatus"]),
1292                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1293     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1294                             for op in job.ops))
1295
1296     # Finished jobs can't be processed any further
1297     self.assertRaises(errors.ProgrammerError,
1298                       jqueue._JobProcessor(self.queue, opexec, job))
1299
1300
1301 if __name__ == "__main__":
1302   testutils.GanetiTestProgram()