Implement chained jobs
[ganeti-local] / test / ganeti.jqueue_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.jqueue"""
23
24 import os
25 import sys
26 import unittest
27 import tempfile
28 import shutil
29 import errno
30 import itertools
31
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import jqueue
36 from ganeti import opcodes
37 from ganeti import compat
38 from ganeti import mcpu
39
40 import testutils
41
42
43 class _FakeJob:
44   def __init__(self, job_id, status):
45     self.id = job_id
46     self._status = status
47     self._log = []
48
49   def SetStatus(self, status):
50     self._status = status
51
52   def AddLogEntry(self, msg):
53     self._log.append((len(self._log), msg))
54
55   def CalcStatus(self):
56     return self._status
57
58   def GetInfo(self, fields):
59     result = []
60
61     for name in fields:
62       if name == "status":
63         result.append(self._status)
64       else:
65         raise Exception("Unknown field")
66
67     return result
68
69   def GetLogEntries(self, newer_than):
70     assert newer_than is None or newer_than >= 0
71
72     if newer_than is None:
73       return self._log
74
75     return self._log[newer_than:]
76
77
78 class TestJobChangesChecker(unittest.TestCase):
79   def testStatus(self):
80     job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81     checker = jqueue._JobChangesChecker(["status"], None, None)
82     self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83
84     job.SetStatus(constants.JOB_STATUS_RUNNING)
85     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86
87     job.SetStatus(constants.JOB_STATUS_SUCCESS)
88     self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89
90     # job.id is used by checker
91     self.assertEqual(job.id, 9094)
92
93   def testStatusWithPrev(self):
94     job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95     checker = jqueue._JobChangesChecker(["status"],
96                                         [constants.JOB_STATUS_QUEUED], None)
97     self.assert_(checker(job) is None)
98
99     job.SetStatus(constants.JOB_STATUS_RUNNING)
100     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101
102   def testFinalStatus(self):
103     for status in constants.JOBS_FINALIZED:
104       job = _FakeJob(2178711, status)
105       checker = jqueue._JobChangesChecker(["status"], [status], None)
106       # There won't be any changes in this status, hence it should signal
107       # a change immediately
108       self.assertEqual(checker(job), ([status], []))
109
110   def testLog(self):
111     job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112     checker = jqueue._JobChangesChecker(["status"], None, None)
113     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114
115     job.AddLogEntry("Hello World")
116     (job_info, log_entries) = checker(job)
117     self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118     self.assertEqual(log_entries, [[0, "Hello World"]])
119
120     checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121     self.assert_(checker2(job) is None)
122
123     job.AddLogEntry("Foo Bar")
124     job.SetStatus(constants.JOB_STATUS_ERROR)
125
126     (job_info, log_entries) = checker2(job)
127     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128     self.assertEqual(log_entries, [[1, "Foo Bar"]])
129
130     checker3 = jqueue._JobChangesChecker(["status"], None, None)
131     (job_info, log_entries) = checker3(job)
132     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133     self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134
135
136 class TestJobChangesWaiter(unittest.TestCase):
137   def setUp(self):
138     self.tmpdir = tempfile.mkdtemp()
139     self.filename = utils.PathJoin(self.tmpdir, "job-1")
140     utils.WriteFile(self.filename, data="")
141
142   def tearDown(self):
143     shutil.rmtree(self.tmpdir)
144
145   def _EnsureNotifierClosed(self, notifier):
146     try:
147       os.fstat(notifier._fd)
148     except EnvironmentError, err:
149       self.assertEqual(err.errno, errno.EBADF)
150     else:
151       self.fail("File descriptor wasn't closed")
152
153   def testClose(self):
154     for wait in [False, True]:
155       waiter = jqueue._JobFileChangesWaiter(self.filename)
156       try:
157         if wait:
158           waiter.Wait(0.001)
159       finally:
160         waiter.Close()
161
162       # Ensure file descriptor was closed
163       self._EnsureNotifierClosed(waiter._notifier)
164
165   def testChangingFile(self):
166     waiter = jqueue._JobFileChangesWaiter(self.filename)
167     try:
168       self.assertFalse(waiter.Wait(0.1))
169       utils.WriteFile(self.filename, data="changed")
170       self.assert_(waiter.Wait(60))
171     finally:
172       waiter.Close()
173
174     self._EnsureNotifierClosed(waiter._notifier)
175
176   def testChangingFile2(self):
177     waiter = jqueue._JobChangesWaiter(self.filename)
178     try:
179       self.assertFalse(waiter._filewaiter)
180       self.assert_(waiter.Wait(0.1))
181       self.assert_(waiter._filewaiter)
182
183       # File waiter is now used, but there have been no changes
184       self.assertFalse(waiter.Wait(0.1))
185       utils.WriteFile(self.filename, data="changed")
186       self.assert_(waiter.Wait(60))
187     finally:
188       waiter.Close()
189
190     self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191
192
193 class TestWaitForJobChangesHelper(unittest.TestCase):
194   def setUp(self):
195     self.tmpdir = tempfile.mkdtemp()
196     self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197     utils.WriteFile(self.filename, data="")
198
199   def tearDown(self):
200     shutil.rmtree(self.tmpdir)
201
202   def _LoadWaitingJob(self):
203     return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204
205   def _LoadLostJob(self):
206     return None
207
208   def testNoChanges(self):
209     wfjc = jqueue._WaitForJobChangesHelper()
210
211     # No change
212     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213                           [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214                      constants.JOB_NOTCHANGED)
215
216     # No previous information
217     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218                           ["status"], None, None, 1.0),
219                      ([constants.JOB_STATUS_WAITLOCK], []))
220
221   def testLostJob(self):
222     wfjc = jqueue._WaitForJobChangesHelper()
223     self.assert_(wfjc(self.filename, self._LoadLostJob,
224                       ["status"], None, None, 1.0) is None)
225
226
227 class TestEncodeOpError(unittest.TestCase):
228   def test(self):
229     encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230     self.assert_(isinstance(encerr, tuple))
231     self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232
233     encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234     self.assert_(isinstance(encerr, tuple))
235     self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236
237     encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238     self.assert_(isinstance(encerr, tuple))
239     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240
241     encerr = jqueue._EncodeOpError("Hello World")
242     self.assert_(isinstance(encerr, tuple))
243     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244
245
246 class TestQueuedOpCode(unittest.TestCase):
247   def testDefaults(self):
248     def _Check(op):
249       self.assertFalse(hasattr(op.input, "dry_run"))
250       self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251       self.assertFalse(op.log)
252       self.assert_(op.start_timestamp is None)
253       self.assert_(op.exec_timestamp is None)
254       self.assert_(op.end_timestamp is None)
255       self.assert_(op.result is None)
256       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257
258     op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259     _Check(op1)
260     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261     _Check(op2)
262     self.assertEqual(op1.Serialize(), op2.Serialize())
263
264   def testPriority(self):
265     def _Check(op):
266       assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267              "Default priority equals high priority; test can't work"
268       self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270
271     inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
272     op1 = jqueue._QueuedOpCode(inpop)
273     _Check(op1)
274     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275     _Check(op2)
276     self.assertEqual(op1.Serialize(), op2.Serialize())
277
278
279 class TestQueuedJob(unittest.TestCase):
280   def test(self):
281     self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282                       None, 1, [])
283
284   def testDefaults(self):
285     job_id = 4260
286     ops = [
287       opcodes.OpTagsGet(),
288       opcodes.OpTestDelay(),
289       ]
290
291     def _Check(job):
292       self.assertEqual(job.id, job_id)
293       self.assertEqual(job.log_serial, 0)
294       self.assert_(job.received_timestamp)
295       self.assert_(job.start_timestamp is None)
296       self.assert_(job.end_timestamp is None)
297       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298       self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299       self.assert_(repr(job).startswith("<"))
300       self.assertEqual(len(job.ops), len(ops))
301       self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302                               for (inp, op) in zip(ops, job.ops)))
303       self.assertRaises(errors.OpExecError, job.GetInfo,
304                         ["unknown-field"])
305       self.assertEqual(job.GetInfo(["summary"]),
306                        [[op.input.Summary() for op in job.ops]])
307
308     job1 = jqueue._QueuedJob(None, job_id, ops)
309     _Check(job1)
310     job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311     _Check(job2)
312     self.assertEqual(job1.Serialize(), job2.Serialize())
313
314   def testPriority(self):
315     job_id = 4283
316     ops = [
317       opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
318       opcodes.OpTestDelay(),
319       ]
320
321     def _Check(job):
322       self.assertEqual(job.id, job_id)
323       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324       self.assert_(repr(job).startswith("<"))
325
326     job = jqueue._QueuedJob(None, job_id, ops)
327     _Check(job)
328     self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329                             for op in job.ops))
330     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331
332     # Increase first
333     job.ops[0].priority -= 1
334     _Check(job)
335     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336
337     # Mark opcode as finished
338     job.ops[0].status = constants.OP_STATUS_SUCCESS
339     _Check(job)
340     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341
342     # Increase second
343     job.ops[1].priority -= 10
344     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345
346     # Test increasing first
347     job.ops[0].status = constants.OP_STATUS_RUNNING
348     job.ops[0].priority -= 19
349     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350
351   def testCalcStatus(self):
352     def _Queued(ops):
353       # The default status is "queued"
354       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355                               for op in ops))
356
357     def _Waitlock1(ops):
358       ops[0].status = constants.OP_STATUS_WAITLOCK
359
360     def _Waitlock2(ops):
361       ops[0].status = constants.OP_STATUS_SUCCESS
362       ops[1].status = constants.OP_STATUS_SUCCESS
363       ops[2].status = constants.OP_STATUS_WAITLOCK
364
365     def _Running(ops):
366       ops[0].status = constants.OP_STATUS_SUCCESS
367       ops[1].status = constants.OP_STATUS_RUNNING
368       for op in ops[2:]:
369         op.status = constants.OP_STATUS_QUEUED
370
371     def _Canceling1(ops):
372       ops[0].status = constants.OP_STATUS_SUCCESS
373       ops[1].status = constants.OP_STATUS_SUCCESS
374       for op in ops[2:]:
375         op.status = constants.OP_STATUS_CANCELING
376
377     def _Canceling2(ops):
378       for op in ops:
379         op.status = constants.OP_STATUS_CANCELING
380
381     def _Canceled(ops):
382       for op in ops:
383         op.status = constants.OP_STATUS_CANCELED
384
385     def _Error1(ops):
386       for idx, op in enumerate(ops):
387         if idx > 3:
388           op.status = constants.OP_STATUS_ERROR
389         else:
390           op.status = constants.OP_STATUS_SUCCESS
391
392     def _Error2(ops):
393       for op in ops:
394         op.status = constants.OP_STATUS_ERROR
395
396     def _Success(ops):
397       for op in ops:
398         op.status = constants.OP_STATUS_SUCCESS
399
400     tests = {
401       constants.JOB_STATUS_QUEUED: [_Queued],
402       constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403       constants.JOB_STATUS_RUNNING: [_Running],
404       constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405       constants.JOB_STATUS_CANCELED: [_Canceled],
406       constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407       constants.JOB_STATUS_SUCCESS: [_Success],
408       }
409
410     def _NewJob():
411       job = jqueue._QueuedJob(None, 1,
412                               [opcodes.OpTestDelay() for _ in range(10)])
413       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415                               for op in job.ops))
416       return job
417
418     for status in constants.JOB_STATUS_ALL:
419       sttests = tests[status]
420       assert sttests
421       for fn in sttests:
422         job = _NewJob()
423         fn(job.ops)
424         self.assertEqual(job.CalcStatus(), status)
425
426
427 class _FakeDependencyManager:
428   def __init__(self):
429     self._checks = []
430     self._notifications = []
431     self._waiting = set()
432
433   def AddCheckResult(self, job, dep_job_id, dep_status, result):
434     self._checks.append((job, dep_job_id, dep_status, result))
435
436   def CountPendingResults(self):
437     return len(self._checks)
438
439   def CountWaitingJobs(self):
440     return len(self._waiting)
441
442   def GetNextNotification(self):
443     return self._notifications.pop(0)
444
445   def JobWaiting(self, job):
446     return job in self._waiting
447
448   def CheckAndRegister(self, job, dep_job_id, dep_status):
449     (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
450
451     assert exp_job == job
452     assert exp_dep_job_id == dep_job_id
453     assert exp_dep_status == dep_status
454
455     (result_status, _) = result
456
457     if result_status == jqueue._JobDependencyManager.WAIT:
458       self._waiting.add(job)
459     elif result_status == jqueue._JobDependencyManager.CONTINUE:
460       self._waiting.remove(job)
461
462     return result
463
464   def NotifyWaiters(self, job_id):
465     self._notifications.append(job_id)
466
467
468 class _DisabledFakeDependencyManager:
469   def JobWaiting(self, _):
470     return False
471
472   def CheckAndRegister(self, *args):
473     assert False, "Should not be called"
474
475   def NotifyWaiters(self, _):
476     pass
477
478
479 class _FakeQueueForProc:
480   def __init__(self, depmgr=None):
481     self._acquired = False
482     self._updates = []
483     self._submitted = []
484
485     self._submit_count = itertools.count(1000)
486
487     if depmgr:
488       self.depmgr = depmgr
489     else:
490       self.depmgr = _DisabledFakeDependencyManager()
491
492   def IsAcquired(self):
493     return self._acquired
494
495   def GetNextUpdate(self):
496     return self._updates.pop(0)
497
498   def GetNextSubmittedJob(self):
499     return self._submitted.pop(0)
500
501   def acquire(self, shared=0):
502     assert shared == 1
503     self._acquired = True
504
505   def release(self):
506     assert self._acquired
507     self._acquired = False
508
509   def UpdateJobUnlocked(self, job, replicate=True):
510     assert self._acquired, "Lock not acquired while updating job"
511     self._updates.append((job, bool(replicate)))
512
513   def SubmitManyJobs(self, jobs):
514     assert not self._acquired, "Lock acquired while submitting jobs"
515     job_ids = [self._submit_count.next() for _ in jobs]
516     self._submitted.extend(zip(job_ids, jobs))
517     return job_ids
518
519
520 class _FakeExecOpCodeForProc:
521   def __init__(self, queue, before_start, after_start):
522     self._queue = queue
523     self._before_start = before_start
524     self._after_start = after_start
525
526   def __call__(self, op, cbs, timeout=None, priority=None):
527     assert isinstance(op, opcodes.OpTestDummy)
528     assert not self._queue.IsAcquired(), \
529            "Queue lock not released when executing opcode"
530
531     if self._before_start:
532       self._before_start(timeout, priority)
533
534     cbs.NotifyStart()
535
536     if self._after_start:
537       self._after_start(op, cbs)
538
539     # Check again after the callbacks
540     assert not self._queue.IsAcquired()
541
542     if op.fail:
543       raise errors.OpExecError("Error requested (%s)" % op.result)
544
545     if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
546       return cbs.SubmitManyJobs(op.submit_jobs)
547
548     return op.result
549
550
551 class _JobProcessorTestUtils:
552   def _CreateJob(self, queue, job_id, ops):
553     job = jqueue._QueuedJob(queue, job_id, ops)
554     self.assertFalse(job.start_timestamp)
555     self.assertFalse(job.end_timestamp)
556     self.assertEqual(len(ops), len(job.ops))
557     self.assert_(compat.all(op.input == inp
558                             for (op, inp) in zip(job.ops, ops)))
559     self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
560     return job
561
562
563 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
564   def _GenericCheckJob(self, job):
565     assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
566                       for op in job.ops)
567
568     self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
569                      [[op.start_timestamp for op in job.ops],
570                       [op.exec_timestamp for op in job.ops],
571                       [op.end_timestamp for op in job.ops]])
572     self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
573                      [job.received_timestamp,
574                       job.start_timestamp,
575                       job.end_timestamp])
576     self.assert_(job.start_timestamp)
577     self.assert_(job.end_timestamp)
578     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
579
580   def testSuccess(self):
581     queue = _FakeQueueForProc()
582
583     for (job_id, opcount) in [(25351, 1), (6637, 3),
584                               (24644, 10), (32207, 100)]:
585       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
586              for i in range(opcount)]
587
588       # Create job
589       job = self._CreateJob(queue, job_id, ops)
590
591       def _BeforeStart(timeout, priority):
592         self.assertEqual(queue.GetNextUpdate(), (job, True))
593         self.assertRaises(IndexError, queue.GetNextUpdate)
594         self.assertFalse(queue.IsAcquired())
595         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
596         self.assertFalse(job.cur_opctx)
597
598       def _AfterStart(op, cbs):
599         self.assertEqual(queue.GetNextUpdate(), (job, True))
600         self.assertRaises(IndexError, queue.GetNextUpdate)
601
602         self.assertFalse(queue.IsAcquired())
603         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
604         self.assertFalse(job.cur_opctx)
605
606         # Job is running, cancelling shouldn't be possible
607         (success, _) = job.Cancel()
608         self.assertFalse(success)
609
610       opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
611
612       for idx in range(len(ops)):
613         self.assertRaises(IndexError, queue.GetNextUpdate)
614         result = jqueue._JobProcessor(queue, opexec, job)()
615         self.assertEqual(queue.GetNextUpdate(), (job, True))
616         self.assertRaises(IndexError, queue.GetNextUpdate)
617         if idx == len(ops) - 1:
618           # Last opcode
619           self.assert_(result)
620         else:
621           self.assertFalse(result)
622
623           self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
624           self.assert_(job.start_timestamp)
625           self.assertFalse(job.end_timestamp)
626
627       self.assertRaises(IndexError, queue.GetNextUpdate)
628
629       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
630       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
631       self.assertEqual(job.GetInfo(["opresult"]),
632                        [[op.input.result for op in job.ops]])
633       self.assertEqual(job.GetInfo(["opstatus"]),
634                        [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
635       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
636                               for op in job.ops))
637
638       self._GenericCheckJob(job)
639
640       # Calling the processor on a finished job should be a no-op
641       self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
642       self.assertRaises(IndexError, queue.GetNextUpdate)
643
644   def testOpcodeError(self):
645     queue = _FakeQueueForProc()
646
647     testdata = [
648       (17077, 1, 0, 0),
649       (1782, 5, 2, 2),
650       (18179, 10, 9, 9),
651       (4744, 10, 3, 8),
652       (23816, 100, 39, 45),
653       ]
654
655     for (job_id, opcount, failfrom, failto) in testdata:
656       # Prepare opcodes
657       ops = [opcodes.OpTestDummy(result="Res%s" % i,
658                                  fail=(failfrom <= i and
659                                        i <= failto))
660              for i in range(opcount)]
661
662       # Create job
663       job = self._CreateJob(queue, job_id, ops)
664
665       opexec = _FakeExecOpCodeForProc(queue, None, None)
666
667       for idx in range(len(ops)):
668         self.assertRaises(IndexError, queue.GetNextUpdate)
669         result = jqueue._JobProcessor(queue, opexec, job)()
670         # queued to waitlock
671         self.assertEqual(queue.GetNextUpdate(), (job, True))
672         # waitlock to running
673         self.assertEqual(queue.GetNextUpdate(), (job, True))
674         # Opcode result
675         self.assertEqual(queue.GetNextUpdate(), (job, True))
676         self.assertRaises(IndexError, queue.GetNextUpdate)
677
678         if idx in (failfrom, len(ops) - 1):
679           # Last opcode
680           self.assert_(result)
681           break
682
683         self.assertFalse(result)
684
685         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
686
687       self.assertRaises(IndexError, queue.GetNextUpdate)
688
689       # Check job status
690       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
691       self.assertEqual(job.GetInfo(["id"]), [job_id])
692       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
693
694       # Check opcode status
695       data = zip(job.ops,
696                  job.GetInfo(["opstatus"])[0],
697                  job.GetInfo(["opresult"])[0])
698
699       for idx, (op, opstatus, opresult) in enumerate(data):
700         if idx < failfrom:
701           assert not op.input.fail
702           self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
703           self.assertEqual(opresult, op.input.result)
704         elif idx <= failto:
705           assert op.input.fail
706           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
707           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
708         else:
709           assert not op.input.fail
710           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
711           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
712
713       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
714                               for op in job.ops[:failfrom]))
715
716       self._GenericCheckJob(job)
717
718       # Calling the processor on a finished job should be a no-op
719       self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
720       self.assertRaises(IndexError, queue.GetNextUpdate)
721
722   def testCancelWhileInQueue(self):
723     queue = _FakeQueueForProc()
724
725     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
726            for i in range(5)]
727
728     # Create job
729     job_id = 17045
730     job = self._CreateJob(queue, job_id, ops)
731
732     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
733
734     # Mark as cancelled
735     (success, _) = job.Cancel()
736     self.assert_(success)
737
738     self.assertRaises(IndexError, queue.GetNextUpdate)
739
740     self.assertFalse(job.start_timestamp)
741     self.assertTrue(job.end_timestamp)
742     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
743                             for op in job.ops))
744
745     # Serialize to check for differences
746     before_proc = job.Serialize()
747
748     # Simulate processor called in workerpool
749     opexec = _FakeExecOpCodeForProc(queue, None, None)
750     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
751
752     # Check result
753     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
754     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
755     self.assertFalse(job.start_timestamp)
756     self.assertTrue(job.end_timestamp)
757     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
758                                 for op in job.ops))
759     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
760                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
761                       ["Job canceled by request" for _ in job.ops]])
762
763     # Must not have changed or written
764     self.assertEqual(before_proc, job.Serialize())
765     self.assertRaises(IndexError, queue.GetNextUpdate)
766
767   def testCancelWhileWaitlockInQueue(self):
768     queue = _FakeQueueForProc()
769
770     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
771            for i in range(5)]
772
773     # Create job
774     job_id = 8645
775     job = self._CreateJob(queue, job_id, ops)
776
777     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
778
779     job.ops[0].status = constants.OP_STATUS_WAITLOCK
780
781     assert len(job.ops) == 5
782
783     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
784
785     # Mark as cancelling
786     (success, _) = job.Cancel()
787     self.assert_(success)
788
789     self.assertRaises(IndexError, queue.GetNextUpdate)
790
791     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
792                             for op in job.ops))
793
794     opexec = _FakeExecOpCodeForProc(queue, None, None)
795     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
796
797     # Check result
798     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
799     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
800     self.assertFalse(job.start_timestamp)
801     self.assert_(job.end_timestamp)
802     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
803                                 for op in job.ops))
804     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
805                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
806                       ["Job canceled by request" for _ in job.ops]])
807
808   def testCancelWhileWaitlock(self):
809     queue = _FakeQueueForProc()
810
811     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
812            for i in range(5)]
813
814     # Create job
815     job_id = 11009
816     job = self._CreateJob(queue, job_id, ops)
817
818     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
819
820     def _BeforeStart(timeout, priority):
821       self.assertEqual(queue.GetNextUpdate(), (job, True))
822       self.assertRaises(IndexError, queue.GetNextUpdate)
823       self.assertFalse(queue.IsAcquired())
824       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
825
826       # Mark as cancelled
827       (success, _) = job.Cancel()
828       self.assert_(success)
829
830       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
831                               for op in job.ops))
832       self.assertRaises(IndexError, queue.GetNextUpdate)
833
834     def _AfterStart(op, cbs):
835       self.assertEqual(queue.GetNextUpdate(), (job, True))
836       self.assertRaises(IndexError, queue.GetNextUpdate)
837       self.assertFalse(queue.IsAcquired())
838       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
839
840     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
841
842     self.assertRaises(IndexError, queue.GetNextUpdate)
843     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
844     self.assertEqual(queue.GetNextUpdate(), (job, True))
845     self.assertRaises(IndexError, queue.GetNextUpdate)
846
847     # Check result
848     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
849     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
850     self.assert_(job.start_timestamp)
851     self.assert_(job.end_timestamp)
852     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
853                                 for op in job.ops))
854     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
855                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
856                       ["Job canceled by request" for _ in job.ops]])
857
858   def testCancelWhileWaitlockWithTimeout(self):
859     queue = _FakeQueueForProc()
860
861     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
862            for i in range(5)]
863
864     # Create job
865     job_id = 24314
866     job = self._CreateJob(queue, job_id, ops)
867
868     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
869
870     def _BeforeStart(timeout, priority):
871       self.assertFalse(queue.IsAcquired())
872       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
873
874       # Mark as cancelled
875       (success, _) = job.Cancel()
876       self.assert_(success)
877
878       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
879                               for op in job.ops))
880
881       # Fake an acquire attempt timing out
882       raise mcpu.LockAcquireTimeout()
883
884     def _AfterStart(op, cbs):
885       self.fail("Should not reach this")
886
887     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
888
889     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
890
891     # Check result
892     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
893     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
894     self.assert_(job.start_timestamp)
895     self.assert_(job.end_timestamp)
896     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
897                                 for op in job.ops))
898     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
899                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
900                       ["Job canceled by request" for _ in job.ops]])
901
902   def testCancelWhileRunning(self):
903     # Tests canceling a job with finished opcodes and more, unprocessed ones
904     queue = _FakeQueueForProc()
905
906     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
907            for i in range(3)]
908
909     # Create job
910     job_id = 28492
911     job = self._CreateJob(queue, job_id, ops)
912
913     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
914
915     opexec = _FakeExecOpCodeForProc(queue, None, None)
916
917     # Run one opcode
918     self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
919
920     # Job goes back to queued
921     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
922     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
923                      [[constants.OP_STATUS_SUCCESS,
924                        constants.OP_STATUS_QUEUED,
925                        constants.OP_STATUS_QUEUED],
926                       ["Res0", None, None]])
927
928     # Mark as cancelled
929     (success, _) = job.Cancel()
930     self.assert_(success)
931
932     # Try processing another opcode (this will actually cancel the job)
933     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
934
935     # Check result
936     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
937     self.assertEqual(job.GetInfo(["id"]), [job_id])
938     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
939     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
940                      [[constants.OP_STATUS_SUCCESS,
941                        constants.OP_STATUS_CANCELED,
942                        constants.OP_STATUS_CANCELED],
943                       ["Res0", "Job canceled by request",
944                        "Job canceled by request"]])
945
946   def testPartiallyRun(self):
947     # Tests calling the processor on a job that's been partially run before the
948     # program was restarted
949     queue = _FakeQueueForProc()
950
951     opexec = _FakeExecOpCodeForProc(queue, None, None)
952
953     for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
954       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
955              for i in range(10)]
956
957       # Create job
958       job = self._CreateJob(queue, job_id, ops)
959
960       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
961
962       for _ in range(successcount):
963         self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
964
965       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
966       self.assertEqual(job.GetInfo(["opstatus"]),
967                        [[constants.OP_STATUS_SUCCESS
968                          for _ in range(successcount)] +
969                         [constants.OP_STATUS_QUEUED
970                          for _ in range(len(ops) - successcount)]])
971
972       self.assert_(job.ops_iter)
973
974       # Serialize and restore (simulates program restart)
975       newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
976       self.assertFalse(newjob.ops_iter)
977       self._TestPartial(newjob, successcount)
978
979   def _TestPartial(self, job, successcount):
980     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
981     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
982
983     queue = _FakeQueueForProc()
984     opexec = _FakeExecOpCodeForProc(queue, None, None)
985
986     for remaining in reversed(range(len(job.ops) - successcount)):
987       result = jqueue._JobProcessor(queue, opexec, job)()
988       self.assertEqual(queue.GetNextUpdate(), (job, True))
989       self.assertEqual(queue.GetNextUpdate(), (job, True))
990       self.assertEqual(queue.GetNextUpdate(), (job, True))
991       self.assertRaises(IndexError, queue.GetNextUpdate)
992
993       if remaining == 0:
994         # Last opcode
995         self.assert_(result)
996         break
997
998       self.assertFalse(result)
999
1000       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1001
1002     self.assertRaises(IndexError, queue.GetNextUpdate)
1003     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1004     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1005     self.assertEqual(job.GetInfo(["opresult"]),
1006                      [[op.input.result for op in job.ops]])
1007     self.assertEqual(job.GetInfo(["opstatus"]),
1008                      [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1009     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1010                             for op in job.ops))
1011
1012     self._GenericCheckJob(job)
1013
1014     # Calling the processor on a finished job should be a no-op
1015     self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1016     self.assertRaises(IndexError, queue.GetNextUpdate)
1017
1018     # ... also after being restored
1019     job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
1020     # Calling the processor on a finished job should be a no-op
1021     self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
1022     self.assertRaises(IndexError, queue.GetNextUpdate)
1023
1024   def testProcessorOnRunningJob(self):
1025     ops = [opcodes.OpTestDummy(result="result", fail=False)]
1026
1027     queue = _FakeQueueForProc()
1028     opexec = _FakeExecOpCodeForProc(queue, None, None)
1029
1030     # Create job
1031     job = self._CreateJob(queue, 9571, ops)
1032
1033     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1034
1035     job.ops[0].status = constants.OP_STATUS_RUNNING
1036
1037     assert len(job.ops) == 1
1038
1039     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1040
1041     # Calling on running job must fail
1042     self.assertRaises(errors.ProgrammerError,
1043                       jqueue._JobProcessor(queue, opexec, job))
1044
1045   def testLogMessages(self):
1046     # Tests the "Feedback" callback function
1047     queue = _FakeQueueForProc()
1048
1049     messages = {
1050       1: [
1051         (None, "Hello"),
1052         (None, "World"),
1053         (constants.ELOG_MESSAGE, "there"),
1054         ],
1055       4: [
1056         (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1057         (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1058         ],
1059       }
1060     ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1061                                messages=messages.get(i, []))
1062            for i in range(5)]
1063
1064     # Create job
1065     job = self._CreateJob(queue, 29386, ops)
1066
1067     def _BeforeStart(timeout, priority):
1068       self.assertEqual(queue.GetNextUpdate(), (job, True))
1069       self.assertRaises(IndexError, queue.GetNextUpdate)
1070       self.assertFalse(queue.IsAcquired())
1071       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1072
1073     def _AfterStart(op, cbs):
1074       self.assertEqual(queue.GetNextUpdate(), (job, True))
1075       self.assertRaises(IndexError, queue.GetNextUpdate)
1076       self.assertFalse(queue.IsAcquired())
1077       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1078
1079       self.assertRaises(AssertionError, cbs.Feedback,
1080                         "too", "many", "arguments")
1081
1082       for (log_type, msg) in op.messages:
1083         self.assertRaises(IndexError, queue.GetNextUpdate)
1084         if log_type:
1085           cbs.Feedback(log_type, msg)
1086         else:
1087           cbs.Feedback(msg)
1088         # Check for job update without replication
1089         self.assertEqual(queue.GetNextUpdate(), (job, False))
1090         self.assertRaises(IndexError, queue.GetNextUpdate)
1091
1092     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1093
1094     for remaining in reversed(range(len(job.ops))):
1095       self.assertRaises(IndexError, queue.GetNextUpdate)
1096       result = jqueue._JobProcessor(queue, opexec, job)()
1097       self.assertEqual(queue.GetNextUpdate(), (job, True))
1098       self.assertRaises(IndexError, queue.GetNextUpdate)
1099
1100       if remaining == 0:
1101         # Last opcode
1102         self.assert_(result)
1103         break
1104
1105       self.assertFalse(result)
1106
1107       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1108
1109     self.assertRaises(IndexError, queue.GetNextUpdate)
1110
1111     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1112     self.assertEqual(job.GetInfo(["opresult"]),
1113                      [[op.input.result for op in job.ops]])
1114
1115     logmsgcount = sum(len(m) for m in messages.values())
1116
1117     self._CheckLogMessages(job, logmsgcount)
1118
1119     # Serialize and restore (simulates program restart)
1120     newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
1121     self._CheckLogMessages(newjob, logmsgcount)
1122
1123     # Check each message
1124     prevserial = -1
1125     for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1126       for (serial, timestamp, log_type, msg) in oplog:
1127         (exptype, expmsg) = messages.get(idx).pop(0)
1128         if exptype:
1129           self.assertEqual(log_type, exptype)
1130         else:
1131           self.assertEqual(log_type, constants.ELOG_MESSAGE)
1132         self.assertEqual(expmsg, msg)
1133         self.assert_(serial > prevserial)
1134         prevserial = serial
1135
1136   def _CheckLogMessages(self, job, count):
1137     # Check serial
1138     self.assertEqual(job.log_serial, count)
1139
1140     # No filter
1141     self.assertEqual(job.GetLogEntries(None),
1142                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1143                       for entry in entries])
1144
1145     # Filter with serial
1146     assert count > 3
1147     self.assert_(job.GetLogEntries(3))
1148     self.assertEqual(job.GetLogEntries(3),
1149                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1150                       for entry in entries][3:])
1151
1152     # No log message after highest serial
1153     self.assertFalse(job.GetLogEntries(count))
1154     self.assertFalse(job.GetLogEntries(count + 3))
1155
1156   def testSubmitManyJobs(self):
1157     queue = _FakeQueueForProc()
1158
1159     job_id = 15656
1160     ops = [
1161       opcodes.OpTestDummy(result="Res0", fail=False,
1162                           submit_jobs=[]),
1163       opcodes.OpTestDummy(result="Res1", fail=False,
1164                           submit_jobs=[
1165                             [opcodes.OpTestDummy(result="r1j0", fail=False)],
1166                             ]),
1167       opcodes.OpTestDummy(result="Res2", fail=False,
1168                           submit_jobs=[
1169                             [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1170                              opcodes.OpTestDummy(result="r2j0o1", fail=False),
1171                              opcodes.OpTestDummy(result="r2j0o2", fail=False),
1172                              opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1173                             [opcodes.OpTestDummy(result="r2j1", fail=False)],
1174                             [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1175                              opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1176                             ]),
1177       ]
1178
1179     # Create job
1180     job = self._CreateJob(queue, job_id, ops)
1181
1182     def _BeforeStart(timeout, priority):
1183       self.assertEqual(queue.GetNextUpdate(), (job, True))
1184       self.assertRaises(IndexError, queue.GetNextUpdate)
1185       self.assertFalse(queue.IsAcquired())
1186       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1187       self.assertFalse(job.cur_opctx)
1188
1189     def _AfterStart(op, cbs):
1190       self.assertEqual(queue.GetNextUpdate(), (job, True))
1191       self.assertRaises(IndexError, queue.GetNextUpdate)
1192
1193       self.assertFalse(queue.IsAcquired())
1194       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1195       self.assertFalse(job.cur_opctx)
1196
1197       # Job is running, cancelling shouldn't be possible
1198       (success, _) = job.Cancel()
1199       self.assertFalse(success)
1200
1201     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1202
1203     for idx in range(len(ops)):
1204       self.assertRaises(IndexError, queue.GetNextUpdate)
1205       result = jqueue._JobProcessor(queue, opexec, job)()
1206       self.assertEqual(queue.GetNextUpdate(), (job, True))
1207       self.assertRaises(IndexError, queue.GetNextUpdate)
1208       if idx == len(ops) - 1:
1209         # Last opcode
1210         self.assert_(result)
1211       else:
1212         self.assertFalse(result)
1213
1214         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1215         self.assert_(job.start_timestamp)
1216         self.assertFalse(job.end_timestamp)
1217
1218     self.assertRaises(IndexError, queue.GetNextUpdate)
1219
1220     for idx, submitted_ops in enumerate(job_ops
1221                                         for op in ops
1222                                         for job_ops in op.submit_jobs):
1223       self.assertEqual(queue.GetNextSubmittedJob(),
1224                        (1000 + idx, submitted_ops))
1225     self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1226
1227     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1228     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1229     self.assertEqual(job.GetInfo(["opresult"]),
1230                      [[[], [1000], [1001, 1002, 1003]]])
1231     self.assertEqual(job.GetInfo(["opstatus"]),
1232                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1233
1234     self._GenericCheckJob(job)
1235
1236     # Calling the processor on a finished job should be a no-op
1237     self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1238     self.assertRaises(IndexError, queue.GetNextUpdate)
1239
1240   def testJobDependency(self):
1241     depmgr = _FakeDependencyManager()
1242     queue = _FakeQueueForProc(depmgr=depmgr)
1243
1244     self.assertEqual(queue.depmgr, depmgr)
1245
1246     prev_job_id = 22113
1247     prev_job_id2 = 28102
1248     job_id = 29929
1249     ops = [
1250       opcodes.OpTestDummy(result="Res0", fail=False,
1251                           depends=[
1252                             [prev_job_id2, None],
1253                             [prev_job_id, None],
1254                             ]),
1255       opcodes.OpTestDummy(result="Res1", fail=False),
1256       ]
1257
1258     # Create job
1259     job = self._CreateJob(queue, job_id, ops)
1260
1261     def _BeforeStart(timeout, priority):
1262       if attempt == 0 or attempt > 5:
1263         # Job should only be updated when it wasn't waiting for another job
1264         self.assertEqual(queue.GetNextUpdate(), (job, True))
1265       self.assertRaises(IndexError, queue.GetNextUpdate)
1266       self.assertFalse(queue.IsAcquired())
1267       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1268       self.assertFalse(job.cur_opctx)
1269
1270     def _AfterStart(op, cbs):
1271       self.assertEqual(queue.GetNextUpdate(), (job, True))
1272       self.assertRaises(IndexError, queue.GetNextUpdate)
1273
1274       self.assertFalse(queue.IsAcquired())
1275       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1276       self.assertFalse(job.cur_opctx)
1277
1278       # Job is running, cancelling shouldn't be possible
1279       (success, _) = job.Cancel()
1280       self.assertFalse(success)
1281
1282     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1283
1284     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1285
1286     counter = itertools.count()
1287     while True:
1288       attempt = counter.next()
1289
1290       self.assertRaises(IndexError, queue.GetNextUpdate)
1291       self.assertRaises(IndexError, depmgr.GetNextNotification)
1292
1293       if attempt < 2:
1294         depmgr.AddCheckResult(job, prev_job_id2, None,
1295                               (jqueue._JobDependencyManager.WAIT, "wait2"))
1296       elif attempt == 2:
1297         depmgr.AddCheckResult(job, prev_job_id2, None,
1298                               (jqueue._JobDependencyManager.CONTINUE, "cont"))
1299         # The processor will ask for the next dependency immediately
1300         depmgr.AddCheckResult(job, prev_job_id, None,
1301                               (jqueue._JobDependencyManager.WAIT, "wait"))
1302       elif attempt < 5:
1303         depmgr.AddCheckResult(job, prev_job_id, None,
1304                               (jqueue._JobDependencyManager.WAIT, "wait"))
1305       elif attempt == 5:
1306         depmgr.AddCheckResult(job, prev_job_id, None,
1307                               (jqueue._JobDependencyManager.CONTINUE, "cont"))
1308       if attempt == 2:
1309         self.assertEqual(depmgr.CountPendingResults(), 2)
1310       elif attempt > 5:
1311         self.assertEqual(depmgr.CountPendingResults(), 0)
1312       else:
1313         self.assertEqual(depmgr.CountPendingResults(), 1)
1314
1315       result = jqueue._JobProcessor(queue, opexec, job)()
1316       if attempt == 0 or attempt >= 5:
1317         # Job should only be updated if there was an actual change
1318         self.assertEqual(queue.GetNextUpdate(), (job, True))
1319       self.assertRaises(IndexError, queue.GetNextUpdate)
1320       self.assertFalse(depmgr.CountPendingResults())
1321
1322       if attempt < 5:
1323         # Simulate waiting for other job
1324         self.assertTrue(result)
1325         self.assertTrue(job.cur_opctx)
1326         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1327         self.assertRaises(IndexError, depmgr.GetNextNotification)
1328         self.assert_(job.start_timestamp)
1329         self.assertFalse(job.end_timestamp)
1330         continue
1331
1332       if result:
1333         # Last opcode
1334         self.assertFalse(job.cur_opctx)
1335         self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1336         break
1337
1338       self.assertRaises(IndexError, depmgr.GetNextNotification)
1339
1340       self.assertFalse(result)
1341       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1342       self.assert_(job.start_timestamp)
1343       self.assertFalse(job.end_timestamp)
1344
1345     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1346     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1347     self.assertEqual(job.GetInfo(["opresult"]),
1348                      [[op.input.result for op in job.ops]])
1349     self.assertEqual(job.GetInfo(["opstatus"]),
1350                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1351     self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1352                                for op in job.ops))
1353
1354     self._GenericCheckJob(job)
1355
1356     self.assertRaises(IndexError, queue.GetNextUpdate)
1357     self.assertRaises(IndexError, depmgr.GetNextNotification)
1358     self.assertFalse(depmgr.CountPendingResults())
1359     self.assertFalse(depmgr.CountWaitingJobs())
1360
1361     # Calling the processor on a finished job should be a no-op
1362     self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1363     self.assertRaises(IndexError, queue.GetNextUpdate)
1364
1365   def testJobDependencyCancel(self):
1366     depmgr = _FakeDependencyManager()
1367     queue = _FakeQueueForProc(depmgr=depmgr)
1368
1369     self.assertEqual(queue.depmgr, depmgr)
1370
1371     prev_job_id = 13623
1372     job_id = 30876
1373     ops = [
1374       opcodes.OpTestDummy(result="Res0", fail=False),
1375       opcodes.OpTestDummy(result="Res1", fail=False,
1376                           depends=[
1377                             [prev_job_id, None],
1378                             ]),
1379       opcodes.OpTestDummy(result="Res2", fail=False),
1380       ]
1381
1382     # Create job
1383     job = self._CreateJob(queue, job_id, ops)
1384
1385     def _BeforeStart(timeout, priority):
1386       if attempt == 0 or attempt > 5:
1387         # Job should only be updated when it wasn't waiting for another job
1388         self.assertEqual(queue.GetNextUpdate(), (job, True))
1389       self.assertRaises(IndexError, queue.GetNextUpdate)
1390       self.assertFalse(queue.IsAcquired())
1391       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1392       self.assertFalse(job.cur_opctx)
1393
1394     def _AfterStart(op, cbs):
1395       self.assertEqual(queue.GetNextUpdate(), (job, True))
1396       self.assertRaises(IndexError, queue.GetNextUpdate)
1397
1398       self.assertFalse(queue.IsAcquired())
1399       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1400       self.assertFalse(job.cur_opctx)
1401
1402       # Job is running, cancelling shouldn't be possible
1403       (success, _) = job.Cancel()
1404       self.assertFalse(success)
1405
1406     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1407
1408     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1409
1410     counter = itertools.count()
1411     while True:
1412       attempt = counter.next()
1413
1414       self.assertRaises(IndexError, queue.GetNextUpdate)
1415       self.assertRaises(IndexError, depmgr.GetNextNotification)
1416
1417       if attempt == 0:
1418         # This will handle the first opcode
1419         pass
1420       elif attempt < 4:
1421         depmgr.AddCheckResult(job, prev_job_id, None,
1422                               (jqueue._JobDependencyManager.WAIT, "wait"))
1423       elif attempt == 4:
1424         # Other job was cancelled
1425         depmgr.AddCheckResult(job, prev_job_id, None,
1426                               (jqueue._JobDependencyManager.CANCEL, "cancel"))
1427
1428       if attempt == 0:
1429         self.assertEqual(depmgr.CountPendingResults(), 0)
1430       else:
1431         self.assertEqual(depmgr.CountPendingResults(), 1)
1432
1433       result = jqueue._JobProcessor(queue, opexec, job)()
1434       if attempt <= 1 or attempt >= 4:
1435         # Job should only be updated if there was an actual change
1436         self.assertEqual(queue.GetNextUpdate(), (job, True))
1437       self.assertRaises(IndexError, queue.GetNextUpdate)
1438       self.assertFalse(depmgr.CountPendingResults())
1439
1440       if attempt > 0 and attempt < 4:
1441         # Simulate waiting for other job
1442         self.assertTrue(result)
1443         self.assertTrue(job.cur_opctx)
1444         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1445         self.assertRaises(IndexError, depmgr.GetNextNotification)
1446         self.assert_(job.start_timestamp)
1447         self.assertFalse(job.end_timestamp)
1448         continue
1449
1450       if result:
1451         # Last opcode
1452         self.assertFalse(job.cur_opctx)
1453         self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1454         break
1455
1456       self.assertRaises(IndexError, depmgr.GetNextNotification)
1457
1458       self.assertFalse(result)
1459       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1460       self.assert_(job.start_timestamp)
1461       self.assertFalse(job.end_timestamp)
1462
1463     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1464     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1465     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1466                      [[constants.OP_STATUS_SUCCESS,
1467                        constants.OP_STATUS_CANCELED,
1468                        constants.OP_STATUS_CANCELED],
1469                       ["Res0", "Job canceled by request",
1470                        "Job canceled by request"]])
1471
1472     self._GenericCheckJob(job)
1473
1474     self.assertRaises(IndexError, queue.GetNextUpdate)
1475     self.assertRaises(IndexError, depmgr.GetNextNotification)
1476     self.assertFalse(depmgr.CountPendingResults())
1477
1478     # Calling the processor on a finished job should be a no-op
1479     self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1480     self.assertRaises(IndexError, queue.GetNextUpdate)
1481
1482   def testJobDependencyWrongstatus(self):
1483     depmgr = _FakeDependencyManager()
1484     queue = _FakeQueueForProc(depmgr=depmgr)
1485
1486     self.assertEqual(queue.depmgr, depmgr)
1487
1488     prev_job_id = 9741
1489     job_id = 11763
1490     ops = [
1491       opcodes.OpTestDummy(result="Res0", fail=False),
1492       opcodes.OpTestDummy(result="Res1", fail=False,
1493                           depends=[
1494                             [prev_job_id, None],
1495                             ]),
1496       opcodes.OpTestDummy(result="Res2", fail=False),
1497       ]
1498
1499     # Create job
1500     job = self._CreateJob(queue, job_id, ops)
1501
1502     def _BeforeStart(timeout, priority):
1503       if attempt == 0 or attempt > 5:
1504         # Job should only be updated when it wasn't waiting for another job
1505         self.assertEqual(queue.GetNextUpdate(), (job, True))
1506       self.assertRaises(IndexError, queue.GetNextUpdate)
1507       self.assertFalse(queue.IsAcquired())
1508       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1509       self.assertFalse(job.cur_opctx)
1510
1511     def _AfterStart(op, cbs):
1512       self.assertEqual(queue.GetNextUpdate(), (job, True))
1513       self.assertRaises(IndexError, queue.GetNextUpdate)
1514
1515       self.assertFalse(queue.IsAcquired())
1516       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1517       self.assertFalse(job.cur_opctx)
1518
1519       # Job is running, cancelling shouldn't be possible
1520       (success, _) = job.Cancel()
1521       self.assertFalse(success)
1522
1523     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1524
1525     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1526
1527     counter = itertools.count()
1528     while True:
1529       attempt = counter.next()
1530
1531       self.assertRaises(IndexError, queue.GetNextUpdate)
1532       self.assertRaises(IndexError, depmgr.GetNextNotification)
1533
1534       if attempt == 0:
1535         # This will handle the first opcode
1536         pass
1537       elif attempt < 4:
1538         depmgr.AddCheckResult(job, prev_job_id, None,
1539                               (jqueue._JobDependencyManager.WAIT, "wait"))
1540       elif attempt == 4:
1541         # Other job failed
1542         depmgr.AddCheckResult(job, prev_job_id, None,
1543                               (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1544
1545       if attempt == 0:
1546         self.assertEqual(depmgr.CountPendingResults(), 0)
1547       else:
1548         self.assertEqual(depmgr.CountPendingResults(), 1)
1549
1550       result = jqueue._JobProcessor(queue, opexec, job)()
1551       if attempt <= 1 or attempt >= 4:
1552         # Job should only be updated if there was an actual change
1553         self.assertEqual(queue.GetNextUpdate(), (job, True))
1554       self.assertRaises(IndexError, queue.GetNextUpdate)
1555       self.assertFalse(depmgr.CountPendingResults())
1556
1557       if attempt > 0 and attempt < 4:
1558         # Simulate waiting for other job
1559         self.assertTrue(result)
1560         self.assertTrue(job.cur_opctx)
1561         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1562         self.assertRaises(IndexError, depmgr.GetNextNotification)
1563         self.assert_(job.start_timestamp)
1564         self.assertFalse(job.end_timestamp)
1565         continue
1566
1567       if result:
1568         # Last opcode
1569         self.assertFalse(job.cur_opctx)
1570         self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1571         break
1572
1573       self.assertRaises(IndexError, depmgr.GetNextNotification)
1574
1575       self.assertFalse(result)
1576       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1577       self.assert_(job.start_timestamp)
1578       self.assertFalse(job.end_timestamp)
1579
1580     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1581     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1582     self.assertEqual(job.GetInfo(["opstatus"]),
1583                      [[constants.OP_STATUS_SUCCESS,
1584                        constants.OP_STATUS_ERROR,
1585                        constants.OP_STATUS_ERROR]]),
1586
1587     (opresult, ) = job.GetInfo(["opresult"])
1588     self.assertEqual(len(opresult), len(ops))
1589     self.assertEqual(opresult[0], "Res0")
1590     self.assertTrue(errors.GetEncodedError(opresult[1]))
1591     self.assertTrue(errors.GetEncodedError(opresult[2]))
1592
1593     self._GenericCheckJob(job)
1594
1595     self.assertRaises(IndexError, queue.GetNextUpdate)
1596     self.assertRaises(IndexError, depmgr.GetNextNotification)
1597     self.assertFalse(depmgr.CountPendingResults())
1598
1599     # Calling the processor on a finished job should be a no-op
1600     self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1601     self.assertRaises(IndexError, queue.GetNextUpdate)
1602
1603
1604 class _FakeTimeoutStrategy:
1605   def __init__(self, timeouts):
1606     self.timeouts = timeouts
1607     self.attempts = 0
1608     self.last_timeout = None
1609
1610   def NextAttempt(self):
1611     self.attempts += 1
1612     if self.timeouts:
1613       timeout = self.timeouts.pop(0)
1614     else:
1615       timeout = None
1616     self.last_timeout = timeout
1617     return timeout
1618
1619
1620 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1621   def setUp(self):
1622     self.queue = _FakeQueueForProc()
1623     self.job = None
1624     self.curop = None
1625     self.opcounter = None
1626     self.timeout_strategy = None
1627     self.retries = 0
1628     self.prev_tsop = None
1629     self.prev_prio = None
1630     self.prev_status = None
1631     self.lock_acq_prio = None
1632     self.gave_lock = None
1633     self.done_lock_before_blocking = False
1634
1635   def _BeforeStart(self, timeout, priority):
1636     job = self.job
1637
1638     # If status has changed, job must've been written
1639     if self.prev_status != self.job.ops[self.curop].status:
1640       self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1641     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1642
1643     self.assertFalse(self.queue.IsAcquired())
1644     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1645
1646     ts = self.timeout_strategy
1647
1648     self.assert_(timeout is None or isinstance(timeout, (int, float)))
1649     self.assertEqual(timeout, ts.last_timeout)
1650     self.assertEqual(priority, job.ops[self.curop].priority)
1651
1652     self.gave_lock = True
1653     self.lock_acq_prio = priority
1654
1655     if (self.curop == 3 and
1656         job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1657       # Give locks before running into blocking acquire
1658       assert self.retries == 7
1659       self.retries = 0
1660       self.done_lock_before_blocking = True
1661       return
1662
1663     if self.retries > 0:
1664       self.assert_(timeout is not None)
1665       self.retries -= 1
1666       self.gave_lock = False
1667       raise mcpu.LockAcquireTimeout()
1668
1669     if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1670       assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1671       assert not ts.timeouts
1672       self.assert_(timeout is None)
1673
1674   def _AfterStart(self, op, cbs):
1675     job = self.job
1676
1677     # Setting to "running" requires an update
1678     self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1679     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1680
1681     self.assertFalse(self.queue.IsAcquired())
1682     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1683
1684     # Job is running, cancelling shouldn't be possible
1685     (success, _) = job.Cancel()
1686     self.assertFalse(success)
1687
1688   def _NextOpcode(self):
1689     self.curop = self.opcounter.next()
1690     self.prev_prio = self.job.ops[self.curop].priority
1691     self.prev_status = self.job.ops[self.curop].status
1692
1693   def _NewTimeoutStrategy(self):
1694     job = self.job
1695
1696     self.assertEqual(self.retries, 0)
1697
1698     if self.prev_tsop == self.curop:
1699       # Still on the same opcode, priority must've been increased
1700       self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1701
1702     if self.curop == 1:
1703       # Normal retry
1704       timeouts = range(10, 31, 10)
1705       self.retries = len(timeouts) - 1
1706
1707     elif self.curop == 2:
1708       # Let this run into a blocking acquire
1709       timeouts = range(11, 61, 12)
1710       self.retries = len(timeouts)
1711
1712     elif self.curop == 3:
1713       # Wait for priority to increase, but give lock before blocking acquire
1714       timeouts = range(12, 100, 14)
1715       self.retries = len(timeouts)
1716
1717       self.assertFalse(self.done_lock_before_blocking)
1718
1719     elif self.curop == 4:
1720       self.assert_(self.done_lock_before_blocking)
1721
1722       # Timeouts, but no need to retry
1723       timeouts = range(10, 31, 10)
1724       self.retries = 0
1725
1726     elif self.curop == 5:
1727       # Normal retry
1728       timeouts = range(19, 100, 11)
1729       self.retries = len(timeouts)
1730
1731     else:
1732       timeouts = []
1733       self.retries = 0
1734
1735     assert len(job.ops) == 10
1736     assert self.retries <= len(timeouts)
1737
1738     ts = _FakeTimeoutStrategy(timeouts)
1739
1740     self.timeout_strategy = ts
1741     self.prev_tsop = self.curop
1742     self.prev_prio = job.ops[self.curop].priority
1743
1744     return ts
1745
1746   def testTimeout(self):
1747     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1748            for i in range(10)]
1749
1750     # Create job
1751     job_id = 15801
1752     job = self._CreateJob(self.queue, job_id, ops)
1753     self.job = job
1754
1755     self.opcounter = itertools.count(0)
1756
1757     opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1758                                     self._AfterStart)
1759     tsf = self._NewTimeoutStrategy
1760
1761     self.assertFalse(self.done_lock_before_blocking)
1762
1763     while True:
1764       proc = jqueue._JobProcessor(self.queue, opexec, job,
1765                                   _timeout_strategy_factory=tsf)
1766
1767       self.assertRaises(IndexError, self.queue.GetNextUpdate)
1768
1769       if self.curop is not None:
1770         self.prev_status = self.job.ops[self.curop].status
1771
1772       self.lock_acq_prio = None
1773
1774       result = proc(_nextop_fn=self._NextOpcode)
1775       assert self.curop is not None
1776
1777       if result or self.gave_lock:
1778         # Got lock and/or job is done, result must've been written
1779         self.assertFalse(job.cur_opctx)
1780         self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1781         self.assertRaises(IndexError, self.queue.GetNextUpdate)
1782         self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1783         self.assert_(job.ops[self.curop].exec_timestamp)
1784
1785       if result:
1786         self.assertFalse(job.cur_opctx)
1787         break
1788
1789       self.assertFalse(result)
1790
1791       if self.curop == 0:
1792         self.assertEqual(job.ops[self.curop].start_timestamp,
1793                          job.start_timestamp)
1794
1795       if self.gave_lock:
1796         # Opcode finished, but job not yet done
1797         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1798       else:
1799         # Did not get locks
1800         self.assert_(job.cur_opctx)
1801         self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1802                          self.timeout_strategy.NextAttempt)
1803         self.assertFalse(job.ops[self.curop].exec_timestamp)
1804         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1805
1806         # If priority has changed since acquiring locks, the job must've been
1807         # updated
1808         if self.lock_acq_prio != job.ops[self.curop].priority:
1809           self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1810
1811       self.assertRaises(IndexError, self.queue.GetNextUpdate)
1812
1813       self.assert_(job.start_timestamp)
1814       self.assertFalse(job.end_timestamp)
1815
1816     self.assertEqual(self.curop, len(job.ops) - 1)
1817     self.assertEqual(self.job, job)
1818     self.assertEqual(self.opcounter.next(), len(job.ops))
1819     self.assert_(self.done_lock_before_blocking)
1820
1821     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1822     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1823     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1824     self.assertEqual(job.GetInfo(["opresult"]),
1825                      [[op.input.result for op in job.ops]])
1826     self.assertEqual(job.GetInfo(["opstatus"]),
1827                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1828     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1829                             for op in job.ops))
1830
1831     # Calling the processor on a finished job should be a no-op
1832     self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
1833     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1834
1835
1836 class TestJobDependencyManager(unittest.TestCase):
1837   class _FakeJob:
1838     def __init__(self, job_id):
1839       self.id = str(job_id)
1840
1841   def setUp(self):
1842     self._status = []
1843     self._queue = []
1844     self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1845
1846   def _GetStatus(self, job_id):
1847     (exp_job_id, result) = self._status.pop(0)
1848     self.assertEqual(exp_job_id, job_id)
1849     return result
1850
1851   def _Enqueue(self, jobs):
1852     self._queue.append(jobs)
1853
1854   def testNotFinalizedThenCancel(self):
1855     job = self._FakeJob(17697)
1856     job_id = str(28625)
1857
1858     self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1859     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1860     self.assertEqual(result, self.jdm.WAIT)
1861     self.assertFalse(self._status)
1862     self.assertFalse(self._queue)
1863     self.assertTrue(self.jdm.JobWaiting(job))
1864     self.assertEqual(self.jdm._waiters, {
1865       job_id: set([job]),
1866       })
1867
1868     self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1869     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1870     self.assertEqual(result, self.jdm.CANCEL)
1871     self.assertFalse(self._status)
1872     self.assertFalse(self._queue)
1873     self.assertFalse(self.jdm.JobWaiting(job))
1874
1875   def testRequireCancel(self):
1876     job = self._FakeJob(5278)
1877     job_id = str(9610)
1878     dep_status = [constants.JOB_STATUS_CANCELED]
1879
1880     self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1881     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1882     self.assertEqual(result, self.jdm.WAIT)
1883     self.assertFalse(self._status)
1884     self.assertFalse(self._queue)
1885     self.assertTrue(self.jdm.JobWaiting(job))
1886     self.assertEqual(self.jdm._waiters, {
1887       job_id: set([job]),
1888       })
1889
1890     self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1891     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1892     self.assertEqual(result, self.jdm.CONTINUE)
1893     self.assertFalse(self._status)
1894     self.assertFalse(self._queue)
1895     self.assertFalse(self.jdm.JobWaiting(job))
1896
1897   def testRequireError(self):
1898     job = self._FakeJob(21459)
1899     job_id = str(25519)
1900     dep_status = [constants.JOB_STATUS_ERROR]
1901
1902     self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1903     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1904     self.assertEqual(result, self.jdm.WAIT)
1905     self.assertFalse(self._status)
1906     self.assertFalse(self._queue)
1907     self.assertTrue(self.jdm.JobWaiting(job))
1908     self.assertEqual(self.jdm._waiters, {
1909       job_id: set([job]),
1910       })
1911
1912     self._status.append((job_id, constants.JOB_STATUS_ERROR))
1913     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1914     self.assertEqual(result, self.jdm.CONTINUE)
1915     self.assertFalse(self._status)
1916     self.assertFalse(self._queue)
1917     self.assertFalse(self.jdm.JobWaiting(job))
1918
1919   def testRequireMultiple(self):
1920     dep_status = list(constants.JOBS_FINALIZED)
1921
1922     for end_status in dep_status:
1923       job = self._FakeJob(21343)
1924       job_id = str(14609)
1925
1926       self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1927       (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1928       self.assertEqual(result, self.jdm.WAIT)
1929       self.assertFalse(self._status)
1930       self.assertFalse(self._queue)
1931       self.assertTrue(self.jdm.JobWaiting(job))
1932       self.assertEqual(self.jdm._waiters, {
1933         job_id: set([job]),
1934         })
1935
1936       self._status.append((job_id, end_status))
1937       (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1938       self.assertEqual(result, self.jdm.CONTINUE)
1939       self.assertFalse(self._status)
1940       self.assertFalse(self._queue)
1941       self.assertFalse(self.jdm.JobWaiting(job))
1942
1943   def testNotify(self):
1944     job = self._FakeJob(8227)
1945     job_id = str(4113)
1946
1947     self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1948     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1949     self.assertEqual(result, self.jdm.WAIT)
1950     self.assertFalse(self._status)
1951     self.assertFalse(self._queue)
1952     self.assertTrue(self.jdm.JobWaiting(job))
1953     self.assertEqual(self.jdm._waiters, {
1954       job_id: set([job]),
1955       })
1956
1957     self.jdm.NotifyWaiters(job_id)
1958     self.assertFalse(self._status)
1959     self.assertFalse(self.jdm._waiters)
1960     self.assertFalse(self.jdm.JobWaiting(job))
1961     self.assertEqual(self._queue, [set([job])])
1962
1963   def testWrongStatus(self):
1964     job = self._FakeJob(10102)
1965     job_id = str(1271)
1966
1967     self._status.append((job_id, constants.JOB_STATUS_QUEUED))
1968     (result, _) = self.jdm.CheckAndRegister(job, job_id,
1969                                             [constants.JOB_STATUS_SUCCESS])
1970     self.assertEqual(result, self.jdm.WAIT)
1971     self.assertFalse(self._status)
1972     self.assertFalse(self._queue)
1973     self.assertTrue(self.jdm.JobWaiting(job))
1974     self.assertEqual(self.jdm._waiters, {
1975       job_id: set([job]),
1976       })
1977
1978     self._status.append((job_id, constants.JOB_STATUS_ERROR))
1979     (result, _) = self.jdm.CheckAndRegister(job, job_id,
1980                                             [constants.JOB_STATUS_SUCCESS])
1981     self.assertEqual(result, self.jdm.WRONGSTATUS)
1982     self.assertFalse(self._status)
1983     self.assertFalse(self._queue)
1984     self.assertFalse(self.jdm.JobWaiting(job))
1985
1986   def testCorrectStatus(self):
1987     job = self._FakeJob(24273)
1988     job_id = str(23885)
1989
1990     self._status.append((job_id, constants.JOB_STATUS_QUEUED))
1991     (result, _) = self.jdm.CheckAndRegister(job, job_id,
1992                                             [constants.JOB_STATUS_SUCCESS])
1993     self.assertEqual(result, self.jdm.WAIT)
1994     self.assertFalse(self._status)
1995     self.assertFalse(self._queue)
1996     self.assertTrue(self.jdm.JobWaiting(job))
1997     self.assertEqual(self.jdm._waiters, {
1998       job_id: set([job]),
1999       })
2000
2001     self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2002     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2003                                             [constants.JOB_STATUS_SUCCESS])
2004     self.assertEqual(result, self.jdm.CONTINUE)
2005     self.assertFalse(self._status)
2006     self.assertFalse(self._queue)
2007     self.assertFalse(self.jdm.JobWaiting(job))
2008
2009   def testFinalizedRightAway(self):
2010     job = self._FakeJob(224)
2011     job_id = str(3081)
2012
2013     self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2014     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2015                                             [constants.JOB_STATUS_SUCCESS])
2016     self.assertEqual(result, self.jdm.CONTINUE)
2017     self.assertFalse(self._status)
2018     self.assertFalse(self._queue)
2019     self.assertFalse(self.jdm.JobWaiting(job))
2020     self.assertEqual(self.jdm._waiters, {
2021       job_id: set(),
2022       })
2023
2024     # Force cleanup
2025     self.jdm.NotifyWaiters("0")
2026     self.assertFalse(self.jdm._waiters)
2027     self.assertFalse(self._status)
2028     self.assertFalse(self._queue)
2029
2030   def testSelfDependency(self):
2031     job = self._FakeJob(18937)
2032
2033     self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2034     (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2035     self.assertEqual(result, self.jdm.ERROR)
2036
2037   def testJobDisappears(self):
2038     job = self._FakeJob(30540)
2039     job_id = str(23769)
2040
2041     def _FakeStatus(_):
2042       raise errors.JobLost("#msg#")
2043
2044     jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2045     (result, _) = jdm.CheckAndRegister(job, job_id, [])
2046     self.assertEqual(result, self.jdm.ERROR)
2047     self.assertFalse(jdm.JobWaiting(job))
2048
2049
2050 if __name__ == "__main__":
2051   testutils.GanetiTestProgram()