ba56ae5686e80e14ecdc081376bb492ab060d183
[ganeti-local] / test / ganeti.jqueue_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.jqueue"""
23
24 import os
25 import sys
26 import unittest
27 import tempfile
28 import shutil
29 import errno
30 import itertools
31
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import jqueue
36 from ganeti import opcodes
37 from ganeti import compat
38 from ganeti import mcpu
39
40 import testutils
41
42
43 class _FakeJob:
44   def __init__(self, job_id, status):
45     self.id = job_id
46     self._status = status
47     self._log = []
48
49   def SetStatus(self, status):
50     self._status = status
51
52   def AddLogEntry(self, msg):
53     self._log.append((len(self._log), msg))
54
55   def CalcStatus(self):
56     return self._status
57
58   def GetInfo(self, fields):
59     result = []
60
61     for name in fields:
62       if name == "status":
63         result.append(self._status)
64       else:
65         raise Exception("Unknown field")
66
67     return result
68
69   def GetLogEntries(self, newer_than):
70     assert newer_than is None or newer_than >= 0
71
72     if newer_than is None:
73       return self._log
74
75     return self._log[newer_than:]
76
77
78 class TestJobChangesChecker(unittest.TestCase):
79   def testStatus(self):
80     job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81     checker = jqueue._JobChangesChecker(["status"], None, None)
82     self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83
84     job.SetStatus(constants.JOB_STATUS_RUNNING)
85     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86
87     job.SetStatus(constants.JOB_STATUS_SUCCESS)
88     self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89
90     # job.id is used by checker
91     self.assertEqual(job.id, 9094)
92
93   def testStatusWithPrev(self):
94     job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95     checker = jqueue._JobChangesChecker(["status"],
96                                         [constants.JOB_STATUS_QUEUED], None)
97     self.assert_(checker(job) is None)
98
99     job.SetStatus(constants.JOB_STATUS_RUNNING)
100     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101
102   def testFinalStatus(self):
103     for status in constants.JOBS_FINALIZED:
104       job = _FakeJob(2178711, status)
105       checker = jqueue._JobChangesChecker(["status"], [status], None)
106       # There won't be any changes in this status, hence it should signal
107       # a change immediately
108       self.assertEqual(checker(job), ([status], []))
109
110   def testLog(self):
111     job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112     checker = jqueue._JobChangesChecker(["status"], None, None)
113     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114
115     job.AddLogEntry("Hello World")
116     (job_info, log_entries) = checker(job)
117     self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118     self.assertEqual(log_entries, [[0, "Hello World"]])
119
120     checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121     self.assert_(checker2(job) is None)
122
123     job.AddLogEntry("Foo Bar")
124     job.SetStatus(constants.JOB_STATUS_ERROR)
125
126     (job_info, log_entries) = checker2(job)
127     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128     self.assertEqual(log_entries, [[1, "Foo Bar"]])
129
130     checker3 = jqueue._JobChangesChecker(["status"], None, None)
131     (job_info, log_entries) = checker3(job)
132     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133     self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134
135
136 class TestJobChangesWaiter(unittest.TestCase):
137   def setUp(self):
138     self.tmpdir = tempfile.mkdtemp()
139     self.filename = utils.PathJoin(self.tmpdir, "job-1")
140     utils.WriteFile(self.filename, data="")
141
142   def tearDown(self):
143     shutil.rmtree(self.tmpdir)
144
145   def _EnsureNotifierClosed(self, notifier):
146     try:
147       os.fstat(notifier._fd)
148     except EnvironmentError, err:
149       self.assertEqual(err.errno, errno.EBADF)
150     else:
151       self.fail("File descriptor wasn't closed")
152
153   def testClose(self):
154     for wait in [False, True]:
155       waiter = jqueue._JobFileChangesWaiter(self.filename)
156       try:
157         if wait:
158           waiter.Wait(0.001)
159       finally:
160         waiter.Close()
161
162       # Ensure file descriptor was closed
163       self._EnsureNotifierClosed(waiter._notifier)
164
165   def testChangingFile(self):
166     waiter = jqueue._JobFileChangesWaiter(self.filename)
167     try:
168       self.assertFalse(waiter.Wait(0.1))
169       utils.WriteFile(self.filename, data="changed")
170       self.assert_(waiter.Wait(60))
171     finally:
172       waiter.Close()
173
174     self._EnsureNotifierClosed(waiter._notifier)
175
176   def testChangingFile2(self):
177     waiter = jqueue._JobChangesWaiter(self.filename)
178     try:
179       self.assertFalse(waiter._filewaiter)
180       self.assert_(waiter.Wait(0.1))
181       self.assert_(waiter._filewaiter)
182
183       # File waiter is now used, but there have been no changes
184       self.assertFalse(waiter.Wait(0.1))
185       utils.WriteFile(self.filename, data="changed")
186       self.assert_(waiter.Wait(60))
187     finally:
188       waiter.Close()
189
190     self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191
192
193 class TestWaitForJobChangesHelper(unittest.TestCase):
194   def setUp(self):
195     self.tmpdir = tempfile.mkdtemp()
196     self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197     utils.WriteFile(self.filename, data="")
198
199   def tearDown(self):
200     shutil.rmtree(self.tmpdir)
201
202   def _LoadWaitingJob(self):
203     return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204
205   def _LoadLostJob(self):
206     return None
207
208   def testNoChanges(self):
209     wfjc = jqueue._WaitForJobChangesHelper()
210
211     # No change
212     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213                           [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214                      constants.JOB_NOTCHANGED)
215
216     # No previous information
217     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218                           ["status"], None, None, 1.0),
219                      ([constants.JOB_STATUS_WAITLOCK], []))
220
221   def testLostJob(self):
222     wfjc = jqueue._WaitForJobChangesHelper()
223     self.assert_(wfjc(self.filename, self._LoadLostJob,
224                       ["status"], None, None, 1.0) is None)
225
226
227 class TestEncodeOpError(unittest.TestCase):
228   def test(self):
229     encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230     self.assert_(isinstance(encerr, tuple))
231     self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232
233     encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234     self.assert_(isinstance(encerr, tuple))
235     self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236
237     encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238     self.assert_(isinstance(encerr, tuple))
239     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240
241     encerr = jqueue._EncodeOpError("Hello World")
242     self.assert_(isinstance(encerr, tuple))
243     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244
245
246 class TestQueuedOpCode(unittest.TestCase):
247   def testDefaults(self):
248     def _Check(op):
249       self.assertFalse(hasattr(op.input, "dry_run"))
250       self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251       self.assertFalse(op.log)
252       self.assert_(op.start_timestamp is None)
253       self.assert_(op.exec_timestamp is None)
254       self.assert_(op.end_timestamp is None)
255       self.assert_(op.result is None)
256       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257
258     op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259     _Check(op1)
260     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261     _Check(op2)
262     self.assertEqual(op1.Serialize(), op2.Serialize())
263
264   def testPriority(self):
265     def _Check(op):
266       assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267              "Default priority equals high priority; test can't work"
268       self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270
271     inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
272     op1 = jqueue._QueuedOpCode(inpop)
273     _Check(op1)
274     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275     _Check(op2)
276     self.assertEqual(op1.Serialize(), op2.Serialize())
277
278
279 class TestQueuedJob(unittest.TestCase):
280   def test(self):
281     self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282                       None, 1, [])
283
284   def testDefaults(self):
285     job_id = 4260
286     ops = [
287       opcodes.OpGetTags(),
288       opcodes.OpTestDelay(),
289       ]
290
291     def _Check(job):
292       self.assertEqual(job.id, job_id)
293       self.assertEqual(job.log_serial, 0)
294       self.assert_(job.received_timestamp)
295       self.assert_(job.start_timestamp is None)
296       self.assert_(job.end_timestamp is None)
297       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298       self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299       self.assert_(repr(job).startswith("<"))
300       self.assertEqual(len(job.ops), len(ops))
301       self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302                               for (inp, op) in zip(ops, job.ops)))
303       self.assertRaises(errors.OpExecError, job.GetInfo,
304                         ["unknown-field"])
305       self.assertEqual(job.GetInfo(["summary"]),
306                        [[op.input.Summary() for op in job.ops]])
307
308     job1 = jqueue._QueuedJob(None, job_id, ops)
309     _Check(job1)
310     job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311     _Check(job2)
312     self.assertEqual(job1.Serialize(), job2.Serialize())
313
314   def testPriority(self):
315     job_id = 4283
316     ops = [
317       opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
318       opcodes.OpTestDelay(),
319       ]
320
321     def _Check(job):
322       self.assertEqual(job.id, job_id)
323       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324       self.assert_(repr(job).startswith("<"))
325
326     job = jqueue._QueuedJob(None, job_id, ops)
327     _Check(job)
328     self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329                             for op in job.ops))
330     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331
332     # Increase first
333     job.ops[0].priority -= 1
334     _Check(job)
335     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336
337     # Mark opcode as finished
338     job.ops[0].status = constants.OP_STATUS_SUCCESS
339     _Check(job)
340     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341
342     # Increase second
343     job.ops[1].priority -= 10
344     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345
346     # Test increasing first
347     job.ops[0].status = constants.OP_STATUS_RUNNING
348     job.ops[0].priority -= 19
349     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350
351   def testCalcStatus(self):
352     def _Queued(ops):
353       # The default status is "queued"
354       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355                               for op in ops))
356
357     def _Waitlock1(ops):
358       ops[0].status = constants.OP_STATUS_WAITLOCK
359
360     def _Waitlock2(ops):
361       ops[0].status = constants.OP_STATUS_SUCCESS
362       ops[1].status = constants.OP_STATUS_SUCCESS
363       ops[2].status = constants.OP_STATUS_WAITLOCK
364
365     def _Running(ops):
366       ops[0].status = constants.OP_STATUS_SUCCESS
367       ops[1].status = constants.OP_STATUS_RUNNING
368       for op in ops[2:]:
369         op.status = constants.OP_STATUS_QUEUED
370
371     def _Canceling1(ops):
372       ops[0].status = constants.OP_STATUS_SUCCESS
373       ops[1].status = constants.OP_STATUS_SUCCESS
374       for op in ops[2:]:
375         op.status = constants.OP_STATUS_CANCELING
376
377     def _Canceling2(ops):
378       for op in ops:
379         op.status = constants.OP_STATUS_CANCELING
380
381     def _Canceled(ops):
382       for op in ops:
383         op.status = constants.OP_STATUS_CANCELED
384
385     def _Error1(ops):
386       for idx, op in enumerate(ops):
387         if idx > 3:
388           op.status = constants.OP_STATUS_ERROR
389         else:
390           op.status = constants.OP_STATUS_SUCCESS
391
392     def _Error2(ops):
393       for op in ops:
394         op.status = constants.OP_STATUS_ERROR
395
396     def _Success(ops):
397       for op in ops:
398         op.status = constants.OP_STATUS_SUCCESS
399
400     tests = {
401       constants.JOB_STATUS_QUEUED: [_Queued],
402       constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403       constants.JOB_STATUS_RUNNING: [_Running],
404       constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405       constants.JOB_STATUS_CANCELED: [_Canceled],
406       constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407       constants.JOB_STATUS_SUCCESS: [_Success],
408       }
409
410     def _NewJob():
411       job = jqueue._QueuedJob(None, 1,
412                               [opcodes.OpTestDelay() for _ in range(10)])
413       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415                               for op in job.ops))
416       return job
417
418     for status in constants.JOB_STATUS_ALL:
419       sttests = tests[status]
420       assert sttests
421       for fn in sttests:
422         job = _NewJob()
423         fn(job.ops)
424         self.assertEqual(job.CalcStatus(), status)
425
426
427 class _FakeQueueForProc:
428   def __init__(self):
429     self._acquired = False
430
431   def IsAcquired(self):
432     return self._acquired
433
434   def acquire(self, shared=0):
435     assert shared == 1
436     self._acquired = True
437
438   def release(self):
439     assert self._acquired
440     self._acquired = False
441
442   def UpdateJobUnlocked(self, job, replicate=None):
443     # TODO: Ensure job is updated at the correct places
444     pass
445
446
447 class _FakeExecOpCodeForProc:
448   def __init__(self, before_start, after_start):
449     self._before_start = before_start
450     self._after_start = after_start
451
452   def __call__(self, op, cbs, timeout=None):
453     assert isinstance(op, opcodes.OpTestDummy)
454
455     if self._before_start:
456       self._before_start(timeout)
457
458     cbs.NotifyStart()
459
460     if self._after_start:
461       self._after_start(op, cbs)
462
463     if op.fail:
464       raise errors.OpExecError("Error requested (%s)" % op.result)
465
466     return op.result
467
468
469 class _JobProcessorTestUtils:
470   def _CreateJob(self, queue, job_id, ops):
471     job = jqueue._QueuedJob(queue, job_id, ops)
472     self.assertFalse(job.start_timestamp)
473     self.assertFalse(job.end_timestamp)
474     self.assertEqual(len(ops), len(job.ops))
475     self.assert_(compat.all(op.input == inp
476                             for (op, inp) in zip(job.ops, ops)))
477     self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
478     return job
479
480
481 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
482   def _GenericCheckJob(self, job):
483     assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
484                       for op in job.ops)
485
486     self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
487                      [[op.start_timestamp for op in job.ops],
488                       [op.exec_timestamp for op in job.ops],
489                       [op.end_timestamp for op in job.ops]])
490     self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
491                      [job.received_timestamp,
492                       job.start_timestamp,
493                       job.end_timestamp])
494     self.assert_(job.start_timestamp)
495     self.assert_(job.end_timestamp)
496     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
497
498   def testSuccess(self):
499     queue = _FakeQueueForProc()
500
501     for (job_id, opcount) in [(25351, 1), (6637, 3),
502                               (24644, 10), (32207, 100)]:
503       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
504              for i in range(opcount)]
505
506       # Create job
507       job = self._CreateJob(queue, job_id, ops)
508
509       def _BeforeStart(_):
510         self.assertFalse(queue.IsAcquired())
511         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
512
513       def _AfterStart(op, cbs):
514         self.assertFalse(queue.IsAcquired())
515         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
516
517         # Job is running, cancelling shouldn't be possible
518         (success, _) = job.Cancel()
519         self.assertFalse(success)
520
521       opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
522
523       for idx in range(len(ops)):
524         result = jqueue._JobProcessor(queue, opexec, job)()
525         if idx == len(ops) - 1:
526           # Last opcode
527           self.assert_(result)
528         else:
529           self.assertFalse(result)
530
531           self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
532           self.assert_(job.start_timestamp)
533           self.assertFalse(job.end_timestamp)
534
535       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
536       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
537       self.assertEqual(job.GetInfo(["opresult"]),
538                        [[op.input.result for op in job.ops]])
539       self.assertEqual(job.GetInfo(["opstatus"]),
540                        [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
541       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
542                               for op in job.ops))
543
544       self._GenericCheckJob(job)
545
546       # Finished jobs can't be processed any further
547       self.assertRaises(errors.ProgrammerError,
548                         jqueue._JobProcessor(queue, opexec, job))
549
550   def testOpcodeError(self):
551     queue = _FakeQueueForProc()
552
553     testdata = [
554       (17077, 1, 0, 0),
555       (1782, 5, 2, 2),
556       (18179, 10, 9, 9),
557       (4744, 10, 3, 8),
558       (23816, 100, 39, 45),
559       ]
560
561     for (job_id, opcount, failfrom, failto) in testdata:
562       # Prepare opcodes
563       ops = [opcodes.OpTestDummy(result="Res%s" % i,
564                                  fail=(failfrom <= i and
565                                        i <= failto))
566              for i in range(opcount)]
567
568       # Create job
569       job = self._CreateJob(queue, job_id, ops)
570
571       opexec = _FakeExecOpCodeForProc(None, None)
572
573       for idx in range(len(ops)):
574         result = jqueue._JobProcessor(queue, opexec, job)()
575
576         if idx in (failfrom, len(ops) - 1):
577           # Last opcode
578           self.assert_(result)
579           break
580
581         self.assertFalse(result)
582
583         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
584
585       # Check job status
586       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
587       self.assertEqual(job.GetInfo(["id"]), [job_id])
588       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
589
590       # Check opcode status
591       data = zip(job.ops,
592                  job.GetInfo(["opstatus"])[0],
593                  job.GetInfo(["opresult"])[0])
594
595       for idx, (op, opstatus, opresult) in enumerate(data):
596         if idx < failfrom:
597           assert not op.input.fail
598           self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
599           self.assertEqual(opresult, op.input.result)
600         elif idx <= failto:
601           assert op.input.fail
602           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
603           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
604         else:
605           assert not op.input.fail
606           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
607           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
608
609       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
610                               for op in job.ops[:failfrom]))
611
612       self._GenericCheckJob(job)
613
614       # Finished jobs can't be processed any further
615       self.assertRaises(errors.ProgrammerError,
616                         jqueue._JobProcessor(queue, opexec, job))
617
618   def testCancelWhileInQueue(self):
619     queue = _FakeQueueForProc()
620
621     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
622            for i in range(5)]
623
624     # Create job
625     job_id = 17045
626     job = self._CreateJob(queue, job_id, ops)
627
628     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
629
630     # Mark as cancelled
631     (success, _) = job.Cancel()
632     self.assert_(success)
633
634     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
635                             for op in job.ops))
636
637     opexec = _FakeExecOpCodeForProc(None, None)
638     jqueue._JobProcessor(queue, opexec, job)()
639
640     # Check result
641     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
642     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
643     self.assertFalse(job.start_timestamp)
644     self.assert_(job.end_timestamp)
645     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
646                                 for op in job.ops))
647     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
648                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
649                       ["Job canceled by request" for _ in job.ops]])
650
651   def testCancelWhileWaitlock(self):
652     queue = _FakeQueueForProc()
653
654     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
655            for i in range(5)]
656
657     # Create job
658     job_id = 11009
659     job = self._CreateJob(queue, job_id, ops)
660
661     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
662
663     def _BeforeStart(_):
664       self.assertFalse(queue.IsAcquired())
665       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
666
667       # Mark as cancelled
668       (success, _) = job.Cancel()
669       self.assert_(success)
670
671       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
672                               for op in job.ops))
673
674     def _AfterStart(op, cbs):
675       self.assertFalse(queue.IsAcquired())
676       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
677
678     opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
679
680     jqueue._JobProcessor(queue, opexec, job)()
681
682     # Check result
683     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
684     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
685     self.assert_(job.start_timestamp)
686     self.assert_(job.end_timestamp)
687     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
688                                 for op in job.ops))
689     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
690                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
691                       ["Job canceled by request" for _ in job.ops]])
692
693   def testCancelWhileRunning(self):
694     # Tests canceling a job with finished opcodes and more, unprocessed ones
695     queue = _FakeQueueForProc()
696
697     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
698            for i in range(3)]
699
700     # Create job
701     job_id = 28492
702     job = self._CreateJob(queue, job_id, ops)
703
704     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
705
706     opexec = _FakeExecOpCodeForProc(None, None)
707
708     # Run one opcode
709     self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
710
711     # Job goes back to queued
712     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
713     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
714                      [[constants.OP_STATUS_SUCCESS,
715                        constants.OP_STATUS_QUEUED,
716                        constants.OP_STATUS_QUEUED],
717                       ["Res0", None, None]])
718
719     # Mark as cancelled
720     (success, _) = job.Cancel()
721     self.assert_(success)
722
723     # Try processing another opcode (this will actually cancel the job)
724     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
725
726     # Check result
727     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
728     self.assertEqual(job.GetInfo(["id"]), [job_id])
729     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
730     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
731                      [[constants.OP_STATUS_SUCCESS,
732                        constants.OP_STATUS_CANCELED,
733                        constants.OP_STATUS_CANCELED],
734                       ["Res0", "Job canceled by request",
735                        "Job canceled by request"]])
736
737   def testPartiallyRun(self):
738     # Tests calling the processor on a job that's been partially run before the
739     # program was restarted
740     queue = _FakeQueueForProc()
741
742     opexec = _FakeExecOpCodeForProc(None, None)
743
744     for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
745       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
746              for i in range(10)]
747
748       # Create job
749       job = self._CreateJob(queue, job_id, ops)
750
751       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
752
753       for _ in range(successcount):
754         self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
755
756       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
757       self.assertEqual(job.GetInfo(["opstatus"]),
758                        [[constants.OP_STATUS_SUCCESS
759                          for _ in range(successcount)] +
760                         [constants.OP_STATUS_QUEUED
761                          for _ in range(len(ops) - successcount)]])
762
763       self.assert_(job.ops_iter)
764
765       # Serialize and restore (simulates program restart)
766       newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
767       self.assertFalse(newjob.ops_iter)
768       self._TestPartial(newjob, successcount)
769
770   def _TestPartial(self, job, successcount):
771     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
772     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
773
774     queue = _FakeQueueForProc()
775     opexec = _FakeExecOpCodeForProc(None, None)
776
777     for remaining in reversed(range(len(job.ops) - successcount)):
778       result = jqueue._JobProcessor(queue, opexec, job)()
779
780       if remaining == 0:
781         # Last opcode
782         self.assert_(result)
783         break
784
785       self.assertFalse(result)
786
787       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
788
789     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
790     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
791     self.assertEqual(job.GetInfo(["opresult"]),
792                      [[op.input.result for op in job.ops]])
793     self.assertEqual(job.GetInfo(["opstatus"]),
794                      [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
795     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
796                             for op in job.ops))
797
798     self._GenericCheckJob(job)
799
800     # Finished jobs can't be processed any further
801     self.assertRaises(errors.ProgrammerError,
802                       jqueue._JobProcessor(queue, opexec, job))
803
804     # ... also after being restored
805     job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
806     self.assertRaises(errors.ProgrammerError,
807                       jqueue._JobProcessor(queue, opexec, job2))
808
809   def testProcessorOnRunningJob(self):
810     ops = [opcodes.OpTestDummy(result="result", fail=False)]
811
812     queue = _FakeQueueForProc()
813     opexec = _FakeExecOpCodeForProc(None, None)
814
815     # Create job
816     job = self._CreateJob(queue, 9571, ops)
817
818     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
819
820     job.ops[0].status = constants.OP_STATUS_RUNNING
821
822     assert len(job.ops) == 1
823
824     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
825
826     # Calling on running job must fail
827     self.assertRaises(errors.ProgrammerError,
828                       jqueue._JobProcessor(queue, opexec, job))
829
830   def testLogMessages(self):
831     # Tests the "Feedback" callback function
832     queue = _FakeQueueForProc()
833
834     messages = {
835       1: [
836         (None, "Hello"),
837         (None, "World"),
838         (constants.ELOG_MESSAGE, "there"),
839         ],
840       4: [
841         (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
842         (constants.ELOG_JQUEUE_TEST, ("other", "type")),
843         ],
844       }
845     ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
846                                messages=messages.get(i, []))
847            for i in range(5)]
848
849     # Create job
850     job = self._CreateJob(queue, 29386, ops)
851
852     def _BeforeStart(_):
853       self.assertFalse(queue.IsAcquired())
854       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
855
856     def _AfterStart(op, cbs):
857       self.assertFalse(queue.IsAcquired())
858       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
859
860       self.assertRaises(AssertionError, cbs.Feedback,
861                         "too", "many", "arguments")
862
863       for (log_type, msg) in op.messages:
864         if log_type:
865           cbs.Feedback(log_type, msg)
866         else:
867           cbs.Feedback(msg)
868
869     opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
870
871     for remaining in reversed(range(len(job.ops))):
872       result = jqueue._JobProcessor(queue, opexec, job)()
873
874       if remaining == 0:
875         # Last opcode
876         self.assert_(result)
877         break
878
879       self.assertFalse(result)
880
881       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
882
883     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
884     self.assertEqual(job.GetInfo(["opresult"]),
885                      [[op.input.result for op in job.ops]])
886
887     logmsgcount = sum(len(m) for m in messages.values())
888
889     self._CheckLogMessages(job, logmsgcount)
890
891     # Serialize and restore (simulates program restart)
892     newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
893     self._CheckLogMessages(newjob, logmsgcount)
894
895     # Check each message
896     prevserial = -1
897     for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
898       for (serial, timestamp, log_type, msg) in oplog:
899         (exptype, expmsg) = messages.get(idx).pop(0)
900         if exptype:
901           self.assertEqual(log_type, exptype)
902         else:
903           self.assertEqual(log_type, constants.ELOG_MESSAGE)
904         self.assertEqual(expmsg, msg)
905         self.assert_(serial > prevserial)
906         prevserial = serial
907
908   def _CheckLogMessages(self, job, count):
909     # Check serial
910     self.assertEqual(job.log_serial, count)
911
912     # No filter
913     self.assertEqual(job.GetLogEntries(None),
914                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
915                       for entry in entries])
916
917     # Filter with serial
918     assert count > 3
919     self.assert_(job.GetLogEntries(3))
920     self.assertEqual(job.GetLogEntries(3),
921                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
922                       for entry in entries][3:])
923
924     # No log message after highest serial
925     self.assertFalse(job.GetLogEntries(count))
926     self.assertFalse(job.GetLogEntries(count + 3))
927
928
929 class _FakeTimeoutStrategy:
930   def __init__(self, timeouts):
931     self.timeouts = timeouts
932     self.attempts = 0
933     self.last_timeout = None
934
935   def NextAttempt(self):
936     self.attempts += 1
937     if self.timeouts:
938       timeout = self.timeouts.pop(0)
939     else:
940       timeout = None
941     self.last_timeout = timeout
942     return timeout
943
944
945 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
946   def setUp(self):
947     self.queue = _FakeQueueForProc()
948     self.job = None
949     self.curop = None
950     self.opcounter = None
951     self.timeout_strategy = None
952     self.retries = 0
953     self.prev_tsop = None
954     self.prev_prio = None
955     self.gave_lock = None
956     self.done_lock_before_blocking = False
957
958   def _BeforeStart(self, timeout):
959     job = self.job
960
961     self.assertFalse(self.queue.IsAcquired())
962     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
963
964     ts = self.timeout_strategy
965
966     self.assert_(timeout is None or isinstance(timeout, (int, float)))
967     self.assertEqual(timeout, ts.last_timeout)
968
969     self.gave_lock = True
970
971     if (self.curop == 3 and
972         job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
973       # Give locks before running into blocking acquire
974       assert self.retries == 7
975       self.retries = 0
976       self.done_lock_before_blocking = True
977       return
978
979     if self.retries > 0:
980       self.assert_(timeout is not None)
981       self.retries -= 1
982       self.gave_lock = False
983       raise mcpu.LockAcquireTimeout()
984
985     if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
986       assert self.retries == 0, "Didn't exhaust all retries at highest priority"
987       assert not ts.timeouts
988       self.assert_(timeout is None)
989
990   def _AfterStart(self, op, cbs):
991     job = self.job
992
993     self.assertFalse(self.queue.IsAcquired())
994     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
995
996     # Job is running, cancelling shouldn't be possible
997     (success, _) = job.Cancel()
998     self.assertFalse(success)
999
1000   def _NextOpcode(self):
1001     self.curop = self.opcounter.next()
1002     self.prev_prio = self.job.ops[self.curop].priority
1003
1004   def _NewTimeoutStrategy(self):
1005     job = self.job
1006
1007     self.assertEqual(self.retries, 0)
1008
1009     if self.prev_tsop == self.curop:
1010       # Still on the same opcode, priority must've been increased
1011       self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1012
1013     if self.curop == 1:
1014       # Normal retry
1015       timeouts = range(10, 31, 10)
1016       self.retries = len(timeouts) - 1
1017
1018     elif self.curop == 2:
1019       # Let this run into a blocking acquire
1020       timeouts = range(11, 61, 12)
1021       self.retries = len(timeouts)
1022
1023     elif self.curop == 3:
1024       # Wait for priority to increase, but give lock before blocking acquire
1025       timeouts = range(12, 100, 14)
1026       self.retries = len(timeouts)
1027
1028       self.assertFalse(self.done_lock_before_blocking)
1029
1030     elif self.curop == 4:
1031       self.assert_(self.done_lock_before_blocking)
1032
1033       # Timeouts, but no need to retry
1034       timeouts = range(10, 31, 10)
1035       self.retries = 0
1036
1037     elif self.curop == 5:
1038       # Normal retry
1039       timeouts = range(19, 100, 11)
1040       self.retries = len(timeouts)
1041
1042     else:
1043       timeouts = []
1044       self.retries = 0
1045
1046     assert len(job.ops) == 10
1047     assert self.retries <= len(timeouts)
1048
1049     ts = _FakeTimeoutStrategy(timeouts)
1050
1051     self.timeout_strategy = ts
1052     self.prev_tsop = self.curop
1053     self.prev_prio = job.ops[self.curop].priority
1054
1055     return ts
1056
1057   def testTimeout(self):
1058     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1059            for i in range(10)]
1060
1061     # Create job
1062     job_id = 15801
1063     job = self._CreateJob(self.queue, job_id, ops)
1064     self.job = job
1065
1066     self.opcounter = itertools.count(0)
1067
1068     opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart)
1069     tsf = self._NewTimeoutStrategy
1070
1071     self.assertFalse(self.done_lock_before_blocking)
1072
1073     for i in itertools.count(0):
1074       proc = jqueue._JobProcessor(self.queue, opexec, job,
1075                                   _timeout_strategy_factory=tsf)
1076
1077       result = proc(_nextop_fn=self._NextOpcode)
1078       if result:
1079         self.assertFalse(job.cur_opctx)
1080         break
1081
1082       self.assertFalse(result)
1083
1084       if self.gave_lock:
1085         self.assertFalse(job.cur_opctx)
1086       else:
1087         self.assert_(job.cur_opctx)
1088         self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1089                          self.timeout_strategy.NextAttempt)
1090
1091       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1092       self.assert_(job.start_timestamp)
1093       self.assertFalse(job.end_timestamp)
1094
1095     self.assertEqual(self.curop, len(job.ops) - 1)
1096     self.assertEqual(self.job, job)
1097     self.assertEqual(self.opcounter.next(), len(job.ops))
1098     self.assert_(self.done_lock_before_blocking)
1099
1100     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1101     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1102     self.assertEqual(job.GetInfo(["opresult"]),
1103                      [[op.input.result for op in job.ops]])
1104     self.assertEqual(job.GetInfo(["opstatus"]),
1105                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1106     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1107                             for op in job.ops))
1108
1109     # Finished jobs can't be processed any further
1110     self.assertRaises(errors.ProgrammerError,
1111                       jqueue._JobProcessor(self.queue, opexec, job))
1112
1113
1114 if __name__ == "__main__":
1115   testutils.GanetiTestProgram()