Introduce a TMaybe combinator
[ganeti-local] / test / ganeti.jqueue_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.jqueue"""
23
24 import os
25 import sys
26 import unittest
27 import tempfile
28 import shutil
29 import errno
30 import itertools
31 import random
32 import operator
33
34 from ganeti import constants
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import jqueue
38 from ganeti import opcodes
39 from ganeti import compat
40 from ganeti import mcpu
41 from ganeti import query
42 from ganeti import workerpool
43
44 import testutils
45
46
47 class _FakeJob:
48   def __init__(self, job_id, status):
49     self.id = job_id
50     self.writable = False
51     self._status = status
52     self._log = []
53
54   def SetStatus(self, status):
55     self._status = status
56
57   def AddLogEntry(self, msg):
58     self._log.append((len(self._log), msg))
59
60   def CalcStatus(self):
61     return self._status
62
63   def GetInfo(self, fields):
64     result = []
65
66     for name in fields:
67       if name == "status":
68         result.append(self._status)
69       else:
70         raise Exception("Unknown field")
71
72     return result
73
74   def GetLogEntries(self, newer_than):
75     assert newer_than is None or newer_than >= 0
76
77     if newer_than is None:
78       return self._log
79
80     return self._log[newer_than:]
81
82
83 class TestJobChangesChecker(unittest.TestCase):
84   def testStatus(self):
85     job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
86     checker = jqueue._JobChangesChecker(["status"], None, None)
87     self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
88
89     job.SetStatus(constants.JOB_STATUS_RUNNING)
90     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
91
92     job.SetStatus(constants.JOB_STATUS_SUCCESS)
93     self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
94
95     # job.id is used by checker
96     self.assertEqual(job.id, 9094)
97
98   def testStatusWithPrev(self):
99     job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
100     checker = jqueue._JobChangesChecker(["status"],
101                                         [constants.JOB_STATUS_QUEUED], None)
102     self.assert_(checker(job) is None)
103
104     job.SetStatus(constants.JOB_STATUS_RUNNING)
105     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
106
107   def testFinalStatus(self):
108     for status in constants.JOBS_FINALIZED:
109       job = _FakeJob(2178711, status)
110       checker = jqueue._JobChangesChecker(["status"], [status], None)
111       # There won't be any changes in this status, hence it should signal
112       # a change immediately
113       self.assertEqual(checker(job), ([status], []))
114
115   def testLog(self):
116     job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
117     checker = jqueue._JobChangesChecker(["status"], None, None)
118     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
119
120     job.AddLogEntry("Hello World")
121     (job_info, log_entries) = checker(job)
122     self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
123     self.assertEqual(log_entries, [[0, "Hello World"]])
124
125     checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
126     self.assert_(checker2(job) is None)
127
128     job.AddLogEntry("Foo Bar")
129     job.SetStatus(constants.JOB_STATUS_ERROR)
130
131     (job_info, log_entries) = checker2(job)
132     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133     self.assertEqual(log_entries, [[1, "Foo Bar"]])
134
135     checker3 = jqueue._JobChangesChecker(["status"], None, None)
136     (job_info, log_entries) = checker3(job)
137     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
138     self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
139
140
141 class TestJobChangesWaiter(unittest.TestCase):
142   def setUp(self):
143     self.tmpdir = tempfile.mkdtemp()
144     self.filename = utils.PathJoin(self.tmpdir, "job-1")
145     utils.WriteFile(self.filename, data="")
146
147   def tearDown(self):
148     shutil.rmtree(self.tmpdir)
149
150   def _EnsureNotifierClosed(self, notifier):
151     try:
152       os.fstat(notifier._fd)
153     except EnvironmentError, err:
154       self.assertEqual(err.errno, errno.EBADF)
155     else:
156       self.fail("File descriptor wasn't closed")
157
158   def testClose(self):
159     for wait in [False, True]:
160       waiter = jqueue._JobFileChangesWaiter(self.filename)
161       try:
162         if wait:
163           waiter.Wait(0.001)
164       finally:
165         waiter.Close()
166
167       # Ensure file descriptor was closed
168       self._EnsureNotifierClosed(waiter._notifier)
169
170   def testChangingFile(self):
171     waiter = jqueue._JobFileChangesWaiter(self.filename)
172     try:
173       self.assertFalse(waiter.Wait(0.1))
174       utils.WriteFile(self.filename, data="changed")
175       self.assert_(waiter.Wait(60))
176     finally:
177       waiter.Close()
178
179     self._EnsureNotifierClosed(waiter._notifier)
180
181   def testChangingFile2(self):
182     waiter = jqueue._JobChangesWaiter(self.filename)
183     try:
184       self.assertFalse(waiter._filewaiter)
185       self.assert_(waiter.Wait(0.1))
186       self.assert_(waiter._filewaiter)
187
188       # File waiter is now used, but there have been no changes
189       self.assertFalse(waiter.Wait(0.1))
190       utils.WriteFile(self.filename, data="changed")
191       self.assert_(waiter.Wait(60))
192     finally:
193       waiter.Close()
194
195     self._EnsureNotifierClosed(waiter._filewaiter._notifier)
196
197
198 class TestWaitForJobChangesHelper(unittest.TestCase):
199   def setUp(self):
200     self.tmpdir = tempfile.mkdtemp()
201     self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
202     utils.WriteFile(self.filename, data="")
203
204   def tearDown(self):
205     shutil.rmtree(self.tmpdir)
206
207   def _LoadWaitingJob(self):
208     return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
209
210   def _LoadLostJob(self):
211     return None
212
213   def testNoChanges(self):
214     wfjc = jqueue._WaitForJobChangesHelper()
215
216     # No change
217     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
218                           [constants.JOB_STATUS_WAITING], None, 0.1),
219                      constants.JOB_NOTCHANGED)
220
221     # No previous information
222     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
223                           ["status"], None, None, 1.0),
224                      ([constants.JOB_STATUS_WAITING], []))
225
226   def testLostJob(self):
227     wfjc = jqueue._WaitForJobChangesHelper()
228     self.assert_(wfjc(self.filename, self._LoadLostJob,
229                       ["status"], None, None, 1.0) is None)
230
231
232 class TestEncodeOpError(unittest.TestCase):
233   def test(self):
234     encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
235     self.assert_(isinstance(encerr, tuple))
236     self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
237
238     encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
239     self.assert_(isinstance(encerr, tuple))
240     self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
241
242     encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
243     self.assert_(isinstance(encerr, tuple))
244     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
245
246     encerr = jqueue._EncodeOpError("Hello World")
247     self.assert_(isinstance(encerr, tuple))
248     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
249
250
251 class TestQueuedOpCode(unittest.TestCase):
252   def testDefaults(self):
253     def _Check(op):
254       self.assertFalse(hasattr(op.input, "dry_run"))
255       self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
256       self.assertFalse(op.log)
257       self.assert_(op.start_timestamp is None)
258       self.assert_(op.exec_timestamp is None)
259       self.assert_(op.end_timestamp is None)
260       self.assert_(op.result is None)
261       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
262
263     op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
264     _Check(op1)
265     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
266     _Check(op2)
267     self.assertEqual(op1.Serialize(), op2.Serialize())
268
269   def testPriority(self):
270     def _Check(op):
271       assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
272              "Default priority equals high priority; test can't work"
273       self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
274       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
275
276     inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
277     op1 = jqueue._QueuedOpCode(inpop)
278     _Check(op1)
279     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
280     _Check(op2)
281     self.assertEqual(op1.Serialize(), op2.Serialize())
282
283
284 class TestQueuedJob(unittest.TestCase):
285   def testNoOpCodes(self):
286     self.assertRaises(errors.GenericError, jqueue._QueuedJob,
287                       None, 1, [], False)
288
289   def testDefaults(self):
290     job_id = 4260
291     ops = [
292       opcodes.OpTagsGet(),
293       opcodes.OpTestDelay(),
294       ]
295
296     def _Check(job):
297       self.assertTrue(job.writable)
298       self.assertEqual(job.id, job_id)
299       self.assertEqual(job.log_serial, 0)
300       self.assert_(job.received_timestamp)
301       self.assert_(job.start_timestamp is None)
302       self.assert_(job.end_timestamp is None)
303       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
304       self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
305       self.assert_(repr(job).startswith("<"))
306       self.assertEqual(len(job.ops), len(ops))
307       self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
308                               for (inp, op) in zip(ops, job.ops)))
309       self.assertRaises(errors.OpPrereqError, job.GetInfo,
310                         ["unknown-field"])
311       self.assertEqual(job.GetInfo(["summary"]),
312                        [[op.input.Summary() for op in job.ops]])
313       self.assertFalse(job.archived)
314
315     job1 = jqueue._QueuedJob(None, job_id, ops, True)
316     _Check(job1)
317     job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
318     _Check(job2)
319     self.assertEqual(job1.Serialize(), job2.Serialize())
320
321   def testWritable(self):
322     job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
323     self.assertFalse(job.writable)
324
325     job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
326     self.assertTrue(job.writable)
327
328   def testArchived(self):
329     job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
330     self.assertFalse(job.archived)
331
332     newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
333     self.assertTrue(newjob.archived)
334
335     newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
336     self.assertFalse(newjob2.archived)
337
338   def testPriority(self):
339     job_id = 4283
340     ops = [
341       opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
342       opcodes.OpTestDelay(),
343       ]
344
345     def _Check(job):
346       self.assertEqual(job.id, job_id)
347       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
348       self.assert_(repr(job).startswith("<"))
349
350     job = jqueue._QueuedJob(None, job_id, ops, True)
351     _Check(job)
352     self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
353                             for op in job.ops))
354     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
355
356     # Increase first
357     job.ops[0].priority -= 1
358     _Check(job)
359     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
360
361     # Mark opcode as finished
362     job.ops[0].status = constants.OP_STATUS_SUCCESS
363     _Check(job)
364     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
365
366     # Increase second
367     job.ops[1].priority -= 10
368     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
369
370     # Test increasing first
371     job.ops[0].status = constants.OP_STATUS_RUNNING
372     job.ops[0].priority -= 19
373     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
374
375   def _JobForPriority(self, job_id):
376     ops = [
377       opcodes.OpTagsGet(),
378       opcodes.OpTestDelay(),
379       opcodes.OpTagsGet(),
380       opcodes.OpTestDelay(),
381       ]
382
383     job = jqueue._QueuedJob(None, job_id, ops, True)
384
385     self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
386                                for op in job.ops))
387     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
388     self.assertFalse(compat.any(hasattr(op.input, "priority")
389                                 for op in job.ops))
390
391     return job
392
393   def testChangePriorityAllQueued(self):
394     job = self._JobForPriority(24984)
395     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
396     self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
397                                for op in job.ops))
398     result = job.ChangePriority(-10)
399     self.assertEqual(job.CalcPriority(), -10)
400     self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
401     self.assertTrue(compat.all(op.input.priority == -10 for op in job.ops))
402     self.assertEqual(result,
403                      (True, ("Priorities of pending opcodes for job 24984 have"
404                              " been changed to -10")))
405
406   def testChangePriorityAllFinished(self):
407     job = self._JobForPriority(16405)
408
409     for (idx, op) in enumerate(job.ops):
410       if idx > 2:
411         op.status = constants.OP_STATUS_ERROR
412       else:
413         op.status = constants.OP_STATUS_SUCCESS
414
415     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
416     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
417     result = job.ChangePriority(-10)
418     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
419     self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
420                                for op in job.ops))
421     self.assertFalse(compat.any(hasattr(op.input, "priority")
422                                 for op in job.ops))
423     self.assertEqual(map(operator.attrgetter("status"), job.ops), [
424       constants.OP_STATUS_SUCCESS,
425       constants.OP_STATUS_SUCCESS,
426       constants.OP_STATUS_SUCCESS,
427       constants.OP_STATUS_ERROR,
428       ])
429     self.assertEqual(result, (False, "Job 16405 is finished"))
430
431   def testChangePriorityCancelling(self):
432     job = self._JobForPriority(31572)
433
434     for (idx, op) in enumerate(job.ops):
435       if idx > 1:
436         op.status = constants.OP_STATUS_CANCELING
437       else:
438         op.status = constants.OP_STATUS_SUCCESS
439
440     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
441     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
442     result = job.ChangePriority(5)
443     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
444     self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
445                                for op in job.ops))
446     self.assertFalse(compat.any(hasattr(op.input, "priority")
447                                 for op in job.ops))
448     self.assertEqual(map(operator.attrgetter("status"), job.ops), [
449       constants.OP_STATUS_SUCCESS,
450       constants.OP_STATUS_SUCCESS,
451       constants.OP_STATUS_CANCELING,
452       constants.OP_STATUS_CANCELING,
453       ])
454     self.assertEqual(result, (False, "Job 31572 is cancelling"))
455
456   def testChangePriorityFirstRunning(self):
457     job = self._JobForPriority(1716215889)
458
459     for (idx, op) in enumerate(job.ops):
460       if idx == 0:
461         op.status = constants.OP_STATUS_RUNNING
462       else:
463         op.status = constants.OP_STATUS_QUEUED
464
465     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
466     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
467     result = job.ChangePriority(7)
468     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
469     self.assertEqual(map(operator.attrgetter("priority"), job.ops),
470                      [constants.OP_PRIO_DEFAULT, 7, 7, 7])
471     self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
472                      [None, 7, 7, 7])
473     self.assertEqual(map(operator.attrgetter("status"), job.ops), [
474       constants.OP_STATUS_RUNNING,
475       constants.OP_STATUS_QUEUED,
476       constants.OP_STATUS_QUEUED,
477       constants.OP_STATUS_QUEUED,
478       ])
479     self.assertEqual(result,
480                      (True, ("Priorities of pending opcodes for job"
481                              " 1716215889 have been changed to 7")))
482
483   def testChangePriorityLastRunning(self):
484     job = self._JobForPriority(1308)
485
486     for (idx, op) in enumerate(job.ops):
487       if idx == (len(job.ops) - 1):
488         op.status = constants.OP_STATUS_RUNNING
489       else:
490         op.status = constants.OP_STATUS_SUCCESS
491
492     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
493     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
494     result = job.ChangePriority(-3)
495     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
496     self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
497                                for op in job.ops))
498     self.assertFalse(compat.any(hasattr(op.input, "priority")
499                                 for op in job.ops))
500     self.assertEqual(map(operator.attrgetter("status"), job.ops), [
501       constants.OP_STATUS_SUCCESS,
502       constants.OP_STATUS_SUCCESS,
503       constants.OP_STATUS_SUCCESS,
504       constants.OP_STATUS_RUNNING,
505       ])
506     self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
507
508   def testChangePrioritySecondOpcodeRunning(self):
509     job = self._JobForPriority(27701)
510
511     self.assertEqual(len(job.ops), 4)
512     job.ops[0].status = constants.OP_STATUS_SUCCESS
513     job.ops[1].status = constants.OP_STATUS_RUNNING
514     job.ops[2].status = constants.OP_STATUS_QUEUED
515     job.ops[3].status = constants.OP_STATUS_QUEUED
516
517     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
518     result = job.ChangePriority(-19)
519     self.assertEqual(job.CalcPriority(), -19)
520     self.assertEqual(map(operator.attrgetter("priority"), job.ops),
521                      [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
522                       -19, -19])
523     self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
524                      [None, None, -19, -19])
525     self.assertEqual(map(operator.attrgetter("status"), job.ops), [
526       constants.OP_STATUS_SUCCESS,
527       constants.OP_STATUS_RUNNING,
528       constants.OP_STATUS_QUEUED,
529       constants.OP_STATUS_QUEUED,
530       ])
531     self.assertEqual(result,
532                      (True, ("Priorities of pending opcodes for job"
533                              " 27701 have been changed to -19")))
534
535   def testChangePriorityWithInconsistentJob(self):
536     job = self._JobForPriority(30097)
537
538     self.assertEqual(len(job.ops), 4)
539
540     # This job is invalid (as it has two opcodes marked as running) and make
541     # the call fail because an unprocessed opcode precedes a running one (which
542     # should never happen in reality)
543     job.ops[0].status = constants.OP_STATUS_SUCCESS
544     job.ops[1].status = constants.OP_STATUS_RUNNING
545     job.ops[2].status = constants.OP_STATUS_QUEUED
546     job.ops[3].status = constants.OP_STATUS_RUNNING
547
548     self.assertRaises(AssertionError, job.ChangePriority, 19)
549
550   def testCalcStatus(self):
551     def _Queued(ops):
552       # The default status is "queued"
553       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
554                               for op in ops))
555
556     def _Waitlock1(ops):
557       ops[0].status = constants.OP_STATUS_WAITING
558
559     def _Waitlock2(ops):
560       ops[0].status = constants.OP_STATUS_SUCCESS
561       ops[1].status = constants.OP_STATUS_SUCCESS
562       ops[2].status = constants.OP_STATUS_WAITING
563
564     def _Running(ops):
565       ops[0].status = constants.OP_STATUS_SUCCESS
566       ops[1].status = constants.OP_STATUS_RUNNING
567       for op in ops[2:]:
568         op.status = constants.OP_STATUS_QUEUED
569
570     def _Canceling1(ops):
571       ops[0].status = constants.OP_STATUS_SUCCESS
572       ops[1].status = constants.OP_STATUS_SUCCESS
573       for op in ops[2:]:
574         op.status = constants.OP_STATUS_CANCELING
575
576     def _Canceling2(ops):
577       for op in ops:
578         op.status = constants.OP_STATUS_CANCELING
579
580     def _Canceled(ops):
581       for op in ops:
582         op.status = constants.OP_STATUS_CANCELED
583
584     def _Error1(ops):
585       for idx, op in enumerate(ops):
586         if idx > 3:
587           op.status = constants.OP_STATUS_ERROR
588         else:
589           op.status = constants.OP_STATUS_SUCCESS
590
591     def _Error2(ops):
592       for op in ops:
593         op.status = constants.OP_STATUS_ERROR
594
595     def _Success(ops):
596       for op in ops:
597         op.status = constants.OP_STATUS_SUCCESS
598
599     tests = {
600       constants.JOB_STATUS_QUEUED: [_Queued],
601       constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
602       constants.JOB_STATUS_RUNNING: [_Running],
603       constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
604       constants.JOB_STATUS_CANCELED: [_Canceled],
605       constants.JOB_STATUS_ERROR: [_Error1, _Error2],
606       constants.JOB_STATUS_SUCCESS: [_Success],
607       }
608
609     def _NewJob():
610       job = jqueue._QueuedJob(None, 1,
611                               [opcodes.OpTestDelay() for _ in range(10)],
612                               True)
613       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
614       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
615                               for op in job.ops))
616       return job
617
618     for status in constants.JOB_STATUS_ALL:
619       sttests = tests[status]
620       assert sttests
621       for fn in sttests:
622         job = _NewJob()
623         fn(job.ops)
624         self.assertEqual(job.CalcStatus(), status)
625
626
627 class _FakeDependencyManager:
628   def __init__(self):
629     self._checks = []
630     self._notifications = []
631     self._waiting = set()
632
633   def AddCheckResult(self, job, dep_job_id, dep_status, result):
634     self._checks.append((job, dep_job_id, dep_status, result))
635
636   def CountPendingResults(self):
637     return len(self._checks)
638
639   def CountWaitingJobs(self):
640     return len(self._waiting)
641
642   def GetNextNotification(self):
643     return self._notifications.pop(0)
644
645   def JobWaiting(self, job):
646     return job in self._waiting
647
648   def CheckAndRegister(self, job, dep_job_id, dep_status):
649     (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
650
651     assert exp_job == job
652     assert exp_dep_job_id == dep_job_id
653     assert exp_dep_status == dep_status
654
655     (result_status, _) = result
656
657     if result_status == jqueue._JobDependencyManager.WAIT:
658       self._waiting.add(job)
659     elif result_status == jqueue._JobDependencyManager.CONTINUE:
660       self._waiting.remove(job)
661
662     return result
663
664   def NotifyWaiters(self, job_id):
665     self._notifications.append(job_id)
666
667
668 class _DisabledFakeDependencyManager:
669   def JobWaiting(self, _):
670     return False
671
672   def CheckAndRegister(self, *args):
673     assert False, "Should not be called"
674
675   def NotifyWaiters(self, _):
676     pass
677
678
679 class _FakeQueueForProc:
680   def __init__(self, depmgr=None):
681     self._acquired = False
682     self._updates = []
683     self._submitted = []
684     self._accepting_jobs = True
685
686     self._submit_count = itertools.count(1000)
687
688     if depmgr:
689       self.depmgr = depmgr
690     else:
691       self.depmgr = _DisabledFakeDependencyManager()
692
693   def IsAcquired(self):
694     return self._acquired
695
696   def GetNextUpdate(self):
697     return self._updates.pop(0)
698
699   def GetNextSubmittedJob(self):
700     return self._submitted.pop(0)
701
702   def acquire(self, shared=0):
703     assert shared == 1
704     self._acquired = True
705
706   def release(self):
707     assert self._acquired
708     self._acquired = False
709
710   def UpdateJobUnlocked(self, job, replicate=True):
711     assert self._acquired, "Lock not acquired while updating job"
712     self._updates.append((job, bool(replicate)))
713
714   def SubmitManyJobs(self, jobs):
715     assert not self._acquired, "Lock acquired while submitting jobs"
716     job_ids = [self._submit_count.next() for _ in jobs]
717     self._submitted.extend(zip(job_ids, jobs))
718     return job_ids
719
720   def StopAcceptingJobs(self):
721     self._accepting_jobs = False
722
723   def AcceptingJobsUnlocked(self):
724     return self._accepting_jobs
725
726
727 class _FakeExecOpCodeForProc:
728   def __init__(self, queue, before_start, after_start):
729     self._queue = queue
730     self._before_start = before_start
731     self._after_start = after_start
732
733   def __call__(self, op, cbs, timeout=None):
734     assert isinstance(op, opcodes.OpTestDummy)
735     assert not self._queue.IsAcquired(), \
736            "Queue lock not released when executing opcode"
737
738     if self._before_start:
739       self._before_start(timeout, cbs.CurrentPriority())
740
741     cbs.NotifyStart()
742
743     if self._after_start:
744       self._after_start(op, cbs)
745
746     # Check again after the callbacks
747     assert not self._queue.IsAcquired()
748
749     if op.fail:
750       raise errors.OpExecError("Error requested (%s)" % op.result)
751
752     if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
753       return cbs.SubmitManyJobs(op.submit_jobs)
754
755     return op.result
756
757
758 class _JobProcessorTestUtils:
759   def _CreateJob(self, queue, job_id, ops):
760     job = jqueue._QueuedJob(queue, job_id, ops, True)
761     self.assertFalse(job.start_timestamp)
762     self.assertFalse(job.end_timestamp)
763     self.assertEqual(len(ops), len(job.ops))
764     self.assert_(compat.all(op.input == inp
765                             for (op, inp) in zip(job.ops, ops)))
766     self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
767     return job
768
769
770 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
771   def _GenericCheckJob(self, job):
772     assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
773                       for op in job.ops)
774
775     self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
776                      [[op.start_timestamp for op in job.ops],
777                       [op.exec_timestamp for op in job.ops],
778                       [op.end_timestamp for op in job.ops]])
779     self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
780                      [job.received_timestamp,
781                       job.start_timestamp,
782                       job.end_timestamp])
783     self.assert_(job.start_timestamp)
784     self.assert_(job.end_timestamp)
785     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
786
787   def testSuccess(self):
788     queue = _FakeQueueForProc()
789
790     for (job_id, opcount) in [(25351, 1), (6637, 3),
791                               (24644, 10), (32207, 100)]:
792       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
793              for i in range(opcount)]
794
795       # Create job
796       job = self._CreateJob(queue, job_id, ops)
797
798       def _BeforeStart(timeout, priority):
799         self.assertEqual(queue.GetNextUpdate(), (job, True))
800         self.assertRaises(IndexError, queue.GetNextUpdate)
801         self.assertFalse(queue.IsAcquired())
802         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
803         self.assertFalse(job.cur_opctx)
804
805       def _AfterStart(op, cbs):
806         self.assertEqual(queue.GetNextUpdate(), (job, True))
807         self.assertRaises(IndexError, queue.GetNextUpdate)
808
809         self.assertFalse(queue.IsAcquired())
810         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
811         self.assertFalse(job.cur_opctx)
812
813         # Job is running, cancelling shouldn't be possible
814         (success, _) = job.Cancel()
815         self.assertFalse(success)
816
817       opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
818
819       for idx in range(len(ops)):
820         self.assertRaises(IndexError, queue.GetNextUpdate)
821         result = jqueue._JobProcessor(queue, opexec, job)()
822         self.assertEqual(queue.GetNextUpdate(), (job, True))
823         self.assertRaises(IndexError, queue.GetNextUpdate)
824         if idx == len(ops) - 1:
825           # Last opcode
826           self.assertEqual(result, jqueue._JobProcessor.FINISHED)
827         else:
828           self.assertEqual(result, jqueue._JobProcessor.DEFER)
829
830           self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
831           self.assert_(job.start_timestamp)
832           self.assertFalse(job.end_timestamp)
833
834       self.assertRaises(IndexError, queue.GetNextUpdate)
835
836       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
837       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
838       self.assertEqual(job.GetInfo(["opresult"]),
839                        [[op.input.result for op in job.ops]])
840       self.assertEqual(job.GetInfo(["opstatus"]),
841                        [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
842       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
843                               for op in job.ops))
844
845       self._GenericCheckJob(job)
846
847       # Calling the processor on a finished job should be a no-op
848       self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
849                        jqueue._JobProcessor.FINISHED)
850       self.assertRaises(IndexError, queue.GetNextUpdate)
851
852   def testOpcodeError(self):
853     queue = _FakeQueueForProc()
854
855     testdata = [
856       (17077, 1, 0, 0),
857       (1782, 5, 2, 2),
858       (18179, 10, 9, 9),
859       (4744, 10, 3, 8),
860       (23816, 100, 39, 45),
861       ]
862
863     for (job_id, opcount, failfrom, failto) in testdata:
864       # Prepare opcodes
865       ops = [opcodes.OpTestDummy(result="Res%s" % i,
866                                  fail=(failfrom <= i and
867                                        i <= failto))
868              for i in range(opcount)]
869
870       # Create job
871       job = self._CreateJob(queue, str(job_id), ops)
872
873       opexec = _FakeExecOpCodeForProc(queue, None, None)
874
875       for idx in range(len(ops)):
876         self.assertRaises(IndexError, queue.GetNextUpdate)
877         result = jqueue._JobProcessor(queue, opexec, job)()
878         # queued to waitlock
879         self.assertEqual(queue.GetNextUpdate(), (job, True))
880         # waitlock to running
881         self.assertEqual(queue.GetNextUpdate(), (job, True))
882         # Opcode result
883         self.assertEqual(queue.GetNextUpdate(), (job, True))
884         self.assertRaises(IndexError, queue.GetNextUpdate)
885
886         if idx in (failfrom, len(ops) - 1):
887           # Last opcode
888           self.assertEqual(result, jqueue._JobProcessor.FINISHED)
889           break
890
891         self.assertEqual(result, jqueue._JobProcessor.DEFER)
892
893         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
894
895       self.assertRaises(IndexError, queue.GetNextUpdate)
896
897       # Check job status
898       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
899       self.assertEqual(job.GetInfo(["id"]), [job_id])
900       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
901
902       # Check opcode status
903       data = zip(job.ops,
904                  job.GetInfo(["opstatus"])[0],
905                  job.GetInfo(["opresult"])[0])
906
907       for idx, (op, opstatus, opresult) in enumerate(data):
908         if idx < failfrom:
909           assert not op.input.fail
910           self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
911           self.assertEqual(opresult, op.input.result)
912         elif idx <= failto:
913           assert op.input.fail
914           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
915           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
916         else:
917           assert not op.input.fail
918           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
919           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
920
921       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
922                               for op in job.ops[:failfrom]))
923
924       self._GenericCheckJob(job)
925
926       # Calling the processor on a finished job should be a no-op
927       self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
928                        jqueue._JobProcessor.FINISHED)
929       self.assertRaises(IndexError, queue.GetNextUpdate)
930
931   def testCancelWhileInQueue(self):
932     queue = _FakeQueueForProc()
933
934     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
935            for i in range(5)]
936
937     # Create job
938     job_id = 17045
939     job = self._CreateJob(queue, job_id, ops)
940
941     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
942
943     # Mark as cancelled
944     (success, _) = job.Cancel()
945     self.assert_(success)
946
947     self.assertRaises(IndexError, queue.GetNextUpdate)
948
949     self.assertFalse(job.start_timestamp)
950     self.assertTrue(job.end_timestamp)
951     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
952                             for op in job.ops))
953
954     # Serialize to check for differences
955     before_proc = job.Serialize()
956
957     # Simulate processor called in workerpool
958     opexec = _FakeExecOpCodeForProc(queue, None, None)
959     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
960                      jqueue._JobProcessor.FINISHED)
961
962     # Check result
963     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
964     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
965     self.assertFalse(job.start_timestamp)
966     self.assertTrue(job.end_timestamp)
967     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
968                                 for op in job.ops))
969     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
970                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
971                       ["Job canceled by request" for _ in job.ops]])
972
973     # Must not have changed or written
974     self.assertEqual(before_proc, job.Serialize())
975     self.assertRaises(IndexError, queue.GetNextUpdate)
976
977   def testCancelWhileWaitlockInQueue(self):
978     queue = _FakeQueueForProc()
979
980     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
981            for i in range(5)]
982
983     # Create job
984     job_id = 8645
985     job = self._CreateJob(queue, job_id, ops)
986
987     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
988
989     job.ops[0].status = constants.OP_STATUS_WAITING
990
991     assert len(job.ops) == 5
992
993     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
994
995     # Mark as cancelling
996     (success, _) = job.Cancel()
997     self.assert_(success)
998
999     self.assertRaises(IndexError, queue.GetNextUpdate)
1000
1001     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1002                             for op in job.ops))
1003
1004     opexec = _FakeExecOpCodeForProc(queue, None, None)
1005     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1006                      jqueue._JobProcessor.FINISHED)
1007
1008     # Check result
1009     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1010     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1011     self.assertFalse(job.start_timestamp)
1012     self.assert_(job.end_timestamp)
1013     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1014                                 for op in job.ops))
1015     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1016                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
1017                       ["Job canceled by request" for _ in job.ops]])
1018
1019   def testCancelWhileWaitlock(self):
1020     queue = _FakeQueueForProc()
1021
1022     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1023            for i in range(5)]
1024
1025     # Create job
1026     job_id = 11009
1027     job = self._CreateJob(queue, job_id, ops)
1028
1029     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1030
1031     def _BeforeStart(timeout, priority):
1032       self.assertEqual(queue.GetNextUpdate(), (job, True))
1033       self.assertRaises(IndexError, queue.GetNextUpdate)
1034       self.assertFalse(queue.IsAcquired())
1035       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1036
1037       # Mark as cancelled
1038       (success, _) = job.Cancel()
1039       self.assert_(success)
1040
1041       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1042                               for op in job.ops))
1043       self.assertRaises(IndexError, queue.GetNextUpdate)
1044
1045     def _AfterStart(op, cbs):
1046       self.assertEqual(queue.GetNextUpdate(), (job, True))
1047       self.assertRaises(IndexError, queue.GetNextUpdate)
1048       self.assertFalse(queue.IsAcquired())
1049       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1050
1051     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1052
1053     self.assertRaises(IndexError, queue.GetNextUpdate)
1054     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1055                      jqueue._JobProcessor.FINISHED)
1056     self.assertEqual(queue.GetNextUpdate(), (job, True))
1057     self.assertRaises(IndexError, queue.GetNextUpdate)
1058
1059     # Check result
1060     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1061     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1062     self.assert_(job.start_timestamp)
1063     self.assert_(job.end_timestamp)
1064     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1065                                 for op in job.ops))
1066     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1067                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
1068                       ["Job canceled by request" for _ in job.ops]])
1069
1070   def _TestCancelWhileSomething(self, cb):
1071     queue = _FakeQueueForProc()
1072
1073     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1074            for i in range(5)]
1075
1076     # Create job
1077     job_id = 24314
1078     job = self._CreateJob(queue, job_id, ops)
1079
1080     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1081
1082     def _BeforeStart(timeout, priority):
1083       self.assertFalse(queue.IsAcquired())
1084       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1085
1086       # Mark as cancelled
1087       (success, _) = job.Cancel()
1088       self.assert_(success)
1089
1090       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1091                               for op in job.ops))
1092
1093       cb(queue)
1094
1095     def _AfterStart(op, cbs):
1096       self.fail("Should not reach this")
1097
1098     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1099
1100     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1101                      jqueue._JobProcessor.FINISHED)
1102
1103     # Check result
1104     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1105     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1106     self.assert_(job.start_timestamp)
1107     self.assert_(job.end_timestamp)
1108     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1109                                 for op in job.ops))
1110     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1111                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
1112                       ["Job canceled by request" for _ in job.ops]])
1113
1114     return queue
1115
1116   def testCancelWhileWaitlockWithTimeout(self):
1117     def fn(_):
1118       # Fake an acquire attempt timing out
1119       raise mcpu.LockAcquireTimeout()
1120
1121     self._TestCancelWhileSomething(fn)
1122
1123   def testCancelDuringQueueShutdown(self):
1124     queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
1125     self.assertFalse(queue.AcceptingJobsUnlocked())
1126
1127   def testCancelWhileRunning(self):
1128     # Tests canceling a job with finished opcodes and more, unprocessed ones
1129     queue = _FakeQueueForProc()
1130
1131     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1132            for i in range(3)]
1133
1134     # Create job
1135     job_id = 28492
1136     job = self._CreateJob(queue, job_id, ops)
1137
1138     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1139
1140     opexec = _FakeExecOpCodeForProc(queue, None, None)
1141
1142     # Run one opcode
1143     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1144                      jqueue._JobProcessor.DEFER)
1145
1146     # Job goes back to queued
1147     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1148     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1149                      [[constants.OP_STATUS_SUCCESS,
1150                        constants.OP_STATUS_QUEUED,
1151                        constants.OP_STATUS_QUEUED],
1152                       ["Res0", None, None]])
1153
1154     # Mark as cancelled
1155     (success, _) = job.Cancel()
1156     self.assert_(success)
1157
1158     # Try processing another opcode (this will actually cancel the job)
1159     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1160                      jqueue._JobProcessor.FINISHED)
1161
1162     # Check result
1163     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1164     self.assertEqual(job.GetInfo(["id"]), [job_id])
1165     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1166     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1167                      [[constants.OP_STATUS_SUCCESS,
1168                        constants.OP_STATUS_CANCELED,
1169                        constants.OP_STATUS_CANCELED],
1170                       ["Res0", "Job canceled by request",
1171                        "Job canceled by request"]])
1172
1173   def _TestQueueShutdown(self, queue, opexec, job, runcount):
1174     self.assertTrue(queue.AcceptingJobsUnlocked())
1175
1176     # Simulate shutdown
1177     queue.StopAcceptingJobs()
1178
1179     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1180                      jqueue._JobProcessor.DEFER)
1181
1182     # Check result
1183     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1184     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
1185     self.assertFalse(job.cur_opctx)
1186     self.assertTrue(job.start_timestamp)
1187     self.assertFalse(job.end_timestamp)
1188     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1189     self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1190                                for op in job.ops[:runcount]))
1191     self.assertFalse(job.ops[runcount].end_timestamp)
1192     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1193                                 for op in job.ops[(runcount + 1):]))
1194     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1195                      [(([constants.OP_STATUS_SUCCESS] * runcount) +
1196                        ([constants.OP_STATUS_QUEUED] *
1197                         (len(job.ops) - runcount))),
1198                       (["Res%s" % i for i in range(runcount)] +
1199                        ([None] * (len(job.ops) - runcount)))])
1200
1201     # Must have been written and replicated
1202     self.assertEqual(queue.GetNextUpdate(), (job, True))
1203     self.assertRaises(IndexError, queue.GetNextUpdate)
1204
1205   def testQueueShutdownWhileRunning(self):
1206     # Tests shutting down the queue while a job is running
1207     queue = _FakeQueueForProc()
1208
1209     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1210            for i in range(3)]
1211
1212     # Create job
1213     job_id = 2718211587
1214     job = self._CreateJob(queue, job_id, ops)
1215
1216     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1217
1218     opexec = _FakeExecOpCodeForProc(queue, None, None)
1219
1220     self.assertRaises(IndexError, queue.GetNextUpdate)
1221
1222     # Run one opcode
1223     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1224                      jqueue._JobProcessor.DEFER)
1225
1226     # Job goes back to queued
1227     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1228     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1229                      [[constants.OP_STATUS_SUCCESS,
1230                        constants.OP_STATUS_QUEUED,
1231                        constants.OP_STATUS_QUEUED],
1232                       ["Res0", None, None]])
1233     self.assertFalse(job.cur_opctx)
1234
1235     # Writes for waiting, running and result
1236     for _ in range(3):
1237       self.assertEqual(queue.GetNextUpdate(), (job, True))
1238
1239     # Run second opcode
1240     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1241                      jqueue._JobProcessor.DEFER)
1242
1243     # Job goes back to queued
1244     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1245     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1246                      [[constants.OP_STATUS_SUCCESS,
1247                        constants.OP_STATUS_SUCCESS,
1248                        constants.OP_STATUS_QUEUED],
1249                       ["Res0", "Res1", None]])
1250     self.assertFalse(job.cur_opctx)
1251
1252     # Writes for waiting, running and result
1253     for _ in range(3):
1254       self.assertEqual(queue.GetNextUpdate(), (job, True))
1255
1256     self._TestQueueShutdown(queue, opexec, job, 2)
1257
1258   def testQueueShutdownWithLockTimeout(self):
1259     # Tests shutting down while a lock acquire times out
1260     queue = _FakeQueueForProc()
1261
1262     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1263            for i in range(3)]
1264
1265     # Create job
1266     job_id = 1304231178
1267     job = self._CreateJob(queue, job_id, ops)
1268
1269     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1270
1271     acquire_timeout = False
1272
1273     def _BeforeStart(timeout, priority):
1274       self.assertFalse(queue.IsAcquired())
1275       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1276       if acquire_timeout:
1277         raise mcpu.LockAcquireTimeout()
1278
1279     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
1280
1281     self.assertRaises(IndexError, queue.GetNextUpdate)
1282
1283     # Run one opcode
1284     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1285                      jqueue._JobProcessor.DEFER)
1286
1287     # Job goes back to queued
1288     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1289     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1290                      [[constants.OP_STATUS_SUCCESS,
1291                        constants.OP_STATUS_QUEUED,
1292                        constants.OP_STATUS_QUEUED],
1293                       ["Res0", None, None]])
1294     self.assertFalse(job.cur_opctx)
1295
1296     # Writes for waiting, running and result
1297     for _ in range(3):
1298       self.assertEqual(queue.GetNextUpdate(), (job, True))
1299
1300     # The next opcode should have expiring lock acquires
1301     acquire_timeout = True
1302
1303     self._TestQueueShutdown(queue, opexec, job, 1)
1304
1305   def testQueueShutdownWhileInQueue(self):
1306     # This should never happen in reality (no new jobs are started by the
1307     # workerpool once a shutdown has been initiated), but it's better to test
1308     # the job processor for this scenario
1309     queue = _FakeQueueForProc()
1310
1311     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1312            for i in range(5)]
1313
1314     # Create job
1315     job_id = 2031
1316     job = self._CreateJob(queue, job_id, ops)
1317
1318     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1319     self.assertRaises(IndexError, queue.GetNextUpdate)
1320
1321     self.assertFalse(job.start_timestamp)
1322     self.assertFalse(job.end_timestamp)
1323     self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
1324                                for op in job.ops))
1325
1326     opexec = _FakeExecOpCodeForProc(queue, None, None)
1327     self._TestQueueShutdown(queue, opexec, job, 0)
1328
1329   def testQueueShutdownWhileWaitlockInQueue(self):
1330     queue = _FakeQueueForProc()
1331
1332     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1333            for i in range(5)]
1334
1335     # Create job
1336     job_id = 53125685
1337     job = self._CreateJob(queue, job_id, ops)
1338
1339     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1340
1341     job.ops[0].status = constants.OP_STATUS_WAITING
1342
1343     assert len(job.ops) == 5
1344
1345     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1346
1347     self.assertRaises(IndexError, queue.GetNextUpdate)
1348
1349     opexec = _FakeExecOpCodeForProc(queue, None, None)
1350     self._TestQueueShutdown(queue, opexec, job, 0)
1351
1352   def testPartiallyRun(self):
1353     # Tests calling the processor on a job that's been partially run before the
1354     # program was restarted
1355     queue = _FakeQueueForProc()
1356
1357     opexec = _FakeExecOpCodeForProc(queue, None, None)
1358
1359     for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
1360       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1361              for i in range(10)]
1362
1363       # Create job
1364       job = self._CreateJob(queue, job_id, ops)
1365
1366       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1367
1368       for _ in range(successcount):
1369         self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1370                          jqueue._JobProcessor.DEFER)
1371
1372       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1373       self.assertEqual(job.GetInfo(["opstatus"]),
1374                        [[constants.OP_STATUS_SUCCESS
1375                          for _ in range(successcount)] +
1376                         [constants.OP_STATUS_QUEUED
1377                          for _ in range(len(ops) - successcount)]])
1378
1379       self.assert_(job.ops_iter)
1380
1381       # Serialize and restore (simulates program restart)
1382       newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1383       self.assertFalse(newjob.ops_iter)
1384       self._TestPartial(newjob, successcount)
1385
1386   def _TestPartial(self, job, successcount):
1387     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1388     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1389
1390     queue = _FakeQueueForProc()
1391     opexec = _FakeExecOpCodeForProc(queue, None, None)
1392
1393     for remaining in reversed(range(len(job.ops) - successcount)):
1394       result = jqueue._JobProcessor(queue, opexec, job)()
1395       self.assertEqual(queue.GetNextUpdate(), (job, True))
1396       self.assertEqual(queue.GetNextUpdate(), (job, True))
1397       self.assertEqual(queue.GetNextUpdate(), (job, True))
1398       self.assertRaises(IndexError, queue.GetNextUpdate)
1399
1400       if remaining == 0:
1401         # Last opcode
1402         self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1403         break
1404
1405       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1406
1407       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1408
1409     self.assertRaises(IndexError, queue.GetNextUpdate)
1410     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1411     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1412     self.assertEqual(job.GetInfo(["opresult"]),
1413                      [[op.input.result for op in job.ops]])
1414     self.assertEqual(job.GetInfo(["opstatus"]),
1415                      [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1416     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1417                             for op in job.ops))
1418
1419     self._GenericCheckJob(job)
1420
1421     # Calling the processor on a finished job should be a no-op
1422     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1423                      jqueue._JobProcessor.FINISHED)
1424     self.assertRaises(IndexError, queue.GetNextUpdate)
1425
1426     # ... also after being restored
1427     job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1428     # Calling the processor on a finished job should be a no-op
1429     self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1430                      jqueue._JobProcessor.FINISHED)
1431     self.assertRaises(IndexError, queue.GetNextUpdate)
1432
1433   def testProcessorOnRunningJob(self):
1434     ops = [opcodes.OpTestDummy(result="result", fail=False)]
1435
1436     queue = _FakeQueueForProc()
1437     opexec = _FakeExecOpCodeForProc(queue, None, None)
1438
1439     # Create job
1440     job = self._CreateJob(queue, 9571, ops)
1441
1442     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1443
1444     job.ops[0].status = constants.OP_STATUS_RUNNING
1445
1446     assert len(job.ops) == 1
1447
1448     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1449
1450     # Calling on running job must fail
1451     self.assertRaises(errors.ProgrammerError,
1452                       jqueue._JobProcessor(queue, opexec, job))
1453
1454   def testLogMessages(self):
1455     # Tests the "Feedback" callback function
1456     queue = _FakeQueueForProc()
1457
1458     messages = {
1459       1: [
1460         (None, "Hello"),
1461         (None, "World"),
1462         (constants.ELOG_MESSAGE, "there"),
1463         ],
1464       4: [
1465         (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1466         (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1467         ],
1468       }
1469     ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1470                                messages=messages.get(i, []))
1471            for i in range(5)]
1472
1473     # Create job
1474     job = self._CreateJob(queue, 29386, ops)
1475
1476     def _BeforeStart(timeout, priority):
1477       self.assertEqual(queue.GetNextUpdate(), (job, True))
1478       self.assertRaises(IndexError, queue.GetNextUpdate)
1479       self.assertFalse(queue.IsAcquired())
1480       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1481
1482     def _AfterStart(op, cbs):
1483       self.assertEqual(queue.GetNextUpdate(), (job, True))
1484       self.assertRaises(IndexError, queue.GetNextUpdate)
1485       self.assertFalse(queue.IsAcquired())
1486       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1487
1488       self.assertRaises(AssertionError, cbs.Feedback,
1489                         "too", "many", "arguments")
1490
1491       for (log_type, msg) in op.messages:
1492         self.assertRaises(IndexError, queue.GetNextUpdate)
1493         if log_type:
1494           cbs.Feedback(log_type, msg)
1495         else:
1496           cbs.Feedback(msg)
1497         # Check for job update without replication
1498         self.assertEqual(queue.GetNextUpdate(), (job, False))
1499         self.assertRaises(IndexError, queue.GetNextUpdate)
1500
1501     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1502
1503     for remaining in reversed(range(len(job.ops))):
1504       self.assertRaises(IndexError, queue.GetNextUpdate)
1505       result = jqueue._JobProcessor(queue, opexec, job)()
1506       self.assertEqual(queue.GetNextUpdate(), (job, True))
1507       self.assertRaises(IndexError, queue.GetNextUpdate)
1508
1509       if remaining == 0:
1510         # Last opcode
1511         self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1512         break
1513
1514       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1515
1516       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1517
1518     self.assertRaises(IndexError, queue.GetNextUpdate)
1519
1520     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1521     self.assertEqual(job.GetInfo(["opresult"]),
1522                      [[op.input.result for op in job.ops]])
1523
1524     logmsgcount = sum(len(m) for m in messages.values())
1525
1526     self._CheckLogMessages(job, logmsgcount)
1527
1528     # Serialize and restore (simulates program restart)
1529     newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1530     self._CheckLogMessages(newjob, logmsgcount)
1531
1532     # Check each message
1533     prevserial = -1
1534     for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1535       for (serial, timestamp, log_type, msg) in oplog:
1536         (exptype, expmsg) = messages.get(idx).pop(0)
1537         if exptype:
1538           self.assertEqual(log_type, exptype)
1539         else:
1540           self.assertEqual(log_type, constants.ELOG_MESSAGE)
1541         self.assertEqual(expmsg, msg)
1542         self.assert_(serial > prevserial)
1543         prevserial = serial
1544
1545   def _CheckLogMessages(self, job, count):
1546     # Check serial
1547     self.assertEqual(job.log_serial, count)
1548
1549     # No filter
1550     self.assertEqual(job.GetLogEntries(None),
1551                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1552                       for entry in entries])
1553
1554     # Filter with serial
1555     assert count > 3
1556     self.assert_(job.GetLogEntries(3))
1557     self.assertEqual(job.GetLogEntries(3),
1558                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1559                       for entry in entries][3:])
1560
1561     # No log message after highest serial
1562     self.assertFalse(job.GetLogEntries(count))
1563     self.assertFalse(job.GetLogEntries(count + 3))
1564
1565   def testSubmitManyJobs(self):
1566     queue = _FakeQueueForProc()
1567
1568     job_id = 15656
1569     ops = [
1570       opcodes.OpTestDummy(result="Res0", fail=False,
1571                           submit_jobs=[]),
1572       opcodes.OpTestDummy(result="Res1", fail=False,
1573                           submit_jobs=[
1574                             [opcodes.OpTestDummy(result="r1j0", fail=False)],
1575                             ]),
1576       opcodes.OpTestDummy(result="Res2", fail=False,
1577                           submit_jobs=[
1578                             [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1579                              opcodes.OpTestDummy(result="r2j0o1", fail=False),
1580                              opcodes.OpTestDummy(result="r2j0o2", fail=False),
1581                              opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1582                             [opcodes.OpTestDummy(result="r2j1", fail=False)],
1583                             [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1584                              opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1585                             ]),
1586       ]
1587
1588     # Create job
1589     job = self._CreateJob(queue, job_id, ops)
1590
1591     def _BeforeStart(timeout, priority):
1592       self.assertEqual(queue.GetNextUpdate(), (job, True))
1593       self.assertRaises(IndexError, queue.GetNextUpdate)
1594       self.assertFalse(queue.IsAcquired())
1595       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1596       self.assertFalse(job.cur_opctx)
1597
1598     def _AfterStart(op, cbs):
1599       self.assertEqual(queue.GetNextUpdate(), (job, True))
1600       self.assertRaises(IndexError, queue.GetNextUpdate)
1601
1602       self.assertFalse(queue.IsAcquired())
1603       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1604       self.assertFalse(job.cur_opctx)
1605
1606       # Job is running, cancelling shouldn't be possible
1607       (success, _) = job.Cancel()
1608       self.assertFalse(success)
1609
1610     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1611
1612     for idx in range(len(ops)):
1613       self.assertRaises(IndexError, queue.GetNextUpdate)
1614       result = jqueue._JobProcessor(queue, opexec, job)()
1615       self.assertEqual(queue.GetNextUpdate(), (job, True))
1616       self.assertRaises(IndexError, queue.GetNextUpdate)
1617       if idx == len(ops) - 1:
1618         # Last opcode
1619         self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1620       else:
1621         self.assertEqual(result, jqueue._JobProcessor.DEFER)
1622
1623         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1624         self.assert_(job.start_timestamp)
1625         self.assertFalse(job.end_timestamp)
1626
1627     self.assertRaises(IndexError, queue.GetNextUpdate)
1628
1629     for idx, submitted_ops in enumerate(job_ops
1630                                         for op in ops
1631                                         for job_ops in op.submit_jobs):
1632       self.assertEqual(queue.GetNextSubmittedJob(),
1633                        (1000 + idx, submitted_ops))
1634     self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1635
1636     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1637     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1638     self.assertEqual(job.GetInfo(["opresult"]),
1639                      [[[], [1000], [1001, 1002, 1003]]])
1640     self.assertEqual(job.GetInfo(["opstatus"]),
1641                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1642
1643     self._GenericCheckJob(job)
1644
1645     # Calling the processor on a finished job should be a no-op
1646     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1647                      jqueue._JobProcessor.FINISHED)
1648     self.assertRaises(IndexError, queue.GetNextUpdate)
1649
1650   def testJobDependency(self):
1651     depmgr = _FakeDependencyManager()
1652     queue = _FakeQueueForProc(depmgr=depmgr)
1653
1654     self.assertEqual(queue.depmgr, depmgr)
1655
1656     prev_job_id = 22113
1657     prev_job_id2 = 28102
1658     job_id = 29929
1659     ops = [
1660       opcodes.OpTestDummy(result="Res0", fail=False,
1661                           depends=[
1662                             [prev_job_id2, None],
1663                             [prev_job_id, None],
1664                             ]),
1665       opcodes.OpTestDummy(result="Res1", fail=False),
1666       ]
1667
1668     # Create job
1669     job = self._CreateJob(queue, job_id, ops)
1670
1671     def _BeforeStart(timeout, priority):
1672       if attempt == 0 or attempt > 5:
1673         # Job should only be updated when it wasn't waiting for another job
1674         self.assertEqual(queue.GetNextUpdate(), (job, True))
1675       self.assertRaises(IndexError, queue.GetNextUpdate)
1676       self.assertFalse(queue.IsAcquired())
1677       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1678       self.assertFalse(job.cur_opctx)
1679
1680     def _AfterStart(op, cbs):
1681       self.assertEqual(queue.GetNextUpdate(), (job, True))
1682       self.assertRaises(IndexError, queue.GetNextUpdate)
1683
1684       self.assertFalse(queue.IsAcquired())
1685       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1686       self.assertFalse(job.cur_opctx)
1687
1688       # Job is running, cancelling shouldn't be possible
1689       (success, _) = job.Cancel()
1690       self.assertFalse(success)
1691
1692     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1693
1694     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1695
1696     counter = itertools.count()
1697     while True:
1698       attempt = counter.next()
1699
1700       self.assertRaises(IndexError, queue.GetNextUpdate)
1701       self.assertRaises(IndexError, depmgr.GetNextNotification)
1702
1703       if attempt < 2:
1704         depmgr.AddCheckResult(job, prev_job_id2, None,
1705                               (jqueue._JobDependencyManager.WAIT, "wait2"))
1706       elif attempt == 2:
1707         depmgr.AddCheckResult(job, prev_job_id2, None,
1708                               (jqueue._JobDependencyManager.CONTINUE, "cont"))
1709         # The processor will ask for the next dependency immediately
1710         depmgr.AddCheckResult(job, prev_job_id, None,
1711                               (jqueue._JobDependencyManager.WAIT, "wait"))
1712       elif attempt < 5:
1713         depmgr.AddCheckResult(job, prev_job_id, None,
1714                               (jqueue._JobDependencyManager.WAIT, "wait"))
1715       elif attempt == 5:
1716         depmgr.AddCheckResult(job, prev_job_id, None,
1717                               (jqueue._JobDependencyManager.CONTINUE, "cont"))
1718       if attempt == 2:
1719         self.assertEqual(depmgr.CountPendingResults(), 2)
1720       elif attempt > 5:
1721         self.assertEqual(depmgr.CountPendingResults(), 0)
1722       else:
1723         self.assertEqual(depmgr.CountPendingResults(), 1)
1724
1725       result = jqueue._JobProcessor(queue, opexec, job)()
1726       if attempt == 0 or attempt >= 5:
1727         # Job should only be updated if there was an actual change
1728         self.assertEqual(queue.GetNextUpdate(), (job, True))
1729       self.assertRaises(IndexError, queue.GetNextUpdate)
1730       self.assertFalse(depmgr.CountPendingResults())
1731
1732       if attempt < 5:
1733         # Simulate waiting for other job
1734         self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1735         self.assertTrue(job.cur_opctx)
1736         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1737         self.assertRaises(IndexError, depmgr.GetNextNotification)
1738         self.assert_(job.start_timestamp)
1739         self.assertFalse(job.end_timestamp)
1740         continue
1741
1742       if result == jqueue._JobProcessor.FINISHED:
1743         # Last opcode
1744         self.assertFalse(job.cur_opctx)
1745         break
1746
1747       self.assertRaises(IndexError, depmgr.GetNextNotification)
1748
1749       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1750       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1751       self.assert_(job.start_timestamp)
1752       self.assertFalse(job.end_timestamp)
1753
1754     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1755     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1756     self.assertEqual(job.GetInfo(["opresult"]),
1757                      [[op.input.result for op in job.ops]])
1758     self.assertEqual(job.GetInfo(["opstatus"]),
1759                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1760     self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1761                                for op in job.ops))
1762
1763     self._GenericCheckJob(job)
1764
1765     self.assertRaises(IndexError, queue.GetNextUpdate)
1766     self.assertRaises(IndexError, depmgr.GetNextNotification)
1767     self.assertFalse(depmgr.CountPendingResults())
1768     self.assertFalse(depmgr.CountWaitingJobs())
1769
1770     # Calling the processor on a finished job should be a no-op
1771     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1772                      jqueue._JobProcessor.FINISHED)
1773     self.assertRaises(IndexError, queue.GetNextUpdate)
1774
1775   def testJobDependencyCancel(self):
1776     depmgr = _FakeDependencyManager()
1777     queue = _FakeQueueForProc(depmgr=depmgr)
1778
1779     self.assertEqual(queue.depmgr, depmgr)
1780
1781     prev_job_id = 13623
1782     job_id = 30876
1783     ops = [
1784       opcodes.OpTestDummy(result="Res0", fail=False),
1785       opcodes.OpTestDummy(result="Res1", fail=False,
1786                           depends=[
1787                             [prev_job_id, None],
1788                             ]),
1789       opcodes.OpTestDummy(result="Res2", fail=False),
1790       ]
1791
1792     # Create job
1793     job = self._CreateJob(queue, job_id, ops)
1794
1795     def _BeforeStart(timeout, priority):
1796       if attempt == 0 or attempt > 5:
1797         # Job should only be updated when it wasn't waiting for another job
1798         self.assertEqual(queue.GetNextUpdate(), (job, True))
1799       self.assertRaises(IndexError, queue.GetNextUpdate)
1800       self.assertFalse(queue.IsAcquired())
1801       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1802       self.assertFalse(job.cur_opctx)
1803
1804     def _AfterStart(op, cbs):
1805       self.assertEqual(queue.GetNextUpdate(), (job, True))
1806       self.assertRaises(IndexError, queue.GetNextUpdate)
1807
1808       self.assertFalse(queue.IsAcquired())
1809       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1810       self.assertFalse(job.cur_opctx)
1811
1812       # Job is running, cancelling shouldn't be possible
1813       (success, _) = job.Cancel()
1814       self.assertFalse(success)
1815
1816     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1817
1818     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1819
1820     counter = itertools.count()
1821     while True:
1822       attempt = counter.next()
1823
1824       self.assertRaises(IndexError, queue.GetNextUpdate)
1825       self.assertRaises(IndexError, depmgr.GetNextNotification)
1826
1827       if attempt == 0:
1828         # This will handle the first opcode
1829         pass
1830       elif attempt < 4:
1831         depmgr.AddCheckResult(job, prev_job_id, None,
1832                               (jqueue._JobDependencyManager.WAIT, "wait"))
1833       elif attempt == 4:
1834         # Other job was cancelled
1835         depmgr.AddCheckResult(job, prev_job_id, None,
1836                               (jqueue._JobDependencyManager.CANCEL, "cancel"))
1837
1838       if attempt == 0:
1839         self.assertEqual(depmgr.CountPendingResults(), 0)
1840       else:
1841         self.assertEqual(depmgr.CountPendingResults(), 1)
1842
1843       result = jqueue._JobProcessor(queue, opexec, job)()
1844       if attempt <= 1 or attempt >= 4:
1845         # Job should only be updated if there was an actual change
1846         self.assertEqual(queue.GetNextUpdate(), (job, True))
1847       self.assertRaises(IndexError, queue.GetNextUpdate)
1848       self.assertFalse(depmgr.CountPendingResults())
1849
1850       if attempt > 0 and attempt < 4:
1851         # Simulate waiting for other job
1852         self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1853         self.assertTrue(job.cur_opctx)
1854         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1855         self.assertRaises(IndexError, depmgr.GetNextNotification)
1856         self.assert_(job.start_timestamp)
1857         self.assertFalse(job.end_timestamp)
1858         continue
1859
1860       if result == jqueue._JobProcessor.FINISHED:
1861         # Last opcode
1862         self.assertFalse(job.cur_opctx)
1863         break
1864
1865       self.assertRaises(IndexError, depmgr.GetNextNotification)
1866
1867       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1868       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1869       self.assert_(job.start_timestamp)
1870       self.assertFalse(job.end_timestamp)
1871
1872     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1873     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1874     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1875                      [[constants.OP_STATUS_SUCCESS,
1876                        constants.OP_STATUS_CANCELED,
1877                        constants.OP_STATUS_CANCELED],
1878                       ["Res0", "Job canceled by request",
1879                        "Job canceled by request"]])
1880
1881     self._GenericCheckJob(job)
1882
1883     self.assertRaises(IndexError, queue.GetNextUpdate)
1884     self.assertRaises(IndexError, depmgr.GetNextNotification)
1885     self.assertFalse(depmgr.CountPendingResults())
1886
1887     # Calling the processor on a finished job should be a no-op
1888     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1889                      jqueue._JobProcessor.FINISHED)
1890     self.assertRaises(IndexError, queue.GetNextUpdate)
1891
1892   def testJobDependencyWrongstatus(self):
1893     depmgr = _FakeDependencyManager()
1894     queue = _FakeQueueForProc(depmgr=depmgr)
1895
1896     self.assertEqual(queue.depmgr, depmgr)
1897
1898     prev_job_id = 9741
1899     job_id = 11763
1900     ops = [
1901       opcodes.OpTestDummy(result="Res0", fail=False),
1902       opcodes.OpTestDummy(result="Res1", fail=False,
1903                           depends=[
1904                             [prev_job_id, None],
1905                             ]),
1906       opcodes.OpTestDummy(result="Res2", fail=False),
1907       ]
1908
1909     # Create job
1910     job = self._CreateJob(queue, job_id, ops)
1911
1912     def _BeforeStart(timeout, priority):
1913       if attempt == 0 or attempt > 5:
1914         # Job should only be updated when it wasn't waiting for another job
1915         self.assertEqual(queue.GetNextUpdate(), (job, True))
1916       self.assertRaises(IndexError, queue.GetNextUpdate)
1917       self.assertFalse(queue.IsAcquired())
1918       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1919       self.assertFalse(job.cur_opctx)
1920
1921     def _AfterStart(op, cbs):
1922       self.assertEqual(queue.GetNextUpdate(), (job, True))
1923       self.assertRaises(IndexError, queue.GetNextUpdate)
1924
1925       self.assertFalse(queue.IsAcquired())
1926       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1927       self.assertFalse(job.cur_opctx)
1928
1929       # Job is running, cancelling shouldn't be possible
1930       (success, _) = job.Cancel()
1931       self.assertFalse(success)
1932
1933     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1934
1935     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1936
1937     counter = itertools.count()
1938     while True:
1939       attempt = counter.next()
1940
1941       self.assertRaises(IndexError, queue.GetNextUpdate)
1942       self.assertRaises(IndexError, depmgr.GetNextNotification)
1943
1944       if attempt == 0:
1945         # This will handle the first opcode
1946         pass
1947       elif attempt < 4:
1948         depmgr.AddCheckResult(job, prev_job_id, None,
1949                               (jqueue._JobDependencyManager.WAIT, "wait"))
1950       elif attempt == 4:
1951         # Other job failed
1952         depmgr.AddCheckResult(job, prev_job_id, None,
1953                               (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1954
1955       if attempt == 0:
1956         self.assertEqual(depmgr.CountPendingResults(), 0)
1957       else:
1958         self.assertEqual(depmgr.CountPendingResults(), 1)
1959
1960       result = jqueue._JobProcessor(queue, opexec, job)()
1961       if attempt <= 1 or attempt >= 4:
1962         # Job should only be updated if there was an actual change
1963         self.assertEqual(queue.GetNextUpdate(), (job, True))
1964       self.assertRaises(IndexError, queue.GetNextUpdate)
1965       self.assertFalse(depmgr.CountPendingResults())
1966
1967       if attempt > 0 and attempt < 4:
1968         # Simulate waiting for other job
1969         self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1970         self.assertTrue(job.cur_opctx)
1971         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1972         self.assertRaises(IndexError, depmgr.GetNextNotification)
1973         self.assert_(job.start_timestamp)
1974         self.assertFalse(job.end_timestamp)
1975         continue
1976
1977       if result == jqueue._JobProcessor.FINISHED:
1978         # Last opcode
1979         self.assertFalse(job.cur_opctx)
1980         break
1981
1982       self.assertRaises(IndexError, depmgr.GetNextNotification)
1983
1984       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1985       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1986       self.assert_(job.start_timestamp)
1987       self.assertFalse(job.end_timestamp)
1988
1989     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1990     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1991     self.assertEqual(job.GetInfo(["opstatus"]),
1992                      [[constants.OP_STATUS_SUCCESS,
1993                        constants.OP_STATUS_ERROR,
1994                        constants.OP_STATUS_ERROR]]),
1995
1996     (opresult, ) = job.GetInfo(["opresult"])
1997     self.assertEqual(len(opresult), len(ops))
1998     self.assertEqual(opresult[0], "Res0")
1999     self.assertTrue(errors.GetEncodedError(opresult[1]))
2000     self.assertTrue(errors.GetEncodedError(opresult[2]))
2001
2002     self._GenericCheckJob(job)
2003
2004     self.assertRaises(IndexError, queue.GetNextUpdate)
2005     self.assertRaises(IndexError, depmgr.GetNextNotification)
2006     self.assertFalse(depmgr.CountPendingResults())
2007
2008     # Calling the processor on a finished job should be a no-op
2009     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
2010                      jqueue._JobProcessor.FINISHED)
2011     self.assertRaises(IndexError, queue.GetNextUpdate)
2012
2013
2014 class TestEvaluateJobProcessorResult(unittest.TestCase):
2015   def testFinished(self):
2016     depmgr = _FakeDependencyManager()
2017     job = _IdOnlyFakeJob(30953)
2018     jqueue._EvaluateJobProcessorResult(depmgr, job,
2019                                        jqueue._JobProcessor.FINISHED)
2020     self.assertEqual(depmgr.GetNextNotification(), job.id)
2021     self.assertRaises(IndexError, depmgr.GetNextNotification)
2022
2023   def testDefer(self):
2024     depmgr = _FakeDependencyManager()
2025     job = _IdOnlyFakeJob(11326, priority=5463)
2026     try:
2027       jqueue._EvaluateJobProcessorResult(depmgr, job,
2028                                          jqueue._JobProcessor.DEFER)
2029     except workerpool.DeferTask, err:
2030       self.assertEqual(err.priority, 5463)
2031     else:
2032       self.fail("Didn't raise exception")
2033     self.assertRaises(IndexError, depmgr.GetNextNotification)
2034
2035   def testWaitdep(self):
2036     depmgr = _FakeDependencyManager()
2037     job = _IdOnlyFakeJob(21317)
2038     jqueue._EvaluateJobProcessorResult(depmgr, job,
2039                                        jqueue._JobProcessor.WAITDEP)
2040     self.assertRaises(IndexError, depmgr.GetNextNotification)
2041
2042   def testOther(self):
2043     depmgr = _FakeDependencyManager()
2044     job = _IdOnlyFakeJob(5813)
2045     self.assertRaises(errors.ProgrammerError,
2046                       jqueue._EvaluateJobProcessorResult,
2047                       depmgr, job, "Other result")
2048     self.assertRaises(IndexError, depmgr.GetNextNotification)
2049
2050
2051 class _FakeTimeoutStrategy:
2052   def __init__(self, timeouts):
2053     self.timeouts = timeouts
2054     self.attempts = 0
2055     self.last_timeout = None
2056
2057   def NextAttempt(self):
2058     self.attempts += 1
2059     if self.timeouts:
2060       timeout = self.timeouts.pop(0)
2061     else:
2062       timeout = None
2063     self.last_timeout = timeout
2064     return timeout
2065
2066
2067 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
2068   def setUp(self):
2069     self.queue = _FakeQueueForProc()
2070     self.job = None
2071     self.curop = None
2072     self.opcounter = None
2073     self.timeout_strategy = None
2074     self.retries = 0
2075     self.prev_tsop = None
2076     self.prev_prio = None
2077     self.prev_status = None
2078     self.lock_acq_prio = None
2079     self.gave_lock = None
2080     self.done_lock_before_blocking = False
2081
2082   def _BeforeStart(self, timeout, priority):
2083     job = self.job
2084
2085     # If status has changed, job must've been written
2086     if self.prev_status != self.job.ops[self.curop].status:
2087       self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2088     self.assertRaises(IndexError, self.queue.GetNextUpdate)
2089
2090     self.assertFalse(self.queue.IsAcquired())
2091     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2092
2093     ts = self.timeout_strategy
2094
2095     self.assert_(timeout is None or isinstance(timeout, (int, float)))
2096     self.assertEqual(timeout, ts.last_timeout)
2097     self.assertEqual(priority, job.ops[self.curop].priority)
2098
2099     self.gave_lock = True
2100     self.lock_acq_prio = priority
2101
2102     if (self.curop == 3 and
2103         job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
2104       # Give locks before running into blocking acquire
2105       assert self.retries == 7
2106       self.retries = 0
2107       self.done_lock_before_blocking = True
2108       return
2109
2110     if self.retries > 0:
2111       self.assert_(timeout is not None)
2112       self.retries -= 1
2113       self.gave_lock = False
2114       raise mcpu.LockAcquireTimeout()
2115
2116     if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
2117       assert self.retries == 0, "Didn't exhaust all retries at highest priority"
2118       assert not ts.timeouts
2119       self.assert_(timeout is None)
2120
2121   def _AfterStart(self, op, cbs):
2122     job = self.job
2123
2124     # Setting to "running" requires an update
2125     self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2126     self.assertRaises(IndexError, self.queue.GetNextUpdate)
2127
2128     self.assertFalse(self.queue.IsAcquired())
2129     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
2130
2131     # Job is running, cancelling shouldn't be possible
2132     (success, _) = job.Cancel()
2133     self.assertFalse(success)
2134
2135   def _NextOpcode(self):
2136     self.curop = self.opcounter.next()
2137     self.prev_prio = self.job.ops[self.curop].priority
2138     self.prev_status = self.job.ops[self.curop].status
2139
2140   def _NewTimeoutStrategy(self):
2141     job = self.job
2142
2143     self.assertEqual(self.retries, 0)
2144
2145     if self.prev_tsop == self.curop:
2146       # Still on the same opcode, priority must've been increased
2147       self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
2148
2149     if self.curop == 1:
2150       # Normal retry
2151       timeouts = range(10, 31, 10)
2152       self.retries = len(timeouts) - 1
2153
2154     elif self.curop == 2:
2155       # Let this run into a blocking acquire
2156       timeouts = range(11, 61, 12)
2157       self.retries = len(timeouts)
2158
2159     elif self.curop == 3:
2160       # Wait for priority to increase, but give lock before blocking acquire
2161       timeouts = range(12, 100, 14)
2162       self.retries = len(timeouts)
2163
2164       self.assertFalse(self.done_lock_before_blocking)
2165
2166     elif self.curop == 4:
2167       self.assert_(self.done_lock_before_blocking)
2168
2169       # Timeouts, but no need to retry
2170       timeouts = range(10, 31, 10)
2171       self.retries = 0
2172
2173     elif self.curop == 5:
2174       # Normal retry
2175       timeouts = range(19, 100, 11)
2176       self.retries = len(timeouts)
2177
2178     else:
2179       timeouts = []
2180       self.retries = 0
2181
2182     assert len(job.ops) == 10
2183     assert self.retries <= len(timeouts)
2184
2185     ts = _FakeTimeoutStrategy(timeouts)
2186
2187     self.timeout_strategy = ts
2188     self.prev_tsop = self.curop
2189     self.prev_prio = job.ops[self.curop].priority
2190
2191     return ts
2192
2193   def testTimeout(self):
2194     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2195            for i in range(10)]
2196
2197     # Create job
2198     job_id = 15801
2199     job = self._CreateJob(self.queue, job_id, ops)
2200     self.job = job
2201
2202     self.opcounter = itertools.count(0)
2203
2204     opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
2205                                     self._AfterStart)
2206     tsf = self._NewTimeoutStrategy
2207
2208     self.assertFalse(self.done_lock_before_blocking)
2209
2210     while True:
2211       proc = jqueue._JobProcessor(self.queue, opexec, job,
2212                                   _timeout_strategy_factory=tsf)
2213
2214       self.assertRaises(IndexError, self.queue.GetNextUpdate)
2215
2216       if self.curop is not None:
2217         self.prev_status = self.job.ops[self.curop].status
2218
2219       self.lock_acq_prio = None
2220
2221       result = proc(_nextop_fn=self._NextOpcode)
2222       assert self.curop is not None
2223
2224       if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
2225         # Got lock and/or job is done, result must've been written
2226         self.assertFalse(job.cur_opctx)
2227         self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2228         self.assertRaises(IndexError, self.queue.GetNextUpdate)
2229         self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
2230         self.assert_(job.ops[self.curop].exec_timestamp)
2231
2232       if result == jqueue._JobProcessor.FINISHED:
2233         self.assertFalse(job.cur_opctx)
2234         break
2235
2236       self.assertEqual(result, jqueue._JobProcessor.DEFER)
2237
2238       if self.curop == 0:
2239         self.assertEqual(job.ops[self.curop].start_timestamp,
2240                          job.start_timestamp)
2241
2242       if self.gave_lock:
2243         # Opcode finished, but job not yet done
2244         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2245       else:
2246         # Did not get locks
2247         self.assert_(job.cur_opctx)
2248         self.assertEqual(job.cur_opctx._timeout_strategy._fn,
2249                          self.timeout_strategy.NextAttempt)
2250         self.assertFalse(job.ops[self.curop].exec_timestamp)
2251         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2252
2253         # If priority has changed since acquiring locks, the job must've been
2254         # updated
2255         if self.lock_acq_prio != job.ops[self.curop].priority:
2256           self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2257
2258       self.assertRaises(IndexError, self.queue.GetNextUpdate)
2259
2260       self.assert_(job.start_timestamp)
2261       self.assertFalse(job.end_timestamp)
2262
2263     self.assertEqual(self.curop, len(job.ops) - 1)
2264     self.assertEqual(self.job, job)
2265     self.assertEqual(self.opcounter.next(), len(job.ops))
2266     self.assert_(self.done_lock_before_blocking)
2267
2268     self.assertRaises(IndexError, self.queue.GetNextUpdate)
2269     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2270     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2271     self.assertEqual(job.GetInfo(["opresult"]),
2272                      [[op.input.result for op in job.ops]])
2273     self.assertEqual(job.GetInfo(["opstatus"]),
2274                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
2275     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
2276                             for op in job.ops))
2277
2278     # Calling the processor on a finished job should be a no-op
2279     self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2280                      jqueue._JobProcessor.FINISHED)
2281     self.assertRaises(IndexError, self.queue.GetNextUpdate)
2282
2283
2284 class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
2285   def setUp(self):
2286     self.queue = _FakeQueueForProc()
2287     self.opexecprio = []
2288
2289   def _BeforeStart(self, timeout, priority):
2290     self.assertFalse(self.queue.IsAcquired())
2291     self.opexecprio.append(priority)
2292
2293   def testChangePriorityWhileRunning(self):
2294     # Tests changing the priority on a job while it has finished opcodes
2295     # (successful) and more, unprocessed ones
2296     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2297            for i in range(3)]
2298
2299     # Create job
2300     job_id = 3499
2301     job = self._CreateJob(self.queue, job_id, ops)
2302
2303     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2304
2305     opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
2306
2307     # Run first opcode
2308     self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2309                      jqueue._JobProcessor.DEFER)
2310
2311     # Job goes back to queued
2312     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2313     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2314     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2315                      [[constants.OP_STATUS_SUCCESS,
2316                        constants.OP_STATUS_QUEUED,
2317                        constants.OP_STATUS_QUEUED],
2318                       ["Res0", None, None]])
2319
2320     self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
2321     self.assertRaises(IndexError, self.opexecprio.pop, 0)
2322
2323     # Change priority
2324     self.assertEqual(job.ChangePriority(-10),
2325                      (True,
2326                       ("Priorities of pending opcodes for job 3499 have"
2327                        " been changed to -10")))
2328     self.assertEqual(job.CalcPriority(), -10)
2329
2330     # Process second opcode
2331     self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2332                      jqueue._JobProcessor.DEFER)
2333
2334     self.assertEqual(self.opexecprio.pop(0), -10)
2335     self.assertRaises(IndexError, self.opexecprio.pop, 0)
2336
2337     # Check status
2338     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2339     self.assertEqual(job.CalcPriority(), -10)
2340     self.assertEqual(job.GetInfo(["id"]), [job_id])
2341     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
2342     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2343                      [[constants.OP_STATUS_SUCCESS,
2344                        constants.OP_STATUS_SUCCESS,
2345                        constants.OP_STATUS_QUEUED],
2346                       ["Res0", "Res1", None]])
2347
2348     # Change priority once more
2349     self.assertEqual(job.ChangePriority(5),
2350                      (True,
2351                       ("Priorities of pending opcodes for job 3499 have"
2352                        " been changed to 5")))
2353     self.assertEqual(job.CalcPriority(), 5)
2354
2355     # Process third opcode
2356     self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2357                      jqueue._JobProcessor.FINISHED)
2358
2359     self.assertEqual(self.opexecprio.pop(0), 5)
2360     self.assertRaises(IndexError, self.opexecprio.pop, 0)
2361
2362     # Check status
2363     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2364     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2365     self.assertEqual(job.GetInfo(["id"]), [job_id])
2366     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2367     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2368                      [[constants.OP_STATUS_SUCCESS,
2369                        constants.OP_STATUS_SUCCESS,
2370                        constants.OP_STATUS_SUCCESS],
2371                       ["Res0", "Res1", "Res2"]])
2372     self.assertEqual(map(operator.attrgetter("priority"), job.ops),
2373                      [constants.OP_PRIO_DEFAULT, -10, 5])
2374
2375
2376 class _IdOnlyFakeJob:
2377   def __init__(self, job_id, priority=NotImplemented):
2378     self.id = str(job_id)
2379     self._priority = priority
2380
2381   def CalcPriority(self):
2382     return self._priority
2383
2384
2385 class TestJobDependencyManager(unittest.TestCase):
2386   def setUp(self):
2387     self._status = []
2388     self._queue = []
2389     self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
2390
2391   def _GetStatus(self, job_id):
2392     (exp_job_id, result) = self._status.pop(0)
2393     self.assertEqual(exp_job_id, job_id)
2394     return result
2395
2396   def _Enqueue(self, jobs):
2397     self.assertFalse(self.jdm._lock.is_owned(),
2398                      msg=("Must not own manager lock while re-adding jobs"
2399                           " (potential deadlock)"))
2400     self._queue.append(jobs)
2401
2402   def testNotFinalizedThenCancel(self):
2403     job = _IdOnlyFakeJob(17697)
2404     job_id = str(28625)
2405
2406     self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2407     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2408     self.assertEqual(result, self.jdm.WAIT)
2409     self.assertFalse(self._status)
2410     self.assertFalse(self._queue)
2411     self.assertTrue(self.jdm.JobWaiting(job))
2412     self.assertEqual(self.jdm._waiters, {
2413       job_id: set([job]),
2414       })
2415     self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2416       ("job/28625", None, None, [("job", [job.id])])
2417       ])
2418
2419     self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2420     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2421     self.assertEqual(result, self.jdm.CANCEL)
2422     self.assertFalse(self._status)
2423     self.assertFalse(self._queue)
2424     self.assertFalse(self.jdm.JobWaiting(job))
2425     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2426
2427   def testNotFinalizedThenQueued(self):
2428     # This can happen on a queue shutdown
2429     job = _IdOnlyFakeJob(1320)
2430     job_id = str(22971)
2431
2432     for i in range(5):
2433       if i > 2:
2434         self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2435       else:
2436         self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2437       (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2438       self.assertEqual(result, self.jdm.WAIT)
2439       self.assertFalse(self._status)
2440       self.assertFalse(self._queue)
2441       self.assertTrue(self.jdm.JobWaiting(job))
2442       self.assertEqual(self.jdm._waiters, {
2443         job_id: set([job]),
2444         })
2445       self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2446         ("job/22971", None, None, [("job", [job.id])])
2447         ])
2448
2449   def testRequireCancel(self):
2450     job = _IdOnlyFakeJob(5278)
2451     job_id = str(9610)
2452     dep_status = [constants.JOB_STATUS_CANCELED]
2453
2454     self._status.append((job_id, constants.JOB_STATUS_WAITING))
2455     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2456     self.assertEqual(result, self.jdm.WAIT)
2457     self.assertFalse(self._status)
2458     self.assertFalse(self._queue)
2459     self.assertTrue(self.jdm.JobWaiting(job))
2460     self.assertEqual(self.jdm._waiters, {
2461       job_id: set([job]),
2462       })
2463     self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2464       ("job/9610", None, None, [("job", [job.id])])
2465       ])
2466
2467     self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2468     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2469     self.assertEqual(result, self.jdm.CONTINUE)
2470     self.assertFalse(self._status)
2471     self.assertFalse(self._queue)
2472     self.assertFalse(self.jdm.JobWaiting(job))
2473     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2474
2475   def testRequireError(self):
2476     job = _IdOnlyFakeJob(21459)
2477     job_id = str(25519)
2478     dep_status = [constants.JOB_STATUS_ERROR]
2479
2480     self._status.append((job_id, constants.JOB_STATUS_WAITING))
2481     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2482     self.assertEqual(result, self.jdm.WAIT)
2483     self.assertFalse(self._status)
2484     self.assertFalse(self._queue)
2485     self.assertTrue(self.jdm.JobWaiting(job))
2486     self.assertEqual(self.jdm._waiters, {
2487       job_id: set([job]),
2488       })
2489
2490     self._status.append((job_id, constants.JOB_STATUS_ERROR))
2491     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2492     self.assertEqual(result, self.jdm.CONTINUE)
2493     self.assertFalse(self._status)
2494     self.assertFalse(self._queue)
2495     self.assertFalse(self.jdm.JobWaiting(job))
2496     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2497
2498   def testRequireMultiple(self):
2499     dep_status = list(constants.JOBS_FINALIZED)
2500
2501     for end_status in dep_status:
2502       job = _IdOnlyFakeJob(21343)
2503       job_id = str(14609)
2504
2505       self._status.append((job_id, constants.JOB_STATUS_WAITING))
2506       (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2507       self.assertEqual(result, self.jdm.WAIT)
2508       self.assertFalse(self._status)
2509       self.assertFalse(self._queue)
2510       self.assertTrue(self.jdm.JobWaiting(job))
2511       self.assertEqual(self.jdm._waiters, {
2512         job_id: set([job]),
2513         })
2514       self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2515         ("job/14609", None, None, [("job", [job.id])])
2516         ])
2517
2518       self._status.append((job_id, end_status))
2519       (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2520       self.assertEqual(result, self.jdm.CONTINUE)
2521       self.assertFalse(self._status)
2522       self.assertFalse(self._queue)
2523       self.assertFalse(self.jdm.JobWaiting(job))
2524       self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2525
2526   def testNotify(self):
2527     job = _IdOnlyFakeJob(8227)
2528     job_id = str(4113)
2529
2530     self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2531     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2532     self.assertEqual(result, self.jdm.WAIT)
2533     self.assertFalse(self._status)
2534     self.assertFalse(self._queue)
2535     self.assertTrue(self.jdm.JobWaiting(job))
2536     self.assertEqual(self.jdm._waiters, {
2537       job_id: set([job]),
2538       })
2539
2540     self.jdm.NotifyWaiters(job_id)
2541     self.assertFalse(self._status)
2542     self.assertFalse(self.jdm._waiters)
2543     self.assertFalse(self.jdm.JobWaiting(job))
2544     self.assertEqual(self._queue, [set([job])])
2545
2546   def testWrongStatus(self):
2547     job = _IdOnlyFakeJob(10102)
2548     job_id = str(1271)
2549
2550     self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2551     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2552                                             [constants.JOB_STATUS_SUCCESS])
2553     self.assertEqual(result, self.jdm.WAIT)
2554     self.assertFalse(self._status)
2555     self.assertFalse(self._queue)
2556     self.assertTrue(self.jdm.JobWaiting(job))
2557     self.assertEqual(self.jdm._waiters, {
2558       job_id: set([job]),
2559       })
2560
2561     self._status.append((job_id, constants.JOB_STATUS_ERROR))
2562     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2563                                             [constants.JOB_STATUS_SUCCESS])
2564     self.assertEqual(result, self.jdm.WRONGSTATUS)
2565     self.assertFalse(self._status)
2566     self.assertFalse(self._queue)
2567     self.assertFalse(self.jdm.JobWaiting(job))
2568
2569   def testCorrectStatus(self):
2570     job = _IdOnlyFakeJob(24273)
2571     job_id = str(23885)
2572
2573     self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2574     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2575                                             [constants.JOB_STATUS_SUCCESS])
2576     self.assertEqual(result, self.jdm.WAIT)
2577     self.assertFalse(self._status)
2578     self.assertFalse(self._queue)
2579     self.assertTrue(self.jdm.JobWaiting(job))
2580     self.assertEqual(self.jdm._waiters, {
2581       job_id: set([job]),
2582       })
2583
2584     self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2585     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2586                                             [constants.JOB_STATUS_SUCCESS])
2587     self.assertEqual(result, self.jdm.CONTINUE)
2588     self.assertFalse(self._status)
2589     self.assertFalse(self._queue)
2590     self.assertFalse(self.jdm.JobWaiting(job))
2591
2592   def testFinalizedRightAway(self):
2593     job = _IdOnlyFakeJob(224)
2594     job_id = str(3081)
2595
2596     self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2597     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2598                                             [constants.JOB_STATUS_SUCCESS])
2599     self.assertEqual(result, self.jdm.CONTINUE)
2600     self.assertFalse(self._status)
2601     self.assertFalse(self._queue)
2602     self.assertFalse(self.jdm.JobWaiting(job))
2603     self.assertEqual(self.jdm._waiters, {
2604       job_id: set(),
2605       })
2606
2607     # Force cleanup
2608     self.jdm.NotifyWaiters("0")
2609     self.assertFalse(self.jdm._waiters)
2610     self.assertFalse(self._status)
2611     self.assertFalse(self._queue)
2612
2613   def testMultipleWaiting(self):
2614     # Use a deterministic random generator
2615     rnd = random.Random(21402)
2616
2617     job_ids = map(str, rnd.sample(range(1, 10000), 150))
2618
2619     waiters = dict((job_ids.pop(),
2620                     set(map(_IdOnlyFakeJob,
2621                             [job_ids.pop()
2622                              for _ in range(rnd.randint(1, 20))])))
2623                    for _ in range(10))
2624
2625     # Ensure there are no duplicate job IDs
2626     assert not utils.FindDuplicates(waiters.keys() +
2627                                     [job.id
2628                                      for jobs in waiters.values()
2629                                      for job in jobs])
2630
2631     # Register all jobs as waiters
2632     for job_id, job in [(job_id, job)
2633                         for (job_id, jobs) in waiters.items()
2634                         for job in jobs]:
2635       self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2636       (result, _) = self.jdm.CheckAndRegister(job, job_id,
2637                                               [constants.JOB_STATUS_SUCCESS])
2638       self.assertEqual(result, self.jdm.WAIT)
2639       self.assertFalse(self._status)
2640       self.assertFalse(self._queue)
2641       self.assertTrue(self.jdm.JobWaiting(job))
2642
2643     self.assertEqual(self.jdm._waiters, waiters)
2644
2645     def _MakeSet((name, mode, owner_names, pending)):
2646       return (name, mode, owner_names,
2647               [(pendmode, set(pend)) for (pendmode, pend) in pending])
2648
2649     def _CheckLockInfo():
2650       info = self.jdm.GetLockInfo([query.LQ_PENDING])
2651       self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2652         ("job/%s" % job_id, None, None,
2653          [("job", set([job.id for job in jobs]))])
2654         for job_id, jobs in waiters.items()
2655         if jobs
2656         ]))
2657
2658     _CheckLockInfo()
2659
2660     # Notify in random order
2661     for job_id in rnd.sample(waiters, len(waiters)):
2662       # Remove from pending waiter list
2663       jobs = waiters.pop(job_id)
2664       for job in jobs:
2665         self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2666         (result, _) = self.jdm.CheckAndRegister(job, job_id,
2667                                                 [constants.JOB_STATUS_SUCCESS])
2668         self.assertEqual(result, self.jdm.CONTINUE)
2669         self.assertFalse(self._status)
2670         self.assertFalse(self._queue)
2671         self.assertFalse(self.jdm.JobWaiting(job))
2672
2673       _CheckLockInfo()
2674
2675     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2676
2677     assert not waiters
2678
2679   def testSelfDependency(self):
2680     job = _IdOnlyFakeJob(18937)
2681
2682     self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2683     (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2684     self.assertEqual(result, self.jdm.ERROR)
2685
2686   def testJobDisappears(self):
2687     job = _IdOnlyFakeJob(30540)
2688     job_id = str(23769)
2689
2690     def _FakeStatus(_):
2691       raise errors.JobLost("#msg#")
2692
2693     jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2694     (result, _) = jdm.CheckAndRegister(job, job_id, [])
2695     self.assertEqual(result, self.jdm.ERROR)
2696     self.assertFalse(jdm.JobWaiting(job))
2697     self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2698
2699
2700 if __name__ == "__main__":
2701   testutils.GanetiTestProgram()