jqueue: Use priority for acquiring locks
[ganeti-local] / test / ganeti.jqueue_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.jqueue"""
23
24 import os
25 import sys
26 import unittest
27 import tempfile
28 import shutil
29 import errno
30 import itertools
31
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import jqueue
36 from ganeti import opcodes
37 from ganeti import compat
38 from ganeti import mcpu
39
40 import testutils
41
42
43 class _FakeJob:
44   def __init__(self, job_id, status):
45     self.id = job_id
46     self._status = status
47     self._log = []
48
49   def SetStatus(self, status):
50     self._status = status
51
52   def AddLogEntry(self, msg):
53     self._log.append((len(self._log), msg))
54
55   def CalcStatus(self):
56     return self._status
57
58   def GetInfo(self, fields):
59     result = []
60
61     for name in fields:
62       if name == "status":
63         result.append(self._status)
64       else:
65         raise Exception("Unknown field")
66
67     return result
68
69   def GetLogEntries(self, newer_than):
70     assert newer_than is None or newer_than >= 0
71
72     if newer_than is None:
73       return self._log
74
75     return self._log[newer_than:]
76
77
78 class TestJobChangesChecker(unittest.TestCase):
79   def testStatus(self):
80     job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81     checker = jqueue._JobChangesChecker(["status"], None, None)
82     self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83
84     job.SetStatus(constants.JOB_STATUS_RUNNING)
85     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86
87     job.SetStatus(constants.JOB_STATUS_SUCCESS)
88     self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89
90     # job.id is used by checker
91     self.assertEqual(job.id, 9094)
92
93   def testStatusWithPrev(self):
94     job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95     checker = jqueue._JobChangesChecker(["status"],
96                                         [constants.JOB_STATUS_QUEUED], None)
97     self.assert_(checker(job) is None)
98
99     job.SetStatus(constants.JOB_STATUS_RUNNING)
100     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101
102   def testFinalStatus(self):
103     for status in constants.JOBS_FINALIZED:
104       job = _FakeJob(2178711, status)
105       checker = jqueue._JobChangesChecker(["status"], [status], None)
106       # There won't be any changes in this status, hence it should signal
107       # a change immediately
108       self.assertEqual(checker(job), ([status], []))
109
110   def testLog(self):
111     job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112     checker = jqueue._JobChangesChecker(["status"], None, None)
113     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114
115     job.AddLogEntry("Hello World")
116     (job_info, log_entries) = checker(job)
117     self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118     self.assertEqual(log_entries, [[0, "Hello World"]])
119
120     checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121     self.assert_(checker2(job) is None)
122
123     job.AddLogEntry("Foo Bar")
124     job.SetStatus(constants.JOB_STATUS_ERROR)
125
126     (job_info, log_entries) = checker2(job)
127     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128     self.assertEqual(log_entries, [[1, "Foo Bar"]])
129
130     checker3 = jqueue._JobChangesChecker(["status"], None, None)
131     (job_info, log_entries) = checker3(job)
132     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133     self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134
135
136 class TestJobChangesWaiter(unittest.TestCase):
137   def setUp(self):
138     self.tmpdir = tempfile.mkdtemp()
139     self.filename = utils.PathJoin(self.tmpdir, "job-1")
140     utils.WriteFile(self.filename, data="")
141
142   def tearDown(self):
143     shutil.rmtree(self.tmpdir)
144
145   def _EnsureNotifierClosed(self, notifier):
146     try:
147       os.fstat(notifier._fd)
148     except EnvironmentError, err:
149       self.assertEqual(err.errno, errno.EBADF)
150     else:
151       self.fail("File descriptor wasn't closed")
152
153   def testClose(self):
154     for wait in [False, True]:
155       waiter = jqueue._JobFileChangesWaiter(self.filename)
156       try:
157         if wait:
158           waiter.Wait(0.001)
159       finally:
160         waiter.Close()
161
162       # Ensure file descriptor was closed
163       self._EnsureNotifierClosed(waiter._notifier)
164
165   def testChangingFile(self):
166     waiter = jqueue._JobFileChangesWaiter(self.filename)
167     try:
168       self.assertFalse(waiter.Wait(0.1))
169       utils.WriteFile(self.filename, data="changed")
170       self.assert_(waiter.Wait(60))
171     finally:
172       waiter.Close()
173
174     self._EnsureNotifierClosed(waiter._notifier)
175
176   def testChangingFile2(self):
177     waiter = jqueue._JobChangesWaiter(self.filename)
178     try:
179       self.assertFalse(waiter._filewaiter)
180       self.assert_(waiter.Wait(0.1))
181       self.assert_(waiter._filewaiter)
182
183       # File waiter is now used, but there have been no changes
184       self.assertFalse(waiter.Wait(0.1))
185       utils.WriteFile(self.filename, data="changed")
186       self.assert_(waiter.Wait(60))
187     finally:
188       waiter.Close()
189
190     self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191
192
193 class TestWaitForJobChangesHelper(unittest.TestCase):
194   def setUp(self):
195     self.tmpdir = tempfile.mkdtemp()
196     self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197     utils.WriteFile(self.filename, data="")
198
199   def tearDown(self):
200     shutil.rmtree(self.tmpdir)
201
202   def _LoadWaitingJob(self):
203     return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204
205   def _LoadLostJob(self):
206     return None
207
208   def testNoChanges(self):
209     wfjc = jqueue._WaitForJobChangesHelper()
210
211     # No change
212     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213                           [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214                      constants.JOB_NOTCHANGED)
215
216     # No previous information
217     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218                           ["status"], None, None, 1.0),
219                      ([constants.JOB_STATUS_WAITLOCK], []))
220
221   def testLostJob(self):
222     wfjc = jqueue._WaitForJobChangesHelper()
223     self.assert_(wfjc(self.filename, self._LoadLostJob,
224                       ["status"], None, None, 1.0) is None)
225
226
227 class TestEncodeOpError(unittest.TestCase):
228   def test(self):
229     encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230     self.assert_(isinstance(encerr, tuple))
231     self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232
233     encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234     self.assert_(isinstance(encerr, tuple))
235     self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236
237     encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238     self.assert_(isinstance(encerr, tuple))
239     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240
241     encerr = jqueue._EncodeOpError("Hello World")
242     self.assert_(isinstance(encerr, tuple))
243     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244
245
246 class TestQueuedOpCode(unittest.TestCase):
247   def testDefaults(self):
248     def _Check(op):
249       self.assertFalse(hasattr(op.input, "dry_run"))
250       self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251       self.assertFalse(op.log)
252       self.assert_(op.start_timestamp is None)
253       self.assert_(op.exec_timestamp is None)
254       self.assert_(op.end_timestamp is None)
255       self.assert_(op.result is None)
256       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257
258     op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259     _Check(op1)
260     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261     _Check(op2)
262     self.assertEqual(op1.Serialize(), op2.Serialize())
263
264   def testPriority(self):
265     def _Check(op):
266       assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267              "Default priority equals high priority; test can't work"
268       self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270
271     inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
272     op1 = jqueue._QueuedOpCode(inpop)
273     _Check(op1)
274     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275     _Check(op2)
276     self.assertEqual(op1.Serialize(), op2.Serialize())
277
278
279 class TestQueuedJob(unittest.TestCase):
280   def test(self):
281     self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282                       None, 1, [])
283
284   def testDefaults(self):
285     job_id = 4260
286     ops = [
287       opcodes.OpGetTags(),
288       opcodes.OpTestDelay(),
289       ]
290
291     def _Check(job):
292       self.assertEqual(job.id, job_id)
293       self.assertEqual(job.log_serial, 0)
294       self.assert_(job.received_timestamp)
295       self.assert_(job.start_timestamp is None)
296       self.assert_(job.end_timestamp is None)
297       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298       self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299       self.assert_(repr(job).startswith("<"))
300       self.assertEqual(len(job.ops), len(ops))
301       self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302                               for (inp, op) in zip(ops, job.ops)))
303       self.assertRaises(errors.OpExecError, job.GetInfo,
304                         ["unknown-field"])
305       self.assertEqual(job.GetInfo(["summary"]),
306                        [[op.input.Summary() for op in job.ops]])
307
308     job1 = jqueue._QueuedJob(None, job_id, ops)
309     _Check(job1)
310     job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311     _Check(job2)
312     self.assertEqual(job1.Serialize(), job2.Serialize())
313
314   def testPriority(self):
315     job_id = 4283
316     ops = [
317       opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
318       opcodes.OpTestDelay(),
319       ]
320
321     def _Check(job):
322       self.assertEqual(job.id, job_id)
323       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324       self.assert_(repr(job).startswith("<"))
325
326     job = jqueue._QueuedJob(None, job_id, ops)
327     _Check(job)
328     self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329                             for op in job.ops))
330     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331
332     # Increase first
333     job.ops[0].priority -= 1
334     _Check(job)
335     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336
337     # Mark opcode as finished
338     job.ops[0].status = constants.OP_STATUS_SUCCESS
339     _Check(job)
340     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341
342     # Increase second
343     job.ops[1].priority -= 10
344     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345
346     # Test increasing first
347     job.ops[0].status = constants.OP_STATUS_RUNNING
348     job.ops[0].priority -= 19
349     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350
351   def testCalcStatus(self):
352     def _Queued(ops):
353       # The default status is "queued"
354       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355                               for op in ops))
356
357     def _Waitlock1(ops):
358       ops[0].status = constants.OP_STATUS_WAITLOCK
359
360     def _Waitlock2(ops):
361       ops[0].status = constants.OP_STATUS_SUCCESS
362       ops[1].status = constants.OP_STATUS_SUCCESS
363       ops[2].status = constants.OP_STATUS_WAITLOCK
364
365     def _Running(ops):
366       ops[0].status = constants.OP_STATUS_SUCCESS
367       ops[1].status = constants.OP_STATUS_RUNNING
368       for op in ops[2:]:
369         op.status = constants.OP_STATUS_QUEUED
370
371     def _Canceling1(ops):
372       ops[0].status = constants.OP_STATUS_SUCCESS
373       ops[1].status = constants.OP_STATUS_SUCCESS
374       for op in ops[2:]:
375         op.status = constants.OP_STATUS_CANCELING
376
377     def _Canceling2(ops):
378       for op in ops:
379         op.status = constants.OP_STATUS_CANCELING
380
381     def _Canceled(ops):
382       for op in ops:
383         op.status = constants.OP_STATUS_CANCELED
384
385     def _Error1(ops):
386       for idx, op in enumerate(ops):
387         if idx > 3:
388           op.status = constants.OP_STATUS_ERROR
389         else:
390           op.status = constants.OP_STATUS_SUCCESS
391
392     def _Error2(ops):
393       for op in ops:
394         op.status = constants.OP_STATUS_ERROR
395
396     def _Success(ops):
397       for op in ops:
398         op.status = constants.OP_STATUS_SUCCESS
399
400     tests = {
401       constants.JOB_STATUS_QUEUED: [_Queued],
402       constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403       constants.JOB_STATUS_RUNNING: [_Running],
404       constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405       constants.JOB_STATUS_CANCELED: [_Canceled],
406       constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407       constants.JOB_STATUS_SUCCESS: [_Success],
408       }
409
410     def _NewJob():
411       job = jqueue._QueuedJob(None, 1,
412                               [opcodes.OpTestDelay() for _ in range(10)])
413       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414       self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415                               for op in job.ops))
416       return job
417
418     for status in constants.JOB_STATUS_ALL:
419       sttests = tests[status]
420       assert sttests
421       for fn in sttests:
422         job = _NewJob()
423         fn(job.ops)
424         self.assertEqual(job.CalcStatus(), status)
425
426
427 class _FakeQueueForProc:
428   def __init__(self):
429     self._acquired = False
430
431   def IsAcquired(self):
432     return self._acquired
433
434   def acquire(self, shared=0):
435     assert shared == 1
436     self._acquired = True
437
438   def release(self):
439     assert self._acquired
440     self._acquired = False
441
442   def UpdateJobUnlocked(self, job, replicate=None):
443     # TODO: Ensure job is updated at the correct places
444     pass
445
446
447 class _FakeExecOpCodeForProc:
448   def __init__(self, before_start, after_start):
449     self._before_start = before_start
450     self._after_start = after_start
451
452   def __call__(self, op, cbs, timeout=None, priority=None):
453     assert isinstance(op, opcodes.OpTestDummy)
454
455     if self._before_start:
456       self._before_start(timeout, priority)
457
458     cbs.NotifyStart()
459
460     if self._after_start:
461       self._after_start(op, cbs)
462
463     if op.fail:
464       raise errors.OpExecError("Error requested (%s)" % op.result)
465
466     return op.result
467
468
469 class _JobProcessorTestUtils:
470   def _CreateJob(self, queue, job_id, ops):
471     job = jqueue._QueuedJob(queue, job_id, ops)
472     self.assertFalse(job.start_timestamp)
473     self.assertFalse(job.end_timestamp)
474     self.assertEqual(len(ops), len(job.ops))
475     self.assert_(compat.all(op.input == inp
476                             for (op, inp) in zip(job.ops, ops)))
477     self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
478     return job
479
480
481 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
482   def _GenericCheckJob(self, job):
483     assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
484                       for op in job.ops)
485
486     self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
487                      [[op.start_timestamp for op in job.ops],
488                       [op.exec_timestamp for op in job.ops],
489                       [op.end_timestamp for op in job.ops]])
490     self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
491                      [job.received_timestamp,
492                       job.start_timestamp,
493                       job.end_timestamp])
494     self.assert_(job.start_timestamp)
495     self.assert_(job.end_timestamp)
496     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
497
498   def testSuccess(self):
499     queue = _FakeQueueForProc()
500
501     for (job_id, opcount) in [(25351, 1), (6637, 3),
502                               (24644, 10), (32207, 100)]:
503       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
504              for i in range(opcount)]
505
506       # Create job
507       job = self._CreateJob(queue, job_id, ops)
508
509       def _BeforeStart(timeout, priority):
510         self.assertFalse(queue.IsAcquired())
511         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
512
513       def _AfterStart(op, cbs):
514         self.assertFalse(queue.IsAcquired())
515         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
516
517         # Job is running, cancelling shouldn't be possible
518         (success, _) = job.Cancel()
519         self.assertFalse(success)
520
521       opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
522
523       for idx in range(len(ops)):
524         result = jqueue._JobProcessor(queue, opexec, job)()
525         if idx == len(ops) - 1:
526           # Last opcode
527           self.assert_(result)
528         else:
529           self.assertFalse(result)
530
531           self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
532           self.assert_(job.start_timestamp)
533           self.assertFalse(job.end_timestamp)
534
535       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
536       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
537       self.assertEqual(job.GetInfo(["opresult"]),
538                        [[op.input.result for op in job.ops]])
539       self.assertEqual(job.GetInfo(["opstatus"]),
540                        [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
541       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
542                               for op in job.ops))
543
544       self._GenericCheckJob(job)
545
546       # Finished jobs can't be processed any further
547       self.assertRaises(errors.ProgrammerError,
548                         jqueue._JobProcessor(queue, opexec, job))
549
550   def testOpcodeError(self):
551     queue = _FakeQueueForProc()
552
553     testdata = [
554       (17077, 1, 0, 0),
555       (1782, 5, 2, 2),
556       (18179, 10, 9, 9),
557       (4744, 10, 3, 8),
558       (23816, 100, 39, 45),
559       ]
560
561     for (job_id, opcount, failfrom, failto) in testdata:
562       # Prepare opcodes
563       ops = [opcodes.OpTestDummy(result="Res%s" % i,
564                                  fail=(failfrom <= i and
565                                        i <= failto))
566              for i in range(opcount)]
567
568       # Create job
569       job = self._CreateJob(queue, job_id, ops)
570
571       opexec = _FakeExecOpCodeForProc(None, None)
572
573       for idx in range(len(ops)):
574         result = jqueue._JobProcessor(queue, opexec, job)()
575
576         if idx in (failfrom, len(ops) - 1):
577           # Last opcode
578           self.assert_(result)
579           break
580
581         self.assertFalse(result)
582
583         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
584
585       # Check job status
586       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
587       self.assertEqual(job.GetInfo(["id"]), [job_id])
588       self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
589
590       # Check opcode status
591       data = zip(job.ops,
592                  job.GetInfo(["opstatus"])[0],
593                  job.GetInfo(["opresult"])[0])
594
595       for idx, (op, opstatus, opresult) in enumerate(data):
596         if idx < failfrom:
597           assert not op.input.fail
598           self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
599           self.assertEqual(opresult, op.input.result)
600         elif idx <= failto:
601           assert op.input.fail
602           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
603           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
604         else:
605           assert not op.input.fail
606           self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
607           self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
608
609       self.assert_(compat.all(op.start_timestamp and op.end_timestamp
610                               for op in job.ops[:failfrom]))
611
612       self._GenericCheckJob(job)
613
614       # Finished jobs can't be processed any further
615       self.assertRaises(errors.ProgrammerError,
616                         jqueue._JobProcessor(queue, opexec, job))
617
618   def testCancelWhileInQueue(self):
619     queue = _FakeQueueForProc()
620
621     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
622            for i in range(5)]
623
624     # Create job
625     job_id = 17045
626     job = self._CreateJob(queue, job_id, ops)
627
628     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
629
630     # Mark as cancelled
631     (success, _) = job.Cancel()
632     self.assert_(success)
633
634     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
635                             for op in job.ops))
636
637     opexec = _FakeExecOpCodeForProc(None, None)
638     jqueue._JobProcessor(queue, opexec, job)()
639
640     # Check result
641     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
642     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
643     self.assertFalse(job.start_timestamp)
644     self.assert_(job.end_timestamp)
645     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
646                                 for op in job.ops))
647     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
648                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
649                       ["Job canceled by request" for _ in job.ops]])
650
651   def testCancelWhileWaitlock(self):
652     queue = _FakeQueueForProc()
653
654     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
655            for i in range(5)]
656
657     # Create job
658     job_id = 11009
659     job = self._CreateJob(queue, job_id, ops)
660
661     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
662
663     def _BeforeStart(timeout, priority):
664       self.assertFalse(queue.IsAcquired())
665       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
666
667       # Mark as cancelled
668       (success, _) = job.Cancel()
669       self.assert_(success)
670
671       self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
672                               for op in job.ops))
673
674     def _AfterStart(op, cbs):
675       self.assertFalse(queue.IsAcquired())
676       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
677
678     opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
679
680     jqueue._JobProcessor(queue, opexec, job)()
681
682     # Check result
683     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
684     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
685     self.assert_(job.start_timestamp)
686     self.assert_(job.end_timestamp)
687     self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
688                                 for op in job.ops))
689     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
690                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
691                       ["Job canceled by request" for _ in job.ops]])
692
693   def testCancelWhileRunning(self):
694     # Tests canceling a job with finished opcodes and more, unprocessed ones
695     queue = _FakeQueueForProc()
696
697     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
698            for i in range(3)]
699
700     # Create job
701     job_id = 28492
702     job = self._CreateJob(queue, job_id, ops)
703
704     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
705
706     opexec = _FakeExecOpCodeForProc(None, None)
707
708     # Run one opcode
709     self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
710
711     # Job goes back to queued
712     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
713     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
714                      [[constants.OP_STATUS_SUCCESS,
715                        constants.OP_STATUS_QUEUED,
716                        constants.OP_STATUS_QUEUED],
717                       ["Res0", None, None]])
718
719     # Mark as cancelled
720     (success, _) = job.Cancel()
721     self.assert_(success)
722
723     # Try processing another opcode (this will actually cancel the job)
724     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
725
726     # Check result
727     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
728     self.assertEqual(job.GetInfo(["id"]), [job_id])
729     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
730     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
731                      [[constants.OP_STATUS_SUCCESS,
732                        constants.OP_STATUS_CANCELED,
733                        constants.OP_STATUS_CANCELED],
734                       ["Res0", "Job canceled by request",
735                        "Job canceled by request"]])
736
737   def testPartiallyRun(self):
738     # Tests calling the processor on a job that's been partially run before the
739     # program was restarted
740     queue = _FakeQueueForProc()
741
742     opexec = _FakeExecOpCodeForProc(None, None)
743
744     for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
745       ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
746              for i in range(10)]
747
748       # Create job
749       job = self._CreateJob(queue, job_id, ops)
750
751       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
752
753       for _ in range(successcount):
754         self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
755
756       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
757       self.assertEqual(job.GetInfo(["opstatus"]),
758                        [[constants.OP_STATUS_SUCCESS
759                          for _ in range(successcount)] +
760                         [constants.OP_STATUS_QUEUED
761                          for _ in range(len(ops) - successcount)]])
762
763       self.assert_(job.ops_iter)
764
765       # Serialize and restore (simulates program restart)
766       newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
767       self.assertFalse(newjob.ops_iter)
768       self._TestPartial(newjob, successcount)
769
770   def _TestPartial(self, job, successcount):
771     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
772     self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
773
774     queue = _FakeQueueForProc()
775     opexec = _FakeExecOpCodeForProc(None, None)
776
777     for remaining in reversed(range(len(job.ops) - successcount)):
778       result = jqueue._JobProcessor(queue, opexec, job)()
779
780       if remaining == 0:
781         # Last opcode
782         self.assert_(result)
783         break
784
785       self.assertFalse(result)
786
787       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
788
789     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
790     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
791     self.assertEqual(job.GetInfo(["opresult"]),
792                      [[op.input.result for op in job.ops]])
793     self.assertEqual(job.GetInfo(["opstatus"]),
794                      [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
795     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
796                             for op in job.ops))
797
798     self._GenericCheckJob(job)
799
800     # Finished jobs can't be processed any further
801     self.assertRaises(errors.ProgrammerError,
802                       jqueue._JobProcessor(queue, opexec, job))
803
804     # ... also after being restored
805     job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
806     self.assertRaises(errors.ProgrammerError,
807                       jqueue._JobProcessor(queue, opexec, job2))
808
809   def testProcessorOnRunningJob(self):
810     ops = [opcodes.OpTestDummy(result="result", fail=False)]
811
812     queue = _FakeQueueForProc()
813     opexec = _FakeExecOpCodeForProc(None, None)
814
815     # Create job
816     job = self._CreateJob(queue, 9571, ops)
817
818     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
819
820     job.ops[0].status = constants.OP_STATUS_RUNNING
821
822     assert len(job.ops) == 1
823
824     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
825
826     # Calling on running job must fail
827     self.assertRaises(errors.ProgrammerError,
828                       jqueue._JobProcessor(queue, opexec, job))
829
830   def testLogMessages(self):
831     # Tests the "Feedback" callback function
832     queue = _FakeQueueForProc()
833
834     messages = {
835       1: [
836         (None, "Hello"),
837         (None, "World"),
838         (constants.ELOG_MESSAGE, "there"),
839         ],
840       4: [
841         (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
842         (constants.ELOG_JQUEUE_TEST, ("other", "type")),
843         ],
844       }
845     ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
846                                messages=messages.get(i, []))
847            for i in range(5)]
848
849     # Create job
850     job = self._CreateJob(queue, 29386, ops)
851
852     def _BeforeStart(timeout, priority):
853       self.assertFalse(queue.IsAcquired())
854       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
855
856     def _AfterStart(op, cbs):
857       self.assertFalse(queue.IsAcquired())
858       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
859
860       self.assertRaises(AssertionError, cbs.Feedback,
861                         "too", "many", "arguments")
862
863       for (log_type, msg) in op.messages:
864         if log_type:
865           cbs.Feedback(log_type, msg)
866         else:
867           cbs.Feedback(msg)
868
869     opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
870
871     for remaining in reversed(range(len(job.ops))):
872       result = jqueue._JobProcessor(queue, opexec, job)()
873
874       if remaining == 0:
875         # Last opcode
876         self.assert_(result)
877         break
878
879       self.assertFalse(result)
880
881       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
882
883     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
884     self.assertEqual(job.GetInfo(["opresult"]),
885                      [[op.input.result for op in job.ops]])
886
887     logmsgcount = sum(len(m) for m in messages.values())
888
889     self._CheckLogMessages(job, logmsgcount)
890
891     # Serialize and restore (simulates program restart)
892     newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
893     self._CheckLogMessages(newjob, logmsgcount)
894
895     # Check each message
896     prevserial = -1
897     for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
898       for (serial, timestamp, log_type, msg) in oplog:
899         (exptype, expmsg) = messages.get(idx).pop(0)
900         if exptype:
901           self.assertEqual(log_type, exptype)
902         else:
903           self.assertEqual(log_type, constants.ELOG_MESSAGE)
904         self.assertEqual(expmsg, msg)
905         self.assert_(serial > prevserial)
906         prevserial = serial
907
908   def _CheckLogMessages(self, job, count):
909     # Check serial
910     self.assertEqual(job.log_serial, count)
911
912     # No filter
913     self.assertEqual(job.GetLogEntries(None),
914                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
915                       for entry in entries])
916
917     # Filter with serial
918     assert count > 3
919     self.assert_(job.GetLogEntries(3))
920     self.assertEqual(job.GetLogEntries(3),
921                      [entry for entries in job.GetInfo(["oplog"])[0] if entries
922                       for entry in entries][3:])
923
924     # No log message after highest serial
925     self.assertFalse(job.GetLogEntries(count))
926     self.assertFalse(job.GetLogEntries(count + 3))
927
928
929 class _FakeTimeoutStrategy:
930   def __init__(self, timeouts):
931     self.timeouts = timeouts
932     self.attempts = 0
933     self.last_timeout = None
934
935   def NextAttempt(self):
936     self.attempts += 1
937     if self.timeouts:
938       timeout = self.timeouts.pop(0)
939     else:
940       timeout = None
941     self.last_timeout = timeout
942     return timeout
943
944
945 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
946   def setUp(self):
947     self.queue = _FakeQueueForProc()
948     self.job = None
949     self.curop = None
950     self.opcounter = None
951     self.timeout_strategy = None
952     self.retries = 0
953     self.prev_tsop = None
954     self.prev_prio = None
955     self.gave_lock = None
956     self.done_lock_before_blocking = False
957
958   def _BeforeStart(self, timeout, priority):
959     job = self.job
960
961     self.assertFalse(self.queue.IsAcquired())
962     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
963
964     ts = self.timeout_strategy
965
966     self.assert_(timeout is None or isinstance(timeout, (int, float)))
967     self.assertEqual(timeout, ts.last_timeout)
968     self.assertEqual(priority, job.ops[self.curop].priority)
969
970     self.gave_lock = True
971
972     if (self.curop == 3 and
973         job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
974       # Give locks before running into blocking acquire
975       assert self.retries == 7
976       self.retries = 0
977       self.done_lock_before_blocking = True
978       return
979
980     if self.retries > 0:
981       self.assert_(timeout is not None)
982       self.retries -= 1
983       self.gave_lock = False
984       raise mcpu.LockAcquireTimeout()
985
986     if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
987       assert self.retries == 0, "Didn't exhaust all retries at highest priority"
988       assert not ts.timeouts
989       self.assert_(timeout is None)
990
991   def _AfterStart(self, op, cbs):
992     job = self.job
993
994     self.assertFalse(self.queue.IsAcquired())
995     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
996
997     # Job is running, cancelling shouldn't be possible
998     (success, _) = job.Cancel()
999     self.assertFalse(success)
1000
1001   def _NextOpcode(self):
1002     self.curop = self.opcounter.next()
1003     self.prev_prio = self.job.ops[self.curop].priority
1004
1005   def _NewTimeoutStrategy(self):
1006     job = self.job
1007
1008     self.assertEqual(self.retries, 0)
1009
1010     if self.prev_tsop == self.curop:
1011       # Still on the same opcode, priority must've been increased
1012       self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1013
1014     if self.curop == 1:
1015       # Normal retry
1016       timeouts = range(10, 31, 10)
1017       self.retries = len(timeouts) - 1
1018
1019     elif self.curop == 2:
1020       # Let this run into a blocking acquire
1021       timeouts = range(11, 61, 12)
1022       self.retries = len(timeouts)
1023
1024     elif self.curop == 3:
1025       # Wait for priority to increase, but give lock before blocking acquire
1026       timeouts = range(12, 100, 14)
1027       self.retries = len(timeouts)
1028
1029       self.assertFalse(self.done_lock_before_blocking)
1030
1031     elif self.curop == 4:
1032       self.assert_(self.done_lock_before_blocking)
1033
1034       # Timeouts, but no need to retry
1035       timeouts = range(10, 31, 10)
1036       self.retries = 0
1037
1038     elif self.curop == 5:
1039       # Normal retry
1040       timeouts = range(19, 100, 11)
1041       self.retries = len(timeouts)
1042
1043     else:
1044       timeouts = []
1045       self.retries = 0
1046
1047     assert len(job.ops) == 10
1048     assert self.retries <= len(timeouts)
1049
1050     ts = _FakeTimeoutStrategy(timeouts)
1051
1052     self.timeout_strategy = ts
1053     self.prev_tsop = self.curop
1054     self.prev_prio = job.ops[self.curop].priority
1055
1056     return ts
1057
1058   def testTimeout(self):
1059     ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1060            for i in range(10)]
1061
1062     # Create job
1063     job_id = 15801
1064     job = self._CreateJob(self.queue, job_id, ops)
1065     self.job = job
1066
1067     self.opcounter = itertools.count(0)
1068
1069     opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart)
1070     tsf = self._NewTimeoutStrategy
1071
1072     self.assertFalse(self.done_lock_before_blocking)
1073
1074     for i in itertools.count(0):
1075       proc = jqueue._JobProcessor(self.queue, opexec, job,
1076                                   _timeout_strategy_factory=tsf)
1077
1078       result = proc(_nextop_fn=self._NextOpcode)
1079       if result:
1080         self.assertFalse(job.cur_opctx)
1081         break
1082
1083       self.assertFalse(result)
1084
1085       if self.gave_lock:
1086         self.assertFalse(job.cur_opctx)
1087       else:
1088         self.assert_(job.cur_opctx)
1089         self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1090                          self.timeout_strategy.NextAttempt)
1091
1092       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1093       self.assert_(job.start_timestamp)
1094       self.assertFalse(job.end_timestamp)
1095
1096     self.assertEqual(self.curop, len(job.ops) - 1)
1097     self.assertEqual(self.job, job)
1098     self.assertEqual(self.opcounter.next(), len(job.ops))
1099     self.assert_(self.done_lock_before_blocking)
1100
1101     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1102     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1103     self.assertEqual(job.GetInfo(["opresult"]),
1104                      [[op.input.result for op in job.ops]])
1105     self.assertEqual(job.GetInfo(["opstatus"]),
1106                      [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1107     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1108                             for op in job.ops))
1109
1110     # Finished jobs can't be processed any further
1111     self.assertRaises(errors.ProgrammerError,
1112                       jqueue._JobProcessor(self.queue, opexec, job))
1113
1114
1115 if __name__ == "__main__":
1116   testutils.GanetiTestProgram()