bash_completion: Enable extglob while parsing file
[ganeti-local] / test / ganeti.jqueue_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.jqueue"""
23
24 import os
25 import sys
26 import unittest
27 import tempfile
28 import shutil
29 import errno
30 import itertools
31 import random
32
33 from ganeti import constants
34 from ganeti import utils
35 from ganeti import errors
36 from ganeti import jqueue
37 from ganeti import opcodes
38 from ganeti import compat
39 from ganeti import mcpu
40 from ganeti import query
41 from ganeti import workerpool
42
43 import testutils
44
45
46 class _FakeJob:
47   def __init__(self, job_id, status):
48     self.id = job_id
49     self.writable = False
50     self._status = status
51     self._log = []
52
53   def SetStatus(self, status):
54     self._status = status
55
56   def AddLogEntry(self, msg):
57     self._log.append((len(self._log), msg))
58
59   def CalcStatus(self):
60     return self._status
61
62   def GetInfo(self, fields):
63     result = []
64
65     for name in fields:
66       if name == "status":
67         result.append(self._status)
68       else:
69         raise Exception("Unknown field")
70
71     return result
72
73   def GetLogEntries(self, newer_than):
74     assert newer_than is None or newer_than >= 0
75
76     if newer_than is None:
77       return self._log
78
79     return self._log[newer_than:]
80
81
82 class TestJobChangesChecker(unittest.TestCase):
83   def testStatus(self):
84     job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
85     checker = jqueue._JobChangesChecker(["status"], None, None)
86     self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
87
88     job.SetStatus(constants.JOB_STATUS_RUNNING)
89     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
90
91     job.SetStatus(constants.JOB_STATUS_SUCCESS)
92     self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
93
94     # job.id is used by checker
95     self.assertEqual(job.id, 9094)
96
97   def testStatusWithPrev(self):
98     job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
99     checker = jqueue._JobChangesChecker(["status"],
100                                         [constants.JOB_STATUS_QUEUED], None)
101     self.assert_(checker(job) is None)
102
103     job.SetStatus(constants.JOB_STATUS_RUNNING)
104     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
105
106   def testFinalStatus(self):
107     for status in constants.JOBS_FINALIZED:
108       job = _FakeJob(2178711, status)
109       checker = jqueue._JobChangesChecker(["status"], [status], None)
110       # There won't be any changes in this status, hence it should signal
111       # a change immediately
112       self.assertEqual(checker(job), ([status], []))
113
114   def testLog(self):
115     job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
116     checker = jqueue._JobChangesChecker(["status"], None, None)
117     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
118
119     job.AddLogEntry("Hello World")
120     (job_info, log_entries) = checker(job)
121     self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
122     self.assertEqual(log_entries, [[0, "Hello World"]])
123
124     checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
125     self.assert_(checker2(job) is None)
126
127     job.AddLogEntry("Foo Bar")
128     job.SetStatus(constants.JOB_STATUS_ERROR)
129
130     (job_info, log_entries) = checker2(job)
131     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
132     self.assertEqual(log_entries, [[1, "Foo Bar"]])
133
134     checker3 = jqueue._JobChangesChecker(["status"], None, None)
135     (job_info, log_entries) = checker3(job)
136     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
137     self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
138
139
140 class TestJobChangesWaiter(unittest.TestCase):
141   def setUp(self):
142     self.tmpdir = tempfile.mkdtemp()
143     self.filename = utils.PathJoin(self.tmpdir, "job-1")
144     utils.WriteFile(self.filename, data="")
145
146   def tearDown(self):
147     shutil.rmtree(self.tmpdir)
148
149   def _EnsureNotifierClosed(self, notifier):
150     try:
151       os.fstat(notifier._fd)
152     except EnvironmentError, err:
153       self.assertEqual(err.errno, errno.EBADF)
154     else:
155       self.fail("File descriptor wasn't closed")
156
157   def testClose(self):
158     for wait in [False, True]:
159       waiter = jqueue._JobFileChangesWaiter(self.filename)
160       try:
161         if wait:
162           waiter.Wait(0.001)
163       finally:
164         waiter.Close()
165
166       # Ensure file descriptor was closed
167       self._EnsureNotifierClosed(waiter._notifier)
168
169   def testChangingFile(self):
170     waiter = jqueue._JobFileChangesWaiter(self.filename)
171     try:
172       self.assertFalse(waiter.Wait(0.1))
173       utils.WriteFile(self.filename, data="changed")
174       self.assert_(waiter.Wait(60))
175     finally:
176       waiter.Close()
177
178     self._EnsureNotifierClosed(waiter._notifier)
179
180   def testChangingFile2(self):
181     waiter = jqueue._JobChangesWaiter(self.filename)
182     try:
183       self.assertFalse(waiter._filewaiter)
184       self.assert_(waiter.Wait(0.1))
185       self.assert_(waiter._filewaiter)
186
187       # File waiter is now used, but there have been no changes
188       self.assertFalse(waiter.Wait(0.1))
189       utils.WriteFile(self.filename, data="changed")
190       self.assert_(waiter.Wait(60))
191     finally:
192       waiter.Close()
193
194     self._EnsureNotifierClosed(waiter._filewaiter._notifier)
195
196
197 class TestWaitForJobChangesHelper(unittest.TestCase):
198   def setUp(self):
199     self.tmpdir = tempfile.mkdtemp()
200     self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
201     utils.WriteFile(self.filename, data="")
202
203   def tearDown(self):
204     shutil.rmtree(self.tmpdir)
205
206   def _LoadWaitingJob(self):
207     return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
208
209   def _LoadLostJob(self):
210     return None
211
212   def testNoChanges(self):
213     wfjc = jqueue._WaitForJobChangesHelper()
214
215     # No change
216     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
217                           [constants.JOB_STATUS_WAITING], None, 0.1),
218                      constants.JOB_NOTCHANGED)
219
220     # No previous information
221     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
222                           ["status"], None, None, 1.0),
223                      ([constants.JOB_STATUS_WAITING], []))
224
225   def testLostJob(self):
226     wfjc = jqueue._WaitForJobChangesHelper()
227     self.assert_(wfjc(self.filename, self._LoadLostJob,
228                       ["status"], None, None, 1.0) is None)
229
230
231 class TestEncodeOpError(unittest.TestCase):
232   def test(self):
233     encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
234     self.assert_(isinstance(encerr, tuple))
235     self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
236
237     encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
238     self.assert_(isinstance(encerr, tuple))
239     self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
240
241     encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
242     self.assert_(isinstance(encerr, tuple))
243     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244
245     encerr = jqueue._EncodeOpError("Hello World")
246     self.assert_(isinstance(encerr, tuple))
247     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
248
249
250 class TestQueuedOpCode(unittest.TestCase):
251   def testDefaults(self):
252     def _Check(op):
253       self.assertFalse(hasattr(op.input, "dry_run"))
254       self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
255       self.assertFalse(op.log)
256       self.assert_(op.start_timestamp is None)
257       self.assert_(op.exec_timestamp is None)
258       self.assert_(op.end_timestamp is None)
259       self.assert_(op.result is None)
260       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
261
262     op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
263     _Check(op1)
264     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
265     _Check(op2)
266     self.assertEqual(op1.Serialize(), op2.Serialize())
267
268   def testPriority(self):
269     def _Check(op):
270       assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
271              "Default priority equals high priority; test can't work"
272       self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
273       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
274
275     inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
276     op1 = jqueue._QueuedOpCode(inpop)
277     _Check(op1)
278     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
279     _Check(op2)
280     self.assertEqual(op1.Serialize(), op2.Serialize())
281
282
283 class TestQueuedJob(unittest.TestCase):
284   def test(self):
285     self.assertRaises(errors.GenericError, jqueue._QueuedJob,
286                       None, 1, [], False)
287
288   def testDefaults(self):
289     job_id = 4260
290     ops = [
291       opcodes.OpTagsGet(),
292       opcodes.OpTestDelay(),
293       ]
294
295     def _Check(job):
296       self.assertTrue(job.writable)
297       self.assertEqual(job.id, job_id)
298       self.assertEqual(job.log_serial, 0)
299       self.assert_(job.received_timestamp)
300       self.assert_(job.start_timestamp is None)
301       self.assert_(job.end_timestamp is None)
302       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
303       self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
304       self.assert_(repr(job).startswith("<"))
305       self.assertEqual(len(job.ops), len(ops))
306       self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
307                               for (inp, op) in zip(ops, job.ops)))
308       self.assertRaises(errors.OpPrereqError, job.GetInfo,
309                         ["unknown-field"])
310       self.assertEqual(job.GetInfo(["summary"]),
311                        [[op.input.Summary() for op in job.ops]])
312
313     job1 = jqueue._QueuedJob(None, job_id, ops, True)
314     _Check(job1)
315     job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
316     _Check(job2)
317     self.assertEqual(job1.Serialize(), job2.Serialize())
318
319   def testWritable(self):
320     job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
321     self.assertFalse(job.writable)
322
323     job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
324     self.assertTrue(job.writable)
325
326   def testPriority(self):
327     job_id = 4283
328     ops = [
329       opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
330       opcodes.OpTestDelay(),
331       ]
332
333     def _Check(job):
334       self.assertEqual(job.id, job_id)
335       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
336       self.assert_(repr(job).startswith("<"))
337
338     job = jqueue._QueuedJob(None, job_id, ops, True)
339     _Check(job)
340     self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
341                             for op in job.ops))
342     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
343
344     # Increase first
345     job.ops[0].priority -= 1
346     _Check(job)
347     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
348
349     # Mark opcode as finished
350     job.ops[0].status = constants.OP_STATUS_SUCCESS
351     _Check(job)
352     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
353
354     # Increase second
355     job.ops[1].priority -= 10
356     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
357
358     # Test increasing first
359     job.ops[0].status = constants.OP_STATUS_RUNNING
360     job.ops[0].priority -= 19
361     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
362
363   def testCalcStatus(self):
364     def _Queued(ops):
365       # The default status is "queued"
366       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
367                               for op in ops))
368
369     def _Waitlock1(ops):
370       ops[0].status = constants.OP_STATUS_WAITING
371
372     def _Waitlock2(ops):
373       ops[0].status = constants.OP_STATUS_SUCCESS
374       ops[1].status = constants.OP_STATUS_SUCCESS
375       ops[2].status = constants.OP_STATUS_WAITING
376
377     def _Running(ops):
378       ops[0].status = constants.OP_STATUS_SUCCESS
379       ops[1].status = constants.OP_STATUS_RUNNING
380       for op in ops[2:]:
381         op.status = constants.OP_STATUS_QUEUED
382
383     def _Canceling1(ops):
384       ops[0].status = constants.OP_STATUS_SUCCESS
385       ops[1].status = constants.OP_STATUS_SUCCESS
386       for op in ops[2:]:
387         op.status = constants.OP_STATUS_CANCELING
388
389     def _Canceling2(ops):
390       for op in ops:
391         op.status = constants.OP_STATUS_CANCELING
392
393     def _Canceled(ops):
394       for op in ops:
395         op.status = constants.OP_STATUS_CANCELED
396
397     def _Error1(ops):
398       for idx, op in enumerate(ops):
399         if idx > 3:
400           op.status = constants.OP_STATUS_ERROR
401         else:
402           op.status = constants.OP_STATUS_SUCCESS
403
404     def _Error2(ops):
405       for op in ops:
406         op.status = constants.OP_STATUS_ERROR
407
408     def _Success(ops):
409       for op in ops:
410         op.status = constants.OP_STATUS_SUCCESS
411
412     tests = {
413       constants.JOB_STATUS_QUEUED: [_Queued],
414       constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
415       constants.JOB_STATUS_RUNNING: [_Running],
416       constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
417       constants.JOB_STATUS_CANCELED: [_Canceled],
418       constants.JOB_STATUS_ERROR: [_Error1, _Error2],
419       constants.JOB_STATUS_SUCCESS: [_Success],
420       }
421
422     def _NewJob():
423       job = jqueue._QueuedJob(None, 1,
424                               [opcodes.OpTestDelay() for _ in range(10)],
425                               True)
426       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
427       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
428                               for op in job.ops))
429       return job
430
431     for status in constants.JOB_STATUS_ALL:
432       sttests = tests[status]
433       assert sttests
434       for fn in sttests:
435         job = _NewJob()
436         fn(job.ops)
437         self.assertEqual(job.CalcStatus(), status)
438
439
440 class _FakeDependencyManager:
441   def __init__(self):
442     self._checks = []
443     self._notifications = []
444     self._waiting = set()
445
446   def AddCheckResult(self, job, dep_job_id, dep_status, result):
447     self._checks.append((job, dep_job_id, dep_status, result))
448
449   def CountPendingResults(self):
450     return len(self._checks)
451
452   def CountWaitingJobs(self):
453     return len(self._waiting)
454
455   def GetNextNotification(self):
456     return self._notifications.pop(0)
457
458   def JobWaiting(self, job):
459     return job in self._waiting
460
461   def CheckAndRegister(self, job, dep_job_id, dep_status):
462     (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
463
464     assert exp_job == job
465     assert exp_dep_job_id == dep_job_id
466     assert exp_dep_status == dep_status
467
468     (result_status, _) = result
469
470     if result_status == jqueue._JobDependencyManager.WAIT:
471       self._waiting.add(job)
472     elif result_status == jqueue._JobDependencyManager.CONTINUE:
473       self._waiting.remove(job)
474
475     return result
476
477   def NotifyWaiters(self, job_id):
478     self._notifications.append(job_id)
479
480
481 class _DisabledFakeDependencyManager:
482   def JobWaiting(self, _):
483     return False
484
485   def CheckAndRegister(self, *args):
486     assert False, "Should not be called"
487
488   def NotifyWaiters(self, _):
489     pass
490
491
492 class _FakeQueueForProc:
493   def __init__(self, depmgr=None):
494     self._acquired = False
495     self._updates = []
496     self._submitted = []
497
498     self._submit_count = itertools.count(1000)
499
500     if depmgr:
501       self.depmgr = depmgr
502     else:
503       self.depmgr = _DisabledFakeDependencyManager()
504
505   def IsAcquired(self):
506     return self._acquired
507
508   def GetNextUpdate(self):
509     return self._updates.pop(0)
510
511   def GetNextSubmittedJob(self):
512     return self._submitted.pop(0)
513
514   def acquire(self, shared=0):
515     assert shared == 1
516     self._acquired = True
517
518   def release(self):
519     assert self._acquired
520     self._acquired = False
521
522   def UpdateJobUnlocked(self, job, replicate=True):
523     assert self._acquired, "Lock not acquired while updating job"
524     self._updates.append((job, bool(replicate)))
525
526   def SubmitManyJobs(self, jobs):
527     assert not self._acquired, "Lock acquired while submitting jobs"
528     job_ids = [self._submit_count.next() for _ in jobs]
529     self._submitted.extend(zip(job_ids, jobs))
530     return job_ids
531
532
533 class _FakeExecOpCodeForProc:
534   def __init__(self, queue, before_start, after_start):
535     self._queue = queue
536     self._before_start = before_start
537     self._after_start = after_start
538
539   def __call__(self, op, cbs, timeout=None, priority=None):
540     assert isinstance(op, opcodes.OpTestDummy)
541     assert not self._queue.IsAcquired(), \
542            "Queue lock not released when executing opcode"
543
544     if self._before_start:
545       self._before_start(timeout, priority)
546
547     cbs.NotifyStart()
548
549     if self._after_start:
550       self._after_start(op, cbs)
551
552     # Check again after the callbacks
553     assert not self._queue.IsAcquired()
554
555     if op.fail:
556       raise errors.OpExecError("Error requested (%s)" % op.result)
557
558     if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
559       return cbs.SubmitManyJobs(op.submit_jobs)
560
561     return op.result
562
563
564 class _JobProcessorTestUtils:
565   def _CreateJob(self, queue, job_id, ops):
566     job = jqueue._QueuedJob(queue, job_id, ops, True)
567     self.assertFalse(job.start_timestamp)
568     self.assertFalse(job.end_timestamp)
569     self.assertEqual(len(ops), len(job.ops))
570     self.assert_(compat.all(op.input == inp
571                             for (op, inp) in zip(job.ops, ops)))
572     self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
573     return job
574
575
576 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
577   def _GenericCheckJob(self, job):
578     assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
579                       for op in job.ops)
580
581     self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
582                      [[op.start_timestamp for op in job.ops],
583                       [op.exec_timestamp for op in job.ops],
584                       [op.end_timestamp for op in job.ops]])
585     self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
586                      [job.received_timestamp,
587                       job.start_timestamp,
588                       job.end_timestamp])
589     self.assert_(job.start_timestamp)
590     self.assert_(job.end_timestamp)
591     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
592
593   def testSuccess(self):
594     queue = _FakeQueueForProc()
595
596     for (job_id, opcount) in [(25351, 1), (6637, 3),
597                               (24644, 10), (32207, 100)]:
598       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
599              for i in range(opcount)]
600
601       # Create job
602       job = self._CreateJob(queue, job_id, ops)
603
604       def _BeforeStart(timeout, priority):
605         self.assertEqual(queue.GetNextUpdate(), (job, True))
606         self.assertRaises(IndexError, queue.GetNextUpdate)
607         self.assertFalse(queue.IsAcquired())
608         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
609         self.assertFalse(job.cur_opctx)
610
611       def _AfterStart(op, cbs):
612         self.assertEqual(queue.GetNextUpdate(), (job, True))
613         self.assertRaises(IndexError, queue.GetNextUpdate)
614
615         self.assertFalse(queue.IsAcquired())
616         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
617         self.assertFalse(job.cur_opctx)
618
619         # Job is running, cancelling shouldn't be possible
620         (success, _) = job.Cancel()
621         self.assertFalse(success)
622
623       opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
624
625       for idx in range(len(ops)):
626         self.assertRaises(IndexError, queue.GetNextUpdate)
627         result = jqueue._JobProcessor(queue, opexec, job)()
628         self.assertEqual(queue.GetNextUpdate(), (job, True))
629         self.assertRaises(IndexError, queue.GetNextUpdate)
630         if idx == len(ops) - 1:
631           # Last opcode
632           self.assertEqual(result, jqueue._JobProcessor.FINISHED)
633         else:
634           self.assertEqual(result, jqueue._JobProcessor.DEFER)
635
636           self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
637           self.assert_(job.start_timestamp)
638           self.assertFalse(job.end_timestamp)
639
640       self.assertRaises(IndexError, queue.GetNextUpdate)
641
642       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
643       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
644       self.assertEqual(job.GetInfo(["opresult"]),
645                        [[op.input.result for op in job.ops]])
646       self.assertEqual(job.GetInfo(["opstatus"]),
647                        [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
648       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
649                               for op in job.ops))
650
651       self._GenericCheckJob(job)
652
653       # Calling the processor on a finished job should be a no-op
654       self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
655                        jqueue._JobProcessor.FINISHED)
656       self.assertRaises(IndexError, queue.GetNextUpdate)
657
658   def testOpcodeError(self):
659     queue = _FakeQueueForProc()
660
661     testdata = [
662       (17077, 1, 0, 0),
663       (1782, 5, 2, 2),
664       (18179, 10, 9, 9),
665       (4744, 10, 3, 8),
666       (23816, 100, 39, 45),
667       ]
668
669     for (job_id, opcount, failfrom, failto) in testdata:
670       # Prepare opcodes
671       ops = [opcodes.OpTestDummy(result="Res%s" % i,
672                                  fail=(failfrom <= i and
673                                        i <= failto))
674              for i in range(opcount)]
675
676       # Create job
677       job = self._CreateJob(queue, str(job_id), ops)
678
679       opexec = _FakeExecOpCodeForProc(queue, None, None)
680
681       for idx in range(len(ops)):
682         self.assertRaises(IndexError, queue.GetNextUpdate)
683         result = jqueue._JobProcessor(queue, opexec, job)()
684         # queued to waitlock
685         self.assertEqual(queue.GetNextUpdate(), (job, True))
686         # waitlock to running
687         self.assertEqual(queue.GetNextUpdate(), (job, True))
688         # Opcode result
689         self.assertEqual(queue.GetNextUpdate(), (job, True))
690         self.assertRaises(IndexError, queue.GetNextUpdate)
691
692         if idx in (failfrom, len(ops) - 1):
693           # Last opcode
694           self.assertEqual(result, jqueue._JobProcessor.FINISHED)
695           break
696
697         self.assertEqual(result, jqueue._JobProcessor.DEFER)
698
699         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
700
701       self.assertRaises(IndexError, queue.GetNextUpdate)
702
703       # Check job status
704       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
705       self.assertEqual(job.GetInfo(["id"]), [str(job_id)])
706       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
707
708       # Check opcode status
709       data = zip(job.ops,
710                  job.GetInfo(["opstatus"])[0],
711                  job.GetInfo(["opresult"])[0])
712
713       for idx, (op, opstatus, opresult) in enumerate(data):
714         if idx < failfrom:
715           assert not op.input.fail
716           self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
717           self.assertEqual(opresult, op.input.result)
718         elif idx <= failto:
719           assert op.input.fail
720           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
721           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
722         else:
723           assert not op.input.fail
724           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
725           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
726
727       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
728                               for op in job.ops[:failfrom]))
729
730       self._GenericCheckJob(job)
731
732       # Calling the processor on a finished job should be a no-op
733       self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
734                        jqueue._JobProcessor.FINISHED)
735       self.assertRaises(IndexError, queue.GetNextUpdate)
736
737   def testCancelWhileInQueue(self):
738     queue = _FakeQueueForProc()
739
740     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
741            for i in range(5)]
742
743     # Create job
744     job_id = 17045
745     job = self._CreateJob(queue, job_id, ops)
746
747     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
748
749     # Mark as cancelled
750     (success, _) = job.Cancel()
751     self.assert_(success)
752
753     self.assertRaises(IndexError, queue.GetNextUpdate)
754
755     self.assertFalse(job.start_timestamp)
756     self.assertTrue(job.end_timestamp)
757     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
758                             for op in job.ops))
759
760     # Serialize to check for differences
761     before_proc = job.Serialize()
762
763     # Simulate processor called in workerpool
764     opexec = _FakeExecOpCodeForProc(queue, None, None)
765     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
766                      jqueue._JobProcessor.FINISHED)
767
768     # Check result
769     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
770     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
771     self.assertFalse(job.start_timestamp)
772     self.assertTrue(job.end_timestamp)
773     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
774                                 for op in job.ops))
775     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
776                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
777                       ["Job canceled by request" for _ in job.ops]])
778
779     # Must not have changed or written
780     self.assertEqual(before_proc, job.Serialize())
781     self.assertRaises(IndexError, queue.GetNextUpdate)
782
783   def testCancelWhileWaitlockInQueue(self):
784     queue = _FakeQueueForProc()
785
786     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
787            for i in range(5)]
788
789     # Create job
790     job_id = 8645
791     job = self._CreateJob(queue, job_id, ops)
792
793     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
794
795     job.ops[0].status = constants.OP_STATUS_WAITING
796
797     assert len(job.ops) == 5
798
799     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
800
801     # Mark as cancelling
802     (success, _) = job.Cancel()
803     self.assert_(success)
804
805     self.assertRaises(IndexError, queue.GetNextUpdate)
806
807     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
808                             for op in job.ops))
809
810     opexec = _FakeExecOpCodeForProc(queue, None, None)
811     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
812                      jqueue._JobProcessor.FINISHED)
813
814     # Check result
815     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
816     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
817     self.assertFalse(job.start_timestamp)
818     self.assert_(job.end_timestamp)
819     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
820                                 for op in job.ops))
821     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
822                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
823                       ["Job canceled by request" for _ in job.ops]])
824
825   def testCancelWhileWaitlock(self):
826     queue = _FakeQueueForProc()
827
828     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
829            for i in range(5)]
830
831     # Create job
832     job_id = 11009
833     job = self._CreateJob(queue, job_id, ops)
834
835     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
836
837     def _BeforeStart(timeout, priority):
838       self.assertEqual(queue.GetNextUpdate(), (job, True))
839       self.assertRaises(IndexError, queue.GetNextUpdate)
840       self.assertFalse(queue.IsAcquired())
841       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
842
843       # Mark as cancelled
844       (success, _) = job.Cancel()
845       self.assert_(success)
846
847       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
848                               for op in job.ops))
849       self.assertRaises(IndexError, queue.GetNextUpdate)
850
851     def _AfterStart(op, cbs):
852       self.assertEqual(queue.GetNextUpdate(), (job, True))
853       self.assertRaises(IndexError, queue.GetNextUpdate)
854       self.assertFalse(queue.IsAcquired())
855       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
856
857     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
858
859     self.assertRaises(IndexError, queue.GetNextUpdate)
860     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
861                      jqueue._JobProcessor.FINISHED)
862     self.assertEqual(queue.GetNextUpdate(), (job, True))
863     self.assertRaises(IndexError, queue.GetNextUpdate)
864
865     # Check result
866     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
867     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
868     self.assert_(job.start_timestamp)
869     self.assert_(job.end_timestamp)
870     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
871                                 for op in job.ops))
872     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
873                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
874                       ["Job canceled by request" for _ in job.ops]])
875
876   def testCancelWhileWaitlockWithTimeout(self):
877     queue = _FakeQueueForProc()
878
879     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
880            for i in range(5)]
881
882     # Create job
883     job_id = 24314
884     job = self._CreateJob(queue, job_id, ops)
885
886     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
887
888     def _BeforeStart(timeout, priority):
889       self.assertFalse(queue.IsAcquired())
890       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
891
892       # Mark as cancelled
893       (success, _) = job.Cancel()
894       self.assert_(success)
895
896       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
897                               for op in job.ops))
898
899       # Fake an acquire attempt timing out
900       raise mcpu.LockAcquireTimeout()
901
902     def _AfterStart(op, cbs):
903       self.fail("Should not reach this")
904
905     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
906
907     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
908                      jqueue._JobProcessor.FINISHED)
909
910     # Check result
911     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
912     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
913     self.assert_(job.start_timestamp)
914     self.assert_(job.end_timestamp)
915     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
916                                 for op in job.ops))
917     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
918                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
919                       ["Job canceled by request" for _ in job.ops]])
920
921   def testCancelWhileRunning(self):
922     # Tests canceling a job with finished opcodes and more, unprocessed ones
923     queue = _FakeQueueForProc()
924
925     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
926            for i in range(3)]
927
928     # Create job
929     job_id = str(28492)
930     job = self._CreateJob(queue, job_id, ops)
931
932     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
933
934     opexec = _FakeExecOpCodeForProc(queue, None, None)
935
936     # Run one opcode
937     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
938                      jqueue._JobProcessor.DEFER)
939
940     # Job goes back to queued
941     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
942     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
943                      [[constants.OP_STATUS_SUCCESS,
944                        constants.OP_STATUS_QUEUED,
945                        constants.OP_STATUS_QUEUED],
946                       ["Res0", None, None]])
947
948     # Mark as cancelled
949     (success, _) = job.Cancel()
950     self.assert_(success)
951
952     # Try processing another opcode (this will actually cancel the job)
953     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
954                      jqueue._JobProcessor.FINISHED)
955
956     # Check result
957     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
958     self.assertEqual(job.GetInfo(["id"]), [job_id])
959     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
960     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
961                      [[constants.OP_STATUS_SUCCESS,
962                        constants.OP_STATUS_CANCELED,
963                        constants.OP_STATUS_CANCELED],
964                       ["Res0", "Job canceled by request",
965                        "Job canceled by request"]])
966
967   def testPartiallyRun(self):
968     # Tests calling the processor on a job that's been partially run before the
969     # program was restarted
970     queue = _FakeQueueForProc()
971
972     opexec = _FakeExecOpCodeForProc(queue, None, None)
973
974     for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
975       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
976              for i in range(10)]
977
978       # Create job
979       job = self._CreateJob(queue, job_id, ops)
980
981       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
982
983       for _ in range(successcount):
984         self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
985                          jqueue._JobProcessor.DEFER)
986
987       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
988       self.assertEqual(job.GetInfo(["opstatus"]),
989                        [[constants.OP_STATUS_SUCCESS
990                          for _ in range(successcount)] +
991                         [constants.OP_STATUS_QUEUED
992                          for _ in range(len(ops) - successcount)]])
993
994       self.assert_(job.ops_iter)
995
996       # Serialize and restore (simulates program restart)
997       newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
998       self.assertFalse(newjob.ops_iter)
999       self._TestPartial(newjob, successcount)
1000
1001   def _TestPartial(self, job, successcount):
1002     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1003     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1004
1005     queue = _FakeQueueForProc()
1006     opexec = _FakeExecOpCodeForProc(queue, None, None)
1007
1008     for remaining in reversed(range(len(job.ops) - successcount)):
1009       result = jqueue._JobProcessor(queue, opexec, job)()
1010       self.assertEqual(queue.GetNextUpdate(), (job, True))
1011       self.assertEqual(queue.GetNextUpdate(), (job, True))
1012       self.assertEqual(queue.GetNextUpdate(), (job, True))
1013       self.assertRaises(IndexError, queue.GetNextUpdate)
1014
1015       if remaining == 0:
1016         # Last opcode
1017         self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1018         break
1019
1020       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1021
1022       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1023
1024     self.assertRaises(IndexError, queue.GetNextUpdate)
1025     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1026     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1027     self.assertEqual(job.GetInfo(["opresult"]),
1028                      [[op.input.result for op in job.ops]])
1029     self.assertEqual(job.GetInfo(["opstatus"]),
1030                      [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1031     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1032                             for op in job.ops))
1033
1034     self._GenericCheckJob(job)
1035
1036     # Calling the processor on a finished job should be a no-op
1037     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1038                      jqueue._JobProcessor.FINISHED)
1039     self.assertRaises(IndexError, queue.GetNextUpdate)
1040
1041     # ... also after being restored
1042     job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1043     # Calling the processor on a finished job should be a no-op
1044     self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1045                      jqueue._JobProcessor.FINISHED)
1046     self.assertRaises(IndexError, queue.GetNextUpdate)
1047
1048   def testProcessorOnRunningJob(self):
1049     ops = [opcodes.OpTestDummy(result="result", fail=False)]
1050
1051     queue = _FakeQueueForProc()
1052     opexec = _FakeExecOpCodeForProc(queue, None, None)
1053
1054     # Create job
1055     job = self._CreateJob(queue, 9571, ops)
1056
1057     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1058
1059     job.ops[0].status = constants.OP_STATUS_RUNNING
1060
1061     assert len(job.ops) == 1
1062
1063     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1064
1065     # Calling on running job must fail
1066     self.assertRaises(errors.ProgrammerError,
1067                       jqueue._JobProcessor(queue, opexec, job))
1068
1069   def testLogMessages(self):
1070     # Tests the "Feedback" callback function
1071     queue = _FakeQueueForProc()
1072
1073     messages = {
1074       1: [
1075         (None, "Hello"),
1076         (None, "World"),
1077         (constants.ELOG_MESSAGE, "there"),
1078         ],
1079       4: [
1080         (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1081         (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1082         ],
1083       }
1084     ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1085                                messages=messages.get(i, []))
1086            for i in range(5)]
1087
1088     # Create job
1089     job = self._CreateJob(queue, 29386, ops)
1090
1091     def _BeforeStart(timeout, priority):
1092       self.assertEqual(queue.GetNextUpdate(), (job, True))
1093       self.assertRaises(IndexError, queue.GetNextUpdate)
1094       self.assertFalse(queue.IsAcquired())
1095       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1096
1097     def _AfterStart(op, cbs):
1098       self.assertEqual(queue.GetNextUpdate(), (job, True))
1099       self.assertRaises(IndexError, queue.GetNextUpdate)
1100       self.assertFalse(queue.IsAcquired())
1101       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1102
1103       self.assertRaises(AssertionError, cbs.Feedback,
1104                         "too", "many", "arguments")
1105
1106       for (log_type, msg) in op.messages:
1107         self.assertRaises(IndexError, queue.GetNextUpdate)
1108         if log_type:
1109           cbs.Feedback(log_type, msg)
1110         else:
1111           cbs.Feedback(msg)
1112         # Check for job update without replication
1113         self.assertEqual(queue.GetNextUpdate(), (job, False))
1114         self.assertRaises(IndexError, queue.GetNextUpdate)
1115
1116     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1117
1118     for remaining in reversed(range(len(job.ops))):
1119       self.assertRaises(IndexError, queue.GetNextUpdate)
1120       result = jqueue._JobProcessor(queue, opexec, job)()
1121       self.assertEqual(queue.GetNextUpdate(), (job, True))
1122       self.assertRaises(IndexError, queue.GetNextUpdate)
1123
1124       if remaining == 0:
1125         # Last opcode
1126         self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1127         break
1128
1129       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1130
1131       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1132
1133     self.assertRaises(IndexError, queue.GetNextUpdate)
1134
1135     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1136     self.assertEqual(job.GetInfo(["opresult"]),
1137                      [[op.input.result for op in job.ops]])
1138
1139     logmsgcount = sum(len(m) for m in messages.values())
1140
1141     self._CheckLogMessages(job, logmsgcount)
1142
1143     # Serialize and restore (simulates program restart)
1144     newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1145     self._CheckLogMessages(newjob, logmsgcount)
1146
1147     # Check each message
1148     prevserial = -1
1149     for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1150       for (serial, timestamp, log_type, msg) in oplog:
1151         (exptype, expmsg) = messages.get(idx).pop(0)
1152         if exptype:
1153           self.assertEqual(log_type, exptype)
1154         else:
1155           self.assertEqual(log_type, constants.ELOG_MESSAGE)
1156         self.assertEqual(expmsg, msg)
1157         self.assert_(serial > prevserial)
1158         prevserial = serial
1159
1160   def _CheckLogMessages(self, job, count):
1161     # Check serial
1162     self.assertEqual(job.log_serial, count)
1163
1164     # No filter
1165     self.assertEqual(job.GetLogEntries(None),
1166                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1167                       for entry in entries])
1168
1169     # Filter with serial
1170     assert count > 3
1171     self.assert_(job.GetLogEntries(3))
1172     self.assertEqual(job.GetLogEntries(3),
1173                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1174                       for entry in entries][3:])
1175
1176     # No log message after highest serial
1177     self.assertFalse(job.GetLogEntries(count))
1178     self.assertFalse(job.GetLogEntries(count + 3))
1179
1180   def testSubmitManyJobs(self):
1181     queue = _FakeQueueForProc()
1182
1183     job_id = 15656
1184     ops = [
1185       opcodes.OpTestDummy(result="Res0", fail=False,
1186                           submit_jobs=[]),
1187       opcodes.OpTestDummy(result="Res1", fail=False,
1188                           submit_jobs=[
1189                             [opcodes.OpTestDummy(result="r1j0", fail=False)],
1190                             ]),
1191       opcodes.OpTestDummy(result="Res2", fail=False,
1192                           submit_jobs=[
1193                             [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1194                              opcodes.OpTestDummy(result="r2j0o1", fail=False),
1195                              opcodes.OpTestDummy(result="r2j0o2", fail=False),
1196                              opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1197                             [opcodes.OpTestDummy(result="r2j1", fail=False)],
1198                             [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1199                              opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1200                             ]),
1201       ]
1202
1203     # Create job
1204     job = self._CreateJob(queue, job_id, ops)
1205
1206     def _BeforeStart(timeout, priority):
1207       self.assertEqual(queue.GetNextUpdate(), (job, True))
1208       self.assertRaises(IndexError, queue.GetNextUpdate)
1209       self.assertFalse(queue.IsAcquired())
1210       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1211       self.assertFalse(job.cur_opctx)
1212
1213     def _AfterStart(op, cbs):
1214       self.assertEqual(queue.GetNextUpdate(), (job, True))
1215       self.assertRaises(IndexError, queue.GetNextUpdate)
1216
1217       self.assertFalse(queue.IsAcquired())
1218       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1219       self.assertFalse(job.cur_opctx)
1220
1221       # Job is running, cancelling shouldn't be possible
1222       (success, _) = job.Cancel()
1223       self.assertFalse(success)
1224
1225     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1226
1227     for idx in range(len(ops)):
1228       self.assertRaises(IndexError, queue.GetNextUpdate)
1229       result = jqueue._JobProcessor(queue, opexec, job)()
1230       self.assertEqual(queue.GetNextUpdate(), (job, True))
1231       self.assertRaises(IndexError, queue.GetNextUpdate)
1232       if idx == len(ops) - 1:
1233         # Last opcode
1234         self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1235       else:
1236         self.assertEqual(result, jqueue._JobProcessor.DEFER)
1237
1238         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1239         self.assert_(job.start_timestamp)
1240         self.assertFalse(job.end_timestamp)
1241
1242     self.assertRaises(IndexError, queue.GetNextUpdate)
1243
1244     for idx, submitted_ops in enumerate(job_ops
1245                                         for op in ops
1246                                         for job_ops in op.submit_jobs):
1247       self.assertEqual(queue.GetNextSubmittedJob(),
1248                        (1000 + idx, submitted_ops))
1249     self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1250
1251     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1252     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1253     self.assertEqual(job.GetInfo(["opresult"]),
1254                      [[[], [1000], [1001, 1002, 1003]]])
1255     self.assertEqual(job.GetInfo(["opstatus"]),
1256                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1257
1258     self._GenericCheckJob(job)
1259
1260     # Calling the processor on a finished job should be a no-op
1261     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1262                      jqueue._JobProcessor.FINISHED)
1263     self.assertRaises(IndexError, queue.GetNextUpdate)
1264
1265   def testJobDependency(self):
1266     depmgr = _FakeDependencyManager()
1267     queue = _FakeQueueForProc(depmgr=depmgr)
1268
1269     self.assertEqual(queue.depmgr, depmgr)
1270
1271     prev_job_id = 22113
1272     prev_job_id2 = 28102
1273     job_id = 29929
1274     ops = [
1275       opcodes.OpTestDummy(result="Res0", fail=False,
1276                           depends=[
1277                             [prev_job_id2, None],
1278                             [prev_job_id, None],
1279                             ]),
1280       opcodes.OpTestDummy(result="Res1", fail=False),
1281       ]
1282
1283     # Create job
1284     job = self._CreateJob(queue, job_id, ops)
1285
1286     def _BeforeStart(timeout, priority):
1287       if attempt == 0 or attempt > 5:
1288         # Job should only be updated when it wasn't waiting for another job
1289         self.assertEqual(queue.GetNextUpdate(), (job, True))
1290       self.assertRaises(IndexError, queue.GetNextUpdate)
1291       self.assertFalse(queue.IsAcquired())
1292       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1293       self.assertFalse(job.cur_opctx)
1294
1295     def _AfterStart(op, cbs):
1296       self.assertEqual(queue.GetNextUpdate(), (job, True))
1297       self.assertRaises(IndexError, queue.GetNextUpdate)
1298
1299       self.assertFalse(queue.IsAcquired())
1300       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1301       self.assertFalse(job.cur_opctx)
1302
1303       # Job is running, cancelling shouldn't be possible
1304       (success, _) = job.Cancel()
1305       self.assertFalse(success)
1306
1307     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1308
1309     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1310
1311     counter = itertools.count()
1312     while True:
1313       attempt = counter.next()
1314
1315       self.assertRaises(IndexError, queue.GetNextUpdate)
1316       self.assertRaises(IndexError, depmgr.GetNextNotification)
1317
1318       if attempt < 2:
1319         depmgr.AddCheckResult(job, prev_job_id2, None,
1320                               (jqueue._JobDependencyManager.WAIT, "wait2"))
1321       elif attempt == 2:
1322         depmgr.AddCheckResult(job, prev_job_id2, None,
1323                               (jqueue._JobDependencyManager.CONTINUE, "cont"))
1324         # The processor will ask for the next dependency immediately
1325         depmgr.AddCheckResult(job, prev_job_id, None,
1326                               (jqueue._JobDependencyManager.WAIT, "wait"))
1327       elif attempt < 5:
1328         depmgr.AddCheckResult(job, prev_job_id, None,
1329                               (jqueue._JobDependencyManager.WAIT, "wait"))
1330       elif attempt == 5:
1331         depmgr.AddCheckResult(job, prev_job_id, None,
1332                               (jqueue._JobDependencyManager.CONTINUE, "cont"))
1333       if attempt == 2:
1334         self.assertEqual(depmgr.CountPendingResults(), 2)
1335       elif attempt > 5:
1336         self.assertEqual(depmgr.CountPendingResults(), 0)
1337       else:
1338         self.assertEqual(depmgr.CountPendingResults(), 1)
1339
1340       result = jqueue._JobProcessor(queue, opexec, job)()
1341       if attempt == 0 or attempt >= 5:
1342         # Job should only be updated if there was an actual change
1343         self.assertEqual(queue.GetNextUpdate(), (job, True))
1344       self.assertRaises(IndexError, queue.GetNextUpdate)
1345       self.assertFalse(depmgr.CountPendingResults())
1346
1347       if attempt < 5:
1348         # Simulate waiting for other job
1349         self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1350         self.assertTrue(job.cur_opctx)
1351         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1352         self.assertRaises(IndexError, depmgr.GetNextNotification)
1353         self.assert_(job.start_timestamp)
1354         self.assertFalse(job.end_timestamp)
1355         continue
1356
1357       if result == jqueue._JobProcessor.FINISHED:
1358         # Last opcode
1359         self.assertFalse(job.cur_opctx)
1360         break
1361
1362       self.assertRaises(IndexError, depmgr.GetNextNotification)
1363
1364       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1365       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1366       self.assert_(job.start_timestamp)
1367       self.assertFalse(job.end_timestamp)
1368
1369     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1370     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1371     self.assertEqual(job.GetInfo(["opresult"]),
1372                      [[op.input.result for op in job.ops]])
1373     self.assertEqual(job.GetInfo(["opstatus"]),
1374                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1375     self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1376                                for op in job.ops))
1377
1378     self._GenericCheckJob(job)
1379
1380     self.assertRaises(IndexError, queue.GetNextUpdate)
1381     self.assertRaises(IndexError, depmgr.GetNextNotification)
1382     self.assertFalse(depmgr.CountPendingResults())
1383     self.assertFalse(depmgr.CountWaitingJobs())
1384
1385     # Calling the processor on a finished job should be a no-op
1386     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1387                      jqueue._JobProcessor.FINISHED)
1388     self.assertRaises(IndexError, queue.GetNextUpdate)
1389
1390   def testJobDependencyCancel(self):
1391     depmgr = _FakeDependencyManager()
1392     queue = _FakeQueueForProc(depmgr=depmgr)
1393
1394     self.assertEqual(queue.depmgr, depmgr)
1395
1396     prev_job_id = 13623
1397     job_id = 30876
1398     ops = [
1399       opcodes.OpTestDummy(result="Res0", fail=False),
1400       opcodes.OpTestDummy(result="Res1", fail=False,
1401                           depends=[
1402                             [prev_job_id, None],
1403                             ]),
1404       opcodes.OpTestDummy(result="Res2", fail=False),
1405       ]
1406
1407     # Create job
1408     job = self._CreateJob(queue, job_id, ops)
1409
1410     def _BeforeStart(timeout, priority):
1411       if attempt == 0 or attempt > 5:
1412         # Job should only be updated when it wasn't waiting for another job
1413         self.assertEqual(queue.GetNextUpdate(), (job, True))
1414       self.assertRaises(IndexError, queue.GetNextUpdate)
1415       self.assertFalse(queue.IsAcquired())
1416       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1417       self.assertFalse(job.cur_opctx)
1418
1419     def _AfterStart(op, cbs):
1420       self.assertEqual(queue.GetNextUpdate(), (job, True))
1421       self.assertRaises(IndexError, queue.GetNextUpdate)
1422
1423       self.assertFalse(queue.IsAcquired())
1424       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1425       self.assertFalse(job.cur_opctx)
1426
1427       # Job is running, cancelling shouldn't be possible
1428       (success, _) = job.Cancel()
1429       self.assertFalse(success)
1430
1431     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1432
1433     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1434
1435     counter = itertools.count()
1436     while True:
1437       attempt = counter.next()
1438
1439       self.assertRaises(IndexError, queue.GetNextUpdate)
1440       self.assertRaises(IndexError, depmgr.GetNextNotification)
1441
1442       if attempt == 0:
1443         # This will handle the first opcode
1444         pass
1445       elif attempt < 4:
1446         depmgr.AddCheckResult(job, prev_job_id, None,
1447                               (jqueue._JobDependencyManager.WAIT, "wait"))
1448       elif attempt == 4:
1449         # Other job was cancelled
1450         depmgr.AddCheckResult(job, prev_job_id, None,
1451                               (jqueue._JobDependencyManager.CANCEL, "cancel"))
1452
1453       if attempt == 0:
1454         self.assertEqual(depmgr.CountPendingResults(), 0)
1455       else:
1456         self.assertEqual(depmgr.CountPendingResults(), 1)
1457
1458       result = jqueue._JobProcessor(queue, opexec, job)()
1459       if attempt <= 1 or attempt >= 4:
1460         # Job should only be updated if there was an actual change
1461         self.assertEqual(queue.GetNextUpdate(), (job, True))
1462       self.assertRaises(IndexError, queue.GetNextUpdate)
1463       self.assertFalse(depmgr.CountPendingResults())
1464
1465       if attempt > 0 and attempt < 4:
1466         # Simulate waiting for other job
1467         self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1468         self.assertTrue(job.cur_opctx)
1469         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1470         self.assertRaises(IndexError, depmgr.GetNextNotification)
1471         self.assert_(job.start_timestamp)
1472         self.assertFalse(job.end_timestamp)
1473         continue
1474
1475       if result == jqueue._JobProcessor.FINISHED:
1476         # Last opcode
1477         self.assertFalse(job.cur_opctx)
1478         break
1479
1480       self.assertRaises(IndexError, depmgr.GetNextNotification)
1481
1482       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1483       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1484       self.assert_(job.start_timestamp)
1485       self.assertFalse(job.end_timestamp)
1486
1487     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1488     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1489     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1490                      [[constants.OP_STATUS_SUCCESS,
1491                        constants.OP_STATUS_CANCELED,
1492                        constants.OP_STATUS_CANCELED],
1493                       ["Res0", "Job canceled by request",
1494                        "Job canceled by request"]])
1495
1496     self._GenericCheckJob(job)
1497
1498     self.assertRaises(IndexError, queue.GetNextUpdate)
1499     self.assertRaises(IndexError, depmgr.GetNextNotification)
1500     self.assertFalse(depmgr.CountPendingResults())
1501
1502     # Calling the processor on a finished job should be a no-op
1503     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1504                      jqueue._JobProcessor.FINISHED)
1505     self.assertRaises(IndexError, queue.GetNextUpdate)
1506
1507   def testJobDependencyWrongstatus(self):
1508     depmgr = _FakeDependencyManager()
1509     queue = _FakeQueueForProc(depmgr=depmgr)
1510
1511     self.assertEqual(queue.depmgr, depmgr)
1512
1513     prev_job_id = 9741
1514     job_id = 11763
1515     ops = [
1516       opcodes.OpTestDummy(result="Res0", fail=False),
1517       opcodes.OpTestDummy(result="Res1", fail=False,
1518                           depends=[
1519                             [prev_job_id, None],
1520                             ]),
1521       opcodes.OpTestDummy(result="Res2", fail=False),
1522       ]
1523
1524     # Create job
1525     job = self._CreateJob(queue, job_id, ops)
1526
1527     def _BeforeStart(timeout, priority):
1528       if attempt == 0 or attempt > 5:
1529         # Job should only be updated when it wasn't waiting for another job
1530         self.assertEqual(queue.GetNextUpdate(), (job, True))
1531       self.assertRaises(IndexError, queue.GetNextUpdate)
1532       self.assertFalse(queue.IsAcquired())
1533       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1534       self.assertFalse(job.cur_opctx)
1535
1536     def _AfterStart(op, cbs):
1537       self.assertEqual(queue.GetNextUpdate(), (job, True))
1538       self.assertRaises(IndexError, queue.GetNextUpdate)
1539
1540       self.assertFalse(queue.IsAcquired())
1541       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1542       self.assertFalse(job.cur_opctx)
1543
1544       # Job is running, cancelling shouldn't be possible
1545       (success, _) = job.Cancel()
1546       self.assertFalse(success)
1547
1548     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1549
1550     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1551
1552     counter = itertools.count()
1553     while True:
1554       attempt = counter.next()
1555
1556       self.assertRaises(IndexError, queue.GetNextUpdate)
1557       self.assertRaises(IndexError, depmgr.GetNextNotification)
1558
1559       if attempt == 0:
1560         # This will handle the first opcode
1561         pass
1562       elif attempt < 4:
1563         depmgr.AddCheckResult(job, prev_job_id, None,
1564                               (jqueue._JobDependencyManager.WAIT, "wait"))
1565       elif attempt == 4:
1566         # Other job failed
1567         depmgr.AddCheckResult(job, prev_job_id, None,
1568                               (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1569
1570       if attempt == 0:
1571         self.assertEqual(depmgr.CountPendingResults(), 0)
1572       else:
1573         self.assertEqual(depmgr.CountPendingResults(), 1)
1574
1575       result = jqueue._JobProcessor(queue, opexec, job)()
1576       if attempt <= 1 or attempt >= 4:
1577         # Job should only be updated if there was an actual change
1578         self.assertEqual(queue.GetNextUpdate(), (job, True))
1579       self.assertRaises(IndexError, queue.GetNextUpdate)
1580       self.assertFalse(depmgr.CountPendingResults())
1581
1582       if attempt > 0 and attempt < 4:
1583         # Simulate waiting for other job
1584         self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1585         self.assertTrue(job.cur_opctx)
1586         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1587         self.assertRaises(IndexError, depmgr.GetNextNotification)
1588         self.assert_(job.start_timestamp)
1589         self.assertFalse(job.end_timestamp)
1590         continue
1591
1592       if result == jqueue._JobProcessor.FINISHED:
1593         # Last opcode
1594         self.assertFalse(job.cur_opctx)
1595         break
1596
1597       self.assertRaises(IndexError, depmgr.GetNextNotification)
1598
1599       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1600       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1601       self.assert_(job.start_timestamp)
1602       self.assertFalse(job.end_timestamp)
1603
1604     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1605     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1606     self.assertEqual(job.GetInfo(["opstatus"]),
1607                      [[constants.OP_STATUS_SUCCESS,
1608                        constants.OP_STATUS_ERROR,
1609                        constants.OP_STATUS_ERROR]]),
1610
1611     (opresult, ) = job.GetInfo(["opresult"])
1612     self.assertEqual(len(opresult), len(ops))
1613     self.assertEqual(opresult[0], "Res0")
1614     self.assertTrue(errors.GetEncodedError(opresult[1]))
1615     self.assertTrue(errors.GetEncodedError(opresult[2]))
1616
1617     self._GenericCheckJob(job)
1618
1619     self.assertRaises(IndexError, queue.GetNextUpdate)
1620     self.assertRaises(IndexError, depmgr.GetNextNotification)
1621     self.assertFalse(depmgr.CountPendingResults())
1622
1623     # Calling the processor on a finished job should be a no-op
1624     self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1625                      jqueue._JobProcessor.FINISHED)
1626     self.assertRaises(IndexError, queue.GetNextUpdate)
1627
1628
1629 class TestEvaluateJobProcessorResult(unittest.TestCase):
1630   def testFinished(self):
1631     depmgr = _FakeDependencyManager()
1632     job = _IdOnlyFakeJob(30953)
1633     jqueue._EvaluateJobProcessorResult(depmgr, job,
1634                                        jqueue._JobProcessor.FINISHED)
1635     self.assertEqual(depmgr.GetNextNotification(), job.id)
1636     self.assertRaises(IndexError, depmgr.GetNextNotification)
1637
1638   def testDefer(self):
1639     depmgr = _FakeDependencyManager()
1640     job = _IdOnlyFakeJob(11326, priority=5463)
1641     try:
1642       jqueue._EvaluateJobProcessorResult(depmgr, job,
1643                                          jqueue._JobProcessor.DEFER)
1644     except workerpool.DeferTask, err:
1645       self.assertEqual(err.priority, 5463)
1646     else:
1647       self.fail("Didn't raise exception")
1648     self.assertRaises(IndexError, depmgr.GetNextNotification)
1649
1650   def testWaitdep(self):
1651     depmgr = _FakeDependencyManager()
1652     job = _IdOnlyFakeJob(21317)
1653     jqueue._EvaluateJobProcessorResult(depmgr, job,
1654                                        jqueue._JobProcessor.WAITDEP)
1655     self.assertRaises(IndexError, depmgr.GetNextNotification)
1656
1657   def testOther(self):
1658     depmgr = _FakeDependencyManager()
1659     job = _IdOnlyFakeJob(5813)
1660     self.assertRaises(errors.ProgrammerError,
1661                       jqueue._EvaluateJobProcessorResult,
1662                       depmgr, job, "Other result")
1663     self.assertRaises(IndexError, depmgr.GetNextNotification)
1664
1665
1666 class _FakeTimeoutStrategy:
1667   def __init__(self, timeouts):
1668     self.timeouts = timeouts
1669     self.attempts = 0
1670     self.last_timeout = None
1671
1672   def NextAttempt(self):
1673     self.attempts += 1
1674     if self.timeouts:
1675       timeout = self.timeouts.pop(0)
1676     else:
1677       timeout = None
1678     self.last_timeout = timeout
1679     return timeout
1680
1681
1682 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1683   def setUp(self):
1684     self.queue = _FakeQueueForProc()
1685     self.job = None
1686     self.curop = None
1687     self.opcounter = None
1688     self.timeout_strategy = None
1689     self.retries = 0
1690     self.prev_tsop = None
1691     self.prev_prio = None
1692     self.prev_status = None
1693     self.lock_acq_prio = None
1694     self.gave_lock = None
1695     self.done_lock_before_blocking = False
1696
1697   def _BeforeStart(self, timeout, priority):
1698     job = self.job
1699
1700     # If status has changed, job must've been written
1701     if self.prev_status != self.job.ops[self.curop].status:
1702       self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1703     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1704
1705     self.assertFalse(self.queue.IsAcquired())
1706     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1707
1708     ts = self.timeout_strategy
1709
1710     self.assert_(timeout is None or isinstance(timeout, (int, float)))
1711     self.assertEqual(timeout, ts.last_timeout)
1712     self.assertEqual(priority, job.ops[self.curop].priority)
1713
1714     self.gave_lock = True
1715     self.lock_acq_prio = priority
1716
1717     if (self.curop == 3 and
1718         job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1719       # Give locks before running into blocking acquire
1720       assert self.retries == 7
1721       self.retries = 0
1722       self.done_lock_before_blocking = True
1723       return
1724
1725     if self.retries > 0:
1726       self.assert_(timeout is not None)
1727       self.retries -= 1
1728       self.gave_lock = False
1729       raise mcpu.LockAcquireTimeout()
1730
1731     if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1732       assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1733       assert not ts.timeouts
1734       self.assert_(timeout is None)
1735
1736   def _AfterStart(self, op, cbs):
1737     job = self.job
1738
1739     # Setting to "running" requires an update
1740     self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1741     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1742
1743     self.assertFalse(self.queue.IsAcquired())
1744     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1745
1746     # Job is running, cancelling shouldn't be possible
1747     (success, _) = job.Cancel()
1748     self.assertFalse(success)
1749
1750   def _NextOpcode(self):
1751     self.curop = self.opcounter.next()
1752     self.prev_prio = self.job.ops[self.curop].priority
1753     self.prev_status = self.job.ops[self.curop].status
1754
1755   def _NewTimeoutStrategy(self):
1756     job = self.job
1757
1758     self.assertEqual(self.retries, 0)
1759
1760     if self.prev_tsop == self.curop:
1761       # Still on the same opcode, priority must've been increased
1762       self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1763
1764     if self.curop == 1:
1765       # Normal retry
1766       timeouts = range(10, 31, 10)
1767       self.retries = len(timeouts) - 1
1768
1769     elif self.curop == 2:
1770       # Let this run into a blocking acquire
1771       timeouts = range(11, 61, 12)
1772       self.retries = len(timeouts)
1773
1774     elif self.curop == 3:
1775       # Wait for priority to increase, but give lock before blocking acquire
1776       timeouts = range(12, 100, 14)
1777       self.retries = len(timeouts)
1778
1779       self.assertFalse(self.done_lock_before_blocking)
1780
1781     elif self.curop == 4:
1782       self.assert_(self.done_lock_before_blocking)
1783
1784       # Timeouts, but no need to retry
1785       timeouts = range(10, 31, 10)
1786       self.retries = 0
1787
1788     elif self.curop == 5:
1789       # Normal retry
1790       timeouts = range(19, 100, 11)
1791       self.retries = len(timeouts)
1792
1793     else:
1794       timeouts = []
1795       self.retries = 0
1796
1797     assert len(job.ops) == 10
1798     assert self.retries <= len(timeouts)
1799
1800     ts = _FakeTimeoutStrategy(timeouts)
1801
1802     self.timeout_strategy = ts
1803     self.prev_tsop = self.curop
1804     self.prev_prio = job.ops[self.curop].priority
1805
1806     return ts
1807
1808   def testTimeout(self):
1809     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1810            for i in range(10)]
1811
1812     # Create job
1813     job_id = 15801
1814     job = self._CreateJob(self.queue, job_id, ops)
1815     self.job = job
1816
1817     self.opcounter = itertools.count(0)
1818
1819     opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1820                                     self._AfterStart)
1821     tsf = self._NewTimeoutStrategy
1822
1823     self.assertFalse(self.done_lock_before_blocking)
1824
1825     while True:
1826       proc = jqueue._JobProcessor(self.queue, opexec, job,
1827                                   _timeout_strategy_factory=tsf)
1828
1829       self.assertRaises(IndexError, self.queue.GetNextUpdate)
1830
1831       if self.curop is not None:
1832         self.prev_status = self.job.ops[self.curop].status
1833
1834       self.lock_acq_prio = None
1835
1836       result = proc(_nextop_fn=self._NextOpcode)
1837       assert self.curop is not None
1838
1839       if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
1840         # Got lock and/or job is done, result must've been written
1841         self.assertFalse(job.cur_opctx)
1842         self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1843         self.assertRaises(IndexError, self.queue.GetNextUpdate)
1844         self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1845         self.assert_(job.ops[self.curop].exec_timestamp)
1846
1847       if result == jqueue._JobProcessor.FINISHED:
1848         self.assertFalse(job.cur_opctx)
1849         break
1850
1851       self.assertEqual(result, jqueue._JobProcessor.DEFER)
1852
1853       if self.curop == 0:
1854         self.assertEqual(job.ops[self.curop].start_timestamp,
1855                          job.start_timestamp)
1856
1857       if self.gave_lock:
1858         # Opcode finished, but job not yet done
1859         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1860       else:
1861         # Did not get locks
1862         self.assert_(job.cur_opctx)
1863         self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1864                          self.timeout_strategy.NextAttempt)
1865         self.assertFalse(job.ops[self.curop].exec_timestamp)
1866         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1867
1868         # If priority has changed since acquiring locks, the job must've been
1869         # updated
1870         if self.lock_acq_prio != job.ops[self.curop].priority:
1871           self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1872
1873       self.assertRaises(IndexError, self.queue.GetNextUpdate)
1874
1875       self.assert_(job.start_timestamp)
1876       self.assertFalse(job.end_timestamp)
1877
1878     self.assertEqual(self.curop, len(job.ops) - 1)
1879     self.assertEqual(self.job, job)
1880     self.assertEqual(self.opcounter.next(), len(job.ops))
1881     self.assert_(self.done_lock_before_blocking)
1882
1883     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1884     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1885     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1886     self.assertEqual(job.GetInfo(["opresult"]),
1887                      [[op.input.result for op in job.ops]])
1888     self.assertEqual(job.GetInfo(["opstatus"]),
1889                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1890     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1891                             for op in job.ops))
1892
1893     # Calling the processor on a finished job should be a no-op
1894     self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
1895                      jqueue._JobProcessor.FINISHED)
1896     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1897
1898
1899 class _IdOnlyFakeJob:
1900   def __init__(self, job_id, priority=NotImplemented):
1901     self.id = str(job_id)
1902     self._priority = priority
1903
1904   def CalcPriority(self):
1905     return self._priority
1906
1907
1908 class TestJobDependencyManager(unittest.TestCase):
1909   def setUp(self):
1910     self._status = []
1911     self._queue = []
1912     self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1913
1914   def _GetStatus(self, job_id):
1915     (exp_job_id, result) = self._status.pop(0)
1916     self.assertEqual(exp_job_id, job_id)
1917     return result
1918
1919   def _Enqueue(self, jobs):
1920     self.assertFalse(self.jdm._lock.is_owned(),
1921                      msg=("Must not own manager lock while re-adding jobs"
1922                           " (potential deadlock)"))
1923     self._queue.append(jobs)
1924
1925   def testNotFinalizedThenCancel(self):
1926     job = _IdOnlyFakeJob(17697)
1927     job_id = str(28625)
1928
1929     self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1930     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1931     self.assertEqual(result, self.jdm.WAIT)
1932     self.assertFalse(self._status)
1933     self.assertFalse(self._queue)
1934     self.assertTrue(self.jdm.JobWaiting(job))
1935     self.assertEqual(self.jdm._waiters, {
1936       job_id: set([job]),
1937       })
1938     self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1939       ("job/28625", None, None, [("job", [job.id])])
1940       ])
1941
1942     self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1943     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1944     self.assertEqual(result, self.jdm.CANCEL)
1945     self.assertFalse(self._status)
1946     self.assertFalse(self._queue)
1947     self.assertFalse(self.jdm.JobWaiting(job))
1948     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1949
1950   def testRequireCancel(self):
1951     job = _IdOnlyFakeJob(5278)
1952     job_id = str(9610)
1953     dep_status = [constants.JOB_STATUS_CANCELED]
1954
1955     self._status.append((job_id, constants.JOB_STATUS_WAITING))
1956     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1957     self.assertEqual(result, self.jdm.WAIT)
1958     self.assertFalse(self._status)
1959     self.assertFalse(self._queue)
1960     self.assertTrue(self.jdm.JobWaiting(job))
1961     self.assertEqual(self.jdm._waiters, {
1962       job_id: set([job]),
1963       })
1964     self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1965       ("job/9610", None, None, [("job", [job.id])])
1966       ])
1967
1968     self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1969     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1970     self.assertEqual(result, self.jdm.CONTINUE)
1971     self.assertFalse(self._status)
1972     self.assertFalse(self._queue)
1973     self.assertFalse(self.jdm.JobWaiting(job))
1974     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1975
1976   def testRequireError(self):
1977     job = _IdOnlyFakeJob(21459)
1978     job_id = str(25519)
1979     dep_status = [constants.JOB_STATUS_ERROR]
1980
1981     self._status.append((job_id, constants.JOB_STATUS_WAITING))
1982     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1983     self.assertEqual(result, self.jdm.WAIT)
1984     self.assertFalse(self._status)
1985     self.assertFalse(self._queue)
1986     self.assertTrue(self.jdm.JobWaiting(job))
1987     self.assertEqual(self.jdm._waiters, {
1988       job_id: set([job]),
1989       })
1990
1991     self._status.append((job_id, constants.JOB_STATUS_ERROR))
1992     (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1993     self.assertEqual(result, self.jdm.CONTINUE)
1994     self.assertFalse(self._status)
1995     self.assertFalse(self._queue)
1996     self.assertFalse(self.jdm.JobWaiting(job))
1997     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1998
1999   def testRequireMultiple(self):
2000     dep_status = list(constants.JOBS_FINALIZED)
2001
2002     for end_status in dep_status:
2003       job = _IdOnlyFakeJob(21343)
2004       job_id = str(14609)
2005
2006       self._status.append((job_id, constants.JOB_STATUS_WAITING))
2007       (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2008       self.assertEqual(result, self.jdm.WAIT)
2009       self.assertFalse(self._status)
2010       self.assertFalse(self._queue)
2011       self.assertTrue(self.jdm.JobWaiting(job))
2012       self.assertEqual(self.jdm._waiters, {
2013         job_id: set([job]),
2014         })
2015       self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2016         ("job/14609", None, None, [("job", [job.id])])
2017         ])
2018
2019       self._status.append((job_id, end_status))
2020       (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2021       self.assertEqual(result, self.jdm.CONTINUE)
2022       self.assertFalse(self._status)
2023       self.assertFalse(self._queue)
2024       self.assertFalse(self.jdm.JobWaiting(job))
2025       self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2026
2027   def testNotify(self):
2028     job = _IdOnlyFakeJob(8227)
2029     job_id = str(4113)
2030
2031     self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2032     (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2033     self.assertEqual(result, self.jdm.WAIT)
2034     self.assertFalse(self._status)
2035     self.assertFalse(self._queue)
2036     self.assertTrue(self.jdm.JobWaiting(job))
2037     self.assertEqual(self.jdm._waiters, {
2038       job_id: set([job]),
2039       })
2040
2041     self.jdm.NotifyWaiters(job_id)
2042     self.assertFalse(self._status)
2043     self.assertFalse(self.jdm._waiters)
2044     self.assertFalse(self.jdm.JobWaiting(job))
2045     self.assertEqual(self._queue, [set([job])])
2046
2047   def testWrongStatus(self):
2048     job = _IdOnlyFakeJob(10102)
2049     job_id = str(1271)
2050
2051     self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2052     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2053                                             [constants.JOB_STATUS_SUCCESS])
2054     self.assertEqual(result, self.jdm.WAIT)
2055     self.assertFalse(self._status)
2056     self.assertFalse(self._queue)
2057     self.assertTrue(self.jdm.JobWaiting(job))
2058     self.assertEqual(self.jdm._waiters, {
2059       job_id: set([job]),
2060       })
2061
2062     self._status.append((job_id, constants.JOB_STATUS_ERROR))
2063     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2064                                             [constants.JOB_STATUS_SUCCESS])
2065     self.assertEqual(result, self.jdm.WRONGSTATUS)
2066     self.assertFalse(self._status)
2067     self.assertFalse(self._queue)
2068     self.assertFalse(self.jdm.JobWaiting(job))
2069
2070   def testCorrectStatus(self):
2071     job = _IdOnlyFakeJob(24273)
2072     job_id = str(23885)
2073
2074     self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2075     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2076                                             [constants.JOB_STATUS_SUCCESS])
2077     self.assertEqual(result, self.jdm.WAIT)
2078     self.assertFalse(self._status)
2079     self.assertFalse(self._queue)
2080     self.assertTrue(self.jdm.JobWaiting(job))
2081     self.assertEqual(self.jdm._waiters, {
2082       job_id: set([job]),
2083       })
2084
2085     self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2086     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2087                                             [constants.JOB_STATUS_SUCCESS])
2088     self.assertEqual(result, self.jdm.CONTINUE)
2089     self.assertFalse(self._status)
2090     self.assertFalse(self._queue)
2091     self.assertFalse(self.jdm.JobWaiting(job))
2092
2093   def testFinalizedRightAway(self):
2094     job = _IdOnlyFakeJob(224)
2095     job_id = str(3081)
2096
2097     self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2098     (result, _) = self.jdm.CheckAndRegister(job, job_id,
2099                                             [constants.JOB_STATUS_SUCCESS])
2100     self.assertEqual(result, self.jdm.CONTINUE)
2101     self.assertFalse(self._status)
2102     self.assertFalse(self._queue)
2103     self.assertFalse(self.jdm.JobWaiting(job))
2104     self.assertEqual(self.jdm._waiters, {
2105       job_id: set(),
2106       })
2107
2108     # Force cleanup
2109     self.jdm.NotifyWaiters("0")
2110     self.assertFalse(self.jdm._waiters)
2111     self.assertFalse(self._status)
2112     self.assertFalse(self._queue)
2113
2114   def testMultipleWaiting(self):
2115     # Use a deterministic random generator
2116     rnd = random.Random(21402)
2117
2118     job_ids = map(str, rnd.sample(range(1, 10000), 150))
2119
2120     waiters = dict((job_ids.pop(),
2121                     set(map(_IdOnlyFakeJob,
2122                             [job_ids.pop()
2123                              for _ in range(rnd.randint(1, 20))])))
2124                    for _ in range(10))
2125
2126     # Ensure there are no duplicate job IDs
2127     assert not utils.FindDuplicates(waiters.keys() +
2128                                     [job.id
2129                                      for jobs in waiters.values()
2130                                      for job in jobs])
2131
2132     # Register all jobs as waiters
2133     for job_id, job in [(job_id, job)
2134                         for (job_id, jobs) in waiters.items()
2135                         for job in jobs]:
2136       self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2137       (result, _) = self.jdm.CheckAndRegister(job, job_id,
2138                                               [constants.JOB_STATUS_SUCCESS])
2139       self.assertEqual(result, self.jdm.WAIT)
2140       self.assertFalse(self._status)
2141       self.assertFalse(self._queue)
2142       self.assertTrue(self.jdm.JobWaiting(job))
2143
2144     self.assertEqual(self.jdm._waiters, waiters)
2145
2146     def _MakeSet((name, mode, owner_names, pending)):
2147       return (name, mode, owner_names,
2148               [(pendmode, set(pend)) for (pendmode, pend) in pending])
2149
2150     def _CheckLockInfo():
2151       info = self.jdm.GetLockInfo([query.LQ_PENDING])
2152       self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2153         ("job/%s" % job_id, None, None,
2154          [("job", set([job.id for job in jobs]))])
2155         for job_id, jobs in waiters.items()
2156         if jobs
2157         ]))
2158
2159     _CheckLockInfo()
2160
2161     # Notify in random order
2162     for job_id in rnd.sample(waiters, len(waiters)):
2163       # Remove from pending waiter list
2164       jobs = waiters.pop(job_id)
2165       for job in jobs:
2166         self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2167         (result, _) = self.jdm.CheckAndRegister(job, job_id,
2168                                                 [constants.JOB_STATUS_SUCCESS])
2169         self.assertEqual(result, self.jdm.CONTINUE)
2170         self.assertFalse(self._status)
2171         self.assertFalse(self._queue)
2172         self.assertFalse(self.jdm.JobWaiting(job))
2173
2174       _CheckLockInfo()
2175
2176     self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2177
2178     assert not waiters
2179
2180   def testSelfDependency(self):
2181     job = _IdOnlyFakeJob(18937)
2182
2183     self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2184     (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2185     self.assertEqual(result, self.jdm.ERROR)
2186
2187   def testJobDisappears(self):
2188     job = _IdOnlyFakeJob(30540)
2189     job_id = str(23769)
2190
2191     def _FakeStatus(_):
2192       raise errors.JobLost("#msg#")
2193
2194     jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2195     (result, _) = jdm.CheckAndRegister(job, job_id, [])
2196     self.assertEqual(result, self.jdm.ERROR)
2197     self.assertFalse(jdm.JobWaiting(job))
2198     self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2199
2200
2201 if __name__ == "__main__":
2202   testutils.GanetiTestProgram()