Switch blockdev_getmirrorstatus_multi to per-node bodies
[ganeti-local] / test / ganeti.jqueue_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.jqueue"""
23
24 import os
25 import sys
26 import unittest
27 import tempfile
28 import shutil
29 import errno
30 import itertools
31 import random
32
33 from ganeti import constants
34 from ganeti import utils
35 from ganeti import errors
36 from ganeti import jqueue
37 from ganeti import opcodes
38 from ganeti import compat
39 from ganeti import mcpu
40 from ganeti import query
41
42 import testutils
43
44
45 class _FakeJob:
46   def __init__(self, job_id, status):
47     self.id = job_id
48     self.writable = False
49     self._status = status
50     self._log = []
51
52   def SetStatus(self, status):
53     self._status = status
54
55   def AddLogEntry(self, msg):
56     self._log.append((len(self._log), msg))
57
58   def CalcStatus(self):
59     return self._status
60
61   def GetInfo(self, fields):
62     result = []
63
64     for name in fields:
65       if name == "status":
66         result.append(self._status)
67       else:
68         raise Exception("Unknown field")
69
70     return result
71
72   def GetLogEntries(self, newer_than):
73     assert newer_than is None or newer_than >= 0
74
75     if newer_than is None:
76       return self._log
77
78     return self._log[newer_than:]
79
80
81 class TestJobChangesChecker(unittest.TestCase):
82   def testStatus(self):
83     job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
84     checker = jqueue._JobChangesChecker(["status"], None, None)
85     self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
86
87     job.SetStatus(constants.JOB_STATUS_RUNNING)
88     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
89
90     job.SetStatus(constants.JOB_STATUS_SUCCESS)
91     self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
92
93     # job.id is used by checker
94     self.assertEqual(job.id, 9094)
95
96   def testStatusWithPrev(self):
97     job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
98     checker = jqueue._JobChangesChecker(["status"],
99                                         [constants.JOB_STATUS_QUEUED], None)
100     self.assert_(checker(job) is None)
101
102     job.SetStatus(constants.JOB_STATUS_RUNNING)
103     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
104
105   def testFinalStatus(self):
106     for status in constants.JOBS_FINALIZED:
107       job = _FakeJob(2178711, status)
108       checker = jqueue._JobChangesChecker(["status"], [status], None)
109       # There won't be any changes in this status, hence it should signal
110       # a change immediately
111       self.assertEqual(checker(job), ([status], []))
112
113   def testLog(self):
114     job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
115     checker = jqueue._JobChangesChecker(["status"], None, None)
116     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
117
118     job.AddLogEntry("Hello World")
119     (job_info, log_entries) = checker(job)
120     self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
121     self.assertEqual(log_entries, [[0, "Hello World"]])
122
123     checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
124     self.assert_(checker2(job) is None)
125
126     job.AddLogEntry("Foo Bar")
127     job.SetStatus(constants.JOB_STATUS_ERROR)
128
129     (job_info, log_entries) = checker2(job)
130     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
131     self.assertEqual(log_entries, [[1, "Foo Bar"]])
132
133     checker3 = jqueue._JobChangesChecker(["status"], None, None)
134     (job_info, log_entries) = checker3(job)
135     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
136     self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
137
138
139 class TestJobChangesWaiter(unittest.TestCase):
140   def setUp(self):
141     self.tmpdir = tempfile.mkdtemp()
142     self.filename = utils.PathJoin(self.tmpdir, "job-1")
143     utils.WriteFile(self.filename, data="")
144
145   def tearDown(self):
146     shutil.rmtree(self.tmpdir)
147
148   def _EnsureNotifierClosed(self, notifier):
149     try:
150       os.fstat(notifier._fd)
151     except EnvironmentError, err:
152       self.assertEqual(err.errno, errno.EBADF)
153     else:
154       self.fail("File descriptor wasn't closed")
155
156   def testClose(self):
157     for wait in [False, True]:
158       waiter = jqueue._JobFileChangesWaiter(self.filename)
159       try:
160         if wait:
161           waiter.Wait(0.001)
162       finally:
163         waiter.Close()
164
165       # Ensure file descriptor was closed
166       self._EnsureNotifierClosed(waiter._notifier)
167
168   def testChangingFile(self):
169     waiter = jqueue._JobFileChangesWaiter(self.filename)
170     try:
171       self.assertFalse(waiter.Wait(0.1))
172       utils.WriteFile(self.filename, data="changed")
173       self.assert_(waiter.Wait(60))
174     finally:
175       waiter.Close()
176
177     self._EnsureNotifierClosed(waiter._notifier)
178
179   def testChangingFile2(self):
180     waiter = jqueue._JobChangesWaiter(self.filename)
181     try:
182       self.assertFalse(waiter._filewaiter)
183       self.assert_(waiter.Wait(0.1))
184       self.assert_(waiter._filewaiter)
185
186       # File waiter is now used, but there have been no changes
187       self.assertFalse(waiter.Wait(0.1))
188       utils.WriteFile(self.filename, data="changed")
189       self.assert_(waiter.Wait(60))
190     finally:
191       waiter.Close()
192
193     self._EnsureNotifierClosed(waiter._filewaiter._notifier)
194
195
196 class TestWaitForJobChangesHelper(unittest.TestCase):
197   def setUp(self):
198     self.tmpdir = tempfile.mkdtemp()
199     self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
200     utils.WriteFile(self.filename, data="")
201
202   def tearDown(self):
203     shutil.rmtree(self.tmpdir)
204
205   def _LoadWaitingJob(self):
206     return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
207
208   def _LoadLostJob(self):
209     return None
210
211   def testNoChanges(self):
212     wfjc = jqueue._WaitForJobChangesHelper()
213
214     # No change
215     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
216                           [constants.JOB_STATUS_WAITING], None, 0.1),
217                      constants.JOB_NOTCHANGED)
218
219     # No previous information
220     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
221                           ["status"], None, None, 1.0),
222                      ([constants.JOB_STATUS_WAITING], []))
223
224   def testLostJob(self):
225     wfjc = jqueue._WaitForJobChangesHelper()
226     self.assert_(wfjc(self.filename, self._LoadLostJob,
227                       ["status"], None, None, 1.0) is None)
228
229
230 class TestEncodeOpError(unittest.TestCase):
231   def test(self):
232     encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
233     self.assert_(isinstance(encerr, tuple))
234     self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
235
236     encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
237     self.assert_(isinstance(encerr, tuple))
238     self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
239
240     encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
241     self.assert_(isinstance(encerr, tuple))
242     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
243
244     encerr = jqueue._EncodeOpError("Hello World")
245     self.assert_(isinstance(encerr, tuple))
246     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
247
248
249 class TestQueuedOpCode(unittest.TestCase):
250   def testDefaults(self):
251     def _Check(op):
252       self.assertFalse(hasattr(op.input, "dry_run"))
253       self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
254       self.assertFalse(op.log)
255       self.assert_(op.start_timestamp is None)
256       self.assert_(op.exec_timestamp is None)
257       self.assert_(op.end_timestamp is None)
258       self.assert_(op.result is None)
259       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
260
261     op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
262     _Check(op1)
263     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
264     _Check(op2)
265     self.assertEqual(op1.Serialize(), op2.Serialize())
266
267   def testPriority(self):
268     def _Check(op):
269       assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
270              "Default priority equals high priority; test can't work"
271       self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
272       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
273
274     inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
275     op1 = jqueue._QueuedOpCode(inpop)
276     _Check(op1)
277     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
278     _Check(op2)
279     self.assertEqual(op1.Serialize(), op2.Serialize())
280
281
282 class TestQueuedJob(unittest.TestCase):
283   def test(self):
284     self.assertRaises(errors.GenericError, jqueue._QueuedJob,
285                       None, 1, [], False)
286
287   def testDefaults(self):
288     job_id = 4260
289     ops = [
290       opcodes.OpTagsGet(),
291       opcodes.OpTestDelay(),
292       ]
293
294     def _Check(job):
295       self.assertTrue(job.writable)
296       self.assertEqual(job.id, job_id)
297       self.assertEqual(job.log_serial, 0)
298       self.assert_(job.received_timestamp)
299       self.assert_(job.start_timestamp is None)
300       self.assert_(job.end_timestamp is None)
301       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
302       self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
303       self.assert_(repr(job).startswith("<"))
304       self.assertEqual(len(job.ops), len(ops))
305       self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
306                               for (inp, op) in zip(ops, job.ops)))
307       self.assertRaises(errors.OpExecError, job.GetInfo,
308                         ["unknown-field"])
309       self.assertEqual(job.GetInfo(["summary"]),
310                        [[op.input.Summary() for op in job.ops]])
311
312     job1 = jqueue._QueuedJob(None, job_id, ops, True)
313     _Check(job1)
314     job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
315     _Check(job2)
316     self.assertEqual(job1.Serialize(), job2.Serialize())
317
318   def testWritable(self):
319     job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
320     self.assertFalse(job.writable)
321
322     job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
323     self.assertTrue(job.writable)
324
325   def testPriority(self):
326     job_id = 4283
327     ops = [
328       opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
329       opcodes.OpTestDelay(),
330       ]
331
332     def _Check(job):
333       self.assertEqual(job.id, job_id)
334       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
335       self.assert_(repr(job).startswith("<"))
336
337     job = jqueue._QueuedJob(None, job_id, ops, True)
338     _Check(job)
339     self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
340                             for op in job.ops))
341     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
342
343     # Increase first
344     job.ops[0].priority -= 1
345     _Check(job)
346     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
347
348     # Mark opcode as finished
349     job.ops[0].status = constants.OP_STATUS_SUCCESS
350     _Check(job)
351     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
352
353     # Increase second
354     job.ops[1].priority -= 10
355     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
356
357     # Test increasing first
358     job.ops[0].status = constants.OP_STATUS_RUNNING
359     job.ops[0].priority -= 19
360     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
361
362   def testCalcStatus(self):
363     def _Queued(ops):
364       # The default status is "queued"
365       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
366                               for op in ops))
367
368     def _Waitlock1(ops):
369       ops[0].status = constants.OP_STATUS_WAITING
370
371     def _Waitlock2(ops):
372       ops[0].status = constants.OP_STATUS_SUCCESS
373       ops[1].status = constants.OP_STATUS_SUCCESS
374       ops[2].status = constants.OP_STATUS_WAITING
375
376     def _Running(ops):
377       ops[0].status = constants.OP_STATUS_SUCCESS
378       ops[1].status = constants.OP_STATUS_RUNNING
379       for op in ops[2:]:
380         op.status = constants.OP_STATUS_QUEUED
381
382     def _Canceling1(ops):
383       ops[0].status = constants.OP_STATUS_SUCCESS
384       ops[1].status = constants.OP_STATUS_SUCCESS
385       for op in ops[2:]:
386         op.status = constants.OP_STATUS_CANCELING
387
388     def _Canceling2(ops):
389       for op in ops:
390         op.status = constants.OP_STATUS_CANCELING
391
392     def _Canceled(ops):
393       for op in ops:
394         op.status = constants.OP_STATUS_CANCELED
395
396     def _Error1(ops):
397       for idx, op in enumerate(ops):
398         if idx > 3:
399           op.status = constants.OP_STATUS_ERROR
400         else:
401           op.status = constants.OP_STATUS_SUCCESS
402
403     def _Error2(ops):
404       for op in ops:
405         op.status = constants.OP_STATUS_ERROR
406
407     def _Success(ops):
408       for op in ops:
409         op.status = constants.OP_STATUS_SUCCESS
410
411     tests = {
412       constants.JOB_STATUS_QUEUED: [_Queued],
413       constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
414       constants.JOB_STATUS_RUNNING: [_Running],
415       constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
416       constants.JOB_STATUS_CANCELED: [_Canceled],
417       constants.JOB_STATUS_ERROR: [_Error1, _Error2],
418       constants.JOB_STATUS_SUCCESS: [_Success],
419       }
420
421     def _NewJob():
422       job = jqueue._QueuedJob(None, 1,
423                               [opcodes.OpTestDelay() for _ in range(10)],
424                               True)
425       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
426       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
427                               for op in job.ops))
428       return job
429
430     for status in constants.JOB_STATUS_ALL:
431       sttests = tests[status]
432       assert sttests
433       for fn in sttests:
434         job = _NewJob()
435         fn(job.ops)
436         self.assertEqual(job.CalcStatus(), status)
437
438
439 class _FakeDependencyManager:
440   def __init__(self):
441     self._checks = []
442     self._notifications = []
443     self._waiting = set()
444
445   def AddCheckResult(self, job, dep_job_id, dep_status, result):
446     self._checks.append((job, dep_job_id, dep_status, result))
447
448   def CountPendingResults(self):
449     return len(self._checks)
450
451   def CountWaitingJobs(self):
452     return len(self._waiting)
453
454   def GetNextNotification(self):
455     return self._notifications.pop(0)
456
457   def JobWaiting(self, job):
458     return job in self._waiting
459
460   def CheckAndRegister(self, job, dep_job_id, dep_status):
461     (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
462
463     assert exp_job == job
464     assert exp_dep_job_id == dep_job_id
465     assert exp_dep_status == dep_status
466
467     (result_status, _) = result
468
469     if result_status == jqueue._JobDependencyManager.WAIT:
470       self._waiting.add(job)
471     elif result_status == jqueue._JobDependencyManager.CONTINUE:
472       self._waiting.remove(job)
473
474     return result
475
476   def NotifyWaiters(self, job_id):
477     self._notifications.append(job_id)
478
479
480 class _DisabledFakeDependencyManager:
481   def JobWaiting(self, _):
482     return False
483
484   def CheckAndRegister(self, *args):
485     assert False, "Should not be called"
486
487   def NotifyWaiters(self, _):
488     pass
489
490
491 class _FakeQueueForProc:
492   def __init__(self, depmgr=None):
493     self._acquired = False
494     self._updates = []
495     self._submitted = []
496
497     self._submit_count = itertools.count(1000)
498
499     if depmgr:
500       self.depmgr = depmgr
501     else:
502       self.depmgr = _DisabledFakeDependencyManager()
503
504   def IsAcquired(self):
505     return self._acquired
506
507   def GetNextUpdate(self):
508     return self._updates.pop(0)
509
510   def GetNextSubmittedJob(self):
511     return self._submitted.pop(0)
512
513   def acquire(self, shared=0):
514     assert shared == 1
515     self._acquired = True
516
517   def release(self):
518     assert self._acquired
519     self._acquired = False
520
521   def UpdateJobUnlocked(self, job, replicate=True):
522     assert self._acquired, "Lock not acquired while updating job"
523     self._updates.append((job, bool(replicate)))
524
525   def SubmitManyJobs(self, jobs):
526     assert not self._acquired, "Lock acquired while submitting jobs"
527     job_ids = [self._submit_count.next() for _ in jobs]
528     self._submitted.extend(zip(job_ids, jobs))
529     return job_ids
530
531
532 class _FakeExecOpCodeForProc:
533   def __init__(self, queue, before_start, after_start):
534     self._queue = queue
535     self._before_start = before_start
536     self._after_start = after_start
537
538   def __call__(self, op, cbs, timeout=None, priority=None):
539     assert isinstance(op, opcodes.OpTestDummy)
540     assert not self._queue.IsAcquired(), \
541            "Queue lock not released when executing opcode"
542
543     if self._before_start:
544       self._before_start(timeout, priority)
545
546     cbs.NotifyStart()
547
548     if self._after_start:
549       self._after_start(op, cbs)
550
551     # Check again after the callbacks
552     assert not self._queue.IsAcquired()
553
554     if op.fail:
555       raise errors.OpExecError("Error requested (%s)" % op.result)
556
557     if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
558       return cbs.SubmitManyJobs(op.submit_jobs)
559
560     return op.result
561
562
563 class _JobProcessorTestUtils:
564   def _CreateJob(self, queue, job_id, ops):
565     job = jqueue._QueuedJob(queue, job_id, ops, True)
566     self.assertFalse(job.start_timestamp)
567     self.assertFalse(job.end_timestamp)
568     self.assertEqual(len(ops), len(job.ops))
569     self.assert_(compat.all(op.input == inp
570                             for (op, inp) in zip(job.ops, ops)))
571     self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
572     return job
573
574
575 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
576   def _GenericCheckJob(self, job):
577     assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
578                       for op in job.ops)
579
580     self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
581                      [[op.start_timestamp for op in job.ops],
582                       [op.exec_timestamp for op in job.ops],
583                       [op.end_timestamp for op in job.ops]])
584     self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
585                      [job.received_timestamp,
586                       job.start_timestamp,
587                       job.end_timestamp])
588     self.assert_(job.start_timestamp)
589     self.assert_(job.end_timestamp)
590     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
591
592   def testSuccess(self):
593     queue = _FakeQueueForProc()
594
595     for (job_id, opcount) in [(25351, 1), (6637, 3),
596                               (24644, 10), (32207, 100)]:
597       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
598              for i in range(opcount)]
599
600       # Create job
601       job = self._CreateJob(queue, job_id, ops)
602
603       def _BeforeStart(timeout, priority):
604         self.assertEqual(queue.GetNextUpdate(), (job, True))
605         self.assertRaises(IndexError, queue.GetNextUpdate)
606         self.assertFalse(queue.IsAcquired())
607         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
608         self.assertFalse(job.cur_opctx)
609
610       def _AfterStart(op, cbs):
611         self.assertEqual(queue.GetNextUpdate(), (job, True))
612         self.assertRaises(IndexError, queue.GetNextUpdate)
613
614         self.assertFalse(queue.IsAcquired())
615         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
616         self.assertFalse(job.cur_opctx)
617
618         # Job is running, cancelling shouldn't be possible
619         (success, _) = job.Cancel()
620         self.assertFalse(success)
621
622       opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
623
624       for idx in range(len(ops)):
625         self.assertRaises(IndexError, queue.GetNextUpdate)
626         result = jqueue._JobProcessor(queue, opexec, job)()
627         self.assertEqual(queue.GetNextUpdate(), (job, True))
628         self.assertRaises(IndexError, queue.GetNextUpdate)
629         if idx == len(ops) - 1:
630           # Last opcode
631           self.assertEqual(result, jqueue._JobProcessor.FINISHED)
632         else:
633           self.assertEqual(result, jqueue._JobProcessor.DEFER)
634
635           self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
636           self.assert_(job.start_timestamp)
637           self.assertFalse(job.end_timestamp)
638
639       self.assertRaises(IndexError, queue.GetNextUpdate)
640
641       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
642       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
643       self.assertEqual(job.GetInfo(["opresult"]),
644                        [[op.input.result for op in job.ops]])
645       self.assertEqual(job.GetInfo(["opstatus"]),
646                        [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
647       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
648                               for op in job.ops))
649
650       self._GenericCheckJob(job)
651
652       # Calling the processor on a finished job should be a no-op
653       self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
654                        jqueue._JobProcessor.FINISHED)
655       self.assertRaises(IndexError, queue.GetNextUpdate)
656
657   def testOpcodeError(self):
658     queue = _FakeQueueForProc()
659
660     testdata = [
661       (17077, 1, 0, 0),
662       (1782, 5, 2, 2),
663       (18179, 10, 9, 9),
664       (4744, 10, 3, 8),
665       (23816, 100, 39, 45),
666       ]
667
668     for (job_id, opcount, failfrom, failto) in testdata:
669       # Prepare opcodes
670       ops = [opcodes.OpTestDummy(result="Res%s" % i,
671                                  fail=(failfrom <= i and
672                                        i <= failto))
673              for i in range(opcount)]
674
675       # Create job
676       job = self._CreateJob(queue, job_id, ops)
677
678       opexec = _FakeExecOpCodeForProc(queue, None, None)
679
680       for idx in range(len(ops)):
681         self.assertRaises(IndexError, queue.GetNextUpdate)
682         result = jqueue._JobProcessor(queue, opexec, job)()
683         # queued to waitlock
684         self.assertEqual(queue.GetNextUpdate(), (job, True))
685         # waitlock to running
686         self.assertEqual(queue.GetNextUpdate(), (job, True))
687         # Opcode result
688         self.assertEqual(queue.GetNextUpdate(), (job, True))
689         self.assertRaises(IndexError, queue.GetNextUpdate)
690
691         if idx in (failfrom, len(ops) - 1):
692           # Last opcode
693           self.assertEqual(result, jqueue._JobProcessor.FINISHED)
694           break
695
696         self.assertEqual(result, jqueue._JobProcessor.DEFER)
697
698         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
699
700       self.assertRaises(IndexError, queue.GetNextUpdate)
701
702       # Check job status
703       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
704       self.assertEqual(job.GetInfo(["id"]), [job_id])
705       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
706
707       # Check opcode status
708       data = zip(job.ops,
709                  job.GetInfo(["opstatus"])[0],
710                  job.GetInfo(["opresult"])[0])
711
712       for idx, (op, opstatus, opresult) in enumerate(data):
713         if idx < failfrom:
714           assert not op.input.fail
715           self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
716           self.assertEqual(opresult, op.input.result)
717         elif idx <= failto:
718           assert op.input.fail
719           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
720           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
721         else:
722           assert not op.input.fail
723           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
724           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
725
726       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
727                               for op in job.ops[:failfrom]))
728
729       self._GenericCheckJob(job)
730
731       # Calling the processor on a finished job should be a no-op
732       self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
733                        jqueue._JobProcessor.FINISHED)
734       self.assertRaises(IndexError, queue.GetNextUpdate)
735
736   def testCancelWhileInQueue(self):
737     queue = _FakeQueueForProc()
738
739     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
740            for i in range(5)]
741
742     # Create job
743     job_id = 17045
744     job = self._CreateJob(queue, job_id, ops)
745
746     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
747
748     # Mark as cancelled
749     (success, _) = job.Cancel()
750     self.assert_(success)
751
752     self.assertRaises(IndexError, queue.GetNextUpdate)
753
754     self.assertFalse(job.start_timestamp)
755     self.assertTrue(job.end_timestamp)
756     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
757                             for op in job.ops))
758
759     # Serialize to check for differences
760     before_proc = job.Serialize()
761
762     # Simulate processor called in workerpool
763     opexec = _FakeExecOpCodeForProc(queue, None, None)
764     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
765                      jqueue._JobProcessor.FINISHED)
766
767     # Check result
768     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
769     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
770     self.assertFalse(job.start_timestamp)
771     self.assertTrue(job.end_timestamp)
772     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
773                                 for op in job.ops))
774     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
775                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
776                       ["Job canceled by request" for _ in job.ops]])
777
778     # Must not have changed or written
779     self.assertEqual(before_proc, job.Serialize())
780     self.assertRaises(IndexError, queue.GetNextUpdate)
781
782   def testCancelWhileWaitlockInQueue(self):
783     queue = _FakeQueueForProc()
784
785     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
786            for i in range(5)]
787
788     # Create job
789     job_id = 8645
790     job = self._CreateJob(queue, job_id, ops)
791
792     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
793
794     job.ops[0].status = constants.OP_STATUS_WAITING
795
796     assert len(job.ops) == 5
797
798     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
799
800     # Mark as cancelling
801     (success, _) = job.Cancel()
802     self.assert_(success)
803
804     self.assertRaises(IndexError, queue.GetNextUpdate)
805
806     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
807                             for op in job.ops))
808
809     opexec = _FakeExecOpCodeForProc(queue, None, None)
810     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
811                      jqueue._JobProcessor.FINISHED)
812
813     # Check result
814     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
815     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
816     self.assertFalse(job.start_timestamp)
817     self.assert_(job.end_timestamp)
818     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
819                                 for op in job.ops))
820     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
821                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
822                       ["Job canceled by request" for _ in job.ops]])
823
824   def testCancelWhileWaitlock(self):
825     queue = _FakeQueueForProc()
826
827     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
828            for i in range(5)]
829
830     # Create job
831     job_id = 11009
832     job = self._CreateJob(queue, job_id, ops)
833
834     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
835
836     def _BeforeStart(timeout, priority):
837       self.assertEqual(queue.GetNextUpdate(), (job, True))
838       self.assertRaises(IndexError, queue.GetNextUpdate)
839       self.assertFalse(queue.IsAcquired())
840       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
841
842       # Mark as cancelled
843       (success, _) = job.Cancel()
844       self.assert_(success)
845
846       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
847                               for op in job.ops))
848       self.assertRaises(IndexError, queue.GetNextUpdate)
849
850     def _AfterStart(op, cbs):
851       self.assertEqual(queue.GetNextUpdate(), (job, True))
852       self.assertRaises(IndexError, queue.GetNextUpdate)
853       self.assertFalse(queue.IsAcquired())
854       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
855
856     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
857
858     self.assertRaises(IndexError, queue.GetNextUpdate)
859     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
860                      jqueue._JobProcessor.FINISHED)
861     self.assertEqual(queue.GetNextUpdate(), (job, True))
862     self.assertRaises(IndexError, queue.GetNextUpdate)
863
864     # Check result
865     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
866     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
867     self.assert_(job.start_timestamp)
868     self.assert_(job.end_timestamp)
869     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
870                                 for op in job.ops))
871     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
872                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
873                       ["Job canceled by request" for _ in job.ops]])
874
875   def testCancelWhileWaitlockWithTimeout(self):
876     queue = _FakeQueueForProc()
877
878     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
879            for i in range(5)]
880
881     # Create job
882     job_id = 24314
883     job = self._CreateJob(queue, job_id, ops)
884
885     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
886
887     def _BeforeStart(timeout, priority):
888       self.assertFalse(queue.IsAcquired())
889       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
890
891       # Mark as cancelled
892       (success, _) = job.Cancel()
893       self.assert_(success)
894
895       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
896                               for op in job.ops))
897
898       # Fake an acquire attempt timing out
899       raise mcpu.LockAcquireTimeout()
900
901     def _AfterStart(op, cbs):
902       self.fail("Should not reach this")
903
904     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
905
906     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
907                      jqueue._JobProcessor.FINISHED)
908
909     # Check result
910     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
911     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
912     self.assert_(job.start_timestamp)
913     self.assert_(job.end_timestamp)
914     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
915                                 for op in job.ops))
916     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
917                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
918                       ["Job canceled by request" for _ in job.ops]])
919
920   def testCancelWhileRunning(self):
921     # Tests canceling a job with finished opcodes and more, unprocessed ones
922     queue = _FakeQueueForProc()
923
924     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
925            for i in range(3)]
926
927     # Create job
928     job_id = 28492
929     job = self._CreateJob(queue, job_id, ops)
930
931     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
932
933     opexec = _FakeExecOpCodeForProc(queue, None, None)
934
935     # Run one opcode
936     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
937                      jqueue._JobProcessor.DEFER)
938
939     # Job goes back to queued
940     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
941     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
942                      [[constants.OP_STATUS_SUCCESS,
943                        constants.OP_STATUS_QUEUED,
944                        constants.OP_STATUS_QUEUED],
945                       ["Res0", None, None]])
946
947     # Mark as cancelled
948     (success, _) = job.Cancel()
949     self.assert_(success)
950
951     # Try processing another opcode (this will actually cancel the job)
952     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
953                      jqueue._JobProcessor.FINISHED)
954
955     # Check result
956     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
957     self.assertEqual(job.GetInfo(["id"]), [job_id])
958     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
959     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
960                      [[constants.OP_STATUS_SUCCESS,
961                        constants.OP_STATUS_CANCELED,
962                        constants.OP_STATUS_CANCELED],
963                       ["Res0", "Job canceled by request",
964                        "Job canceled by request"]])
965
966   def testPartiallyRun(self):
967     # Tests calling the processor on a job that's been partially run before the
968     # program was restarted
969     queue = _FakeQueueForProc()
970
971     opexec = _FakeExecOpCodeForProc(queue, None, None)
972
973     for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
974       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
975              for i in range(10)]
976
977       # Create job
978       job = self._CreateJob(queue, job_id, ops)
979
980       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
981
982       for _ in range(successcount):
983         self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
984                          jqueue._JobProcessor.DEFER)
985
986       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
987       self.assertEqual(job.GetInfo(["opstatus"]),
988                        [[constants.OP_STATUS_SUCCESS
989                          for _ in range(successcount)] +
990                         [constants.OP_STATUS_QUEUED
991                          for _ in range(len(ops) - successcount)]])
992
993       self.assert_(job.ops_iter)
994
995       # Serialize and restore (simulates program restart)
996       newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
997       self.assertFalse(newjob.ops_iter)
998       self._TestPartial(newjob, successcount)
999
1000   def _TestPartial(self, job, successcount):
1001     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1002     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1003
1004     queue = _FakeQueueForProc()
1005     opexec = _FakeExecOpCodeForProc(queue, None, None)
1006
1007     for remaining in reversed(range(len(job.ops) - successcount)):
1008       result = jqueue._JobProcessor(queue, opexec, job)()
1009       self.assertEqual(queue.GetNextUpdate(), (job, True))
1010       self.assertEqual(queue.GetNextUpdate(), (job, True))
1011       self.assertEqual(queue.GetNextUpdate(), (job, True))
1012       self.assertRaises(IndexError, queue.GetNextUpdate)
1013
1014       if remaining == 0:
1015         # Last opcode
1016         self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1017         break
1018
1019       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1020
1021       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1022
1023     self.assertRaises(IndexError, queue.GetNextUpdate)
1024     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1025     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1026     self.assertEqual(job.GetInfo(["opresult"]),
1027                      [[op.input.result for op in job.ops]])
1028     self.assertEqual(job.GetInfo(["opstatus"]),
1029                      [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1030     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1031                             for op in job.ops))
1032
1033     self._GenericCheckJob(job)
1034
1035     # Calling the processor on a finished job should be a no-op
1036     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1037                      jqueue._JobProcessor.FINISHED)
1038     self.assertRaises(IndexError, queue.GetNextUpdate)
1039
1040     # ... also after being restored
1041     job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1042     # Calling the processor on a finished job should be a no-op
1043     self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1044                      jqueue._JobProcessor.FINISHED)
1045     self.assertRaises(IndexError, queue.GetNextUpdate)
1046
1047   def testProcessorOnRunningJob(self):
1048     ops = [opcodes.OpTestDummy(result="result", fail=False)]
1049
1050     queue = _FakeQueueForProc()
1051     opexec = _FakeExecOpCodeForProc(queue, None, None)
1052
1053     # Create job
1054     job = self._CreateJob(queue, 9571, ops)
1055
1056     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1057
1058     job.ops[0].status = constants.OP_STATUS_RUNNING
1059
1060     assert len(job.ops) == 1
1061
1062     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1063
1064     # Calling on running job must fail
1065     self.assertRaises(errors.ProgrammerError,
1066                       jqueue._JobProcessor(queue, opexec, job))
1067
1068   def testLogMessages(self):
1069     # Tests the "Feedback" callback function
1070     queue = _FakeQueueForProc()
1071
1072     messages = {
1073       1: [
1074         (None, "Hello"),
1075         (None, "World"),
1076         (constants.ELOG_MESSAGE, "there"),
1077         ],
1078       4: [
1079         (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1080         (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1081         ],
1082       }
1083     ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1084                                messages=messages.get(i, []))
1085            for i in range(5)]
1086
1087     # Create job
1088     job = self._CreateJob(queue, 29386, ops)
1089
1090     def _BeforeStart(timeout, priority):
1091       self.assertEqual(queue.GetNextUpdate(), (job, True))
1092       self.assertRaises(IndexError, queue.GetNextUpdate)
1093       self.assertFalse(queue.IsAcquired())
1094       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1095
1096     def _AfterStart(op, cbs):
1097       self.assertEqual(queue.GetNextUpdate(), (job, True))
1098       self.assertRaises(IndexError, queue.GetNextUpdate)
1099       self.assertFalse(queue.IsAcquired())
1100       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1101
1102       self.assertRaises(AssertionError, cbs.Feedback,
1103                         "too", "many", "arguments")
1104
1105       for (log_type, msg) in op.messages:
1106         self.assertRaises(IndexError, queue.GetNextUpdate)
1107         if log_type:
1108           cbs.Feedback(log_type, msg)
1109         else:
1110           cbs.Feedback(msg)
1111         # Check for job update without replication
1112         self.assertEqual(queue.GetNextUpdate(), (job, False))
1113         self.assertRaises(IndexError, queue.GetNextUpdate)
1114
1115     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1116
1117     for remaining in reversed(range(len(job.ops))):
1118       self.assertRaises(IndexError, queue.GetNextUpdate)
1119       result = jqueue._JobProcessor(queue, opexec, job)()
1120       self.assertEqual(queue.GetNextUpdate(), (job, True))
1121       self.assertRaises(IndexError, queue.GetNextUpdate)
1122
1123       if remaining == 0:
1124         # Last opcode
1125         self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1126         break
1127
1128       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1129
1130       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1131
1132     self.assertRaises(IndexError, queue.GetNextUpdate)
1133
1134     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1135     self.assertEqual(job.GetInfo(["opresult"]),
1136                      [[op.input.result for op in job.ops]])
1137
1138     logmsgcount = sum(len(m) for m in messages.values())
1139
1140     self._CheckLogMessages(job, logmsgcount)
1141
1142     # Serialize and restore (simulates program restart)
1143     newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1144     self._CheckLogMessages(newjob, logmsgcount)
1145
1146     # Check each message
1147     prevserial = -1
1148     for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1149       for (serial, timestamp, log_type, msg) in oplog:
1150         (exptype, expmsg) = messages.get(idx).pop(0)
1151         if exptype:
1152           self.assertEqual(log_type, exptype)
1153         else:
1154           self.assertEqual(log_type, constants.ELOG_MESSAGE)
1155         self.assertEqual(expmsg, msg)
1156         self.assert_(serial > prevserial)
1157         prevserial = serial
1158
1159   def _CheckLogMessages(self, job, count):
1160     # Check serial
1161     self.assertEqual(job.log_serial, count)
1162
1163     # No filter
1164     self.assertEqual(job.GetLogEntries(None),
1165                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1166                       for entry in entries])
1167
1168     # Filter with serial
1169     assert count > 3
1170     self.assert_(job.GetLogEntries(3))
1171     self.assertEqual(job.GetLogEntries(3),
1172                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1173                       for entry in entries][3:])
1174
1175     # No log message after highest serial
1176     self.assertFalse(job.GetLogEntries(count))
1177     self.assertFalse(job.GetLogEntries(count + 3))
1178
1179   def testSubmitManyJobs(self):
1180     queue = _FakeQueueForProc()
1181
1182     job_id = 15656
1183     ops = [
1184       opcodes.OpTestDummy(result="Res0", fail=False,
1185                           submit_jobs=[]),
1186       opcodes.OpTestDummy(result="Res1", fail=False,
1187                           submit_jobs=[
1188                             [opcodes.OpTestDummy(result="r1j0", fail=False)],
1189                             ]),
1190       opcodes.OpTestDummy(result="Res2", fail=False,
1191                           submit_jobs=[
1192                             [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1193                              opcodes.OpTestDummy(result="r2j0o1", fail=False),
1194                              opcodes.OpTestDummy(result="r2j0o2", fail=False),
1195                              opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1196                             [opcodes.OpTestDummy(result="r2j1", fail=False)],
1197                             [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1198                              opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1199                             ]),
1200       ]
1201
1202     # Create job
1203     job = self._CreateJob(queue, job_id, ops)
1204
1205     def _BeforeStart(timeout, priority):
1206       self.assertEqual(queue.GetNextUpdate(), (job, True))
1207       self.assertRaises(IndexError, queue.GetNextUpdate)
1208       self.assertFalse(queue.IsAcquired())
1209       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1210       self.assertFalse(job.cur_opctx)
1211
1212     def _AfterStart(op, cbs):
1213       self.assertEqual(queue.GetNextUpdate(), (job, True))
1214       self.assertRaises(IndexError, queue.GetNextUpdate)
1215
1216       self.assertFalse(queue.IsAcquired())
1217       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1218       self.assertFalse(job.cur_opctx)
1219
1220       # Job is running, cancelling shouldn't be possible
1221       (success, _) = job.Cancel()
1222       self.assertFalse(success)
1223
1224     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1225
1226     for idx in range(len(ops)):
1227       self.assertRaises(IndexError, queue.GetNextUpdate)
1228       result = jqueue._JobProcessor(queue, opexec, job)()
1229       self.assertEqual(queue.GetNextUpdate(), (job, True))
1230       self.assertRaises(IndexError, queue.GetNextUpdate)
1231       if idx == len(ops) - 1:
1232         # Last opcode
1233         self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1234       else:
1235         self.assertEqual(result, jqueue._JobProcessor.DEFER)
1236
1237         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1238         self.assert_(job.start_timestamp)
1239         self.assertFalse(job.end_timestamp)
1240
1241     self.assertRaises(IndexError, queue.GetNextUpdate)
1242
1243     for idx, submitted_ops in enumerate(job_ops
1244                                         for op in ops
1245                                         for job_ops in op.submit_jobs):
1246       self.assertEqual(queue.GetNextSubmittedJob(),
1247                        (1000 + idx, submitted_ops))
1248     self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1249
1250     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1251     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1252     self.assertEqual(job.GetInfo(["opresult"]),
1253                      [[[], [1000], [1001, 1002, 1003]]])
1254     self.assertEqual(job.GetInfo(["opstatus"]),
1255                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1256
1257     self._GenericCheckJob(job)
1258
1259     # Calling the processor on a finished job should be a no-op
1260     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1261                      jqueue._JobProcessor.FINISHED)
1262     self.assertRaises(IndexError, queue.GetNextUpdate)
1263
1264   def testJobDependency(self):
1265     depmgr = _FakeDependencyManager()
1266     queue = _FakeQueueForProc(depmgr=depmgr)
1267
1268     self.assertEqual(queue.depmgr, depmgr)
1269
1270     prev_job_id = 22113
1271     prev_job_id2 = 28102
1272     job_id = 29929
1273     ops = [
1274       opcodes.OpTestDummy(result="Res0", fail=False,
1275                           depends=[
1276                             [prev_job_id2, None],
1277                             [prev_job_id, None],
1278                             ]),
1279       opcodes.OpTestDummy(result="Res1", fail=False),
1280       ]
1281
1282     # Create job
1283     job = self._CreateJob(queue, job_id, ops)
1284
1285     def _BeforeStart(timeout, priority):
1286       if attempt == 0 or attempt > 5:
1287         # Job should only be updated when it wasn't waiting for another job
1288         self.assertEqual(queue.GetNextUpdate(), (job, True))
1289       self.assertRaises(IndexError, queue.GetNextUpdate)
1290       self.assertFalse(queue.IsAcquired())
1291       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1292       self.assertFalse(job.cur_opctx)
1293
1294     def _AfterStart(op, cbs):
1295       self.assertEqual(queue.GetNextUpdate(), (job, True))
1296       self.assertRaises(IndexError, queue.GetNextUpdate)
1297
1298       self.assertFalse(queue.IsAcquired())
1299       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1300       self.assertFalse(job.cur_opctx)
1301
1302       # Job is running, cancelling shouldn't be possible
1303       (success, _) = job.Cancel()
1304       self.assertFalse(success)
1305
1306     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1307
1308     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1309
1310     counter = itertools.count()
1311     while True:
1312       attempt = counter.next()
1313
1314       self.assertRaises(IndexError, queue.GetNextUpdate)
1315       self.assertRaises(IndexError, depmgr.GetNextNotification)
1316
1317       if attempt < 2:
1318         depmgr.AddCheckResult(job, prev_job_id2, None,
1319                               (jqueue._JobDependencyManager.WAIT, "wait2"))
1320       elif attempt == 2:
1321         depmgr.AddCheckResult(job, prev_job_id2, None,
1322                               (jqueue._JobDependencyManager.CONTINUE, "cont"))
1323         # The processor will ask for the next dependency immediately
1324         depmgr.AddCheckResult(job, prev_job_id, None,
1325                               (jqueue._JobDependencyManager.WAIT, "wait"))
1326       elif attempt < 5:
1327         depmgr.AddCheckResult(job, prev_job_id, None,
1328                               (jqueue._JobDependencyManager.WAIT, "wait"))
1329       elif attempt == 5:
1330         depmgr.AddCheckResult(job, prev_job_id, None,
1331                               (jqueue._JobDependencyManager.CONTINUE, "cont"))
1332       if attempt == 2:
1333         self.assertEqual(depmgr.CountPendingResults(), 2)
1334       elif attempt > 5:
1335         self.assertEqual(depmgr.CountPendingResults(), 0)
1336       else:
1337         self.assertEqual(depmgr.CountPendingResults(), 1)
1338
1339       result = jqueue._JobProcessor(queue, opexec, job)()
1340       if attempt == 0 or attempt >= 5:
1341         # Job should only be updated if there was an actual change
1342         self.assertEqual(queue.GetNextUpdate(), (job, True))
1343       self.assertRaises(IndexError, queue.GetNextUpdate)
1344       self.assertFalse(depmgr.CountPendingResults())
1345
1346       if attempt < 5:
1347         # Simulate waiting for other job
1348         self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1349         self.assertTrue(job.cur_opctx)
1350         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1351         self.assertRaises(IndexError, depmgr.GetNextNotification)
1352         self.assert_(job.start_timestamp)
1353         self.assertFalse(job.end_timestamp)
1354         continue
1355
1356       if result == jqueue._JobProcessor.FINISHED:
1357         # Last opcode
1358         self.assertFalse(job.cur_opctx)
1359         break
1360
1361       self.assertRaises(IndexError, depmgr.GetNextNotification)
1362
1363       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1364       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1365       self.assert_(job.start_timestamp)
1366       self.assertFalse(job.end_timestamp)
1367
1368     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1369     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1370     self.assertEqual(job.GetInfo(["opresult"]),
1371                      [[op.input.result for op in job.ops]])
1372     self.assertEqual(job.GetInfo(["opstatus"]),
1373                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1374     self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1375                                for op in job.ops))
1376
1377     self._GenericCheckJob(job)
1378
1379     self.assertRaises(IndexError, queue.GetNextUpdate)
1380     self.assertRaises(IndexError, depmgr.GetNextNotification)
1381     self.assertFalse(depmgr.CountPendingResults())
1382     self.assertFalse(depmgr.CountWaitingJobs())
1383
1384     # Calling the processor on a finished job should be a no-op
1385     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1386                      jqueue._JobProcessor.FINISHED)
1387     self.assertRaises(IndexError, queue.GetNextUpdate)
1388
1389   def testJobDependencyCancel(self):
1390     depmgr = _FakeDependencyManager()
1391     queue = _FakeQueueForProc(depmgr=depmgr)
1392
1393     self.assertEqual(queue.depmgr, depmgr)
1394
1395     prev_job_id = 13623
1396     job_id = 30876
1397     ops = [
1398       opcodes.OpTestDummy(result="Res0", fail=False),
1399       opcodes.OpTestDummy(result="Res1", fail=False,
1400                           depends=[
1401                             [prev_job_id, None],
1402                             ]),
1403       opcodes.OpTestDummy(result="Res2", fail=False),
1404       ]
1405
1406     # Create job
1407     job = self._CreateJob(queue, job_id, ops)
1408
1409     def _BeforeStart(timeout, priority):
1410       if attempt == 0 or attempt > 5:
1411         # Job should only be updated when it wasn't waiting for another job
1412         self.assertEqual(queue.GetNextUpdate(), (job, True))
1413       self.assertRaises(IndexError, queue.GetNextUpdate)
1414       self.assertFalse(queue.IsAcquired())
1415       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1416       self.assertFalse(job.cur_opctx)
1417
1418     def _AfterStart(op, cbs):
1419       self.assertEqual(queue.GetNextUpdate(), (job, True))
1420       self.assertRaises(IndexError, queue.GetNextUpdate)
1421
1422       self.assertFalse(queue.IsAcquired())
1423       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1424       self.assertFalse(job.cur_opctx)
1425
1426       # Job is running, cancelling shouldn't be possible
1427       (success, _) = job.Cancel()
1428       self.assertFalse(success)
1429
1430     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1431
1432     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1433
1434     counter = itertools.count()
1435     while True:
1436       attempt = counter.next()
1437
1438       self.assertRaises(IndexError, queue.GetNextUpdate)
1439       self.assertRaises(IndexError, depmgr.GetNextNotification)
1440
1441       if attempt == 0:
1442         # This will handle the first opcode
1443         pass
1444       elif attempt < 4:
1445         depmgr.AddCheckResult(job, prev_job_id, None,
1446                               (jqueue._JobDependencyManager.WAIT, "wait"))
1447       elif attempt == 4:
1448         # Other job was cancelled
1449         depmgr.AddCheckResult(job, prev_job_id, None,
1450                               (jqueue._JobDependencyManager.CANCEL, "cancel"))
1451
1452       if attempt == 0:
1453         self.assertEqual(depmgr.CountPendingResults(), 0)
1454       else:
1455         self.assertEqual(depmgr.CountPendingResults(), 1)
1456
1457       result = jqueue._JobProcessor(queue, opexec, job)()
1458       if attempt <= 1 or attempt >= 4:
1459         # Job should only be updated if there was an actual change
1460         self.assertEqual(queue.GetNextUpdate(), (job, True))
1461       self.assertRaises(IndexError, queue.GetNextUpdate)
1462       self.assertFalse(depmgr.CountPendingResults())
1463
1464       if attempt > 0 and attempt < 4:
1465         # Simulate waiting for other job
1466         self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1467         self.assertTrue(job.cur_opctx)
1468         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1469         self.assertRaises(IndexError, depmgr.GetNextNotification)
1470         self.assert_(job.start_timestamp)
1471         self.assertFalse(job.end_timestamp)
1472         continue
1473
1474       if result == jqueue._JobProcessor.FINISHED:
1475         # Last opcode
1476         self.assertFalse(job.cur_opctx)
1477         break
1478
1479       self.assertRaises(IndexError, depmgr.GetNextNotification)
1480
1481       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1482       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1483       self.assert_(job.start_timestamp)
1484       self.assertFalse(job.end_timestamp)
1485
1486     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1487     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1488     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1489                      [[constants.OP_STATUS_SUCCESS,
1490                        constants.OP_STATUS_CANCELED,
1491                        constants.OP_STATUS_CANCELED],
1492                       ["Res0", "Job canceled by request",
1493                        "Job canceled by request"]])
1494
1495     self._GenericCheckJob(job)
1496
1497     self.assertRaises(IndexError, queue.GetNextUpdate)
1498     self.assertRaises(IndexError, depmgr.GetNextNotification)
1499     self.assertFalse(depmgr.CountPendingResults())
1500
1501     # Calling the processor on a finished job should be a no-op
1502     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1503                      jqueue._JobProcessor.FINISHED)
1504     self.assertRaises(IndexError, queue.GetNextUpdate)
1505
1506   def testJobDependencyWrongstatus(self):
1507     depmgr = _FakeDependencyManager()
1508     queue = _FakeQueueForProc(depmgr=depmgr)
1509
1510     self.assertEqual(queue.depmgr, depmgr)
1511
1512     prev_job_id = 9741
1513     job_id = 11763
1514     ops = [
1515       opcodes.OpTestDummy(result="Res0", fail=False),
1516       opcodes.OpTestDummy(result="Res1", fail=False,
1517                           depends=[
1518                             [prev_job_id, None],
1519                             ]),
1520       opcodes.OpTestDummy(result="Res2", fail=False),
1521       ]
1522
1523     # Create job
1524     job = self._CreateJob(queue, job_id, ops)
1525
1526     def _BeforeStart(timeout, priority):
1527       if attempt == 0 or attempt > 5:
1528         # Job should only be updated when it wasn't waiting for another job
1529         self.assertEqual(queue.GetNextUpdate(), (job, True))
1530       self.assertRaises(IndexError, queue.GetNextUpdate)
1531       self.assertFalse(queue.IsAcquired())
1532       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1533       self.assertFalse(job.cur_opctx)
1534
1535     def _AfterStart(op, cbs):
1536       self.assertEqual(queue.GetNextUpdate(), (job, True))
1537       self.assertRaises(IndexError, queue.GetNextUpdate)
1538
1539       self.assertFalse(queue.IsAcquired())
1540       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1541       self.assertFalse(job.cur_opctx)
1542
1543       # Job is running, cancelling shouldn't be possible
1544       (success, _) = job.Cancel()
1545       self.assertFalse(success)
1546
1547     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1548
1549     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1550
1551     counter = itertools.count()
1552     while True:
1553       attempt = counter.next()
1554
1555       self.assertRaises(IndexError, queue.GetNextUpdate)
1556       self.assertRaises(IndexError, depmgr.GetNextNotification)
1557
1558       if attempt == 0:
1559         # This will handle the first opcode
1560         pass
1561       elif attempt < 4:
1562         depmgr.AddCheckResult(job, prev_job_id, None,
1563                               (jqueue._JobDependencyManager.WAIT, "wait"))
1564       elif attempt == 4:
1565         # Other job failed
1566         depmgr.AddCheckResult(job, prev_job_id, None,
1567                               (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1568
1569       if attempt == 0:
1570         self.assertEqual(depmgr.CountPendingResults(), 0)
1571       else:
1572         self.assertEqual(depmgr.CountPendingResults(), 1)
1573
1574       result = jqueue._JobProcessor(queue, opexec, job)()
1575       if attempt <= 1 or attempt >= 4:
1576         # Job should only be updated if there was an actual change
1577         self.assertEqual(queue.GetNextUpdate(), (job, True))
1578       self.assertRaises(IndexError, queue.GetNextUpdate)
1579       self.assertFalse(depmgr.CountPendingResults())
1580
1581       if attempt > 0 and attempt < 4:
1582         # Simulate waiting for other job
1583         self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1584         self.assertTrue(job.cur_opctx)
1585         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1586         self.assertRaises(IndexError, depmgr.GetNextNotification)
1587         self.assert_(job.start_timestamp)
1588         self.assertFalse(job.end_timestamp)
1589         continue
1590
1591       if result == jqueue._JobProcessor.FINISHED:
1592         # Last opcode
1593         self.assertFalse(job.cur_opctx)
1594         break
1595
1596       self.assertRaises(IndexError, depmgr.GetNextNotification)
1597
1598       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1599       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1600       self.assert_(job.start_timestamp)
1601       self.assertFalse(job.end_timestamp)
1602
1603     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1604     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1605     self.assertEqual(job.GetInfo(["opstatus"]),
1606                      [[constants.OP_STATUS_SUCCESS,
1607                        constants.OP_STATUS_ERROR,
1608                        constants.OP_STATUS_ERROR]]),
1609
1610     (opresult, ) = job.GetInfo(["opresult"])
1611     self.assertEqual(len(opresult), len(ops))
1612     self.assertEqual(opresult[0], "Res0")
1613     self.assertTrue(errors.GetEncodedError(opresult[1]))
1614     self.assertTrue(errors.GetEncodedError(opresult[2]))
1615
1616     self._GenericCheckJob(job)
1617
1618     self.assertRaises(IndexError, queue.GetNextUpdate)
1619     self.assertRaises(IndexError, depmgr.GetNextNotification)
1620     self.assertFalse(depmgr.CountPendingResults())
1621
1622     # Calling the processor on a finished job should be a no-op
1623     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1624                      jqueue._JobProcessor.FINISHED)
1625     self.assertRaises(IndexError, queue.GetNextUpdate)
1626
1627
1628 class _FakeTimeoutStrategy:
1629   def __init__(self, timeouts):
1630     self.timeouts = timeouts
1631     self.attempts = 0
1632     self.last_timeout = None
1633
1634   def NextAttempt(self):
1635     self.attempts += 1
1636     if self.timeouts:
1637       timeout = self.timeouts.pop(0)
1638     else:
1639       timeout = None
1640     self.last_timeout = timeout
1641     return timeout
1642
1643
1644 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1645   def setUp(self):
1646     self.queue = _FakeQueueForProc()
1647     self.job = None
1648     self.curop = None
1649     self.opcounter = None
1650     self.timeout_strategy = None
1651     self.retries = 0
1652     self.prev_tsop = None
1653     self.prev_prio = None
1654     self.prev_status = None
1655     self.lock_acq_prio = None
1656     self.gave_lock = None
1657     self.done_lock_before_blocking = False
1658
1659   def _BeforeStart(self, timeout, priority):
1660     job = self.job
1661
1662     # If status has changed, job must've been written
1663     if self.prev_status != self.job.ops[self.curop].status:
1664       self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1665     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1666
1667     self.assertFalse(self.queue.IsAcquired())
1668     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1669
1670     ts = self.timeout_strategy
1671
1672     self.assert_(timeout is None or isinstance(timeout, (int, float)))
1673     self.assertEqual(timeout, ts.last_timeout)
1674     self.assertEqual(priority, job.ops[self.curop].priority)
1675
1676     self.gave_lock = True
1677     self.lock_acq_prio = priority
1678
1679     if (self.curop == 3 and
1680         job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1681       # Give locks before running into blocking acquire
1682       assert self.retries == 7
1683       self.retries = 0
1684       self.done_lock_before_blocking = True
1685       return
1686
1687     if self.retries > 0:
1688       self.assert_(timeout is not None)
1689       self.retries -= 1
1690       self.gave_lock = False
1691       raise mcpu.LockAcquireTimeout()
1692
1693     if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1694       assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1695       assert not ts.timeouts
1696       self.assert_(timeout is None)
1697
1698   def _AfterStart(self, op, cbs):
1699     job = self.job
1700
1701     # Setting to "running" requires an update
1702     self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1703     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1704
1705     self.assertFalse(self.queue.IsAcquired())
1706     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1707
1708     # Job is running, cancelling shouldn't be possible
1709     (success, _) = job.Cancel()
1710     self.assertFalse(success)
1711
1712   def _NextOpcode(self):
1713     self.curop = self.opcounter.next()
1714     self.prev_prio = self.job.ops[self.curop].priority
1715     self.prev_status = self.job.ops[self.curop].status
1716
1717   def _NewTimeoutStrategy(self):
1718     job = self.job
1719
1720     self.assertEqual(self.retries, 0)
1721
1722     if self.prev_tsop == self.curop:
1723       # Still on the same opcode, priority must've been increased
1724       self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1725
1726     if self.curop == 1:
1727       # Normal retry
1728       timeouts = range(10, 31, 10)
1729       self.retries = len(timeouts) - 1
1730
1731     elif self.curop == 2:
1732       # Let this run into a blocking acquire
1733       timeouts = range(11, 61, 12)
1734       self.retries = len(timeouts)
1735
1736     elif self.curop == 3:
1737       # Wait for priority to increase, but give lock before blocking acquire
1738       timeouts = range(12, 100, 14)
1739       self.retries = len(timeouts)
1740
1741       self.assertFalse(self.done_lock_before_blocking)
1742
1743     elif self.curop == 4:
1744       self.assert_(self.done_lock_before_blocking)
1745
1746       # Timeouts, but no need to retry
1747       timeouts = range(10, 31, 10)
1748       self.retries = 0
1749
1750     elif self.curop == 5:
1751       # Normal retry
1752       timeouts = range(19, 100, 11)
1753       self.retries = len(timeouts)
1754
1755     else:
1756       timeouts = []
1757       self.retries = 0
1758
1759     assert len(job.ops) == 10
1760     assert self.retries <= len(timeouts)
1761
1762     ts = _FakeTimeoutStrategy(timeouts)
1763
1764     self.timeout_strategy = ts
1765     self.prev_tsop = self.curop
1766     self.prev_prio = job.ops[self.curop].priority
1767
1768     return ts
1769
1770   def testTimeout(self):
1771     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1772            for i in range(10)]
1773
1774     # Create job
1775     job_id = 15801
1776     job = self._CreateJob(self.queue, job_id, ops)
1777     self.job = job
1778
1779     self.opcounter = itertools.count(0)
1780
1781     opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1782                                     self._AfterStart)
1783     tsf = self._NewTimeoutStrategy
1784
1785     self.assertFalse(self.done_lock_before_blocking)
1786
1787     while True:
1788       proc = jqueue._JobProcessor(self.queue, opexec, job,
1789                                   _timeout_strategy_factory=tsf)
1790
1791       self.assertRaises(IndexError, self.queue.GetNextUpdate)
1792
1793       if self.curop is not None:
1794         self.prev_status = self.job.ops[self.curop].status
1795
1796       self.lock_acq_prio = None
1797
1798       result = proc(_nextop_fn=self._NextOpcode)
1799       assert self.curop is not None
1800
1801       if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
1802         # Got lock and/or job is done, result must've been written
1803         self.assertFalse(job.cur_opctx)
1804         self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1805         self.assertRaises(IndexError, self.queue.GetNextUpdate)
1806         self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1807         self.assert_(job.ops[self.curop].exec_timestamp)
1808
1809       if result == jqueue._JobProcessor.FINISHED:
1810         self.assertFalse(job.cur_opctx)
1811         break
1812
1813       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1814
1815       if self.curop == 0:
1816         self.assertEqual(job.ops[self.curop].start_timestamp,
1817                          job.start_timestamp)
1818
1819       if self.gave_lock:
1820         # Opcode finished, but job not yet done
1821         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1822       else:
1823         # Did not get locks
1824         self.assert_(job.cur_opctx)
1825         self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1826                          self.timeout_strategy.NextAttempt)
1827         self.assertFalse(job.ops[self.curop].exec_timestamp)
1828         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1829
1830         # If priority has changed since acquiring locks, the job must've been
1831         # updated
1832         if self.lock_acq_prio != job.ops[self.curop].priority:
1833           self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1834
1835       self.assertRaises(IndexError, self.queue.GetNextUpdate)
1836
1837       self.assert_(job.start_timestamp)
1838       self.assertFalse(job.end_timestamp)
1839
1840     self.assertEqual(self.curop, len(job.ops) - 1)
1841     self.assertEqual(self.job, job)
1842     self.assertEqual(self.opcounter.next(), len(job.ops))
1843     self.assert_(self.done_lock_before_blocking)
1844
1845     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1846     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1847     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1848     self.assertEqual(job.GetInfo(["opresult"]),
1849                      [[op.input.result for op in job.ops]])
1850     self.assertEqual(job.GetInfo(["opstatus"]),
1851                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1852     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1853                             for op in job.ops))
1854
1855     # Calling the processor on a finished job should be a no-op
1856     self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
1857                      jqueue._JobProcessor.FINISHED)
1858     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1859
1860
1861 class TestJobDependencyManager(unittest.TestCase):
1862   class _FakeJob:
1863     def __init__(self, job_id):
1864       self.id = str(job_id)
1865
1866   def setUp(self):
1867     self._status = []
1868     self._queue = []
1869     self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1870
1871   def _GetStatus(self, job_id):
1872     (exp_job_id, result) = self._status.pop(0)
1873     self.assertEqual(exp_job_id, job_id)
1874     return result
1875
1876   def _Enqueue(self, jobs):
1877     self._queue.append(jobs)
1878
1879   def testNotFinalizedThenCancel(self):
1880     job = self._FakeJob(17697)
1881     job_id = str(28625)
1882
1883     self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1884     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1885     self.assertEqual(result, self.jdm.WAIT)
1886     self.assertFalse(self._status)
1887     self.assertFalse(self._queue)
1888     self.assertTrue(self.jdm.JobWaiting(job))
1889     self.assertEqual(self.jdm._waiters, {
1890       job_id: set([job]),
1891       })
1892     self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1893       ("job/28625", None, None, [("job", [job.id])])
1894       ])
1895
1896     self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1897     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1898     self.assertEqual(result, self.jdm.CANCEL)
1899     self.assertFalse(self._status)
1900     self.assertFalse(self._queue)
1901     self.assertFalse(self.jdm.JobWaiting(job))
1902     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1903
1904   def testRequireCancel(self):
1905     job = self._FakeJob(5278)
1906     job_id = str(9610)
1907     dep_status = [constants.JOB_STATUS_CANCELED]
1908
1909     self._status.append((job_id, constants.JOB_STATUS_WAITING))
1910     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1911     self.assertEqual(result, self.jdm.WAIT)
1912     self.assertFalse(self._status)
1913     self.assertFalse(self._queue)
1914     self.assertTrue(self.jdm.JobWaiting(job))
1915     self.assertEqual(self.jdm._waiters, {
1916       job_id: set([job]),
1917       })
1918     self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1919       ("job/9610", None, None, [("job", [job.id])])
1920       ])
1921
1922     self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1923     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1924     self.assertEqual(result, self.jdm.CONTINUE)
1925     self.assertFalse(self._status)
1926     self.assertFalse(self._queue)
1927     self.assertFalse(self.jdm.JobWaiting(job))
1928     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1929
1930   def testRequireError(self):
1931     job = self._FakeJob(21459)
1932     job_id = str(25519)
1933     dep_status = [constants.JOB_STATUS_ERROR]
1934
1935     self._status.append((job_id, constants.JOB_STATUS_WAITING))
1936     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1937     self.assertEqual(result, self.jdm.WAIT)
1938     self.assertFalse(self._status)
1939     self.assertFalse(self._queue)
1940     self.assertTrue(self.jdm.JobWaiting(job))
1941     self.assertEqual(self.jdm._waiters, {
1942       job_id: set([job]),
1943       })
1944
1945     self._status.append((job_id, constants.JOB_STATUS_ERROR))
1946     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1947     self.assertEqual(result, self.jdm.CONTINUE)
1948     self.assertFalse(self._status)
1949     self.assertFalse(self._queue)
1950     self.assertFalse(self.jdm.JobWaiting(job))
1951     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1952
1953   def testRequireMultiple(self):
1954     dep_status = list(constants.JOBS_FINALIZED)
1955
1956     for end_status in dep_status:
1957       job = self._FakeJob(21343)
1958       job_id = str(14609)
1959
1960       self._status.append((job_id, constants.JOB_STATUS_WAITING))
1961       (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1962       self.assertEqual(result, self.jdm.WAIT)
1963       self.assertFalse(self._status)
1964       self.assertFalse(self._queue)
1965       self.assertTrue(self.jdm.JobWaiting(job))
1966       self.assertEqual(self.jdm._waiters, {
1967         job_id: set([job]),
1968         })
1969       self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1970         ("job/14609", None, None, [("job", [job.id])])
1971         ])
1972
1973       self._status.append((job_id, end_status))
1974       (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1975       self.assertEqual(result, self.jdm.CONTINUE)
1976       self.assertFalse(self._status)
1977       self.assertFalse(self._queue)
1978       self.assertFalse(self.jdm.JobWaiting(job))
1979       self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1980
1981   def testNotify(self):
1982     job = self._FakeJob(8227)
1983     job_id = str(4113)
1984
1985     self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1986     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1987     self.assertEqual(result, self.jdm.WAIT)
1988     self.assertFalse(self._status)
1989     self.assertFalse(self._queue)
1990     self.assertTrue(self.jdm.JobWaiting(job))
1991     self.assertEqual(self.jdm._waiters, {
1992       job_id: set([job]),
1993       })
1994
1995     self.jdm.NotifyWaiters(job_id)
1996     self.assertFalse(self._status)
1997     self.assertFalse(self.jdm._waiters)
1998     self.assertFalse(self.jdm.JobWaiting(job))
1999     self.assertEqual(self._queue, [set([job])])
2000
2001   def testWrongStatus(self):
2002     job = self._FakeJob(10102)
2003     job_id = str(1271)
2004
2005     self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2006     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2007                                             [constants.JOB_STATUS_SUCCESS])
2008     self.assertEqual(result, self.jdm.WAIT)
2009     self.assertFalse(self._status)
2010     self.assertFalse(self._queue)
2011     self.assertTrue(self.jdm.JobWaiting(job))
2012     self.assertEqual(self.jdm._waiters, {
2013       job_id: set([job]),
2014       })
2015
2016     self._status.append((job_id, constants.JOB_STATUS_ERROR))
2017     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2018                                             [constants.JOB_STATUS_SUCCESS])
2019     self.assertEqual(result, self.jdm.WRONGSTATUS)
2020     self.assertFalse(self._status)
2021     self.assertFalse(self._queue)
2022     self.assertFalse(self.jdm.JobWaiting(job))
2023
2024   def testCorrectStatus(self):
2025     job = self._FakeJob(24273)
2026     job_id = str(23885)
2027
2028     self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2029     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2030                                             [constants.JOB_STATUS_SUCCESS])
2031     self.assertEqual(result, self.jdm.WAIT)
2032     self.assertFalse(self._status)
2033     self.assertFalse(self._queue)
2034     self.assertTrue(self.jdm.JobWaiting(job))
2035     self.assertEqual(self.jdm._waiters, {
2036       job_id: set([job]),
2037       })
2038
2039     self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2040     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2041                                             [constants.JOB_STATUS_SUCCESS])
2042     self.assertEqual(result, self.jdm.CONTINUE)
2043     self.assertFalse(self._status)
2044     self.assertFalse(self._queue)
2045     self.assertFalse(self.jdm.JobWaiting(job))
2046
2047   def testFinalizedRightAway(self):
2048     job = self._FakeJob(224)
2049     job_id = str(3081)
2050
2051     self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2052     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2053                                             [constants.JOB_STATUS_SUCCESS])
2054     self.assertEqual(result, self.jdm.CONTINUE)
2055     self.assertFalse(self._status)
2056     self.assertFalse(self._queue)
2057     self.assertFalse(self.jdm.JobWaiting(job))
2058     self.assertEqual(self.jdm._waiters, {
2059       job_id: set(),
2060       })
2061
2062     # Force cleanup
2063     self.jdm.NotifyWaiters("0")
2064     self.assertFalse(self.jdm._waiters)
2065     self.assertFalse(self._status)
2066     self.assertFalse(self._queue)
2067
2068   def testMultipleWaiting(self):
2069     # Use a deterministic random generator
2070     rnd = random.Random(21402)
2071
2072     job_ids = map(str, rnd.sample(range(1, 10000), 150))
2073
2074     waiters = dict((job_ids.pop(),
2075                     set(map(self._FakeJob,
2076                             [job_ids.pop()
2077                              for _ in range(rnd.randint(1, 20))])))
2078                    for _ in range(10))
2079
2080     # Ensure there are no duplicate job IDs
2081     assert not utils.FindDuplicates(waiters.keys() +
2082                                     [job.id
2083                                      for jobs in waiters.values()
2084                                      for job in jobs])
2085
2086     # Register all jobs as waiters
2087     for job_id, job in [(job_id, job)
2088                         for (job_id, jobs) in waiters.items()
2089                         for job in jobs]:
2090       self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2091       (result, _) = self.jdm.CheckAndRegister(job, job_id,
2092                                               [constants.JOB_STATUS_SUCCESS])
2093       self.assertEqual(result, self.jdm.WAIT)
2094       self.assertFalse(self._status)
2095       self.assertFalse(self._queue)
2096       self.assertTrue(self.jdm.JobWaiting(job))
2097
2098     self.assertEqual(self.jdm._waiters, waiters)
2099
2100     def _MakeSet((name, mode, owner_names, pending)):
2101       return (name, mode, owner_names,
2102               [(pendmode, set(pend)) for (pendmode, pend) in pending])
2103
2104     def _CheckLockInfo():
2105       info = self.jdm.GetLockInfo([query.LQ_PENDING])
2106       self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2107         ("job/%s" % job_id, None, None,
2108          [("job", set([job.id for job in jobs]))])
2109         for job_id, jobs in waiters.items()
2110         if jobs
2111         ]))
2112
2113     _CheckLockInfo()
2114
2115     # Notify in random order
2116     for job_id in rnd.sample(waiters, len(waiters)):
2117       # Remove from pending waiter list
2118       jobs = waiters.pop(job_id)
2119       for job in jobs:
2120         self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2121         (result, _) = self.jdm.CheckAndRegister(job, job_id,
2122                                                 [constants.JOB_STATUS_SUCCESS])
2123         self.assertEqual(result, self.jdm.CONTINUE)
2124         self.assertFalse(self._status)
2125         self.assertFalse(self._queue)
2126         self.assertFalse(self.jdm.JobWaiting(job))
2127
2128       _CheckLockInfo()
2129
2130     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2131
2132     assert not waiters
2133
2134   def testSelfDependency(self):
2135     job = self._FakeJob(18937)
2136
2137     self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2138     (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2139     self.assertEqual(result, self.jdm.ERROR)
2140
2141   def testJobDisappears(self):
2142     job = self._FakeJob(30540)
2143     job_id = str(23769)
2144
2145     def _FakeStatus(_):
2146       raise errors.JobLost("#msg#")
2147
2148     jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2149     (result, _) = jdm.CheckAndRegister(job, job_id, [])
2150     self.assertEqual(result, self.jdm.ERROR)
2151     self.assertFalse(jdm.JobWaiting(job))
2152     self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2153
2154
2155 if __name__ == "__main__":
2156   testutils.GanetiTestProgram()