--select-instances hbal manpage update
[ganeti-local] / test / ganeti.jqueue_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.jqueue"""
23
24 import os
25 import sys
26 import unittest
27 import tempfile
28 import shutil
29 import errno
30 import itertools
31
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import jqueue
36 from ganeti import opcodes
37 from ganeti import compat
38 from ganeti import mcpu
39
40 import testutils
41
42
43 class _FakeJob:
44   def __init__(self, job_id, status):
45     self.id = job_id
46     self._status = status
47     self._log = []
48
49   def SetStatus(self, status):
50     self._status = status
51
52   def AddLogEntry(self, msg):
53     self._log.append((len(self._log), msg))
54
55   def CalcStatus(self):
56     return self._status
57
58   def GetInfo(self, fields):
59     result = []
60
61     for name in fields:
62       if name == "status":
63         result.append(self._status)
64       else:
65         raise Exception("Unknown field")
66
67     return result
68
69   def GetLogEntries(self, newer_than):
70     assert newer_than is None or newer_than >= 0
71
72     if newer_than is None:
73       return self._log
74
75     return self._log[newer_than:]
76
77
78 class TestJobChangesChecker(unittest.TestCase):
79   def testStatus(self):
80     job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81     checker = jqueue._JobChangesChecker(["status"], None, None)
82     self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83
84     job.SetStatus(constants.JOB_STATUS_RUNNING)
85     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86
87     job.SetStatus(constants.JOB_STATUS_SUCCESS)
88     self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89
90     # job.id is used by checker
91     self.assertEqual(job.id, 9094)
92
93   def testStatusWithPrev(self):
94     job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95     checker = jqueue._JobChangesChecker(["status"],
96                                         [constants.JOB_STATUS_QUEUED], None)
97     self.assert_(checker(job) is None)
98
99     job.SetStatus(constants.JOB_STATUS_RUNNING)
100     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101
102   def testFinalStatus(self):
103     for status in constants.JOBS_FINALIZED:
104       job = _FakeJob(2178711, status)
105       checker = jqueue._JobChangesChecker(["status"], [status], None)
106       # There won't be any changes in this status, hence it should signal
107       # a change immediately
108       self.assertEqual(checker(job), ([status], []))
109
110   def testLog(self):
111     job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112     checker = jqueue._JobChangesChecker(["status"], None, None)
113     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114
115     job.AddLogEntry("Hello World")
116     (job_info, log_entries) = checker(job)
117     self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118     self.assertEqual(log_entries, [[0, "Hello World"]])
119
120     checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121     self.assert_(checker2(job) is None)
122
123     job.AddLogEntry("Foo Bar")
124     job.SetStatus(constants.JOB_STATUS_ERROR)
125
126     (job_info, log_entries) = checker2(job)
127     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128     self.assertEqual(log_entries, [[1, "Foo Bar"]])
129
130     checker3 = jqueue._JobChangesChecker(["status"], None, None)
131     (job_info, log_entries) = checker3(job)
132     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133     self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134
135
136 class TestJobChangesWaiter(unittest.TestCase):
137   def setUp(self):
138     self.tmpdir = tempfile.mkdtemp()
139     self.filename = utils.PathJoin(self.tmpdir, "job-1")
140     utils.WriteFile(self.filename, data="")
141
142   def tearDown(self):
143     shutil.rmtree(self.tmpdir)
144
145   def _EnsureNotifierClosed(self, notifier):
146     try:
147       os.fstat(notifier._fd)
148     except EnvironmentError, err:
149       self.assertEqual(err.errno, errno.EBADF)
150     else:
151       self.fail("File descriptor wasn't closed")
152
153   def testClose(self):
154     for wait in [False, True]:
155       waiter = jqueue._JobFileChangesWaiter(self.filename)
156       try:
157         if wait:
158           waiter.Wait(0.001)
159       finally:
160         waiter.Close()
161
162       # Ensure file descriptor was closed
163       self._EnsureNotifierClosed(waiter._notifier)
164
165   def testChangingFile(self):
166     waiter = jqueue._JobFileChangesWaiter(self.filename)
167     try:
168       self.assertFalse(waiter.Wait(0.1))
169       utils.WriteFile(self.filename, data="changed")
170       self.assert_(waiter.Wait(60))
171     finally:
172       waiter.Close()
173
174     self._EnsureNotifierClosed(waiter._notifier)
175
176   def testChangingFile2(self):
177     waiter = jqueue._JobChangesWaiter(self.filename)
178     try:
179       self.assertFalse(waiter._filewaiter)
180       self.assert_(waiter.Wait(0.1))
181       self.assert_(waiter._filewaiter)
182
183       # File waiter is now used, but there have been no changes
184       self.assertFalse(waiter.Wait(0.1))
185       utils.WriteFile(self.filename, data="changed")
186       self.assert_(waiter.Wait(60))
187     finally:
188       waiter.Close()
189
190     self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191
192
193 class TestWaitForJobChangesHelper(unittest.TestCase):
194   def setUp(self):
195     self.tmpdir = tempfile.mkdtemp()
196     self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197     utils.WriteFile(self.filename, data="")
198
199   def tearDown(self):
200     shutil.rmtree(self.tmpdir)
201
202   def _LoadWaitingJob(self):
203     return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204
205   def _LoadLostJob(self):
206     return None
207
208   def testNoChanges(self):
209     wfjc = jqueue._WaitForJobChangesHelper()
210
211     # No change
212     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213                           [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214                      constants.JOB_NOTCHANGED)
215
216     # No previous information
217     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218                           ["status"], None, None, 1.0),
219                      ([constants.JOB_STATUS_WAITLOCK], []))
220
221   def testLostJob(self):
222     wfjc = jqueue._WaitForJobChangesHelper()
223     self.assert_(wfjc(self.filename, self._LoadLostJob,
224                       ["status"], None, None, 1.0) is None)
225
226
227 class TestEncodeOpError(unittest.TestCase):
228   def test(self):
229     encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230     self.assert_(isinstance(encerr, tuple))
231     self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232
233     encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234     self.assert_(isinstance(encerr, tuple))
235     self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236
237     encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238     self.assert_(isinstance(encerr, tuple))
239     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240
241     encerr = jqueue._EncodeOpError("Hello World")
242     self.assert_(isinstance(encerr, tuple))
243     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244
245
246 class TestQueuedOpCode(unittest.TestCase):
247   def testDefaults(self):
248     def _Check(op):
249       self.assertFalse(hasattr(op.input, "dry_run"))
250       self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251       self.assertFalse(op.log)
252       self.assert_(op.start_timestamp is None)
253       self.assert_(op.exec_timestamp is None)
254       self.assert_(op.end_timestamp is None)
255       self.assert_(op.result is None)
256       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257
258     op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259     _Check(op1)
260     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261     _Check(op2)
262     self.assertEqual(op1.Serialize(), op2.Serialize())
263
264   def testPriority(self):
265     def _Check(op):
266       assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267              "Default priority equals high priority; test can't work"
268       self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270
271     inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
272     op1 = jqueue._QueuedOpCode(inpop)
273     _Check(op1)
274     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275     _Check(op2)
276     self.assertEqual(op1.Serialize(), op2.Serialize())
277
278
279 class TestQueuedJob(unittest.TestCase):
280   def test(self):
281     self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282                       None, 1, [])
283
284   def testDefaults(self):
285     job_id = 4260
286     ops = [
287       opcodes.OpTagsGet(),
288       opcodes.OpTestDelay(),
289       ]
290
291     def _Check(job):
292       self.assertEqual(job.id, job_id)
293       self.assertEqual(job.log_serial, 0)
294       self.assert_(job.received_timestamp)
295       self.assert_(job.start_timestamp is None)
296       self.assert_(job.end_timestamp is None)
297       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298       self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299       self.assert_(repr(job).startswith("<"))
300       self.assertEqual(len(job.ops), len(ops))
301       self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302                               for (inp, op) in zip(ops, job.ops)))
303       self.assertRaises(errors.OpExecError, job.GetInfo,
304                         ["unknown-field"])
305       self.assertEqual(job.GetInfo(["summary"]),
306                        [[op.input.Summary() for op in job.ops]])
307
308     job1 = jqueue._QueuedJob(None, job_id, ops)
309     _Check(job1)
310     job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311     _Check(job2)
312     self.assertEqual(job1.Serialize(), job2.Serialize())
313
314   def testPriority(self):
315     job_id = 4283
316     ops = [
317       opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
318       opcodes.OpTestDelay(),
319       ]
320
321     def _Check(job):
322       self.assertEqual(job.id, job_id)
323       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324       self.assert_(repr(job).startswith("<"))
325
326     job = jqueue._QueuedJob(None, job_id, ops)
327     _Check(job)
328     self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329                             for op in job.ops))
330     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331
332     # Increase first
333     job.ops[0].priority -= 1
334     _Check(job)
335     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336
337     # Mark opcode as finished
338     job.ops[0].status = constants.OP_STATUS_SUCCESS
339     _Check(job)
340     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341
342     # Increase second
343     job.ops[1].priority -= 10
344     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345
346     # Test increasing first
347     job.ops[0].status = constants.OP_STATUS_RUNNING
348     job.ops[0].priority -= 19
349     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350
351   def testCalcStatus(self):
352     def _Queued(ops):
353       # The default status is "queued"
354       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355                               for op in ops))
356
357     def _Waitlock1(ops):
358       ops[0].status = constants.OP_STATUS_WAITLOCK
359
360     def _Waitlock2(ops):
361       ops[0].status = constants.OP_STATUS_SUCCESS
362       ops[1].status = constants.OP_STATUS_SUCCESS
363       ops[2].status = constants.OP_STATUS_WAITLOCK
364
365     def _Running(ops):
366       ops[0].status = constants.OP_STATUS_SUCCESS
367       ops[1].status = constants.OP_STATUS_RUNNING
368       for op in ops[2:]:
369         op.status = constants.OP_STATUS_QUEUED
370
371     def _Canceling1(ops):
372       ops[0].status = constants.OP_STATUS_SUCCESS
373       ops[1].status = constants.OP_STATUS_SUCCESS
374       for op in ops[2:]:
375         op.status = constants.OP_STATUS_CANCELING
376
377     def _Canceling2(ops):
378       for op in ops:
379         op.status = constants.OP_STATUS_CANCELING
380
381     def _Canceled(ops):
382       for op in ops:
383         op.status = constants.OP_STATUS_CANCELED
384
385     def _Error1(ops):
386       for idx, op in enumerate(ops):
387         if idx > 3:
388           op.status = constants.OP_STATUS_ERROR
389         else:
390           op.status = constants.OP_STATUS_SUCCESS
391
392     def _Error2(ops):
393       for op in ops:
394         op.status = constants.OP_STATUS_ERROR
395
396     def _Success(ops):
397       for op in ops:
398         op.status = constants.OP_STATUS_SUCCESS
399
400     tests = {
401       constants.JOB_STATUS_QUEUED: [_Queued],
402       constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403       constants.JOB_STATUS_RUNNING: [_Running],
404       constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405       constants.JOB_STATUS_CANCELED: [_Canceled],
406       constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407       constants.JOB_STATUS_SUCCESS: [_Success],
408       }
409
410     def _NewJob():
411       job = jqueue._QueuedJob(None, 1,
412                               [opcodes.OpTestDelay() for _ in range(10)])
413       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415                               for op in job.ops))
416       return job
417
418     for status in constants.JOB_STATUS_ALL:
419       sttests = tests[status]
420       assert sttests
421       for fn in sttests:
422         job = _NewJob()
423         fn(job.ops)
424         self.assertEqual(job.CalcStatus(), status)
425
426
427 class _FakeQueueForProc:
428   def __init__(self):
429     self._acquired = False
430     self._updates = []
431     self._submitted = []
432
433     self._submit_count = itertools.count(1000)
434
435   def IsAcquired(self):
436     return self._acquired
437
438   def GetNextUpdate(self):
439     return self._updates.pop(0)
440
441   def GetNextSubmittedJob(self):
442     return self._submitted.pop(0)
443
444   def acquire(self, shared=0):
445     assert shared == 1
446     self._acquired = True
447
448   def release(self):
449     assert self._acquired
450     self._acquired = False
451
452   def UpdateJobUnlocked(self, job, replicate=True):
453     assert self._acquired, "Lock not acquired while updating job"
454     self._updates.append((job, bool(replicate)))
455
456   def SubmitManyJobs(self, jobs):
457     assert not self._acquired, "Lock acquired while submitting jobs"
458     job_ids = [self._submit_count.next() for _ in jobs]
459     self._submitted.extend(zip(job_ids, jobs))
460     return job_ids
461
462
463 class _FakeExecOpCodeForProc:
464   def __init__(self, queue, before_start, after_start):
465     self._queue = queue
466     self._before_start = before_start
467     self._after_start = after_start
468
469   def __call__(self, op, cbs, timeout=None, priority=None):
470     assert isinstance(op, opcodes.OpTestDummy)
471     assert not self._queue.IsAcquired(), \
472            "Queue lock not released when executing opcode"
473
474     if self._before_start:
475       self._before_start(timeout, priority)
476
477     cbs.NotifyStart()
478
479     if self._after_start:
480       self._after_start(op, cbs)
481
482     # Check again after the callbacks
483     assert not self._queue.IsAcquired()
484
485     if op.fail:
486       raise errors.OpExecError("Error requested (%s)" % op.result)
487
488     if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
489       return cbs.SubmitManyJobs(op.submit_jobs)
490
491     return op.result
492
493
494 class _JobProcessorTestUtils:
495   def _CreateJob(self, queue, job_id, ops):
496     job = jqueue._QueuedJob(queue, job_id, ops)
497     self.assertFalse(job.start_timestamp)
498     self.assertFalse(job.end_timestamp)
499     self.assertEqual(len(ops), len(job.ops))
500     self.assert_(compat.all(op.input == inp
501                             for (op, inp) in zip(job.ops, ops)))
502     self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
503     return job
504
505
506 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
507   def _GenericCheckJob(self, job):
508     assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
509                       for op in job.ops)
510
511     self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
512                      [[op.start_timestamp for op in job.ops],
513                       [op.exec_timestamp for op in job.ops],
514                       [op.end_timestamp for op in job.ops]])
515     self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
516                      [job.received_timestamp,
517                       job.start_timestamp,
518                       job.end_timestamp])
519     self.assert_(job.start_timestamp)
520     self.assert_(job.end_timestamp)
521     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
522
523   def testSuccess(self):
524     queue = _FakeQueueForProc()
525
526     for (job_id, opcount) in [(25351, 1), (6637, 3),
527                               (24644, 10), (32207, 100)]:
528       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
529              for i in range(opcount)]
530
531       # Create job
532       job = self._CreateJob(queue, job_id, ops)
533
534       def _BeforeStart(timeout, priority):
535         self.assertEqual(queue.GetNextUpdate(), (job, True))
536         self.assertRaises(IndexError, queue.GetNextUpdate)
537         self.assertFalse(queue.IsAcquired())
538         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
539         self.assertFalse(job.cur_opctx)
540
541       def _AfterStart(op, cbs):
542         self.assertEqual(queue.GetNextUpdate(), (job, True))
543         self.assertRaises(IndexError, queue.GetNextUpdate)
544
545         self.assertFalse(queue.IsAcquired())
546         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
547         self.assertFalse(job.cur_opctx)
548
549         # Job is running, cancelling shouldn't be possible
550         (success, _) = job.Cancel()
551         self.assertFalse(success)
552
553       opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
554
555       for idx in range(len(ops)):
556         self.assertRaises(IndexError, queue.GetNextUpdate)
557         result = jqueue._JobProcessor(queue, opexec, job)()
558         self.assertEqual(queue.GetNextUpdate(), (job, True))
559         self.assertRaises(IndexError, queue.GetNextUpdate)
560         if idx == len(ops) - 1:
561           # Last opcode
562           self.assert_(result)
563         else:
564           self.assertFalse(result)
565
566           self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
567           self.assert_(job.start_timestamp)
568           self.assertFalse(job.end_timestamp)
569
570       self.assertRaises(IndexError, queue.GetNextUpdate)
571
572       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
573       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
574       self.assertEqual(job.GetInfo(["opresult"]),
575                        [[op.input.result for op in job.ops]])
576       self.assertEqual(job.GetInfo(["opstatus"]),
577                        [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
578       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
579                               for op in job.ops))
580
581       self._GenericCheckJob(job)
582
583       # Calling the processor on a finished job should be a no-op
584       self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
585       self.assertRaises(IndexError, queue.GetNextUpdate)
586
587   def testOpcodeError(self):
588     queue = _FakeQueueForProc()
589
590     testdata = [
591       (17077, 1, 0, 0),
592       (1782, 5, 2, 2),
593       (18179, 10, 9, 9),
594       (4744, 10, 3, 8),
595       (23816, 100, 39, 45),
596       ]
597
598     for (job_id, opcount, failfrom, failto) in testdata:
599       # Prepare opcodes
600       ops = [opcodes.OpTestDummy(result="Res%s" % i,
601                                  fail=(failfrom <= i and
602                                        i <= failto))
603              for i in range(opcount)]
604
605       # Create job
606       job = self._CreateJob(queue, job_id, ops)
607
608       opexec = _FakeExecOpCodeForProc(queue, None, None)
609
610       for idx in range(len(ops)):
611         self.assertRaises(IndexError, queue.GetNextUpdate)
612         result = jqueue._JobProcessor(queue, opexec, job)()
613         # queued to waitlock
614         self.assertEqual(queue.GetNextUpdate(), (job, True))
615         # waitlock to running
616         self.assertEqual(queue.GetNextUpdate(), (job, True))
617         # Opcode result
618         self.assertEqual(queue.GetNextUpdate(), (job, True))
619         self.assertRaises(IndexError, queue.GetNextUpdate)
620
621         if idx in (failfrom, len(ops) - 1):
622           # Last opcode
623           self.assert_(result)
624           break
625
626         self.assertFalse(result)
627
628         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
629
630       self.assertRaises(IndexError, queue.GetNextUpdate)
631
632       # Check job status
633       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
634       self.assertEqual(job.GetInfo(["id"]), [job_id])
635       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
636
637       # Check opcode status
638       data = zip(job.ops,
639                  job.GetInfo(["opstatus"])[0],
640                  job.GetInfo(["opresult"])[0])
641
642       for idx, (op, opstatus, opresult) in enumerate(data):
643         if idx < failfrom:
644           assert not op.input.fail
645           self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
646           self.assertEqual(opresult, op.input.result)
647         elif idx <= failto:
648           assert op.input.fail
649           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
650           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
651         else:
652           assert not op.input.fail
653           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
654           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
655
656       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
657                               for op in job.ops[:failfrom]))
658
659       self._GenericCheckJob(job)
660
661       # Calling the processor on a finished job should be a no-op
662       self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
663       self.assertRaises(IndexError, queue.GetNextUpdate)
664
665   def testCancelWhileInQueue(self):
666     queue = _FakeQueueForProc()
667
668     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
669            for i in range(5)]
670
671     # Create job
672     job_id = 17045
673     job = self._CreateJob(queue, job_id, ops)
674
675     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
676
677     # Mark as cancelled
678     (success, _) = job.Cancel()
679     self.assert_(success)
680
681     self.assertRaises(IndexError, queue.GetNextUpdate)
682
683     self.assertFalse(job.start_timestamp)
684     self.assertTrue(job.end_timestamp)
685     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
686                             for op in job.ops))
687
688     # Serialize to check for differences
689     before_proc = job.Serialize()
690
691     # Simulate processor called in workerpool
692     opexec = _FakeExecOpCodeForProc(queue, None, None)
693     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
694
695     # Check result
696     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
697     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
698     self.assertFalse(job.start_timestamp)
699     self.assertTrue(job.end_timestamp)
700     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
701                                 for op in job.ops))
702     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
703                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
704                       ["Job canceled by request" for _ in job.ops]])
705
706     # Must not have changed or written
707     self.assertEqual(before_proc, job.Serialize())
708     self.assertRaises(IndexError, queue.GetNextUpdate)
709
710   def testCancelWhileWaitlockInQueue(self):
711     queue = _FakeQueueForProc()
712
713     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
714            for i in range(5)]
715
716     # Create job
717     job_id = 8645
718     job = self._CreateJob(queue, job_id, ops)
719
720     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
721
722     job.ops[0].status = constants.OP_STATUS_WAITLOCK
723
724     assert len(job.ops) == 5
725
726     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
727
728     # Mark as cancelling
729     (success, _) = job.Cancel()
730     self.assert_(success)
731
732     self.assertRaises(IndexError, queue.GetNextUpdate)
733
734     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
735                             for op in job.ops))
736
737     opexec = _FakeExecOpCodeForProc(queue, None, None)
738     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
739
740     # Check result
741     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
742     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
743     self.assertFalse(job.start_timestamp)
744     self.assert_(job.end_timestamp)
745     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
746                                 for op in job.ops))
747     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
748                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
749                       ["Job canceled by request" for _ in job.ops]])
750
751   def testCancelWhileWaitlock(self):
752     queue = _FakeQueueForProc()
753
754     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
755            for i in range(5)]
756
757     # Create job
758     job_id = 11009
759     job = self._CreateJob(queue, job_id, ops)
760
761     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
762
763     def _BeforeStart(timeout, priority):
764       self.assertEqual(queue.GetNextUpdate(), (job, True))
765       self.assertRaises(IndexError, queue.GetNextUpdate)
766       self.assertFalse(queue.IsAcquired())
767       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
768
769       # Mark as cancelled
770       (success, _) = job.Cancel()
771       self.assert_(success)
772
773       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
774                               for op in job.ops))
775       self.assertRaises(IndexError, queue.GetNextUpdate)
776
777     def _AfterStart(op, cbs):
778       self.assertEqual(queue.GetNextUpdate(), (job, True))
779       self.assertRaises(IndexError, queue.GetNextUpdate)
780       self.assertFalse(queue.IsAcquired())
781       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
782
783     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
784
785     self.assertRaises(IndexError, queue.GetNextUpdate)
786     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
787     self.assertEqual(queue.GetNextUpdate(), (job, True))
788     self.assertRaises(IndexError, queue.GetNextUpdate)
789
790     # Check result
791     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
792     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
793     self.assert_(job.start_timestamp)
794     self.assert_(job.end_timestamp)
795     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
796                                 for op in job.ops))
797     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
798                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
799                       ["Job canceled by request" for _ in job.ops]])
800
801   def testCancelWhileWaitlockWithTimeout(self):
802     queue = _FakeQueueForProc()
803
804     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
805            for i in range(5)]
806
807     # Create job
808     job_id = 24314
809     job = self._CreateJob(queue, job_id, ops)
810
811     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
812
813     def _BeforeStart(timeout, priority):
814       self.assertFalse(queue.IsAcquired())
815       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
816
817       # Mark as cancelled
818       (success, _) = job.Cancel()
819       self.assert_(success)
820
821       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
822                               for op in job.ops))
823
824       # Fake an acquire attempt timing out
825       raise mcpu.LockAcquireTimeout()
826
827     def _AfterStart(op, cbs):
828       self.fail("Should not reach this")
829
830     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
831
832     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
833
834     # Check result
835     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
836     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
837     self.assert_(job.start_timestamp)
838     self.assert_(job.end_timestamp)
839     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
840                                 for op in job.ops))
841     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
842                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
843                       ["Job canceled by request" for _ in job.ops]])
844
845   def testCancelWhileRunning(self):
846     # Tests canceling a job with finished opcodes and more, unprocessed ones
847     queue = _FakeQueueForProc()
848
849     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
850            for i in range(3)]
851
852     # Create job
853     job_id = 28492
854     job = self._CreateJob(queue, job_id, ops)
855
856     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
857
858     opexec = _FakeExecOpCodeForProc(queue, None, None)
859
860     # Run one opcode
861     self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
862
863     # Job goes back to queued
864     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
865     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
866                      [[constants.OP_STATUS_SUCCESS,
867                        constants.OP_STATUS_QUEUED,
868                        constants.OP_STATUS_QUEUED],
869                       ["Res0", None, None]])
870
871     # Mark as cancelled
872     (success, _) = job.Cancel()
873     self.assert_(success)
874
875     # Try processing another opcode (this will actually cancel the job)
876     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
877
878     # Check result
879     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
880     self.assertEqual(job.GetInfo(["id"]), [job_id])
881     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
882     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
883                      [[constants.OP_STATUS_SUCCESS,
884                        constants.OP_STATUS_CANCELED,
885                        constants.OP_STATUS_CANCELED],
886                       ["Res0", "Job canceled by request",
887                        "Job canceled by request"]])
888
889   def testPartiallyRun(self):
890     # Tests calling the processor on a job that's been partially run before the
891     # program was restarted
892     queue = _FakeQueueForProc()
893
894     opexec = _FakeExecOpCodeForProc(queue, None, None)
895
896     for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
897       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
898              for i in range(10)]
899
900       # Create job
901       job = self._CreateJob(queue, job_id, ops)
902
903       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
904
905       for _ in range(successcount):
906         self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
907
908       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
909       self.assertEqual(job.GetInfo(["opstatus"]),
910                        [[constants.OP_STATUS_SUCCESS
911                          for _ in range(successcount)] +
912                         [constants.OP_STATUS_QUEUED
913                          for _ in range(len(ops) - successcount)]])
914
915       self.assert_(job.ops_iter)
916
917       # Serialize and restore (simulates program restart)
918       newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
919       self.assertFalse(newjob.ops_iter)
920       self._TestPartial(newjob, successcount)
921
922   def _TestPartial(self, job, successcount):
923     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
924     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
925
926     queue = _FakeQueueForProc()
927     opexec = _FakeExecOpCodeForProc(queue, None, None)
928
929     for remaining in reversed(range(len(job.ops) - successcount)):
930       result = jqueue._JobProcessor(queue, opexec, job)()
931       self.assertEqual(queue.GetNextUpdate(), (job, True))
932       self.assertEqual(queue.GetNextUpdate(), (job, True))
933       self.assertEqual(queue.GetNextUpdate(), (job, True))
934       self.assertRaises(IndexError, queue.GetNextUpdate)
935
936       if remaining == 0:
937         # Last opcode
938         self.assert_(result)
939         break
940
941       self.assertFalse(result)
942
943       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
944
945     self.assertRaises(IndexError, queue.GetNextUpdate)
946     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
947     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
948     self.assertEqual(job.GetInfo(["opresult"]),
949                      [[op.input.result for op in job.ops]])
950     self.assertEqual(job.GetInfo(["opstatus"]),
951                      [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
952     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
953                             for op in job.ops))
954
955     self._GenericCheckJob(job)
956
957     # Calling the processor on a finished job should be a no-op
958     self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
959     self.assertRaises(IndexError, queue.GetNextUpdate)
960
961     # ... also after being restored
962     job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
963     self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
964     self.assertRaises(IndexError, queue.GetNextUpdate)
965
966   def testProcessorOnRunningJob(self):
967     ops = [opcodes.OpTestDummy(result="result", fail=False)]
968
969     queue = _FakeQueueForProc()
970     opexec = _FakeExecOpCodeForProc(queue, None, None)
971
972     # Create job
973     job = self._CreateJob(queue, 9571, ops)
974
975     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
976
977     job.ops[0].status = constants.OP_STATUS_RUNNING
978
979     assert len(job.ops) == 1
980
981     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
982
983     # Calling on running job must fail
984     self.assertRaises(errors.ProgrammerError,
985                       jqueue._JobProcessor(queue, opexec, job))
986
987   def testLogMessages(self):
988     # Tests the "Feedback" callback function
989     queue = _FakeQueueForProc()
990
991     messages = {
992       1: [
993         (None, "Hello"),
994         (None, "World"),
995         (constants.ELOG_MESSAGE, "there"),
996         ],
997       4: [
998         (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
999         (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1000         ],
1001       }
1002     ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1003                                messages=messages.get(i, []))
1004            for i in range(5)]
1005
1006     # Create job
1007     job = self._CreateJob(queue, 29386, ops)
1008
1009     def _BeforeStart(timeout, priority):
1010       self.assertEqual(queue.GetNextUpdate(), (job, True))
1011       self.assertRaises(IndexError, queue.GetNextUpdate)
1012       self.assertFalse(queue.IsAcquired())
1013       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1014
1015     def _AfterStart(op, cbs):
1016       self.assertEqual(queue.GetNextUpdate(), (job, True))
1017       self.assertRaises(IndexError, queue.GetNextUpdate)
1018       self.assertFalse(queue.IsAcquired())
1019       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1020
1021       self.assertRaises(AssertionError, cbs.Feedback,
1022                         "too", "many", "arguments")
1023
1024       for (log_type, msg) in op.messages:
1025         self.assertRaises(IndexError, queue.GetNextUpdate)
1026         if log_type:
1027           cbs.Feedback(log_type, msg)
1028         else:
1029           cbs.Feedback(msg)
1030         # Check for job update without replication
1031         self.assertEqual(queue.GetNextUpdate(), (job, False))
1032         self.assertRaises(IndexError, queue.GetNextUpdate)
1033
1034     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1035
1036     for remaining in reversed(range(len(job.ops))):
1037       self.assertRaises(IndexError, queue.GetNextUpdate)
1038       result = jqueue._JobProcessor(queue, opexec, job)()
1039       self.assertEqual(queue.GetNextUpdate(), (job, True))
1040       self.assertRaises(IndexError, queue.GetNextUpdate)
1041
1042       if remaining == 0:
1043         # Last opcode
1044         self.assert_(result)
1045         break
1046
1047       self.assertFalse(result)
1048
1049       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1050
1051     self.assertRaises(IndexError, queue.GetNextUpdate)
1052
1053     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1054     self.assertEqual(job.GetInfo(["opresult"]),
1055                      [[op.input.result for op in job.ops]])
1056
1057     logmsgcount = sum(len(m) for m in messages.values())
1058
1059     self._CheckLogMessages(job, logmsgcount)
1060
1061     # Serialize and restore (simulates program restart)
1062     newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
1063     self._CheckLogMessages(newjob, logmsgcount)
1064
1065     # Check each message
1066     prevserial = -1
1067     for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1068       for (serial, timestamp, log_type, msg) in oplog:
1069         (exptype, expmsg) = messages.get(idx).pop(0)
1070         if exptype:
1071           self.assertEqual(log_type, exptype)
1072         else:
1073           self.assertEqual(log_type, constants.ELOG_MESSAGE)
1074         self.assertEqual(expmsg, msg)
1075         self.assert_(serial > prevserial)
1076         prevserial = serial
1077
1078   def _CheckLogMessages(self, job, count):
1079     # Check serial
1080     self.assertEqual(job.log_serial, count)
1081
1082     # No filter
1083     self.assertEqual(job.GetLogEntries(None),
1084                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1085                       for entry in entries])
1086
1087     # Filter with serial
1088     assert count > 3
1089     self.assert_(job.GetLogEntries(3))
1090     self.assertEqual(job.GetLogEntries(3),
1091                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1092                       for entry in entries][3:])
1093
1094     # No log message after highest serial
1095     self.assertFalse(job.GetLogEntries(count))
1096     self.assertFalse(job.GetLogEntries(count + 3))
1097
1098   def testSubmitManyJobs(self):
1099     queue = _FakeQueueForProc()
1100
1101     job_id = 15656
1102     ops = [
1103       opcodes.OpTestDummy(result="Res0", fail=False,
1104                           submit_jobs=[]),
1105       opcodes.OpTestDummy(result="Res1", fail=False,
1106                           submit_jobs=[
1107                             [opcodes.OpTestDummy(result="r1j0", fail=False)],
1108                             ]),
1109       opcodes.OpTestDummy(result="Res2", fail=False,
1110                           submit_jobs=[
1111                             [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1112                              opcodes.OpTestDummy(result="r2j0o1", fail=False),
1113                              opcodes.OpTestDummy(result="r2j0o2", fail=False),
1114                              opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1115                             [opcodes.OpTestDummy(result="r2j1", fail=False)],
1116                             [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1117                              opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1118                             ]),
1119       ]
1120
1121     # Create job
1122     job = self._CreateJob(queue, job_id, ops)
1123
1124     def _BeforeStart(timeout, priority):
1125       self.assertEqual(queue.GetNextUpdate(), (job, True))
1126       self.assertRaises(IndexError, queue.GetNextUpdate)
1127       self.assertFalse(queue.IsAcquired())
1128       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1129       self.assertFalse(job.cur_opctx)
1130
1131     def _AfterStart(op, cbs):
1132       self.assertEqual(queue.GetNextUpdate(), (job, True))
1133       self.assertRaises(IndexError, queue.GetNextUpdate)
1134
1135       self.assertFalse(queue.IsAcquired())
1136       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1137       self.assertFalse(job.cur_opctx)
1138
1139       # Job is running, cancelling shouldn't be possible
1140       (success, _) = job.Cancel()
1141       self.assertFalse(success)
1142
1143     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1144
1145     for idx in range(len(ops)):
1146       self.assertRaises(IndexError, queue.GetNextUpdate)
1147       result = jqueue._JobProcessor(queue, opexec, job)()
1148       self.assertEqual(queue.GetNextUpdate(), (job, True))
1149       self.assertRaises(IndexError, queue.GetNextUpdate)
1150       if idx == len(ops) - 1:
1151         # Last opcode
1152         self.assert_(result)
1153       else:
1154         self.assertFalse(result)
1155
1156         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1157         self.assert_(job.start_timestamp)
1158         self.assertFalse(job.end_timestamp)
1159
1160     self.assertRaises(IndexError, queue.GetNextUpdate)
1161
1162     for idx, submitted_ops in enumerate(job_ops
1163                                         for op in ops
1164                                         for job_ops in op.submit_jobs):
1165       self.assertEqual(queue.GetNextSubmittedJob(),
1166                        (1000 + idx, submitted_ops))
1167     self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1168
1169     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1170     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1171     self.assertEqual(job.GetInfo(["opresult"]),
1172                      [[[], [1000], [1001, 1002, 1003]]])
1173     self.assertEqual(job.GetInfo(["opstatus"]),
1174                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1175
1176     self._GenericCheckJob(job)
1177
1178     # Calling the processor on a finished job should be a no-op
1179     self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1180     self.assertRaises(IndexError, queue.GetNextUpdate)
1181
1182
1183 class _FakeTimeoutStrategy:
1184   def __init__(self, timeouts):
1185     self.timeouts = timeouts
1186     self.attempts = 0
1187     self.last_timeout = None
1188
1189   def NextAttempt(self):
1190     self.attempts += 1
1191     if self.timeouts:
1192       timeout = self.timeouts.pop(0)
1193     else:
1194       timeout = None
1195     self.last_timeout = timeout
1196     return timeout
1197
1198
1199 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1200   def setUp(self):
1201     self.queue = _FakeQueueForProc()
1202     self.job = None
1203     self.curop = None
1204     self.opcounter = None
1205     self.timeout_strategy = None
1206     self.retries = 0
1207     self.prev_tsop = None
1208     self.prev_prio = None
1209     self.prev_status = None
1210     self.lock_acq_prio = None
1211     self.gave_lock = None
1212     self.done_lock_before_blocking = False
1213
1214   def _BeforeStart(self, timeout, priority):
1215     job = self.job
1216
1217     # If status has changed, job must've been written
1218     if self.prev_status != self.job.ops[self.curop].status:
1219       self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1220     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1221
1222     self.assertFalse(self.queue.IsAcquired())
1223     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1224
1225     ts = self.timeout_strategy
1226
1227     self.assert_(timeout is None or isinstance(timeout, (int, float)))
1228     self.assertEqual(timeout, ts.last_timeout)
1229     self.assertEqual(priority, job.ops[self.curop].priority)
1230
1231     self.gave_lock = True
1232     self.lock_acq_prio = priority
1233
1234     if (self.curop == 3 and
1235         job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1236       # Give locks before running into blocking acquire
1237       assert self.retries == 7
1238       self.retries = 0
1239       self.done_lock_before_blocking = True
1240       return
1241
1242     if self.retries > 0:
1243       self.assert_(timeout is not None)
1244       self.retries -= 1
1245       self.gave_lock = False
1246       raise mcpu.LockAcquireTimeout()
1247
1248     if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1249       assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1250       assert not ts.timeouts
1251       self.assert_(timeout is None)
1252
1253   def _AfterStart(self, op, cbs):
1254     job = self.job
1255
1256     # Setting to "running" requires an update
1257     self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1258     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1259
1260     self.assertFalse(self.queue.IsAcquired())
1261     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1262
1263     # Job is running, cancelling shouldn't be possible
1264     (success, _) = job.Cancel()
1265     self.assertFalse(success)
1266
1267   def _NextOpcode(self):
1268     self.curop = self.opcounter.next()
1269     self.prev_prio = self.job.ops[self.curop].priority
1270     self.prev_status = self.job.ops[self.curop].status
1271
1272   def _NewTimeoutStrategy(self):
1273     job = self.job
1274
1275     self.assertEqual(self.retries, 0)
1276
1277     if self.prev_tsop == self.curop:
1278       # Still on the same opcode, priority must've been increased
1279       self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1280
1281     if self.curop == 1:
1282       # Normal retry
1283       timeouts = range(10, 31, 10)
1284       self.retries = len(timeouts) - 1
1285
1286     elif self.curop == 2:
1287       # Let this run into a blocking acquire
1288       timeouts = range(11, 61, 12)
1289       self.retries = len(timeouts)
1290
1291     elif self.curop == 3:
1292       # Wait for priority to increase, but give lock before blocking acquire
1293       timeouts = range(12, 100, 14)
1294       self.retries = len(timeouts)
1295
1296       self.assertFalse(self.done_lock_before_blocking)
1297
1298     elif self.curop == 4:
1299       self.assert_(self.done_lock_before_blocking)
1300
1301       # Timeouts, but no need to retry
1302       timeouts = range(10, 31, 10)
1303       self.retries = 0
1304
1305     elif self.curop == 5:
1306       # Normal retry
1307       timeouts = range(19, 100, 11)
1308       self.retries = len(timeouts)
1309
1310     else:
1311       timeouts = []
1312       self.retries = 0
1313
1314     assert len(job.ops) == 10
1315     assert self.retries <= len(timeouts)
1316
1317     ts = _FakeTimeoutStrategy(timeouts)
1318
1319     self.timeout_strategy = ts
1320     self.prev_tsop = self.curop
1321     self.prev_prio = job.ops[self.curop].priority
1322
1323     return ts
1324
1325   def testTimeout(self):
1326     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1327            for i in range(10)]
1328
1329     # Create job
1330     job_id = 15801
1331     job = self._CreateJob(self.queue, job_id, ops)
1332     self.job = job
1333
1334     self.opcounter = itertools.count(0)
1335
1336     opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1337                                     self._AfterStart)
1338     tsf = self._NewTimeoutStrategy
1339
1340     self.assertFalse(self.done_lock_before_blocking)
1341
1342     while True:
1343       proc = jqueue._JobProcessor(self.queue, opexec, job,
1344                                   _timeout_strategy_factory=tsf)
1345
1346       self.assertRaises(IndexError, self.queue.GetNextUpdate)
1347
1348       if self.curop is not None:
1349         self.prev_status = self.job.ops[self.curop].status
1350
1351       self.lock_acq_prio = None
1352
1353       result = proc(_nextop_fn=self._NextOpcode)
1354       assert self.curop is not None
1355
1356       if result or self.gave_lock:
1357         # Got lock and/or job is done, result must've been written
1358         self.assertFalse(job.cur_opctx)
1359         self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1360         self.assertRaises(IndexError, self.queue.GetNextUpdate)
1361         self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1362         self.assert_(job.ops[self.curop].exec_timestamp)
1363
1364       if result:
1365         self.assertFalse(job.cur_opctx)
1366         break
1367
1368       self.assertFalse(result)
1369
1370       if self.curop == 0:
1371         self.assertEqual(job.ops[self.curop].start_timestamp,
1372                          job.start_timestamp)
1373
1374       if self.gave_lock:
1375         # Opcode finished, but job not yet done
1376         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1377       else:
1378         # Did not get locks
1379         self.assert_(job.cur_opctx)
1380         self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1381                          self.timeout_strategy.NextAttempt)
1382         self.assertFalse(job.ops[self.curop].exec_timestamp)
1383         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1384
1385         # If priority has changed since acquiring locks, the job must've been
1386         # updated
1387         if self.lock_acq_prio != job.ops[self.curop].priority:
1388           self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1389
1390       self.assertRaises(IndexError, self.queue.GetNextUpdate)
1391
1392       self.assert_(job.start_timestamp)
1393       self.assertFalse(job.end_timestamp)
1394
1395     self.assertEqual(self.curop, len(job.ops) - 1)
1396     self.assertEqual(self.job, job)
1397     self.assertEqual(self.opcounter.next(), len(job.ops))
1398     self.assert_(self.done_lock_before_blocking)
1399
1400     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1401     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1402     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1403     self.assertEqual(job.GetInfo(["opresult"]),
1404                      [[op.input.result for op in job.ops]])
1405     self.assertEqual(job.GetInfo(["opstatus"]),
1406                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1407     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1408                             for op in job.ops))
1409
1410     # Calling the processor on a finished job should be a no-op
1411     self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
1412     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1413
1414
1415 if __name__ == "__main__":
1416   testutils.GanetiTestProgram()