3a2f0ed699cf61453608aa7139c1ae9e6459a26b
[ganeti-local] / test / ganeti.jqueue_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.jqueue"""
23
24 import os
25 import sys
26 import unittest
27 import tempfile
28 import shutil
29 import errno
30 import itertools
31
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import jqueue
36 from ganeti import opcodes
37 from ganeti import compat
38 from ganeti import mcpu
39
40 import testutils
41
42
43 class _FakeJob:
44   def __init__(self, job_id, status):
45     self.id = job_id
46     self._status = status
47     self._log = []
48
49   def SetStatus(self, status):
50     self._status = status
51
52   def AddLogEntry(self, msg):
53     self._log.append((len(self._log), msg))
54
55   def CalcStatus(self):
56     return self._status
57
58   def GetInfo(self, fields):
59     result = []
60
61     for name in fields:
62       if name == "status":
63         result.append(self._status)
64       else:
65         raise Exception("Unknown field")
66
67     return result
68
69   def GetLogEntries(self, newer_than):
70     assert newer_than is None or newer_than >= 0
71
72     if newer_than is None:
73       return self._log
74
75     return self._log[newer_than:]
76
77
78 class TestJobChangesChecker(unittest.TestCase):
79   def testStatus(self):
80     job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81     checker = jqueue._JobChangesChecker(["status"], None, None)
82     self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83
84     job.SetStatus(constants.JOB_STATUS_RUNNING)
85     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86
87     job.SetStatus(constants.JOB_STATUS_SUCCESS)
88     self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89
90     # job.id is used by checker
91     self.assertEqual(job.id, 9094)
92
93   def testStatusWithPrev(self):
94     job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95     checker = jqueue._JobChangesChecker(["status"],
96                                         [constants.JOB_STATUS_QUEUED], None)
97     self.assert_(checker(job) is None)
98
99     job.SetStatus(constants.JOB_STATUS_RUNNING)
100     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101
102   def testFinalStatus(self):
103     for status in constants.JOBS_FINALIZED:
104       job = _FakeJob(2178711, status)
105       checker = jqueue._JobChangesChecker(["status"], [status], None)
106       # There won't be any changes in this status, hence it should signal
107       # a change immediately
108       self.assertEqual(checker(job), ([status], []))
109
110   def testLog(self):
111     job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112     checker = jqueue._JobChangesChecker(["status"], None, None)
113     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114
115     job.AddLogEntry("Hello World")
116     (job_info, log_entries) = checker(job)
117     self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118     self.assertEqual(log_entries, [[0, "Hello World"]])
119
120     checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121     self.assert_(checker2(job) is None)
122
123     job.AddLogEntry("Foo Bar")
124     job.SetStatus(constants.JOB_STATUS_ERROR)
125
126     (job_info, log_entries) = checker2(job)
127     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128     self.assertEqual(log_entries, [[1, "Foo Bar"]])
129
130     checker3 = jqueue._JobChangesChecker(["status"], None, None)
131     (job_info, log_entries) = checker3(job)
132     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133     self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134
135
136 class TestJobChangesWaiter(unittest.TestCase):
137   def setUp(self):
138     self.tmpdir = tempfile.mkdtemp()
139     self.filename = utils.PathJoin(self.tmpdir, "job-1")
140     utils.WriteFile(self.filename, data="")
141
142   def tearDown(self):
143     shutil.rmtree(self.tmpdir)
144
145   def _EnsureNotifierClosed(self, notifier):
146     try:
147       os.fstat(notifier._fd)
148     except EnvironmentError, err:
149       self.assertEqual(err.errno, errno.EBADF)
150     else:
151       self.fail("File descriptor wasn't closed")
152
153   def testClose(self):
154     for wait in [False, True]:
155       waiter = jqueue._JobFileChangesWaiter(self.filename)
156       try:
157         if wait:
158           waiter.Wait(0.001)
159       finally:
160         waiter.Close()
161
162       # Ensure file descriptor was closed
163       self._EnsureNotifierClosed(waiter._notifier)
164
165   def testChangingFile(self):
166     waiter = jqueue._JobFileChangesWaiter(self.filename)
167     try:
168       self.assertFalse(waiter.Wait(0.1))
169       utils.WriteFile(self.filename, data="changed")
170       self.assert_(waiter.Wait(60))
171     finally:
172       waiter.Close()
173
174     self._EnsureNotifierClosed(waiter._notifier)
175
176   def testChangingFile2(self):
177     waiter = jqueue._JobChangesWaiter(self.filename)
178     try:
179       self.assertFalse(waiter._filewaiter)
180       self.assert_(waiter.Wait(0.1))
181       self.assert_(waiter._filewaiter)
182
183       # File waiter is now used, but there have been no changes
184       self.assertFalse(waiter.Wait(0.1))
185       utils.WriteFile(self.filename, data="changed")
186       self.assert_(waiter.Wait(60))
187     finally:
188       waiter.Close()
189
190     self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191
192
193 class TestWaitForJobChangesHelper(unittest.TestCase):
194   def setUp(self):
195     self.tmpdir = tempfile.mkdtemp()
196     self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197     utils.WriteFile(self.filename, data="")
198
199   def tearDown(self):
200     shutil.rmtree(self.tmpdir)
201
202   def _LoadWaitingJob(self):
203     return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204
205   def _LoadLostJob(self):
206     return None
207
208   def testNoChanges(self):
209     wfjc = jqueue._WaitForJobChangesHelper()
210
211     # No change
212     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213                           [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214                      constants.JOB_NOTCHANGED)
215
216     # No previous information
217     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218                           ["status"], None, None, 1.0),
219                      ([constants.JOB_STATUS_WAITLOCK], []))
220
221   def testLostJob(self):
222     wfjc = jqueue._WaitForJobChangesHelper()
223     self.assert_(wfjc(self.filename, self._LoadLostJob,
224                       ["status"], None, None, 1.0) is None)
225
226
227 class TestEncodeOpError(unittest.TestCase):
228   def test(self):
229     encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230     self.assert_(isinstance(encerr, tuple))
231     self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232
233     encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234     self.assert_(isinstance(encerr, tuple))
235     self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236
237     encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238     self.assert_(isinstance(encerr, tuple))
239     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240
241     encerr = jqueue._EncodeOpError("Hello World")
242     self.assert_(isinstance(encerr, tuple))
243     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244
245
246 class TestQueuedOpCode(unittest.TestCase):
247   def testDefaults(self):
248     def _Check(op):
249       self.assertFalse(hasattr(op.input, "dry_run"))
250       self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251       self.assertFalse(op.log)
252       self.assert_(op.start_timestamp is None)
253       self.assert_(op.exec_timestamp is None)
254       self.assert_(op.end_timestamp is None)
255       self.assert_(op.result is None)
256       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257
258     op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259     _Check(op1)
260     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261     _Check(op2)
262     self.assertEqual(op1.Serialize(), op2.Serialize())
263
264   def testPriority(self):
265     def _Check(op):
266       assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267              "Default priority equals high priority; test can't work"
268       self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270
271     inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
272     op1 = jqueue._QueuedOpCode(inpop)
273     _Check(op1)
274     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275     _Check(op2)
276     self.assertEqual(op1.Serialize(), op2.Serialize())
277
278
279 class TestQueuedJob(unittest.TestCase):
280   def test(self):
281     self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282                       None, 1, [])
283
284   def testDefaults(self):
285     job_id = 4260
286     ops = [
287       opcodes.OpGetTags(),
288       opcodes.OpTestDelay(),
289       ]
290
291     def _Check(job):
292       self.assertEqual(job.id, job_id)
293       self.assertEqual(job.log_serial, 0)
294       self.assert_(job.received_timestamp)
295       self.assert_(job.start_timestamp is None)
296       self.assert_(job.end_timestamp is None)
297       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298       self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299       self.assert_(repr(job).startswith("<"))
300       self.assertEqual(len(job.ops), len(ops))
301       self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302                               for (inp, op) in zip(ops, job.ops)))
303       self.assertRaises(errors.OpExecError, job.GetInfo,
304                         ["unknown-field"])
305       self.assertEqual(job.GetInfo(["summary"]),
306                        [[op.input.Summary() for op in job.ops]])
307
308     job1 = jqueue._QueuedJob(None, job_id, ops)
309     _Check(job1)
310     job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311     _Check(job2)
312     self.assertEqual(job1.Serialize(), job2.Serialize())
313
314   def testPriority(self):
315     job_id = 4283
316     ops = [
317       opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
318       opcodes.OpTestDelay(),
319       ]
320
321     def _Check(job):
322       self.assertEqual(job.id, job_id)
323       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324       self.assert_(repr(job).startswith("<"))
325
326     job = jqueue._QueuedJob(None, job_id, ops)
327     _Check(job)
328     self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329                             for op in job.ops))
330     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331
332     # Increase first
333     job.ops[0].priority -= 1
334     _Check(job)
335     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336
337     # Mark opcode as finished
338     job.ops[0].status = constants.OP_STATUS_SUCCESS
339     _Check(job)
340     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341
342     # Increase second
343     job.ops[1].priority -= 10
344     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345
346     # Test increasing first
347     job.ops[0].status = constants.OP_STATUS_RUNNING
348     job.ops[0].priority -= 19
349     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350
351   def testCalcStatus(self):
352     def _Queued(ops):
353       # The default status is "queued"
354       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355                               for op in ops))
356
357     def _Waitlock1(ops):
358       ops[0].status = constants.OP_STATUS_WAITLOCK
359
360     def _Waitlock2(ops):
361       ops[0].status = constants.OP_STATUS_SUCCESS
362       ops[1].status = constants.OP_STATUS_SUCCESS
363       ops[2].status = constants.OP_STATUS_WAITLOCK
364
365     def _Running(ops):
366       ops[0].status = constants.OP_STATUS_SUCCESS
367       ops[1].status = constants.OP_STATUS_RUNNING
368       for op in ops[2:]:
369         op.status = constants.OP_STATUS_QUEUED
370
371     def _Canceling1(ops):
372       ops[0].status = constants.OP_STATUS_SUCCESS
373       ops[1].status = constants.OP_STATUS_SUCCESS
374       for op in ops[2:]:
375         op.status = constants.OP_STATUS_CANCELING
376
377     def _Canceling2(ops):
378       for op in ops:
379         op.status = constants.OP_STATUS_CANCELING
380
381     def _Canceled(ops):
382       for op in ops:
383         op.status = constants.OP_STATUS_CANCELED
384
385     def _Error1(ops):
386       for idx, op in enumerate(ops):
387         if idx > 3:
388           op.status = constants.OP_STATUS_ERROR
389         else:
390           op.status = constants.OP_STATUS_SUCCESS
391
392     def _Error2(ops):
393       for op in ops:
394         op.status = constants.OP_STATUS_ERROR
395
396     def _Success(ops):
397       for op in ops:
398         op.status = constants.OP_STATUS_SUCCESS
399
400     tests = {
401       constants.JOB_STATUS_QUEUED: [_Queued],
402       constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403       constants.JOB_STATUS_RUNNING: [_Running],
404       constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405       constants.JOB_STATUS_CANCELED: [_Canceled],
406       constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407       constants.JOB_STATUS_SUCCESS: [_Success],
408       }
409
410     def _NewJob():
411       job = jqueue._QueuedJob(None, 1,
412                               [opcodes.OpTestDelay() for _ in range(10)])
413       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415                               for op in job.ops))
416       return job
417
418     for status in constants.JOB_STATUS_ALL:
419       sttests = tests[status]
420       assert sttests
421       for fn in sttests:
422         job = _NewJob()
423         fn(job.ops)
424         self.assertEqual(job.CalcStatus(), status)
425
426
427 class _FakeQueueForProc:
428   def __init__(self):
429     self._acquired = False
430     self._updates = []
431
432   def IsAcquired(self):
433     return self._acquired
434
435   def GetNextUpdate(self):
436     return self._updates.pop(0)
437
438   def acquire(self, shared=0):
439     assert shared == 1
440     self._acquired = True
441
442   def release(self):
443     assert self._acquired
444     self._acquired = False
445
446   def UpdateJobUnlocked(self, job, replicate=True):
447     assert self._acquired, "Lock not acquired while updating job"
448     self._updates.append((job, bool(replicate)))
449
450
451 class _FakeExecOpCodeForProc:
452   def __init__(self, queue, before_start, after_start):
453     self._queue = queue
454     self._before_start = before_start
455     self._after_start = after_start
456
457   def __call__(self, op, cbs, timeout=None, priority=None):
458     assert isinstance(op, opcodes.OpTestDummy)
459     assert not self._queue.IsAcquired(), \
460            "Queue lock not released when executing opcode"
461
462     if self._before_start:
463       self._before_start(timeout, priority)
464
465     cbs.NotifyStart()
466
467     if self._after_start:
468       self._after_start(op, cbs)
469
470     # Check again after the callbacks
471     assert not self._queue.IsAcquired()
472
473     if op.fail:
474       raise errors.OpExecError("Error requested (%s)" % op.result)
475
476     return op.result
477
478
479 class _JobProcessorTestUtils:
480   def _CreateJob(self, queue, job_id, ops):
481     job = jqueue._QueuedJob(queue, job_id, ops)
482     self.assertFalse(job.start_timestamp)
483     self.assertFalse(job.end_timestamp)
484     self.assertEqual(len(ops), len(job.ops))
485     self.assert_(compat.all(op.input == inp
486                             for (op, inp) in zip(job.ops, ops)))
487     self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
488     return job
489
490
491 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
492   def _GenericCheckJob(self, job):
493     assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
494                       for op in job.ops)
495
496     self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
497                      [[op.start_timestamp for op in job.ops],
498                       [op.exec_timestamp for op in job.ops],
499                       [op.end_timestamp for op in job.ops]])
500     self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
501                      [job.received_timestamp,
502                       job.start_timestamp,
503                       job.end_timestamp])
504     self.assert_(job.start_timestamp)
505     self.assert_(job.end_timestamp)
506     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
507
508   def testSuccess(self):
509     queue = _FakeQueueForProc()
510
511     for (job_id, opcount) in [(25351, 1), (6637, 3),
512                               (24644, 10), (32207, 100)]:
513       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
514              for i in range(opcount)]
515
516       # Create job
517       job = self._CreateJob(queue, job_id, ops)
518
519       def _BeforeStart(timeout, priority):
520         self.assertEqual(queue.GetNextUpdate(), (job, True))
521         self.assertRaises(IndexError, queue.GetNextUpdate)
522         self.assertFalse(queue.IsAcquired())
523         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
524
525       def _AfterStart(op, cbs):
526         self.assertEqual(queue.GetNextUpdate(), (job, True))
527         self.assertRaises(IndexError, queue.GetNextUpdate)
528
529         self.assertFalse(queue.IsAcquired())
530         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
531
532         # Job is running, cancelling shouldn't be possible
533         (success, _) = job.Cancel()
534         self.assertFalse(success)
535
536       opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
537
538       for idx in range(len(ops)):
539         self.assertRaises(IndexError, queue.GetNextUpdate)
540         result = jqueue._JobProcessor(queue, opexec, job)()
541         self.assertEqual(queue.GetNextUpdate(), (job, True))
542         self.assertRaises(IndexError, queue.GetNextUpdate)
543         if idx == len(ops) - 1:
544           # Last opcode
545           self.assert_(result)
546         else:
547           self.assertFalse(result)
548
549           self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
550           self.assert_(job.start_timestamp)
551           self.assertFalse(job.end_timestamp)
552
553       self.assertRaises(IndexError, queue.GetNextUpdate)
554
555       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
556       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
557       self.assertEqual(job.GetInfo(["opresult"]),
558                        [[op.input.result for op in job.ops]])
559       self.assertEqual(job.GetInfo(["opstatus"]),
560                        [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
561       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
562                               for op in job.ops))
563
564       self._GenericCheckJob(job)
565
566       # Finished jobs can't be processed any further
567       self.assertRaises(errors.ProgrammerError,
568                         jqueue._JobProcessor(queue, opexec, job))
569
570   def testOpcodeError(self):
571     queue = _FakeQueueForProc()
572
573     testdata = [
574       (17077, 1, 0, 0),
575       (1782, 5, 2, 2),
576       (18179, 10, 9, 9),
577       (4744, 10, 3, 8),
578       (23816, 100, 39, 45),
579       ]
580
581     for (job_id, opcount, failfrom, failto) in testdata:
582       # Prepare opcodes
583       ops = [opcodes.OpTestDummy(result="Res%s" % i,
584                                  fail=(failfrom <= i and
585                                        i <= failto))
586              for i in range(opcount)]
587
588       # Create job
589       job = self._CreateJob(queue, job_id, ops)
590
591       opexec = _FakeExecOpCodeForProc(queue, None, None)
592
593       for idx in range(len(ops)):
594         self.assertRaises(IndexError, queue.GetNextUpdate)
595         result = jqueue._JobProcessor(queue, opexec, job)()
596         # queued to waitlock
597         self.assertEqual(queue.GetNextUpdate(), (job, True))
598         # waitlock to running
599         self.assertEqual(queue.GetNextUpdate(), (job, True))
600         # Opcode result
601         self.assertEqual(queue.GetNextUpdate(), (job, True))
602         self.assertRaises(IndexError, queue.GetNextUpdate)
603
604         if idx in (failfrom, len(ops) - 1):
605           # Last opcode
606           self.assert_(result)
607           break
608
609         self.assertFalse(result)
610
611         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
612
613       self.assertRaises(IndexError, queue.GetNextUpdate)
614
615       # Check job status
616       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
617       self.assertEqual(job.GetInfo(["id"]), [job_id])
618       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
619
620       # Check opcode status
621       data = zip(job.ops,
622                  job.GetInfo(["opstatus"])[0],
623                  job.GetInfo(["opresult"])[0])
624
625       for idx, (op, opstatus, opresult) in enumerate(data):
626         if idx < failfrom:
627           assert not op.input.fail
628           self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
629           self.assertEqual(opresult, op.input.result)
630         elif idx <= failto:
631           assert op.input.fail
632           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
633           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
634         else:
635           assert not op.input.fail
636           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
637           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
638
639       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
640                               for op in job.ops[:failfrom]))
641
642       self._GenericCheckJob(job)
643
644       # Finished jobs can't be processed any further
645       self.assertRaises(errors.ProgrammerError,
646                         jqueue._JobProcessor(queue, opexec, job))
647
648   def testCancelWhileInQueue(self):
649     queue = _FakeQueueForProc()
650
651     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
652            for i in range(5)]
653
654     # Create job
655     job_id = 17045
656     job = self._CreateJob(queue, job_id, ops)
657
658     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
659
660     # Mark as cancelled
661     (success, _) = job.Cancel()
662     self.assert_(success)
663
664     self.assertRaises(IndexError, queue.GetNextUpdate)
665
666     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
667                             for op in job.ops))
668
669     opexec = _FakeExecOpCodeForProc(queue, None, None)
670     jqueue._JobProcessor(queue, opexec, job)()
671
672     # Check result
673     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
674     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
675     self.assertFalse(job.start_timestamp)
676     self.assert_(job.end_timestamp)
677     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
678                                 for op in job.ops))
679     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
680                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
681                       ["Job canceled by request" for _ in job.ops]])
682
683   def testCancelWhileWaitlock(self):
684     queue = _FakeQueueForProc()
685
686     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
687            for i in range(5)]
688
689     # Create job
690     job_id = 11009
691     job = self._CreateJob(queue, job_id, ops)
692
693     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
694
695     def _BeforeStart(timeout, priority):
696       self.assertEqual(queue.GetNextUpdate(), (job, True))
697       self.assertRaises(IndexError, queue.GetNextUpdate)
698       self.assertFalse(queue.IsAcquired())
699       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
700
701       # Mark as cancelled
702       (success, _) = job.Cancel()
703       self.assert_(success)
704
705       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
706                               for op in job.ops))
707       self.assertRaises(IndexError, queue.GetNextUpdate)
708
709     def _AfterStart(op, cbs):
710       self.assertEqual(queue.GetNextUpdate(), (job, True))
711       self.assertRaises(IndexError, queue.GetNextUpdate)
712       self.assertFalse(queue.IsAcquired())
713       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
714
715     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
716
717     self.assertRaises(IndexError, queue.GetNextUpdate)
718     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
719     self.assertEqual(queue.GetNextUpdate(), (job, True))
720     self.assertRaises(IndexError, queue.GetNextUpdate)
721
722     # Check result
723     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
724     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
725     self.assert_(job.start_timestamp)
726     self.assert_(job.end_timestamp)
727     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
728                                 for op in job.ops))
729     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
730                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
731                       ["Job canceled by request" for _ in job.ops]])
732
733   def testCancelWhileWaitlockWithTimeout(self):
734     queue = _FakeQueueForProc()
735
736     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
737            for i in range(5)]
738
739     # Create job
740     job_id = 24314
741     job = self._CreateJob(queue, job_id, ops)
742
743     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
744
745     def _BeforeStart(timeout, priority):
746       self.assertFalse(queue.IsAcquired())
747       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
748
749       # Mark as cancelled
750       (success, _) = job.Cancel()
751       self.assert_(success)
752
753       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
754                               for op in job.ops))
755
756       # Fake an acquire attempt timing out
757       raise mcpu.LockAcquireTimeout()
758
759     def _AfterStart(op, cbs):
760       self.fail("Should not reach this")
761
762     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
763
764     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
765
766     # Check result
767     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
768     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
769     self.assert_(job.start_timestamp)
770     self.assert_(job.end_timestamp)
771     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
772                                 for op in job.ops))
773     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
774                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
775                       ["Job canceled by request" for _ in job.ops]])
776
777   def testCancelWhileRunning(self):
778     # Tests canceling a job with finished opcodes and more, unprocessed ones
779     queue = _FakeQueueForProc()
780
781     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
782            for i in range(3)]
783
784     # Create job
785     job_id = 28492
786     job = self._CreateJob(queue, job_id, ops)
787
788     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
789
790     opexec = _FakeExecOpCodeForProc(queue, None, None)
791
792     # Run one opcode
793     self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
794
795     # Job goes back to queued
796     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
797     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
798                      [[constants.OP_STATUS_SUCCESS,
799                        constants.OP_STATUS_QUEUED,
800                        constants.OP_STATUS_QUEUED],
801                       ["Res0", None, None]])
802
803     # Mark as cancelled
804     (success, _) = job.Cancel()
805     self.assert_(success)
806
807     # Try processing another opcode (this will actually cancel the job)
808     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
809
810     # Check result
811     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
812     self.assertEqual(job.GetInfo(["id"]), [job_id])
813     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
814     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
815                      [[constants.OP_STATUS_SUCCESS,
816                        constants.OP_STATUS_CANCELED,
817                        constants.OP_STATUS_CANCELED],
818                       ["Res0", "Job canceled by request",
819                        "Job canceled by request"]])
820
821   def testPartiallyRun(self):
822     # Tests calling the processor on a job that's been partially run before the
823     # program was restarted
824     queue = _FakeQueueForProc()
825
826     opexec = _FakeExecOpCodeForProc(queue, None, None)
827
828     for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
829       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
830              for i in range(10)]
831
832       # Create job
833       job = self._CreateJob(queue, job_id, ops)
834
835       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
836
837       for _ in range(successcount):
838         self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
839
840       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
841       self.assertEqual(job.GetInfo(["opstatus"]),
842                        [[constants.OP_STATUS_SUCCESS
843                          for _ in range(successcount)] +
844                         [constants.OP_STATUS_QUEUED
845                          for _ in range(len(ops) - successcount)]])
846
847       self.assert_(job.ops_iter)
848
849       # Serialize and restore (simulates program restart)
850       newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
851       self.assertFalse(newjob.ops_iter)
852       self._TestPartial(newjob, successcount)
853
854   def _TestPartial(self, job, successcount):
855     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
856     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
857
858     queue = _FakeQueueForProc()
859     opexec = _FakeExecOpCodeForProc(queue, None, None)
860
861     for remaining in reversed(range(len(job.ops) - successcount)):
862       result = jqueue._JobProcessor(queue, opexec, job)()
863
864       if remaining == 0:
865         # Last opcode
866         self.assert_(result)
867         break
868
869       self.assertFalse(result)
870
871       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
872
873     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
874     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
875     self.assertEqual(job.GetInfo(["opresult"]),
876                      [[op.input.result for op in job.ops]])
877     self.assertEqual(job.GetInfo(["opstatus"]),
878                      [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
879     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
880                             for op in job.ops))
881
882     self._GenericCheckJob(job)
883
884     # Finished jobs can't be processed any further
885     self.assertRaises(errors.ProgrammerError,
886                       jqueue._JobProcessor(queue, opexec, job))
887
888     # ... also after being restored
889     job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
890     self.assertRaises(errors.ProgrammerError,
891                       jqueue._JobProcessor(queue, opexec, job2))
892
893   def testProcessorOnRunningJob(self):
894     ops = [opcodes.OpTestDummy(result="result", fail=False)]
895
896     queue = _FakeQueueForProc()
897     opexec = _FakeExecOpCodeForProc(queue, None, None)
898
899     # Create job
900     job = self._CreateJob(queue, 9571, ops)
901
902     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
903
904     job.ops[0].status = constants.OP_STATUS_RUNNING
905
906     assert len(job.ops) == 1
907
908     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
909
910     # Calling on running job must fail
911     self.assertRaises(errors.ProgrammerError,
912                       jqueue._JobProcessor(queue, opexec, job))
913
914   def testLogMessages(self):
915     # Tests the "Feedback" callback function
916     queue = _FakeQueueForProc()
917
918     messages = {
919       1: [
920         (None, "Hello"),
921         (None, "World"),
922         (constants.ELOG_MESSAGE, "there"),
923         ],
924       4: [
925         (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
926         (constants.ELOG_JQUEUE_TEST, ("other", "type")),
927         ],
928       }
929     ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
930                                messages=messages.get(i, []))
931            for i in range(5)]
932
933     # Create job
934     job = self._CreateJob(queue, 29386, ops)
935
936     def _BeforeStart(timeout, priority):
937       self.assertEqual(queue.GetNextUpdate(), (job, True))
938       self.assertRaises(IndexError, queue.GetNextUpdate)
939       self.assertFalse(queue.IsAcquired())
940       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
941
942     def _AfterStart(op, cbs):
943       self.assertEqual(queue.GetNextUpdate(), (job, True))
944       self.assertRaises(IndexError, queue.GetNextUpdate)
945       self.assertFalse(queue.IsAcquired())
946       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
947
948       self.assertRaises(AssertionError, cbs.Feedback,
949                         "too", "many", "arguments")
950
951       for (log_type, msg) in op.messages:
952         self.assertRaises(IndexError, queue.GetNextUpdate)
953         if log_type:
954           cbs.Feedback(log_type, msg)
955         else:
956           cbs.Feedback(msg)
957         # Check for job update without replication
958         self.assertEqual(queue.GetNextUpdate(), (job, False))
959         self.assertRaises(IndexError, queue.GetNextUpdate)
960
961     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
962
963     for remaining in reversed(range(len(job.ops))):
964       self.assertRaises(IndexError, queue.GetNextUpdate)
965       result = jqueue._JobProcessor(queue, opexec, job)()
966       self.assertEqual(queue.GetNextUpdate(), (job, True))
967       self.assertRaises(IndexError, queue.GetNextUpdate)
968
969       if remaining == 0:
970         # Last opcode
971         self.assert_(result)
972         break
973
974       self.assertFalse(result)
975
976       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
977
978     self.assertRaises(IndexError, queue.GetNextUpdate)
979
980     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
981     self.assertEqual(job.GetInfo(["opresult"]),
982                      [[op.input.result for op in job.ops]])
983
984     logmsgcount = sum(len(m) for m in messages.values())
985
986     self._CheckLogMessages(job, logmsgcount)
987
988     # Serialize and restore (simulates program restart)
989     newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
990     self._CheckLogMessages(newjob, logmsgcount)
991
992     # Check each message
993     prevserial = -1
994     for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
995       for (serial, timestamp, log_type, msg) in oplog:
996         (exptype, expmsg) = messages.get(idx).pop(0)
997         if exptype:
998           self.assertEqual(log_type, exptype)
999         else:
1000           self.assertEqual(log_type, constants.ELOG_MESSAGE)
1001         self.assertEqual(expmsg, msg)
1002         self.assert_(serial > prevserial)
1003         prevserial = serial
1004
1005   def _CheckLogMessages(self, job, count):
1006     # Check serial
1007     self.assertEqual(job.log_serial, count)
1008
1009     # No filter
1010     self.assertEqual(job.GetLogEntries(None),
1011                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1012                       for entry in entries])
1013
1014     # Filter with serial
1015     assert count > 3
1016     self.assert_(job.GetLogEntries(3))
1017     self.assertEqual(job.GetLogEntries(3),
1018                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
1019                       for entry in entries][3:])
1020
1021     # No log message after highest serial
1022     self.assertFalse(job.GetLogEntries(count))
1023     self.assertFalse(job.GetLogEntries(count + 3))
1024
1025
1026 class _FakeTimeoutStrategy:
1027   def __init__(self, timeouts):
1028     self.timeouts = timeouts
1029     self.attempts = 0
1030     self.last_timeout = None
1031
1032   def NextAttempt(self):
1033     self.attempts += 1
1034     if self.timeouts:
1035       timeout = self.timeouts.pop(0)
1036     else:
1037       timeout = None
1038     self.last_timeout = timeout
1039     return timeout
1040
1041
1042 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1043   def setUp(self):
1044     self.queue = _FakeQueueForProc()
1045     self.job = None
1046     self.curop = None
1047     self.opcounter = None
1048     self.timeout_strategy = None
1049     self.retries = 0
1050     self.prev_tsop = None
1051     self.prev_prio = None
1052     self.gave_lock = None
1053     self.done_lock_before_blocking = False
1054
1055   def _BeforeStart(self, timeout, priority):
1056     job = self.job
1057
1058     self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1059     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1060     self.assertFalse(self.queue.IsAcquired())
1061     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1062
1063     ts = self.timeout_strategy
1064
1065     self.assert_(timeout is None or isinstance(timeout, (int, float)))
1066     self.assertEqual(timeout, ts.last_timeout)
1067     self.assertEqual(priority, job.ops[self.curop].priority)
1068
1069     self.gave_lock = True
1070
1071     if (self.curop == 3 and
1072         job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1073       # Give locks before running into blocking acquire
1074       assert self.retries == 7
1075       self.retries = 0
1076       self.done_lock_before_blocking = True
1077       return
1078
1079     if self.retries > 0:
1080       self.assert_(timeout is not None)
1081       self.retries -= 1
1082       self.gave_lock = False
1083       raise mcpu.LockAcquireTimeout()
1084
1085     if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1086       assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1087       assert not ts.timeouts
1088       self.assert_(timeout is None)
1089
1090   def _AfterStart(self, op, cbs):
1091     job = self.job
1092
1093     self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1094     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1095     self.assertFalse(self.queue.IsAcquired())
1096     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1097
1098     # Job is running, cancelling shouldn't be possible
1099     (success, _) = job.Cancel()
1100     self.assertFalse(success)
1101
1102   def _NextOpcode(self):
1103     self.curop = self.opcounter.next()
1104     self.prev_prio = self.job.ops[self.curop].priority
1105
1106   def _NewTimeoutStrategy(self):
1107     job = self.job
1108
1109     self.assertEqual(self.retries, 0)
1110
1111     if self.prev_tsop == self.curop:
1112       # Still on the same opcode, priority must've been increased
1113       self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1114
1115     if self.curop == 1:
1116       # Normal retry
1117       timeouts = range(10, 31, 10)
1118       self.retries = len(timeouts) - 1
1119
1120     elif self.curop == 2:
1121       # Let this run into a blocking acquire
1122       timeouts = range(11, 61, 12)
1123       self.retries = len(timeouts)
1124
1125     elif self.curop == 3:
1126       # Wait for priority to increase, but give lock before blocking acquire
1127       timeouts = range(12, 100, 14)
1128       self.retries = len(timeouts)
1129
1130       self.assertFalse(self.done_lock_before_blocking)
1131
1132     elif self.curop == 4:
1133       self.assert_(self.done_lock_before_blocking)
1134
1135       # Timeouts, but no need to retry
1136       timeouts = range(10, 31, 10)
1137       self.retries = 0
1138
1139     elif self.curop == 5:
1140       # Normal retry
1141       timeouts = range(19, 100, 11)
1142       self.retries = len(timeouts)
1143
1144     else:
1145       timeouts = []
1146       self.retries = 0
1147
1148     assert len(job.ops) == 10
1149     assert self.retries <= len(timeouts)
1150
1151     ts = _FakeTimeoutStrategy(timeouts)
1152
1153     self.timeout_strategy = ts
1154     self.prev_tsop = self.curop
1155     self.prev_prio = job.ops[self.curop].priority
1156
1157     return ts
1158
1159   def testTimeout(self):
1160     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1161            for i in range(10)]
1162
1163     # Create job
1164     job_id = 15801
1165     job = self._CreateJob(self.queue, job_id, ops)
1166     self.job = job
1167
1168     self.opcounter = itertools.count(0)
1169
1170     opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1171                                     self._AfterStart)
1172     tsf = self._NewTimeoutStrategy
1173
1174     self.assertFalse(self.done_lock_before_blocking)
1175
1176     for i in itertools.count(0):
1177       proc = jqueue._JobProcessor(self.queue, opexec, job,
1178                                   _timeout_strategy_factory=tsf)
1179
1180       self.assertRaises(IndexError, self.queue.GetNextUpdate)
1181       result = proc(_nextop_fn=self._NextOpcode)
1182       self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1183       self.assertRaises(IndexError, self.queue.GetNextUpdate)
1184       if result:
1185         self.assertFalse(job.cur_opctx)
1186         break
1187
1188       self.assertFalse(result)
1189
1190       if self.gave_lock:
1191         self.assertFalse(job.cur_opctx)
1192       else:
1193         self.assert_(job.cur_opctx)
1194         self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1195                          self.timeout_strategy.NextAttempt)
1196
1197       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1198       self.assert_(job.start_timestamp)
1199       self.assertFalse(job.end_timestamp)
1200
1201     self.assertEqual(self.curop, len(job.ops) - 1)
1202     self.assertEqual(self.job, job)
1203     self.assertEqual(self.opcounter.next(), len(job.ops))
1204     self.assert_(self.done_lock_before_blocking)
1205
1206     self.assertRaises(IndexError, self.queue.GetNextUpdate)
1207     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1208     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1209     self.assertEqual(job.GetInfo(["opresult"]),
1210                      [[op.input.result for op in job.ops]])
1211     self.assertEqual(job.GetInfo(["opstatus"]),
1212                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1213     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1214                             for op in job.ops))
1215
1216     # Finished jobs can't be processed any further
1217     self.assertRaises(errors.ProgrammerError,
1218                       jqueue._JobProcessor(self.queue, opexec, job))
1219
1220
1221 if __name__ == "__main__":
1222   testutils.GanetiTestProgram()