Add unit tests for RADOSBLockDevice
[ganeti-local] / test / py / ganeti.jqueue_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.jqueue"""
23
24 import os
25 import sys
26 import unittest
27 import tempfile
28 import shutil
29 import errno
30 import itertools
31 import random
32 import operator
33
34 try:
35   # pylint: disable=E0611
36   from pyinotify import pyinotify
37 except ImportError:
38   import pyinotify
39
40 from ganeti import constants
41 from ganeti import utils
42 from ganeti import errors
43 from ganeti import jqueue
44 from ganeti import opcodes
45 from ganeti import compat
46 from ganeti import mcpu
47 from ganeti import query
48 from ganeti import workerpool
49
50 import testutils
51
52
53 class _FakeJob:
54   def __init__(self, job_id, status):
55     self.id = job_id
56     self.writable = False
57     self._status = status
58     self._log = []
59
60   def SetStatus(self, status):
61     self._status = status
62
63   def AddLogEntry(self, msg):
64     self._log.append((len(self._log), msg))
65
66   def CalcStatus(self):
67     return self._status
68
69   def GetInfo(self, fields):
70     result = []
71
72     for name in fields:
73       if name == "status":
74         result.append(self._status)
75       else:
76         raise Exception("Unknown field")
77
78     return result
79
80   def GetLogEntries(self, newer_than):
81     assert newer_than is None or newer_than >= 0
82
83     if newer_than is None:
84       return self._log
85
86     return self._log[newer_than:]
87
88
89 class TestJobChangesChecker(unittest.TestCase):
90   def testStatus(self):
91     job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
92     checker = jqueue._JobChangesChecker(["status"], None, None)
93     self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
94
95     job.SetStatus(constants.JOB_STATUS_RUNNING)
96     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
97
98     job.SetStatus(constants.JOB_STATUS_SUCCESS)
99     self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
100
101     # job.id is used by checker
102     self.assertEqual(job.id, 9094)
103
104   def testStatusWithPrev(self):
105     job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
106     checker = jqueue._JobChangesChecker(["status"],
107                                         [constants.JOB_STATUS_QUEUED], None)
108     self.assert_(checker(job) is None)
109
110     job.SetStatus(constants.JOB_STATUS_RUNNING)
111     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
112
113   def testFinalStatus(self):
114     for status in constants.JOBS_FINALIZED:
115       job = _FakeJob(2178711, status)
116       checker = jqueue._JobChangesChecker(["status"], [status], None)
117       # There won't be any changes in this status, hence it should signal
118       # a change immediately
119       self.assertEqual(checker(job), ([status], []))
120
121   def testLog(self):
122     job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
123     checker = jqueue._JobChangesChecker(["status"], None, None)
124     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
125
126     job.AddLogEntry("Hello World")
127     (job_info, log_entries) = checker(job)
128     self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
129     self.assertEqual(log_entries, [[0, "Hello World"]])
130
131     checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
132     self.assert_(checker2(job) is None)
133
134     job.AddLogEntry("Foo Bar")
135     job.SetStatus(constants.JOB_STATUS_ERROR)
136
137     (job_info, log_entries) = checker2(job)
138     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
139     self.assertEqual(log_entries, [[1, "Foo Bar"]])
140
141     checker3 = jqueue._JobChangesChecker(["status"], None, None)
142     (job_info, log_entries) = checker3(job)
143     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
144     self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
145
146
147 class TestJobChangesWaiter(unittest.TestCase):
148   def setUp(self):
149     self.tmpdir = tempfile.mkdtemp()
150     self.filename = utils.PathJoin(self.tmpdir, "job-1")
151     utils.WriteFile(self.filename, data="")
152
153   def tearDown(self):
154     shutil.rmtree(self.tmpdir)
155
156   def _EnsureNotifierClosed(self, notifier):
157     try:
158       os.fstat(notifier._fd)
159     except EnvironmentError, err:
160       self.assertEqual(err.errno, errno.EBADF)
161     else:
162       self.fail("File descriptor wasn't closed")
163
164   def testClose(self):
165     for wait in [False, True]:
166       waiter = jqueue._JobFileChangesWaiter(self.filename)
167       try:
168         if wait:
169           waiter.Wait(0.001)
170       finally:
171         waiter.Close()
172
173       # Ensure file descriptor was closed
174       self._EnsureNotifierClosed(waiter._notifier)
175
176   def testChangingFile(self):
177     waiter = jqueue._JobFileChangesWaiter(self.filename)
178     try:
179       self.assertFalse(waiter.Wait(0.1))
180       utils.WriteFile(self.filename, data="changed")
181       self.assert_(waiter.Wait(60))
182     finally:
183       waiter.Close()
184
185     self._EnsureNotifierClosed(waiter._notifier)
186
187   def testChangingFile2(self):
188     waiter = jqueue._JobChangesWaiter(self.filename)
189     try:
190       self.assertFalse(waiter._filewaiter)
191       self.assert_(waiter.Wait(0.1))
192       self.assert_(waiter._filewaiter)
193
194       # File waiter is now used, but there have been no changes
195       self.assertFalse(waiter.Wait(0.1))
196       utils.WriteFile(self.filename, data="changed")
197       self.assert_(waiter.Wait(60))
198     finally:
199       waiter.Close()
200
201     self._EnsureNotifierClosed(waiter._filewaiter._notifier)
202
203
204 class _FailingWatchManager(pyinotify.WatchManager):
205   """Subclass of L{pyinotify.WatchManager} which always fails to register.
206
207   """
208   def add_watch(self, filename, mask):
209     assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] |
210                     pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"])
211
212     return {
213       filename: -1,
214       }
215
216
217 class TestWaitForJobChangesHelper(unittest.TestCase):
218   def setUp(self):
219     self.tmpdir = tempfile.mkdtemp()
220     self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
221     utils.WriteFile(self.filename, data="")
222
223   def tearDown(self):
224     shutil.rmtree(self.tmpdir)
225
226   def _LoadWaitingJob(self):
227     return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
228
229   def _LoadLostJob(self):
230     return None
231
232   def testNoChanges(self):
233     wfjc = jqueue._WaitForJobChangesHelper()
234
235     # No change
236     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
237                           [constants.JOB_STATUS_WAITING], None, 0.1),
238                      constants.JOB_NOTCHANGED)
239
240     # No previous information
241     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
242                           ["status"], None, None, 1.0),
243                      ([constants.JOB_STATUS_WAITING], []))
244
245   def testLostJob(self):
246     wfjc = jqueue._WaitForJobChangesHelper()
247     self.assert_(wfjc(self.filename, self._LoadLostJob,
248                       ["status"], None, None, 1.0) is None)
249
250   def testNonExistentFile(self):
251     wfjc = jqueue._WaitForJobChangesHelper()
252
253     filename = utils.PathJoin(self.tmpdir, "does-not-exist")
254     self.assertFalse(os.path.exists(filename))
255
256     result = wfjc(filename, self._LoadLostJob, ["status"], None, None, 1.0,
257                   _waiter_cls=compat.partial(jqueue._JobChangesWaiter,
258                                              _waiter_cls=NotImplemented))
259     self.assertTrue(result is None)
260
261   def testInotifyError(self):
262     jobfile_waiter_cls = \
263       compat.partial(jqueue._JobFileChangesWaiter,
264                      _inotify_wm_cls=_FailingWatchManager)
265
266     jobchange_waiter_cls = \
267       compat.partial(jqueue._JobChangesWaiter, _waiter_cls=jobfile_waiter_cls)
268
269     wfjc = jqueue._WaitForJobChangesHelper()
270
271     # Test if failing to watch a job file (e.g. due to
272     # fs.inotify.max_user_watches being too low) raises errors.InotifyError
273     self.assertRaises(errors.InotifyError, wfjc,
274                       self.filename, self._LoadWaitingJob,
275                       ["status"], [constants.JOB_STATUS_WAITING], None, 1.0,
276                       _waiter_cls=jobchange_waiter_cls)
277
278
279 class TestEncodeOpError(unittest.TestCase):
280   def test(self):
281     encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
282     self.assert_(isinstance(encerr, tuple))
283     self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
284
285     encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
286     self.assert_(isinstance(encerr, tuple))
287     self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
288
289     encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
290     self.assert_(isinstance(encerr, tuple))
291     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
292
293     encerr = jqueue._EncodeOpError("Hello World")
294     self.assert_(isinstance(encerr, tuple))
295     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
296
297
298 class TestQueuedOpCode(unittest.TestCase):
299   def testDefaults(self):
300     def _Check(op):
301       self.assertFalse(hasattr(op.input, "dry_run"))
302       self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
303       self.assertFalse(op.log)
304       self.assert_(op.start_timestamp is None)
305       self.assert_(op.exec_timestamp is None)
306       self.assert_(op.end_timestamp is None)
307       self.assert_(op.result is None)
308       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
309
310     op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
311     _Check(op1)
312     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
313     _Check(op2)
314     self.assertEqual(op1.Serialize(), op2.Serialize())
315
316   def testPriority(self):
317     def _Check(op):
318       assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
319              "Default priority equals high priority; test can't work"
320       self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
321       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
322
323     inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
324     op1 = jqueue._QueuedOpCode(inpop)
325     _Check(op1)
326     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
327     _Check(op2)
328     self.assertEqual(op1.Serialize(), op2.Serialize())
329
330
331 class TestQueuedJob(unittest.TestCase):
332   def testNoOpCodes(self):
333     self.assertRaises(errors.GenericError, jqueue._QueuedJob,
334                       None, 1, [], False)
335
336   def testDefaults(self):
337     job_id = 4260
338     ops = [
339       opcodes.OpTagsGet(),
340       opcodes.OpTestDelay(),
341       ]
342
343     def _Check(job):
344       self.assertTrue(job.writable)
345       self.assertEqual(job.id, job_id)
346       self.assertEqual(job.log_serial, 0)
347       self.assert_(job.received_timestamp)
348       self.assert_(job.start_timestamp is None)
349       self.assert_(job.end_timestamp is None)
350       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
351       self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
352       self.assert_(repr(job).startswith("<"))
353       self.assertEqual(len(job.ops), len(ops))
354       self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
355                               for (inp, op) in zip(ops, job.ops)))
356       self.assertRaises(errors.OpPrereqError, job.GetInfo,
357                         ["unknown-field"])
358       self.assertEqual(job.GetInfo(["summary"]),
359                        [[op.input.Summary() for op in job.ops]])
360       self.assertFalse(job.archived)
361
362     job1 = jqueue._QueuedJob(None, job_id, ops, True)
363     _Check(job1)
364     job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
365     _Check(job2)
366     self.assertEqual(job1.Serialize(), job2.Serialize())
367
368   def testWritable(self):
369     job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
370     self.assertFalse(job.writable)
371
372     job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
373     self.assertTrue(job.writable)
374
375   def testArchived(self):
376     job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
377     self.assertFalse(job.archived)
378
379     newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
380     self.assertTrue(newjob.archived)
381
382     newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
383     self.assertFalse(newjob2.archived)
384
385   def testPriority(self):
386     job_id = 4283
387     ops = [
388       opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
389       opcodes.OpTestDelay(),
390       ]
391
392     def _Check(job):
393       self.assertEqual(job.id, job_id)
394       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
395       self.assert_(repr(job).startswith("<"))
396
397     job = jqueue._QueuedJob(None, job_id, ops, True)
398     _Check(job)
399     self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
400                             for op in job.ops))
401     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
402
403     # Increase first
404     job.ops[0].priority -= 1
405     _Check(job)
406     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
407
408     # Mark opcode as finished
409     job.ops[0].status = constants.OP_STATUS_SUCCESS
410     _Check(job)
411     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
412
413     # Increase second
414     job.ops[1].priority -= 10
415     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
416
417     # Test increasing first
418     job.ops[0].status = constants.OP_STATUS_RUNNING
419     job.ops[0].priority -= 19
420     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
421
422   def _JobForPriority(self, job_id):
423     ops = [
424       opcodes.OpTagsGet(),
425       opcodes.OpTestDelay(),
426       opcodes.OpTagsGet(),
427       opcodes.OpTestDelay(),
428       ]
429
430     job = jqueue._QueuedJob(None, job_id, ops, True)
431
432     self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
433                                for op in job.ops))
434     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
435     self.assertFalse(compat.any(hasattr(op.input, "priority")
436                                 for op in job.ops))
437
438     return job
439
440   def testChangePriorityAllQueued(self):
441     job = self._JobForPriority(24984)
442     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
443     self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
444                                for op in job.ops))
445     result = job.ChangePriority(-10)
446     self.assertEqual(job.CalcPriority(), -10)
447     self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
448     self.assertFalse(compat.any(hasattr(op.input, "priority")
449                                 for op in job.ops))
450     self.assertEqual(result,
451                      (True, ("Priorities of pending opcodes for job 24984 have"
452                              " been changed to -10")))
453
454   def testChangePriorityAllFinished(self):
455     job = self._JobForPriority(16405)
456
457     for (idx, op) in enumerate(job.ops):
458       if idx > 2:
459         op.status = constants.OP_STATUS_ERROR
460       else:
461         op.status = constants.OP_STATUS_SUCCESS
462
463     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
464     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
465     result = job.ChangePriority(-10)
466     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
467     self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
468                                for op in job.ops))
469     self.assertFalse(compat.any(hasattr(op.input, "priority")
470                                 for op in job.ops))
471     self.assertEqual(map(operator.attrgetter("status"), job.ops), [
472       constants.OP_STATUS_SUCCESS,
473       constants.OP_STATUS_SUCCESS,
474       constants.OP_STATUS_SUCCESS,
475       constants.OP_STATUS_ERROR,
476       ])
477     self.assertEqual(result, (False, "Job 16405 is finished"))
478
479   def testChangePriorityCancelling(self):
480     job = self._JobForPriority(31572)
481
482     for (idx, op) in enumerate(job.ops):
483       if idx > 1:
484         op.status = constants.OP_STATUS_CANCELING
485       else:
486         op.status = constants.OP_STATUS_SUCCESS
487
488     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
489     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
490     result = job.ChangePriority(5)
491     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
492     self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
493                                for op in job.ops))
494     self.assertFalse(compat.any(hasattr(op.input, "priority")
495                                 for op in job.ops))
496     self.assertEqual(map(operator.attrgetter("status"), job.ops), [
497       constants.OP_STATUS_SUCCESS,
498       constants.OP_STATUS_SUCCESS,
499       constants.OP_STATUS_CANCELING,
500       constants.OP_STATUS_CANCELING,
501       ])
502     self.assertEqual(result, (False, "Job 31572 is cancelling"))
503
504   def testChangePriorityFirstRunning(self):
505     job = self._JobForPriority(1716215889)
506
507     for (idx, op) in enumerate(job.ops):
508       if idx == 0:
509         op.status = constants.OP_STATUS_RUNNING
510       else:
511         op.status = constants.OP_STATUS_QUEUED
512
513     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
514     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
515     result = job.ChangePriority(7)
516     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
517     self.assertEqual(map(operator.attrgetter("priority"), job.ops),
518                      [constants.OP_PRIO_DEFAULT, 7, 7, 7])
519     self.assertFalse(compat.any(hasattr(op.input, "priority")
520                                 for op in job.ops))
521     self.assertEqual(map(operator.attrgetter("status"), job.ops), [
522       constants.OP_STATUS_RUNNING,
523       constants.OP_STATUS_QUEUED,
524       constants.OP_STATUS_QUEUED,
525       constants.OP_STATUS_QUEUED,
526       ])
527     self.assertEqual(result,
528                      (True, ("Priorities of pending opcodes for job"
529                              " 1716215889 have been changed to 7")))
530
531   def testChangePriorityLastRunning(self):
532     job = self._JobForPriority(1308)
533
534     for (idx, op) in enumerate(job.ops):
535       if idx == (len(job.ops) - 1):
536         op.status = constants.OP_STATUS_RUNNING
537       else:
538         op.status = constants.OP_STATUS_SUCCESS
539
540     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
541     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
542     result = job.ChangePriority(-3)
543     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
544     self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
545                                for op in job.ops))
546     self.assertFalse(compat.any(hasattr(op.input, "priority")
547                                 for op in job.ops))
548     self.assertEqual(map(operator.attrgetter("status"), job.ops), [
549       constants.OP_STATUS_SUCCESS,
550       constants.OP_STATUS_SUCCESS,
551       constants.OP_STATUS_SUCCESS,
552       constants.OP_STATUS_RUNNING,
553       ])
554     self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
555
556   def testChangePrioritySecondOpcodeRunning(self):
557     job = self._JobForPriority(27701)
558
559     self.assertEqual(len(job.ops), 4)
560     job.ops[0].status = constants.OP_STATUS_SUCCESS
561     job.ops[1].status = constants.OP_STATUS_RUNNING
562     job.ops[2].status = constants.OP_STATUS_QUEUED
563     job.ops[3].status = constants.OP_STATUS_QUEUED
564
565     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
566     result = job.ChangePriority(-19)
567     self.assertEqual(job.CalcPriority(), -19)
568     self.assertEqual(map(operator.attrgetter("priority"), job.ops),
569                      [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
570                       -19, -19])
571     self.assertFalse(compat.any(hasattr(op.input, "priority")
572                                 for op in job.ops))
573     self.assertEqual(map(operator.attrgetter("status"), job.ops), [
574       constants.OP_STATUS_SUCCESS,
575       constants.OP_STATUS_RUNNING,
576       constants.OP_STATUS_QUEUED,
577       constants.OP_STATUS_QUEUED,
578       ])
579     self.assertEqual(result,
580                      (True, ("Priorities of pending opcodes for job"
581                              " 27701 have been changed to -19")))
582
583   def testChangePriorityWithInconsistentJob(self):
584     job = self._JobForPriority(30097)
585
586     self.assertEqual(len(job.ops), 4)
587
588     # This job is invalid (as it has two opcodes marked as running) and make
589     # the call fail because an unprocessed opcode precedes a running one (which
590     # should never happen in reality)
591     job.ops[0].status = constants.OP_STATUS_SUCCESS
592     job.ops[1].status = constants.OP_STATUS_RUNNING
593     job.ops[2].status = constants.OP_STATUS_QUEUED
594     job.ops[3].status = constants.OP_STATUS_RUNNING
595
596     self.assertRaises(AssertionError, job.ChangePriority, 19)
597
598   def testCalcStatus(self):
599     def _Queued(ops):
600       # The default status is "queued"
601       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
602                               for op in ops))
603
604     def _Waitlock1(ops):
605       ops[0].status = constants.OP_STATUS_WAITING
606
607     def _Waitlock2(ops):
608       ops[0].status = constants.OP_STATUS_SUCCESS
609       ops[1].status = constants.OP_STATUS_SUCCESS
610       ops[2].status = constants.OP_STATUS_WAITING
611
612     def _Running(ops):
613       ops[0].status = constants.OP_STATUS_SUCCESS
614       ops[1].status = constants.OP_STATUS_RUNNING
615       for op in ops[2:]:
616         op.status = constants.OP_STATUS_QUEUED
617
618     def _Canceling1(ops):
619       ops[0].status = constants.OP_STATUS_SUCCESS
620       ops[1].status = constants.OP_STATUS_SUCCESS
621       for op in ops[2:]:
622         op.status = constants.OP_STATUS_CANCELING
623
624     def _Canceling2(ops):
625       for op in ops:
626         op.status = constants.OP_STATUS_CANCELING
627
628     def _Canceled(ops):
629       for op in ops:
630         op.status = constants.OP_STATUS_CANCELED
631
632     def _Error1(ops):
633       for idx, op in enumerate(ops):
634         if idx > 3:
635           op.status = constants.OP_STATUS_ERROR
636         else:
637           op.status = constants.OP_STATUS_SUCCESS
638
639     def _Error2(ops):
640       for op in ops:
641         op.status = constants.OP_STATUS_ERROR
642
643     def _Success(ops):
644       for op in ops:
645         op.status = constants.OP_STATUS_SUCCESS
646
647     tests = {
648       constants.JOB_STATUS_QUEUED: [_Queued],
649       constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
650       constants.JOB_STATUS_RUNNING: [_Running],
651       constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
652       constants.JOB_STATUS_CANCELED: [_Canceled],
653       constants.JOB_STATUS_ERROR: [_Error1, _Error2],
654       constants.JOB_STATUS_SUCCESS: [_Success],
655       }
656
657     def _NewJob():
658       job = jqueue._QueuedJob(None, 1,
659                               [opcodes.OpTestDelay() for _ in range(10)],
660                               True)
661       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
662       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
663                               for op in job.ops))
664       return job
665
666     for status in constants.JOB_STATUS_ALL:
667       sttests = tests[status]
668       assert sttests
669       for fn in sttests:
670         job = _NewJob()
671         fn(job.ops)
672         self.assertEqual(job.CalcStatus(), status)
673
674
675 class _FakeDependencyManager:
676   def __init__(self):
677     self._checks = []
678     self._notifications = []
679     self._waiting = set()
680
681   def AddCheckResult(self, job, dep_job_id, dep_status, result):
682     self._checks.append((job, dep_job_id, dep_status, result))
683
684   def CountPendingResults(self):
685     return len(self._checks)
686
687   def CountWaitingJobs(self):
688     return len(self._waiting)
689
690   def GetNextNotification(self):
691     return self._notifications.pop(0)
692
693   def JobWaiting(self, job):
694     return job in self._waiting
695
696   def CheckAndRegister(self, job, dep_job_id, dep_status):
697     (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
698
699     assert exp_job == job
700     assert exp_dep_job_id == dep_job_id
701     assert exp_dep_status == dep_status
702
703     (result_status, _) = result
704
705     if result_status == jqueue._JobDependencyManager.WAIT:
706       self._waiting.add(job)
707     elif result_status == jqueue._JobDependencyManager.CONTINUE:
708       self._waiting.remove(job)
709
710     return result
711
712   def NotifyWaiters(self, job_id):
713     self._notifications.append(job_id)
714
715
716 class _DisabledFakeDependencyManager:
717   def JobWaiting(self, _):
718     return False
719
720   def CheckAndRegister(self, *args):
721     assert False, "Should not be called"
722
723   def NotifyWaiters(self, _):
724     pass
725
726
727 class _FakeQueueForProc:
728   def __init__(self, depmgr=None):
729     self._acquired = False
730     self._updates = []
731     self._submitted = []
732     self._accepting_jobs = True
733
734     self._submit_count = itertools.count(1000)
735
736     if depmgr:
737       self.depmgr = depmgr
738     else:
739       self.depmgr = _DisabledFakeDependencyManager()
740
741   def IsAcquired(self):
742     return self._acquired
743
744   def GetNextUpdate(self):
745     return self._updates.pop(0)
746
747   def GetNextSubmittedJob(self):
748     return self._submitted.pop(0)
749
750   def acquire(self, shared=0):
751     assert shared == 1
752     self._acquired = True
753
754   def release(self):
755     assert self._acquired
756     self._acquired = False
757
758   def UpdateJobUnlocked(self, job, replicate=True):
759     assert self._acquired, "Lock not acquired while updating job"
760     self._updates.append((job, bool(replicate)))
761
762   def SubmitManyJobs(self, jobs):
763     assert not self._acquired, "Lock acquired while submitting jobs"
764     job_ids = [self._submit_count.next() for _ in jobs]
765     self._submitted.extend(zip(job_ids, jobs))
766     return job_ids
767
768   def StopAcceptingJobs(self):
769     self._accepting_jobs = False
770
771   def AcceptingJobsUnlocked(self):
772     return self._accepting_jobs
773
774
775 class _FakeExecOpCodeForProc:
776   def __init__(self, queue, before_start, after_start):
777     self._queue = queue
778     self._before_start = before_start
779     self._after_start = after_start
780
781   def __call__(self, op, cbs, timeout=None):
782     assert isinstance(op, opcodes.OpTestDummy)
783     assert not self._queue.IsAcquired(), \
784            "Queue lock not released when executing opcode"
785
786     if self._before_start:
787       self._before_start(timeout, cbs.CurrentPriority())
788
789     cbs.NotifyStart()
790
791     if self._after_start:
792       self._after_start(op, cbs)
793
794     # Check again after the callbacks
795     assert not self._queue.IsAcquired()
796
797     if op.fail:
798       raise errors.OpExecError("Error requested (%s)" % op.result)
799
800     if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
801       return cbs.SubmitManyJobs(op.submit_jobs)
802
803     return op.result
804
805
806 class _JobProcessorTestUtils:
807   def _CreateJob(self, queue, job_id, ops):
808     job = jqueue._QueuedJob(queue, job_id, ops, True)
809     self.assertFalse(job.start_timestamp)
810     self.assertFalse(job.end_timestamp)
811     self.assertEqual(len(ops), len(job.ops))
812     self.assert_(compat.all(op.input == inp
813                             for (op, inp) in zip(job.ops, ops)))
814     self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
815     return job
816
817
818 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
819   def _GenericCheckJob(self, job):
820     assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
821                       for op in job.ops)
822
823     self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
824                      [[op.start_timestamp for op in job.ops],
825                       [op.exec_timestamp for op in job.ops],
826                       [op.end_timestamp for op in job.ops]])
827     self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
828                      [job.received_timestamp,
829                       job.start_timestamp,
830                       job.end_timestamp])
831     self.assert_(job.start_timestamp)
832     self.assert_(job.end_timestamp)
833     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
834
835   def testSuccess(self):
836     queue = _FakeQueueForProc()
837
838     for (job_id, opcount) in [(25351, 1), (6637, 3),
839                               (24644, 10), (32207, 100)]:
840       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
841              for i in range(opcount)]
842
843       # Create job
844       job = self._CreateJob(queue, job_id, ops)
845
846       def _BeforeStart(timeout, priority):
847         self.assertEqual(queue.GetNextUpdate(), (job, True))
848         self.assertRaises(IndexError, queue.GetNextUpdate)
849         self.assertFalse(queue.IsAcquired())
850         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
851         self.assertFalse(job.cur_opctx)
852
853       def _AfterStart(op, cbs):
854         self.assertEqual(queue.GetNextUpdate(), (job, True))
855         self.assertRaises(IndexError, queue.GetNextUpdate)
856
857         self.assertFalse(queue.IsAcquired())
858         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
859         self.assertFalse(job.cur_opctx)
860
861         # Job is running, cancelling shouldn't be possible
862         (success, _) = job.Cancel()
863         self.assertFalse(success)
864
865       opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
866
867       for idx in range(len(ops)):
868         self.assertRaises(IndexError, queue.GetNextUpdate)
869         result = jqueue._JobProcessor(queue, opexec, job)()
870         self.assertEqual(queue.GetNextUpdate(), (job, True))
871         self.assertRaises(IndexError, queue.GetNextUpdate)
872         if idx == len(ops) - 1:
873           # Last opcode
874           self.assertEqual(result, jqueue._JobProcessor.FINISHED)
875         else:
876           self.assertEqual(result, jqueue._JobProcessor.DEFER)
877
878           self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
879           self.assert_(job.start_timestamp)
880           self.assertFalse(job.end_timestamp)
881
882       self.assertRaises(IndexError, queue.GetNextUpdate)
883
884       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
885       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
886       self.assertEqual(job.GetInfo(["opresult"]),
887                        [[op.input.result for op in job.ops]])
888       self.assertEqual(job.GetInfo(["opstatus"]),
889                        [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
890       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
891                               for op in job.ops))
892
893       self._GenericCheckJob(job)
894
895       # Calling the processor on a finished job should be a no-op
896       self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
897                        jqueue._JobProcessor.FINISHED)
898       self.assertRaises(IndexError, queue.GetNextUpdate)
899
900   def testOpcodeError(self):
901     queue = _FakeQueueForProc()
902
903     testdata = [
904       (17077, 1, 0, 0),
905       (1782, 5, 2, 2),
906       (18179, 10, 9, 9),
907       (4744, 10, 3, 8),
908       (23816, 100, 39, 45),
909       ]
910
911     for (job_id, opcount, failfrom, failto) in testdata:
912       # Prepare opcodes
913       ops = [opcodes.OpTestDummy(result="Res%s" % i,
914                                  fail=(failfrom <= i and
915                                        i <= failto))
916              for i in range(opcount)]
917
918       # Create job
919       job = self._CreateJob(queue, str(job_id), ops)
920
921       opexec = _FakeExecOpCodeForProc(queue, None, None)
922
923       for idx in range(len(ops)):
924         self.assertRaises(IndexError, queue.GetNextUpdate)
925         result = jqueue._JobProcessor(queue, opexec, job)()
926         # queued to waitlock
927         self.assertEqual(queue.GetNextUpdate(), (job, True))
928         # waitlock to running
929         self.assertEqual(queue.GetNextUpdate(), (job, True))
930         # Opcode result
931         self.assertEqual(queue.GetNextUpdate(), (job, True))
932         self.assertRaises(IndexError, queue.GetNextUpdate)
933
934         if idx in (failfrom, len(ops) - 1):
935           # Last opcode
936           self.assertEqual(result, jqueue._JobProcessor.FINISHED)
937           break
938
939         self.assertEqual(result, jqueue._JobProcessor.DEFER)
940
941         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
942
943       self.assertRaises(IndexError, queue.GetNextUpdate)
944
945       # Check job status
946       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
947       self.assertEqual(job.GetInfo(["id"]), [job_id])
948       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
949
950       # Check opcode status
951       data = zip(job.ops,
952                  job.GetInfo(["opstatus"])[0],
953                  job.GetInfo(["opresult"])[0])
954
955       for idx, (op, opstatus, opresult) in enumerate(data):
956         if idx < failfrom:
957           assert not op.input.fail
958           self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
959           self.assertEqual(opresult, op.input.result)
960         elif idx <= failto:
961           assert op.input.fail
962           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
963           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
964         else:
965           assert not op.input.fail
966           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
967           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
968
969       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
970                               for op in job.ops[:failfrom]))
971
972       self._GenericCheckJob(job)
973
974       # Calling the processor on a finished job should be a no-op
975       self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
976                        jqueue._JobProcessor.FINISHED)
977       self.assertRaises(IndexError, queue.GetNextUpdate)
978
979   def testCancelWhileInQueue(self):
980     queue = _FakeQueueForProc()
981
982     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
983            for i in range(5)]
984
985     # Create job
986     job_id = 17045
987     job = self._CreateJob(queue, job_id, ops)
988
989     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
990
991     # Mark as cancelled
992     (success, _) = job.Cancel()
993     self.assert_(success)
994
995     self.assertRaises(IndexError, queue.GetNextUpdate)
996
997     self.assertFalse(job.start_timestamp)
998     self.assertTrue(job.end_timestamp)
999     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
1000                             for op in job.ops))
1001
1002     # Serialize to check for differences
1003     before_proc = job.Serialize()
1004
1005     # Simulate processor called in workerpool
1006     opexec = _FakeExecOpCodeForProc(queue, None, None)
1007     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1008                      jqueue._JobProcessor.FINISHED)
1009
1010     # Check result
1011     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1012     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1013     self.assertFalse(job.start_timestamp)
1014     self.assertTrue(job.end_timestamp)
1015     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1016                                 for op in job.ops))
1017     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1018                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
1019                       ["Job canceled by request" for _ in job.ops]])
1020
1021     # Must not have changed or written
1022     self.assertEqual(before_proc, job.Serialize())
1023     self.assertRaises(IndexError, queue.GetNextUpdate)
1024
1025   def testCancelWhileWaitlockInQueue(self):
1026     queue = _FakeQueueForProc()
1027
1028     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1029            for i in range(5)]
1030
1031     # Create job
1032     job_id = 8645
1033     job = self._CreateJob(queue, job_id, ops)
1034
1035     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1036
1037     job.ops[0].status = constants.OP_STATUS_WAITING
1038
1039     assert len(job.ops) == 5
1040
1041     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1042
1043     # Mark as cancelling
1044     (success, _) = job.Cancel()
1045     self.assert_(success)
1046
1047     self.assertRaises(IndexError, queue.GetNextUpdate)
1048
1049     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1050                             for op in job.ops))
1051
1052     opexec = _FakeExecOpCodeForProc(queue, None, None)
1053     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1054                      jqueue._JobProcessor.FINISHED)
1055
1056     # Check result
1057     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1058     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1059     self.assertFalse(job.start_timestamp)
1060     self.assert_(job.end_timestamp)
1061     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1062                                 for op in job.ops))
1063     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1064                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
1065                       ["Job canceled by request" for _ in job.ops]])
1066
1067   def testCancelWhileWaitlock(self):
1068     queue = _FakeQueueForProc()
1069
1070     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1071            for i in range(5)]
1072
1073     # Create job
1074     job_id = 11009
1075     job = self._CreateJob(queue, job_id, ops)
1076
1077     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1078
1079     def _BeforeStart(timeout, priority):
1080       self.assertEqual(queue.GetNextUpdate(), (job, True))
1081       self.assertRaises(IndexError, queue.GetNextUpdate)
1082       self.assertFalse(queue.IsAcquired())
1083       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1084
1085       # Mark as cancelled
1086       (success, _) = job.Cancel()
1087       self.assert_(success)
1088
1089       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1090                               for op in job.ops))
1091       self.assertRaises(IndexError, queue.GetNextUpdate)
1092
1093     def _AfterStart(op, cbs):
1094       self.assertEqual(queue.GetNextUpdate(), (job, True))
1095       self.assertRaises(IndexError, queue.GetNextUpdate)
1096       self.assertFalse(queue.IsAcquired())
1097       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1098
1099     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1100
1101     self.assertRaises(IndexError, queue.GetNextUpdate)
1102     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1103                      jqueue._JobProcessor.FINISHED)
1104     self.assertEqual(queue.GetNextUpdate(), (job, True))
1105     self.assertRaises(IndexError, queue.GetNextUpdate)
1106
1107     # Check result
1108     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1109     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1110     self.assert_(job.start_timestamp)
1111     self.assert_(job.end_timestamp)
1112     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1113                                 for op in job.ops))
1114     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1115                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
1116                       ["Job canceled by request" for _ in job.ops]])
1117
1118   def _TestCancelWhileSomething(self, cb):
1119     queue = _FakeQueueForProc()
1120
1121     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1122            for i in range(5)]
1123
1124     # Create job
1125     job_id = 24314
1126     job = self._CreateJob(queue, job_id, ops)
1127
1128     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1129
1130     def _BeforeStart(timeout, priority):
1131       self.assertFalse(queue.IsAcquired())
1132       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1133
1134       # Mark as cancelled
1135       (success, _) = job.Cancel()
1136       self.assert_(success)
1137
1138       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1139                               for op in job.ops))
1140
1141       cb(queue)
1142
1143     def _AfterStart(op, cbs):
1144       self.fail("Should not reach this")
1145
1146     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1147
1148     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1149                      jqueue._JobProcessor.FINISHED)
1150
1151     # Check result
1152     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1153     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1154     self.assert_(job.start_timestamp)
1155     self.assert_(job.end_timestamp)
1156     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1157                                 for op in job.ops))
1158     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1159                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
1160                       ["Job canceled by request" for _ in job.ops]])
1161
1162     return queue
1163
1164   def testCancelWhileWaitlockWithTimeout(self):
1165     def fn(_):
1166       # Fake an acquire attempt timing out
1167       raise mcpu.LockAcquireTimeout()
1168
1169     self._TestCancelWhileSomething(fn)
1170
1171   def testCancelDuringQueueShutdown(self):
1172     queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
1173     self.assertFalse(queue.AcceptingJobsUnlocked())
1174
1175   def testCancelWhileRunning(self):
1176     # Tests canceling a job with finished opcodes and more, unprocessed ones
1177     queue = _FakeQueueForProc()
1178
1179     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1180            for i in range(3)]
1181
1182     # Create job
1183     job_id = 28492
1184     job = self._CreateJob(queue, job_id, ops)
1185
1186     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1187
1188     opexec = _FakeExecOpCodeForProc(queue, None, None)
1189
1190     # Run one opcode
1191     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1192                      jqueue._JobProcessor.DEFER)
1193
1194     # Job goes back to queued
1195     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1196     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1197                      [[constants.OP_STATUS_SUCCESS,
1198                        constants.OP_STATUS_QUEUED,
1199                        constants.OP_STATUS_QUEUED],
1200                       ["Res0", None, None]])
1201
1202     # Mark as cancelled
1203     (success, _) = job.Cancel()
1204     self.assert_(success)
1205
1206     # Try processing another opcode (this will actually cancel the job)
1207     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1208                      jqueue._JobProcessor.FINISHED)
1209
1210     # Check result
1211     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1212     self.assertEqual(job.GetInfo(["id"]), [job_id])
1213     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1214     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1215                      [[constants.OP_STATUS_SUCCESS,
1216                        constants.OP_STATUS_CANCELED,
1217                        constants.OP_STATUS_CANCELED],
1218                       ["Res0", "Job canceled by request",
1219                        "Job canceled by request"]])
1220
1221   def _TestQueueShutdown(self, queue, opexec, job, runcount):
1222     self.assertTrue(queue.AcceptingJobsUnlocked())
1223
1224     # Simulate shutdown
1225     queue.StopAcceptingJobs()
1226
1227     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1228                      jqueue._JobProcessor.DEFER)
1229
1230     # Check result
1231     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1232     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
1233     self.assertFalse(job.cur_opctx)
1234     self.assertTrue(job.start_timestamp)
1235     self.assertFalse(job.end_timestamp)
1236     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1237     self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1238                                for op in job.ops[:runcount]))
1239     self.assertFalse(job.ops[runcount].end_timestamp)
1240     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1241                                 for op in job.ops[(runcount + 1):]))
1242     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1243                      [(([constants.OP_STATUS_SUCCESS] * runcount) +
1244                        ([constants.OP_STATUS_QUEUED] *
1245                         (len(job.ops) - runcount))),
1246                       (["Res%s" % i for i in range(runcount)] +
1247                        ([None] * (len(job.ops) - runcount)))])
1248
1249     # Must have been written and replicated
1250     self.assertEqual(queue.GetNextUpdate(), (job, True))
1251     self.assertRaises(IndexError, queue.GetNextUpdate)
1252
1253   def testQueueShutdownWhileRunning(self):
1254     # Tests shutting down the queue while a job is running
1255     queue = _FakeQueueForProc()
1256
1257     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1258            for i in range(3)]
1259
1260     # Create job
1261     job_id = 2718211587
1262     job = self._CreateJob(queue, job_id, ops)
1263
1264     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1265
1266     opexec = _FakeExecOpCodeForProc(queue, None, None)
1267
1268     self.assertRaises(IndexError, queue.GetNextUpdate)
1269
1270     # Run one opcode
1271     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1272                      jqueue._JobProcessor.DEFER)
1273
1274     # Job goes back to queued
1275     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1276     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1277                      [[constants.OP_STATUS_SUCCESS,
1278                        constants.OP_STATUS_QUEUED,
1279                        constants.OP_STATUS_QUEUED],
1280                       ["Res0", None, None]])
1281     self.assertFalse(job.cur_opctx)
1282
1283     # Writes for waiting, running and result
1284     for _ in range(3):
1285       self.assertEqual(queue.GetNextUpdate(), (job, True))
1286
1287     # Run second opcode
1288     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1289                      jqueue._JobProcessor.DEFER)
1290
1291     # Job goes back to queued
1292     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1293     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1294                      [[constants.OP_STATUS_SUCCESS,
1295                        constants.OP_STATUS_SUCCESS,
1296                        constants.OP_STATUS_QUEUED],
1297                       ["Res0", "Res1", None]])
1298     self.assertFalse(job.cur_opctx)
1299
1300     # Writes for waiting, running and result
1301     for _ in range(3):
1302       self.assertEqual(queue.GetNextUpdate(), (job, True))
1303
1304     self._TestQueueShutdown(queue, opexec, job, 2)
1305
1306   def testQueueShutdownWithLockTimeout(self):
1307     # Tests shutting down while a lock acquire times out
1308     queue = _FakeQueueForProc()
1309
1310     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1311            for i in range(3)]
1312
1313     # Create job
1314     job_id = 1304231178
1315     job = self._CreateJob(queue, job_id, ops)
1316
1317     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1318
1319     acquire_timeout = False
1320
1321     def _BeforeStart(timeout, priority):
1322       self.assertFalse(queue.IsAcquired())
1323       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1324       if acquire_timeout:
1325         raise mcpu.LockAcquireTimeout()
1326
1327     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
1328
1329     self.assertRaises(IndexError, queue.GetNextUpdate)
1330
1331     # Run one opcode
1332     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1333                      jqueue._JobProcessor.DEFER)
1334
1335     # Job goes back to queued
1336     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1337     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1338                      [[constants.OP_STATUS_SUCCESS,
1339                        constants.OP_STATUS_QUEUED,
1340                        constants.OP_STATUS_QUEUED],
1341                       ["Res0", None, None]])
1342     self.assertFalse(job.cur_opctx)
1343
1344     # Writes for waiting, running and result
1345     for _ in range(3):
1346       self.assertEqual(queue.GetNextUpdate(), (job, True))
1347
1348     # The next opcode should have expiring lock acquires
1349     acquire_timeout = True
1350
1351     self._TestQueueShutdown(queue, opexec, job, 1)
1352
1353   def testQueueShutdownWhileInQueue(self):
1354     # This should never happen in reality (no new jobs are started by the
1355     # workerpool once a shutdown has been initiated), but it's better to test
1356     # the job processor for this scenario
1357     queue = _FakeQueueForProc()
1358
1359     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1360            for i in range(5)]
1361
1362     # Create job
1363     job_id = 2031
1364     job = self._CreateJob(queue, job_id, ops)
1365
1366     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1367     self.assertRaises(IndexError, queue.GetNextUpdate)
1368
1369     self.assertFalse(job.start_timestamp)
1370     self.assertFalse(job.end_timestamp)
1371     self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
1372                                for op in job.ops))
1373
1374     opexec = _FakeExecOpCodeForProc(queue, None, None)
1375     self._TestQueueShutdown(queue, opexec, job, 0)
1376
1377   def testQueueShutdownWhileWaitlockInQueue(self):
1378     queue = _FakeQueueForProc()
1379
1380     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1381            for i in range(5)]
1382
1383     # Create job
1384     job_id = 53125685
1385     job = self._CreateJob(queue, job_id, ops)
1386
1387     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1388
1389     job.ops[0].status = constants.OP_STATUS_WAITING
1390
1391     assert len(job.ops) == 5
1392
1393     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1394
1395     self.assertRaises(IndexError, queue.GetNextUpdate)
1396
1397     opexec = _FakeExecOpCodeForProc(queue, None, None)
1398     self._TestQueueShutdown(queue, opexec, job, 0)
1399
1400   def testPartiallyRun(self):
1401     # Tests calling the processor on a job that's been partially run before the
1402     # program was restarted
1403     queue = _FakeQueueForProc()
1404
1405     opexec = _FakeExecOpCodeForProc(queue, None, None)
1406
1407     for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
1408       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1409              for i in range(10)]
1410
1411       # Create job
1412       job = self._CreateJob(queue, job_id, ops)
1413
1414       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1415
1416       for _ in range(successcount):
1417         self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1418                          jqueue._JobProcessor.DEFER)
1419
1420       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1421       self.assertEqual(job.GetInfo(["opstatus"]),
1422                        [[constants.OP_STATUS_SUCCESS
1423                          for _ in range(successcount)] +
1424                         [constants.OP_STATUS_QUEUED
1425                          for _ in range(len(ops) - successcount)]])
1426
1427       self.assert_(job.ops_iter)
1428
1429       # Serialize and restore (simulates program restart)
1430       newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1431       self.assertFalse(newjob.ops_iter)
1432       self._TestPartial(newjob, successcount)
1433
1434   def _TestPartial(self, job, successcount):
1435     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1436     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1437
1438     queue = _FakeQueueForProc()
1439     opexec = _FakeExecOpCodeForProc(queue, None, None)
1440
1441     for remaining in reversed(range(len(job.ops) - successcount)):
1442       result = jqueue._JobProcessor(queue, opexec, job)()
1443       self.assertEqual(queue.GetNextUpdate(), (job, True))
1444       self.assertEqual(queue.GetNextUpdate(), (job, True))
1445       self.assertEqual(queue.GetNextUpdate(), (job, True))
1446       self.assertRaises(IndexError, queue.GetNextUpdate)
1447
1448       if remaining == 0:
1449         # Last opcode
1450         self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1451         break
1452
1453       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1454
1455       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1456
1457     self.assertRaises(IndexError, queue.GetNextUpdate)
1458     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1459     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1460     self.assertEqual(job.GetInfo(["opresult"]),
1461                      [[op.input.result for op in job.ops]])
1462     self.assertEqual(job.GetInfo(["opstatus"]),
1463                      [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1464     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1465                             for op in job.ops))
1466
1467     self._GenericCheckJob(job)
1468
1469     # Calling the processor on a finished job should be a no-op
1470     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1471                      jqueue._JobProcessor.FINISHED)
1472     self.assertRaises(IndexError, queue.GetNextUpdate)
1473
1474     # ... also after being restored
1475     job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1476     # Calling the processor on a finished job should be a no-op
1477     self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1478                      jqueue._JobProcessor.FINISHED)
1479     self.assertRaises(IndexError, queue.GetNextUpdate)
1480
1481   def testProcessorOnRunningJob(self):
1482     ops = [opcodes.OpTestDummy(result="result", fail=False)]
1483
1484     queue = _FakeQueueForProc()
1485     opexec = _FakeExecOpCodeForProc(queue, None, None)
1486
1487     # Create job
1488     job = self._CreateJob(queue, 9571, ops)
1489
1490     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1491
1492     job.ops[0].status = constants.OP_STATUS_RUNNING
1493
1494     assert len(job.ops) == 1
1495
1496     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1497
1498     # Calling on running job must fail
1499     self.assertRaises(errors.ProgrammerError,
1500                       jqueue._JobProcessor(queue, opexec, job))
1501
1502   def testLogMessages(self):
1503     # Tests the "Feedback" callback function
1504     queue = _FakeQueueForProc()
1505
1506     messages = {
1507       1: [
1508         (None, "Hello"),
1509         (None, "World"),
1510         (constants.ELOG_MESSAGE, "there"),
1511         ],
1512       4: [
1513         (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1514         (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1515         ],
1516       }
1517     ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1518                                messages=messages.get(i, []))
1519            for i in range(5)]
1520
1521     # Create job
1522     job = self._CreateJob(queue, 29386, ops)
1523
1524     def _BeforeStart(timeout, priority):
1525       self.assertEqual(queue.GetNextUpdate(), (job, True))
1526       self.assertRaises(IndexError, queue.GetNextUpdate)
1527       self.assertFalse(queue.IsAcquired())
1528       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1529
1530     def _AfterStart(op, cbs):
1531       self.assertEqual(queue.GetNextUpdate(), (job, True))
1532       self.assertRaises(IndexError, queue.GetNextUpdate)
1533       self.assertFalse(queue.IsAcquired())
1534       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1535
1536       self.assertRaises(AssertionError, cbs.Feedback,
1537                         "too", "many", "arguments")
1538
1539       for (log_type, msg) in op.messages:
1540         self.assertRaises(IndexError, queue.GetNextUpdate)
1541         if log_type:
1542           cbs.Feedback(log_type, msg)
1543         else:
1544           cbs.Feedback(msg)
1545         # Check for job update without replication
1546         self.assertEqual(queue.GetNextUpdate(), (job, False))
1547         self.assertRaises(IndexError, queue.GetNextUpdate)
1548
1549     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1550
1551     for remaining in reversed(range(len(job.ops))):
1552       self.assertRaises(IndexError, queue.GetNextUpdate)
1553       result = jqueue._JobProcessor(queue, opexec, job)()
1554       self.assertEqual(queue.GetNextUpdate(), (job, True))
1555       self.assertRaises(IndexError, queue.GetNextUpdate)
1556
1557       if remaining == 0:
1558         # Last opcode
1559         self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1560         break
1561
1562       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1563
1564       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1565
1566     self.assertRaises(IndexError, queue.GetNextUpdate)
1567
1568     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1569     self.assertEqual(job.GetInfo(["opresult"]),
1570                      [[op.input.result for op in job.ops]])
1571
1572     logmsgcount = sum(len(m) for m in messages.values())
1573
1574     self._CheckLogMessages(job, logmsgcount)
1575
1576     # Serialize and restore (simulates program restart)
1577     newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1578     self._CheckLogMessages(newjob, logmsgcount)
1579
1580     # Check each message
1581     prevserial = -1
1582     for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1583       for (serial, timestamp, log_type, msg) in oplog:
1584         (exptype, expmsg) = messages.get(idx).pop(0)
1585         if exptype:
1586           self.assertEqual(log_type, exptype)
1587         else:
1588           self.assertEqual(log_type, constants.ELOG_MESSAGE)
1589         self.assertEqual(expmsg, msg)
1590         self.assert_(serial > prevserial)
1591         prevserial = serial
1592
1593   def _CheckLogMessages(self, job, count):
1594     # Check serial
1595     self.assertEqual(job.log_serial, count)
1596
1597     # No filter
1598     self.assertEqual(job.GetLogEntries(None),
1599                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1600                       for entry in entries])
1601
1602     # Filter with serial
1603     assert count > 3
1604     self.assert_(job.GetLogEntries(3))
1605     self.assertEqual(job.GetLogEntries(3),
1606                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1607                       for entry in entries][3:])
1608
1609     # No log message after highest serial
1610     self.assertFalse(job.GetLogEntries(count))
1611     self.assertFalse(job.GetLogEntries(count + 3))
1612
1613   def testSubmitManyJobs(self):
1614     queue = _FakeQueueForProc()
1615
1616     job_id = 15656
1617     ops = [
1618       opcodes.OpTestDummy(result="Res0", fail=False,
1619                           submit_jobs=[]),
1620       opcodes.OpTestDummy(result="Res1", fail=False,
1621                           submit_jobs=[
1622                             [opcodes.OpTestDummy(result="r1j0", fail=False)],
1623                             ]),
1624       opcodes.OpTestDummy(result="Res2", fail=False,
1625                           submit_jobs=[
1626                             [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1627                              opcodes.OpTestDummy(result="r2j0o1", fail=False),
1628                              opcodes.OpTestDummy(result="r2j0o2", fail=False),
1629                              opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1630                             [opcodes.OpTestDummy(result="r2j1", fail=False)],
1631                             [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1632                              opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1633                             ]),
1634       ]
1635
1636     # Create job
1637     job = self._CreateJob(queue, job_id, ops)
1638
1639     def _BeforeStart(timeout, priority):
1640       self.assertEqual(queue.GetNextUpdate(), (job, True))
1641       self.assertRaises(IndexError, queue.GetNextUpdate)
1642       self.assertFalse(queue.IsAcquired())
1643       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1644       self.assertFalse(job.cur_opctx)
1645
1646     def _AfterStart(op, cbs):
1647       self.assertEqual(queue.GetNextUpdate(), (job, True))
1648       self.assertRaises(IndexError, queue.GetNextUpdate)
1649
1650       self.assertFalse(queue.IsAcquired())
1651       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1652       self.assertFalse(job.cur_opctx)
1653
1654       # Job is running, cancelling shouldn't be possible
1655       (success, _) = job.Cancel()
1656       self.assertFalse(success)
1657
1658     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1659
1660     for idx in range(len(ops)):
1661       self.assertRaises(IndexError, queue.GetNextUpdate)
1662       result = jqueue._JobProcessor(queue, opexec, job)()
1663       self.assertEqual(queue.GetNextUpdate(), (job, True))
1664       self.assertRaises(IndexError, queue.GetNextUpdate)
1665       if idx == len(ops) - 1:
1666         # Last opcode
1667         self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1668       else:
1669         self.assertEqual(result, jqueue._JobProcessor.DEFER)
1670
1671         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1672         self.assert_(job.start_timestamp)
1673         self.assertFalse(job.end_timestamp)
1674
1675     self.assertRaises(IndexError, queue.GetNextUpdate)
1676
1677     for idx, submitted_ops in enumerate(job_ops
1678                                         for op in ops
1679                                         for job_ops in op.submit_jobs):
1680       self.assertEqual(queue.GetNextSubmittedJob(),
1681                        (1000 + idx, submitted_ops))
1682     self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1683
1684     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1685     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1686     self.assertEqual(job.GetInfo(["opresult"]),
1687                      [[[], [1000], [1001, 1002, 1003]]])
1688     self.assertEqual(job.GetInfo(["opstatus"]),
1689                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1690
1691     self._GenericCheckJob(job)
1692
1693     # Calling the processor on a finished job should be a no-op
1694     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1695                      jqueue._JobProcessor.FINISHED)
1696     self.assertRaises(IndexError, queue.GetNextUpdate)
1697
1698   def testJobDependency(self):
1699     depmgr = _FakeDependencyManager()
1700     queue = _FakeQueueForProc(depmgr=depmgr)
1701
1702     self.assertEqual(queue.depmgr, depmgr)
1703
1704     prev_job_id = 22113
1705     prev_job_id2 = 28102
1706     job_id = 29929
1707     ops = [
1708       opcodes.OpTestDummy(result="Res0", fail=False,
1709                           depends=[
1710                             [prev_job_id2, None],
1711                             [prev_job_id, None],
1712                             ]),
1713       opcodes.OpTestDummy(result="Res1", fail=False),
1714       ]
1715
1716     # Create job
1717     job = self._CreateJob(queue, job_id, ops)
1718
1719     def _BeforeStart(timeout, priority):
1720       if attempt == 0 or attempt > 5:
1721         # Job should only be updated when it wasn't waiting for another job
1722         self.assertEqual(queue.GetNextUpdate(), (job, True))
1723       self.assertRaises(IndexError, queue.GetNextUpdate)
1724       self.assertFalse(queue.IsAcquired())
1725       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1726       self.assertFalse(job.cur_opctx)
1727
1728     def _AfterStart(op, cbs):
1729       self.assertEqual(queue.GetNextUpdate(), (job, True))
1730       self.assertRaises(IndexError, queue.GetNextUpdate)
1731
1732       self.assertFalse(queue.IsAcquired())
1733       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1734       self.assertFalse(job.cur_opctx)
1735
1736       # Job is running, cancelling shouldn't be possible
1737       (success, _) = job.Cancel()
1738       self.assertFalse(success)
1739
1740     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1741
1742     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1743
1744     counter = itertools.count()
1745     while True:
1746       attempt = counter.next()
1747
1748       self.assertRaises(IndexError, queue.GetNextUpdate)
1749       self.assertRaises(IndexError, depmgr.GetNextNotification)
1750
1751       if attempt < 2:
1752         depmgr.AddCheckResult(job, prev_job_id2, None,
1753                               (jqueue._JobDependencyManager.WAIT, "wait2"))
1754       elif attempt == 2:
1755         depmgr.AddCheckResult(job, prev_job_id2, None,
1756                               (jqueue._JobDependencyManager.CONTINUE, "cont"))
1757         # The processor will ask for the next dependency immediately
1758         depmgr.AddCheckResult(job, prev_job_id, None,
1759                               (jqueue._JobDependencyManager.WAIT, "wait"))
1760       elif attempt < 5:
1761         depmgr.AddCheckResult(job, prev_job_id, None,
1762                               (jqueue._JobDependencyManager.WAIT, "wait"))
1763       elif attempt == 5:
1764         depmgr.AddCheckResult(job, prev_job_id, None,
1765                               (jqueue._JobDependencyManager.CONTINUE, "cont"))
1766       if attempt == 2:
1767         self.assertEqual(depmgr.CountPendingResults(), 2)
1768       elif attempt > 5:
1769         self.assertEqual(depmgr.CountPendingResults(), 0)
1770       else:
1771         self.assertEqual(depmgr.CountPendingResults(), 1)
1772
1773       result = jqueue._JobProcessor(queue, opexec, job)()
1774       if attempt == 0 or attempt >= 5:
1775         # Job should only be updated if there was an actual change
1776         self.assertEqual(queue.GetNextUpdate(), (job, True))
1777       self.assertRaises(IndexError, queue.GetNextUpdate)
1778       self.assertFalse(depmgr.CountPendingResults())
1779
1780       if attempt < 5:
1781         # Simulate waiting for other job
1782         self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1783         self.assertTrue(job.cur_opctx)
1784         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1785         self.assertRaises(IndexError, depmgr.GetNextNotification)
1786         self.assert_(job.start_timestamp)
1787         self.assertFalse(job.end_timestamp)
1788         continue
1789
1790       if result == jqueue._JobProcessor.FINISHED:
1791         # Last opcode
1792         self.assertFalse(job.cur_opctx)
1793         break
1794
1795       self.assertRaises(IndexError, depmgr.GetNextNotification)
1796
1797       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1798       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1799       self.assert_(job.start_timestamp)
1800       self.assertFalse(job.end_timestamp)
1801
1802     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1803     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1804     self.assertEqual(job.GetInfo(["opresult"]),
1805                      [[op.input.result for op in job.ops]])
1806     self.assertEqual(job.GetInfo(["opstatus"]),
1807                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1808     self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1809                                for op in job.ops))
1810
1811     self._GenericCheckJob(job)
1812
1813     self.assertRaises(IndexError, queue.GetNextUpdate)
1814     self.assertRaises(IndexError, depmgr.GetNextNotification)
1815     self.assertFalse(depmgr.CountPendingResults())
1816     self.assertFalse(depmgr.CountWaitingJobs())
1817
1818     # Calling the processor on a finished job should be a no-op
1819     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1820                      jqueue._JobProcessor.FINISHED)
1821     self.assertRaises(IndexError, queue.GetNextUpdate)
1822
1823   def testJobDependencyCancel(self):
1824     depmgr = _FakeDependencyManager()
1825     queue = _FakeQueueForProc(depmgr=depmgr)
1826
1827     self.assertEqual(queue.depmgr, depmgr)
1828
1829     prev_job_id = 13623
1830     job_id = 30876
1831     ops = [
1832       opcodes.OpTestDummy(result="Res0", fail=False),
1833       opcodes.OpTestDummy(result="Res1", fail=False,
1834                           depends=[
1835                             [prev_job_id, None],
1836                             ]),
1837       opcodes.OpTestDummy(result="Res2", fail=False),
1838       ]
1839
1840     # Create job
1841     job = self._CreateJob(queue, job_id, ops)
1842
1843     def _BeforeStart(timeout, priority):
1844       if attempt == 0 or attempt > 5:
1845         # Job should only be updated when it wasn't waiting for another job
1846         self.assertEqual(queue.GetNextUpdate(), (job, True))
1847       self.assertRaises(IndexError, queue.GetNextUpdate)
1848       self.assertFalse(queue.IsAcquired())
1849       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1850       self.assertFalse(job.cur_opctx)
1851
1852     def _AfterStart(op, cbs):
1853       self.assertEqual(queue.GetNextUpdate(), (job, True))
1854       self.assertRaises(IndexError, queue.GetNextUpdate)
1855
1856       self.assertFalse(queue.IsAcquired())
1857       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1858       self.assertFalse(job.cur_opctx)
1859
1860       # Job is running, cancelling shouldn't be possible
1861       (success, _) = job.Cancel()
1862       self.assertFalse(success)
1863
1864     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1865
1866     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1867
1868     counter = itertools.count()
1869     while True:
1870       attempt = counter.next()
1871
1872       self.assertRaises(IndexError, queue.GetNextUpdate)
1873       self.assertRaises(IndexError, depmgr.GetNextNotification)
1874
1875       if attempt == 0:
1876         # This will handle the first opcode
1877         pass
1878       elif attempt < 4:
1879         depmgr.AddCheckResult(job, prev_job_id, None,
1880                               (jqueue._JobDependencyManager.WAIT, "wait"))
1881       elif attempt == 4:
1882         # Other job was cancelled
1883         depmgr.AddCheckResult(job, prev_job_id, None,
1884                               (jqueue._JobDependencyManager.CANCEL, "cancel"))
1885
1886       if attempt == 0:
1887         self.assertEqual(depmgr.CountPendingResults(), 0)
1888       else:
1889         self.assertEqual(depmgr.CountPendingResults(), 1)
1890
1891       result = jqueue._JobProcessor(queue, opexec, job)()
1892       if attempt <= 1 or attempt >= 4:
1893         # Job should only be updated if there was an actual change
1894         self.assertEqual(queue.GetNextUpdate(), (job, True))
1895       self.assertRaises(IndexError, queue.GetNextUpdate)
1896       self.assertFalse(depmgr.CountPendingResults())
1897
1898       if attempt > 0 and attempt < 4:
1899         # Simulate waiting for other job
1900         self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1901         self.assertTrue(job.cur_opctx)
1902         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1903         self.assertRaises(IndexError, depmgr.GetNextNotification)
1904         self.assert_(job.start_timestamp)
1905         self.assertFalse(job.end_timestamp)
1906         continue
1907
1908       if result == jqueue._JobProcessor.FINISHED:
1909         # Last opcode
1910         self.assertFalse(job.cur_opctx)
1911         break
1912
1913       self.assertRaises(IndexError, depmgr.GetNextNotification)
1914
1915       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1916       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1917       self.assert_(job.start_timestamp)
1918       self.assertFalse(job.end_timestamp)
1919
1920     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1921     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1922     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1923                      [[constants.OP_STATUS_SUCCESS,
1924                        constants.OP_STATUS_CANCELED,
1925                        constants.OP_STATUS_CANCELED],
1926                       ["Res0", "Job canceled by request",
1927                        "Job canceled by request"]])
1928
1929     self._GenericCheckJob(job)
1930
1931     self.assertRaises(IndexError, queue.GetNextUpdate)
1932     self.assertRaises(IndexError, depmgr.GetNextNotification)
1933     self.assertFalse(depmgr.CountPendingResults())
1934
1935     # Calling the processor on a finished job should be a no-op
1936     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1937                      jqueue._JobProcessor.FINISHED)
1938     self.assertRaises(IndexError, queue.GetNextUpdate)
1939
1940   def testJobDependencyWrongstatus(self):
1941     depmgr = _FakeDependencyManager()
1942     queue = _FakeQueueForProc(depmgr=depmgr)
1943
1944     self.assertEqual(queue.depmgr, depmgr)
1945
1946     prev_job_id = 9741
1947     job_id = 11763
1948     ops = [
1949       opcodes.OpTestDummy(result="Res0", fail=False),
1950       opcodes.OpTestDummy(result="Res1", fail=False,
1951                           depends=[
1952                             [prev_job_id, None],
1953                             ]),
1954       opcodes.OpTestDummy(result="Res2", fail=False),
1955       ]
1956
1957     # Create job
1958     job = self._CreateJob(queue, job_id, ops)
1959
1960     def _BeforeStart(timeout, priority):
1961       if attempt == 0 or attempt > 5:
1962         # Job should only be updated when it wasn't waiting for another job
1963         self.assertEqual(queue.GetNextUpdate(), (job, True))
1964       self.assertRaises(IndexError, queue.GetNextUpdate)
1965       self.assertFalse(queue.IsAcquired())
1966       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1967       self.assertFalse(job.cur_opctx)
1968
1969     def _AfterStart(op, cbs):
1970       self.assertEqual(queue.GetNextUpdate(), (job, True))
1971       self.assertRaises(IndexError, queue.GetNextUpdate)
1972
1973       self.assertFalse(queue.IsAcquired())
1974       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1975       self.assertFalse(job.cur_opctx)
1976
1977       # Job is running, cancelling shouldn't be possible
1978       (success, _) = job.Cancel()
1979       self.assertFalse(success)
1980
1981     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1982
1983     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1984
1985     counter = itertools.count()
1986     while True:
1987       attempt = counter.next()
1988
1989       self.assertRaises(IndexError, queue.GetNextUpdate)
1990       self.assertRaises(IndexError, depmgr.GetNextNotification)
1991
1992       if attempt == 0:
1993         # This will handle the first opcode
1994         pass
1995       elif attempt < 4:
1996         depmgr.AddCheckResult(job, prev_job_id, None,
1997                               (jqueue._JobDependencyManager.WAIT, "wait"))
1998       elif attempt == 4:
1999         # Other job failed
2000         depmgr.AddCheckResult(job, prev_job_id, None,
2001                               (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
2002
2003       if attempt == 0:
2004         self.assertEqual(depmgr.CountPendingResults(), 0)
2005       else:
2006         self.assertEqual(depmgr.CountPendingResults(), 1)
2007
2008       result = jqueue._JobProcessor(queue, opexec, job)()
2009       if attempt <= 1 or attempt >= 4:
2010         # Job should only be updated if there was an actual change
2011         self.assertEqual(queue.GetNextUpdate(), (job, True))
2012       self.assertRaises(IndexError, queue.GetNextUpdate)
2013       self.assertFalse(depmgr.CountPendingResults())
2014
2015       if attempt > 0 and attempt < 4:
2016         # Simulate waiting for other job
2017         self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
2018         self.assertTrue(job.cur_opctx)
2019         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2020         self.assertRaises(IndexError, depmgr.GetNextNotification)
2021         self.assert_(job.start_timestamp)
2022         self.assertFalse(job.end_timestamp)
2023         continue
2024
2025       if result == jqueue._JobProcessor.FINISHED:
2026         # Last opcode
2027         self.assertFalse(job.cur_opctx)
2028         break
2029
2030       self.assertRaises(IndexError, depmgr.GetNextNotification)
2031
2032       self.assertEqual(result, jqueue._JobProcessor.DEFER)
2033       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2034       self.assert_(job.start_timestamp)
2035       self.assertFalse(job.end_timestamp)
2036
2037     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
2038     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
2039     self.assertEqual(job.GetInfo(["opstatus"]),
2040                      [[constants.OP_STATUS_SUCCESS,
2041                        constants.OP_STATUS_ERROR,
2042                        constants.OP_STATUS_ERROR]]),
2043
2044     (opresult, ) = job.GetInfo(["opresult"])
2045     self.assertEqual(len(opresult), len(ops))
2046     self.assertEqual(opresult[0], "Res0")
2047     self.assertTrue(errors.GetEncodedError(opresult[1]))
2048     self.assertTrue(errors.GetEncodedError(opresult[2]))
2049
2050     self._GenericCheckJob(job)
2051
2052     self.assertRaises(IndexError, queue.GetNextUpdate)
2053     self.assertRaises(IndexError, depmgr.GetNextNotification)
2054     self.assertFalse(depmgr.CountPendingResults())
2055
2056     # Calling the processor on a finished job should be a no-op
2057     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
2058                      jqueue._JobProcessor.FINISHED)
2059     self.assertRaises(IndexError, queue.GetNextUpdate)
2060
2061
2062 class TestEvaluateJobProcessorResult(unittest.TestCase):
2063   def testFinished(self):
2064     depmgr = _FakeDependencyManager()
2065     job = _IdOnlyFakeJob(30953)
2066     jqueue._EvaluateJobProcessorResult(depmgr, job,
2067                                        jqueue._JobProcessor.FINISHED)
2068     self.assertEqual(depmgr.GetNextNotification(), job.id)
2069     self.assertRaises(IndexError, depmgr.GetNextNotification)
2070
2071   def testDefer(self):
2072     depmgr = _FakeDependencyManager()
2073     job = _IdOnlyFakeJob(11326, priority=5463)
2074     try:
2075       jqueue._EvaluateJobProcessorResult(depmgr, job,
2076                                          jqueue._JobProcessor.DEFER)
2077     except workerpool.DeferTask, err:
2078       self.assertEqual(err.priority, 5463)
2079     else:
2080       self.fail("Didn't raise exception")
2081     self.assertRaises(IndexError, depmgr.GetNextNotification)
2082
2083   def testWaitdep(self):
2084     depmgr = _FakeDependencyManager()
2085     job = _IdOnlyFakeJob(21317)
2086     jqueue._EvaluateJobProcessorResult(depmgr, job,
2087                                        jqueue._JobProcessor.WAITDEP)
2088     self.assertRaises(IndexError, depmgr.GetNextNotification)
2089
2090   def testOther(self):
2091     depmgr = _FakeDependencyManager()
2092     job = _IdOnlyFakeJob(5813)
2093     self.assertRaises(errors.ProgrammerError,
2094                       jqueue._EvaluateJobProcessorResult,
2095                       depmgr, job, "Other result")
2096     self.assertRaises(IndexError, depmgr.GetNextNotification)
2097
2098
2099 class _FakeTimeoutStrategy:
2100   def __init__(self, timeouts):
2101     self.timeouts = timeouts
2102     self.attempts = 0
2103     self.last_timeout = None
2104
2105   def NextAttempt(self):
2106     self.attempts += 1
2107     if self.timeouts:
2108       timeout = self.timeouts.pop(0)
2109     else:
2110       timeout = None
2111     self.last_timeout = timeout
2112     return timeout
2113
2114
2115 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
2116   def setUp(self):
2117     self.queue = _FakeQueueForProc()
2118     self.job = None
2119     self.curop = None
2120     self.opcounter = None
2121     self.timeout_strategy = None
2122     self.retries = 0
2123     self.prev_tsop = None
2124     self.prev_prio = None
2125     self.prev_status = None
2126     self.lock_acq_prio = None
2127     self.gave_lock = None
2128     self.done_lock_before_blocking = False
2129
2130   def _BeforeStart(self, timeout, priority):
2131     job = self.job
2132
2133     # If status has changed, job must've been written
2134     if self.prev_status != self.job.ops[self.curop].status:
2135       self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2136     self.assertRaises(IndexError, self.queue.GetNextUpdate)
2137
2138     self.assertFalse(self.queue.IsAcquired())
2139     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2140
2141     ts = self.timeout_strategy
2142
2143     self.assert_(timeout is None or isinstance(timeout, (int, float)))
2144     self.assertEqual(timeout, ts.last_timeout)
2145     self.assertEqual(priority, job.ops[self.curop].priority)
2146
2147     self.gave_lock = True
2148     self.lock_acq_prio = priority
2149
2150     if (self.curop == 3 and
2151         job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
2152       # Give locks before running into blocking acquire
2153       assert self.retries == 7
2154       self.retries = 0
2155       self.done_lock_before_blocking = True
2156       return
2157
2158     if self.retries > 0:
2159       self.assert_(timeout is not None)
2160       self.retries -= 1
2161       self.gave_lock = False
2162       raise mcpu.LockAcquireTimeout()
2163
2164     if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
2165       assert self.retries == 0, "Didn't exhaust all retries at highest priority"
2166       assert not ts.timeouts
2167       self.assert_(timeout is None)
2168
2169   def _AfterStart(self, op, cbs):
2170     job = self.job
2171
2172     # Setting to "running" requires an update
2173     self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2174     self.assertRaises(IndexError, self.queue.GetNextUpdate)
2175
2176     self.assertFalse(self.queue.IsAcquired())
2177     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
2178
2179     # Job is running, cancelling shouldn't be possible
2180     (success, _) = job.Cancel()
2181     self.assertFalse(success)
2182
2183   def _NextOpcode(self):
2184     self.curop = self.opcounter.next()
2185     self.prev_prio = self.job.ops[self.curop].priority
2186     self.prev_status = self.job.ops[self.curop].status
2187
2188   def _NewTimeoutStrategy(self):
2189     job = self.job
2190
2191     self.assertEqual(self.retries, 0)
2192
2193     if self.prev_tsop == self.curop:
2194       # Still on the same opcode, priority must've been increased
2195       self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
2196
2197     if self.curop == 1:
2198       # Normal retry
2199       timeouts = range(10, 31, 10)
2200       self.retries = len(timeouts) - 1
2201
2202     elif self.curop == 2:
2203       # Let this run into a blocking acquire
2204       timeouts = range(11, 61, 12)
2205       self.retries = len(timeouts)
2206
2207     elif self.curop == 3:
2208       # Wait for priority to increase, but give lock before blocking acquire
2209       timeouts = range(12, 100, 14)
2210       self.retries = len(timeouts)
2211
2212       self.assertFalse(self.done_lock_before_blocking)
2213
2214     elif self.curop == 4:
2215       self.assert_(self.done_lock_before_blocking)
2216
2217       # Timeouts, but no need to retry
2218       timeouts = range(10, 31, 10)
2219       self.retries = 0
2220
2221     elif self.curop == 5:
2222       # Normal retry
2223       timeouts = range(19, 100, 11)
2224       self.retries = len(timeouts)
2225
2226     else:
2227       timeouts = []
2228       self.retries = 0
2229
2230     assert len(job.ops) == 10
2231     assert self.retries <= len(timeouts)
2232
2233     ts = _FakeTimeoutStrategy(timeouts)
2234
2235     self.timeout_strategy = ts
2236     self.prev_tsop = self.curop
2237     self.prev_prio = job.ops[self.curop].priority
2238
2239     return ts
2240
2241   def testTimeout(self):
2242     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2243            for i in range(10)]
2244
2245     # Create job
2246     job_id = 15801
2247     job = self._CreateJob(self.queue, job_id, ops)
2248     self.job = job
2249
2250     self.opcounter = itertools.count(0)
2251
2252     opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
2253                                     self._AfterStart)
2254     tsf = self._NewTimeoutStrategy
2255
2256     self.assertFalse(self.done_lock_before_blocking)
2257
2258     while True:
2259       proc = jqueue._JobProcessor(self.queue, opexec, job,
2260                                   _timeout_strategy_factory=tsf)
2261
2262       self.assertRaises(IndexError, self.queue.GetNextUpdate)
2263
2264       if self.curop is not None:
2265         self.prev_status = self.job.ops[self.curop].status
2266
2267       self.lock_acq_prio = None
2268
2269       result = proc(_nextop_fn=self._NextOpcode)
2270       assert self.curop is not None
2271
2272       # Input priority should never be set or modified
2273       self.assertFalse(compat.any(hasattr(op.input, "priority")
2274                                   for op in job.ops))
2275
2276       if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
2277         # Got lock and/or job is done, result must've been written
2278         self.assertFalse(job.cur_opctx)
2279         self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2280         self.assertRaises(IndexError, self.queue.GetNextUpdate)
2281         self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
2282         self.assert_(job.ops[self.curop].exec_timestamp)
2283
2284       if result == jqueue._JobProcessor.FINISHED:
2285         self.assertFalse(job.cur_opctx)
2286         break
2287
2288       self.assertEqual(result, jqueue._JobProcessor.DEFER)
2289
2290       if self.curop == 0:
2291         self.assertEqual(job.ops[self.curop].start_timestamp,
2292                          job.start_timestamp)
2293
2294       if self.gave_lock:
2295         # Opcode finished, but job not yet done
2296         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2297       else:
2298         # Did not get locks
2299         self.assert_(job.cur_opctx)
2300         self.assertEqual(job.cur_opctx._timeout_strategy._fn,
2301                          self.timeout_strategy.NextAttempt)
2302         self.assertFalse(job.ops[self.curop].exec_timestamp)
2303         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2304
2305         # If priority has changed since acquiring locks, the job must've been
2306         # updated
2307         if self.lock_acq_prio != job.ops[self.curop].priority:
2308           self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2309
2310       self.assertRaises(IndexError, self.queue.GetNextUpdate)
2311
2312       self.assert_(job.start_timestamp)
2313       self.assertFalse(job.end_timestamp)
2314
2315     self.assertEqual(self.curop, len(job.ops) - 1)
2316     self.assertEqual(self.job, job)
2317     self.assertEqual(self.opcounter.next(), len(job.ops))
2318     self.assert_(self.done_lock_before_blocking)
2319
2320     self.assertRaises(IndexError, self.queue.GetNextUpdate)
2321     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2322     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2323     self.assertEqual(job.GetInfo(["opresult"]),
2324                      [[op.input.result for op in job.ops]])
2325     self.assertEqual(job.GetInfo(["opstatus"]),
2326                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
2327     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
2328                             for op in job.ops))
2329
2330     # Calling the processor on a finished job should be a no-op
2331     self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2332                      jqueue._JobProcessor.FINISHED)
2333     self.assertRaises(IndexError, self.queue.GetNextUpdate)
2334
2335
2336 class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
2337   def setUp(self):
2338     self.queue = _FakeQueueForProc()
2339     self.opexecprio = []
2340
2341   def _BeforeStart(self, timeout, priority):
2342     self.assertFalse(self.queue.IsAcquired())
2343     self.opexecprio.append(priority)
2344
2345   def testChangePriorityWhileRunning(self):
2346     # Tests changing the priority on a job while it has finished opcodes
2347     # (successful) and more, unprocessed ones
2348     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2349            for i in range(3)]
2350
2351     # Create job
2352     job_id = 3499
2353     job = self._CreateJob(self.queue, job_id, ops)
2354
2355     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2356
2357     opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
2358
2359     # Run first opcode
2360     self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2361                      jqueue._JobProcessor.DEFER)
2362
2363     # Job goes back to queued
2364     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2365     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2366     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2367                      [[constants.OP_STATUS_SUCCESS,
2368                        constants.OP_STATUS_QUEUED,
2369                        constants.OP_STATUS_QUEUED],
2370                       ["Res0", None, None]])
2371
2372     self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
2373     self.assertRaises(IndexError, self.opexecprio.pop, 0)
2374
2375     # Change priority
2376     self.assertEqual(job.ChangePriority(-10),
2377                      (True,
2378                       ("Priorities of pending opcodes for job 3499 have"
2379                        " been changed to -10")))
2380     self.assertEqual(job.CalcPriority(), -10)
2381
2382     # Process second opcode
2383     self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2384                      jqueue._JobProcessor.DEFER)
2385
2386     self.assertEqual(self.opexecprio.pop(0), -10)
2387     self.assertRaises(IndexError, self.opexecprio.pop, 0)
2388
2389     # Check status
2390     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2391     self.assertEqual(job.CalcPriority(), -10)
2392     self.assertEqual(job.GetInfo(["id"]), [job_id])
2393     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
2394     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2395                      [[constants.OP_STATUS_SUCCESS,
2396                        constants.OP_STATUS_SUCCESS,
2397                        constants.OP_STATUS_QUEUED],
2398                       ["Res0", "Res1", None]])
2399
2400     # Change priority once more
2401     self.assertEqual(job.ChangePriority(5),
2402                      (True,
2403                       ("Priorities of pending opcodes for job 3499 have"
2404                        " been changed to 5")))
2405     self.assertEqual(job.CalcPriority(), 5)
2406
2407     # Process third opcode
2408     self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2409                      jqueue._JobProcessor.FINISHED)
2410
2411     self.assertEqual(self.opexecprio.pop(0), 5)
2412     self.assertRaises(IndexError, self.opexecprio.pop, 0)
2413
2414     # Check status
2415     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2416     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2417     self.assertEqual(job.GetInfo(["id"]), [job_id])
2418     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2419     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2420                      [[constants.OP_STATUS_SUCCESS,
2421                        constants.OP_STATUS_SUCCESS,
2422                        constants.OP_STATUS_SUCCESS],
2423                       ["Res0", "Res1", "Res2"]])
2424     self.assertEqual(map(operator.attrgetter("priority"), job.ops),
2425                      [constants.OP_PRIO_DEFAULT, -10, 5])
2426
2427
2428 class _IdOnlyFakeJob:
2429   def __init__(self, job_id, priority=NotImplemented):
2430     self.id = str(job_id)
2431     self._priority = priority
2432
2433   def CalcPriority(self):
2434     return self._priority
2435
2436
2437 class TestJobDependencyManager(unittest.TestCase):
2438   def setUp(self):
2439     self._status = []
2440     self._queue = []
2441     self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
2442
2443   def _GetStatus(self, job_id):
2444     (exp_job_id, result) = self._status.pop(0)
2445     self.assertEqual(exp_job_id, job_id)
2446     return result
2447
2448   def _Enqueue(self, jobs):
2449     self.assertFalse(self.jdm._lock.is_owned(),
2450                      msg=("Must not own manager lock while re-adding jobs"
2451                           " (potential deadlock)"))
2452     self._queue.append(jobs)
2453
2454   def testNotFinalizedThenCancel(self):
2455     job = _IdOnlyFakeJob(17697)
2456     job_id = str(28625)
2457
2458     self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2459     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2460     self.assertEqual(result, self.jdm.WAIT)
2461     self.assertFalse(self._status)
2462     self.assertFalse(self._queue)
2463     self.assertTrue(self.jdm.JobWaiting(job))
2464     self.assertEqual(self.jdm._waiters, {
2465       job_id: set([job]),
2466       })
2467     self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2468       ("job/28625", None, None, [("job", [job.id])])
2469       ])
2470
2471     self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2472     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2473     self.assertEqual(result, self.jdm.CANCEL)
2474     self.assertFalse(self._status)
2475     self.assertFalse(self._queue)
2476     self.assertFalse(self.jdm.JobWaiting(job))
2477     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2478
2479   def testNotFinalizedThenQueued(self):
2480     # This can happen on a queue shutdown
2481     job = _IdOnlyFakeJob(1320)
2482     job_id = str(22971)
2483
2484     for i in range(5):
2485       if i > 2:
2486         self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2487       else:
2488         self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2489       (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2490       self.assertEqual(result, self.jdm.WAIT)
2491       self.assertFalse(self._status)
2492       self.assertFalse(self._queue)
2493       self.assertTrue(self.jdm.JobWaiting(job))
2494       self.assertEqual(self.jdm._waiters, {
2495         job_id: set([job]),
2496         })
2497       self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2498         ("job/22971", None, None, [("job", [job.id])])
2499         ])
2500
2501   def testRequireCancel(self):
2502     job = _IdOnlyFakeJob(5278)
2503     job_id = str(9610)
2504     dep_status = [constants.JOB_STATUS_CANCELED]
2505
2506     self._status.append((job_id, constants.JOB_STATUS_WAITING))
2507     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2508     self.assertEqual(result, self.jdm.WAIT)
2509     self.assertFalse(self._status)
2510     self.assertFalse(self._queue)
2511     self.assertTrue(self.jdm.JobWaiting(job))
2512     self.assertEqual(self.jdm._waiters, {
2513       job_id: set([job]),
2514       })
2515     self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2516       ("job/9610", None, None, [("job", [job.id])])
2517       ])
2518
2519     self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2520     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2521     self.assertEqual(result, self.jdm.CONTINUE)
2522     self.assertFalse(self._status)
2523     self.assertFalse(self._queue)
2524     self.assertFalse(self.jdm.JobWaiting(job))
2525     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2526
2527   def testRequireError(self):
2528     job = _IdOnlyFakeJob(21459)
2529     job_id = str(25519)
2530     dep_status = [constants.JOB_STATUS_ERROR]
2531
2532     self._status.append((job_id, constants.JOB_STATUS_WAITING))
2533     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2534     self.assertEqual(result, self.jdm.WAIT)
2535     self.assertFalse(self._status)
2536     self.assertFalse(self._queue)
2537     self.assertTrue(self.jdm.JobWaiting(job))
2538     self.assertEqual(self.jdm._waiters, {
2539       job_id: set([job]),
2540       })
2541
2542     self._status.append((job_id, constants.JOB_STATUS_ERROR))
2543     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2544     self.assertEqual(result, self.jdm.CONTINUE)
2545     self.assertFalse(self._status)
2546     self.assertFalse(self._queue)
2547     self.assertFalse(self.jdm.JobWaiting(job))
2548     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2549
2550   def testRequireMultiple(self):
2551     dep_status = list(constants.JOBS_FINALIZED)
2552
2553     for end_status in dep_status:
2554       job = _IdOnlyFakeJob(21343)
2555       job_id = str(14609)
2556
2557       self._status.append((job_id, constants.JOB_STATUS_WAITING))
2558       (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2559       self.assertEqual(result, self.jdm.WAIT)
2560       self.assertFalse(self._status)
2561       self.assertFalse(self._queue)
2562       self.assertTrue(self.jdm.JobWaiting(job))
2563       self.assertEqual(self.jdm._waiters, {
2564         job_id: set([job]),
2565         })
2566       self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2567         ("job/14609", None, None, [("job", [job.id])])
2568         ])
2569
2570       self._status.append((job_id, end_status))
2571       (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2572       self.assertEqual(result, self.jdm.CONTINUE)
2573       self.assertFalse(self._status)
2574       self.assertFalse(self._queue)
2575       self.assertFalse(self.jdm.JobWaiting(job))
2576       self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2577
2578   def testNotify(self):
2579     job = _IdOnlyFakeJob(8227)
2580     job_id = str(4113)
2581
2582     self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2583     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2584     self.assertEqual(result, self.jdm.WAIT)
2585     self.assertFalse(self._status)
2586     self.assertFalse(self._queue)
2587     self.assertTrue(self.jdm.JobWaiting(job))
2588     self.assertEqual(self.jdm._waiters, {
2589       job_id: set([job]),
2590       })
2591
2592     self.jdm.NotifyWaiters(job_id)
2593     self.assertFalse(self._status)
2594     self.assertFalse(self.jdm._waiters)
2595     self.assertFalse(self.jdm.JobWaiting(job))
2596     self.assertEqual(self._queue, [set([job])])
2597
2598   def testWrongStatus(self):
2599     job = _IdOnlyFakeJob(10102)
2600     job_id = str(1271)
2601
2602     self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2603     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2604                                             [constants.JOB_STATUS_SUCCESS])
2605     self.assertEqual(result, self.jdm.WAIT)
2606     self.assertFalse(self._status)
2607     self.assertFalse(self._queue)
2608     self.assertTrue(self.jdm.JobWaiting(job))
2609     self.assertEqual(self.jdm._waiters, {
2610       job_id: set([job]),
2611       })
2612
2613     self._status.append((job_id, constants.JOB_STATUS_ERROR))
2614     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2615                                             [constants.JOB_STATUS_SUCCESS])
2616     self.assertEqual(result, self.jdm.WRONGSTATUS)
2617     self.assertFalse(self._status)
2618     self.assertFalse(self._queue)
2619     self.assertFalse(self.jdm.JobWaiting(job))
2620
2621   def testCorrectStatus(self):
2622     job = _IdOnlyFakeJob(24273)
2623     job_id = str(23885)
2624
2625     self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2626     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2627                                             [constants.JOB_STATUS_SUCCESS])
2628     self.assertEqual(result, self.jdm.WAIT)
2629     self.assertFalse(self._status)
2630     self.assertFalse(self._queue)
2631     self.assertTrue(self.jdm.JobWaiting(job))
2632     self.assertEqual(self.jdm._waiters, {
2633       job_id: set([job]),
2634       })
2635
2636     self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2637     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2638                                             [constants.JOB_STATUS_SUCCESS])
2639     self.assertEqual(result, self.jdm.CONTINUE)
2640     self.assertFalse(self._status)
2641     self.assertFalse(self._queue)
2642     self.assertFalse(self.jdm.JobWaiting(job))
2643
2644   def testFinalizedRightAway(self):
2645     job = _IdOnlyFakeJob(224)
2646     job_id = str(3081)
2647
2648     self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2649     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2650                                             [constants.JOB_STATUS_SUCCESS])
2651     self.assertEqual(result, self.jdm.CONTINUE)
2652     self.assertFalse(self._status)
2653     self.assertFalse(self._queue)
2654     self.assertFalse(self.jdm.JobWaiting(job))
2655     self.assertEqual(self.jdm._waiters, {
2656       job_id: set(),
2657       })
2658
2659     # Force cleanup
2660     self.jdm.NotifyWaiters("0")
2661     self.assertFalse(self.jdm._waiters)
2662     self.assertFalse(self._status)
2663     self.assertFalse(self._queue)
2664
2665   def testMultipleWaiting(self):
2666     # Use a deterministic random generator
2667     rnd = random.Random(21402)
2668
2669     job_ids = map(str, rnd.sample(range(1, 10000), 150))
2670
2671     waiters = dict((job_ids.pop(),
2672                     set(map(_IdOnlyFakeJob,
2673                             [job_ids.pop()
2674                              for _ in range(rnd.randint(1, 20))])))
2675                    for _ in range(10))
2676
2677     # Ensure there are no duplicate job IDs
2678     assert not utils.FindDuplicates(waiters.keys() +
2679                                     [job.id
2680                                      for jobs in waiters.values()
2681                                      for job in jobs])
2682
2683     # Register all jobs as waiters
2684     for job_id, job in [(job_id, job)
2685                         for (job_id, jobs) in waiters.items()
2686                         for job in jobs]:
2687       self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2688       (result, _) = self.jdm.CheckAndRegister(job, job_id,
2689                                               [constants.JOB_STATUS_SUCCESS])
2690       self.assertEqual(result, self.jdm.WAIT)
2691       self.assertFalse(self._status)
2692       self.assertFalse(self._queue)
2693       self.assertTrue(self.jdm.JobWaiting(job))
2694
2695     self.assertEqual(self.jdm._waiters, waiters)
2696
2697     def _MakeSet((name, mode, owner_names, pending)):
2698       return (name, mode, owner_names,
2699               [(pendmode, set(pend)) for (pendmode, pend) in pending])
2700
2701     def _CheckLockInfo():
2702       info = self.jdm.GetLockInfo([query.LQ_PENDING])
2703       self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2704         ("job/%s" % job_id, None, None,
2705          [("job", set([job.id for job in jobs]))])
2706         for job_id, jobs in waiters.items()
2707         if jobs
2708         ]))
2709
2710     _CheckLockInfo()
2711
2712     # Notify in random order
2713     for job_id in rnd.sample(waiters, len(waiters)):
2714       # Remove from pending waiter list
2715       jobs = waiters.pop(job_id)
2716       for job in jobs:
2717         self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2718         (result, _) = self.jdm.CheckAndRegister(job, job_id,
2719                                                 [constants.JOB_STATUS_SUCCESS])
2720         self.assertEqual(result, self.jdm.CONTINUE)
2721         self.assertFalse(self._status)
2722         self.assertFalse(self._queue)
2723         self.assertFalse(self.jdm.JobWaiting(job))
2724
2725       _CheckLockInfo()
2726
2727     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2728
2729     assert not waiters
2730
2731   def testSelfDependency(self):
2732     job = _IdOnlyFakeJob(18937)
2733
2734     self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2735     (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2736     self.assertEqual(result, self.jdm.ERROR)
2737
2738   def testJobDisappears(self):
2739     job = _IdOnlyFakeJob(30540)
2740     job_id = str(23769)
2741
2742     def _FakeStatus(_):
2743       raise errors.JobLost("#msg#")
2744
2745     jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2746     (result, _) = jdm.CheckAndRegister(job, job_id, [])
2747     self.assertEqual(result, self.jdm.ERROR)
2748     self.assertFalse(jdm.JobWaiting(job))
2749     self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2750
2751
2752 if __name__ == "__main__":
2753   testutils.GanetiTestProgram()