4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.jqueue"""
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import jqueue
36 from ganeti import opcodes
37 from ganeti import compat
38 from ganeti import mcpu
44 def __init__(self, job_id, status):
49 def SetStatus(self, status):
52 def AddLogEntry(self, msg):
53 self._log.append((len(self._log), msg))
58 def GetInfo(self, fields):
63 result.append(self._status)
65 raise Exception("Unknown field")
69 def GetLogEntries(self, newer_than):
70 assert newer_than is None or newer_than >= 0
72 if newer_than is None:
75 return self._log[newer_than:]
78 class TestJobChangesChecker(unittest.TestCase):
80 job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81 checker = jqueue._JobChangesChecker(["status"], None, None)
82 self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
84 job.SetStatus(constants.JOB_STATUS_RUNNING)
85 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
87 job.SetStatus(constants.JOB_STATUS_SUCCESS)
88 self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
90 # job.id is used by checker
91 self.assertEqual(job.id, 9094)
93 def testStatusWithPrev(self):
94 job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95 checker = jqueue._JobChangesChecker(["status"],
96 [constants.JOB_STATUS_QUEUED], None)
97 self.assert_(checker(job) is None)
99 job.SetStatus(constants.JOB_STATUS_RUNNING)
100 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
102 def testFinalStatus(self):
103 for status in constants.JOBS_FINALIZED:
104 job = _FakeJob(2178711, status)
105 checker = jqueue._JobChangesChecker(["status"], [status], None)
106 # There won't be any changes in this status, hence it should signal
107 # a change immediately
108 self.assertEqual(checker(job), ([status], []))
111 job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112 checker = jqueue._JobChangesChecker(["status"], None, None)
113 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
115 job.AddLogEntry("Hello World")
116 (job_info, log_entries) = checker(job)
117 self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118 self.assertEqual(log_entries, [[0, "Hello World"]])
120 checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121 self.assert_(checker2(job) is None)
123 job.AddLogEntry("Foo Bar")
124 job.SetStatus(constants.JOB_STATUS_ERROR)
126 (job_info, log_entries) = checker2(job)
127 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128 self.assertEqual(log_entries, [[1, "Foo Bar"]])
130 checker3 = jqueue._JobChangesChecker(["status"], None, None)
131 (job_info, log_entries) = checker3(job)
132 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133 self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
136 class TestJobChangesWaiter(unittest.TestCase):
138 self.tmpdir = tempfile.mkdtemp()
139 self.filename = utils.PathJoin(self.tmpdir, "job-1")
140 utils.WriteFile(self.filename, data="")
143 shutil.rmtree(self.tmpdir)
145 def _EnsureNotifierClosed(self, notifier):
147 os.fstat(notifier._fd)
148 except EnvironmentError, err:
149 self.assertEqual(err.errno, errno.EBADF)
151 self.fail("File descriptor wasn't closed")
154 for wait in [False, True]:
155 waiter = jqueue._JobFileChangesWaiter(self.filename)
162 # Ensure file descriptor was closed
163 self._EnsureNotifierClosed(waiter._notifier)
165 def testChangingFile(self):
166 waiter = jqueue._JobFileChangesWaiter(self.filename)
168 self.assertFalse(waiter.Wait(0.1))
169 utils.WriteFile(self.filename, data="changed")
170 self.assert_(waiter.Wait(60))
174 self._EnsureNotifierClosed(waiter._notifier)
176 def testChangingFile2(self):
177 waiter = jqueue._JobChangesWaiter(self.filename)
179 self.assertFalse(waiter._filewaiter)
180 self.assert_(waiter.Wait(0.1))
181 self.assert_(waiter._filewaiter)
183 # File waiter is now used, but there have been no changes
184 self.assertFalse(waiter.Wait(0.1))
185 utils.WriteFile(self.filename, data="changed")
186 self.assert_(waiter.Wait(60))
190 self._EnsureNotifierClosed(waiter._filewaiter._notifier)
193 class TestWaitForJobChangesHelper(unittest.TestCase):
195 self.tmpdir = tempfile.mkdtemp()
196 self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197 utils.WriteFile(self.filename, data="")
200 shutil.rmtree(self.tmpdir)
202 def _LoadWaitingJob(self):
203 return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
205 def _LoadLostJob(self):
208 def testNoChanges(self):
209 wfjc = jqueue._WaitForJobChangesHelper()
212 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213 [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214 constants.JOB_NOTCHANGED)
216 # No previous information
217 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218 ["status"], None, None, 1.0),
219 ([constants.JOB_STATUS_WAITLOCK], []))
221 def testLostJob(self):
222 wfjc = jqueue._WaitForJobChangesHelper()
223 self.assert_(wfjc(self.filename, self._LoadLostJob,
224 ["status"], None, None, 1.0) is None)
227 class TestEncodeOpError(unittest.TestCase):
229 encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230 self.assert_(isinstance(encerr, tuple))
231 self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
233 encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234 self.assert_(isinstance(encerr, tuple))
235 self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
237 encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238 self.assert_(isinstance(encerr, tuple))
239 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
241 encerr = jqueue._EncodeOpError("Hello World")
242 self.assert_(isinstance(encerr, tuple))
243 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
246 class TestQueuedOpCode(unittest.TestCase):
247 def testDefaults(self):
249 self.assertFalse(hasattr(op.input, "dry_run"))
250 self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251 self.assertFalse(op.log)
252 self.assert_(op.start_timestamp is None)
253 self.assert_(op.exec_timestamp is None)
254 self.assert_(op.end_timestamp is None)
255 self.assert_(op.result is None)
256 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
258 op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
260 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
262 self.assertEqual(op1.Serialize(), op2.Serialize())
264 def testPriority(self):
266 assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267 "Default priority equals high priority; test can't work"
268 self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
271 inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
272 op1 = jqueue._QueuedOpCode(inpop)
274 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
276 self.assertEqual(op1.Serialize(), op2.Serialize())
279 class TestQueuedJob(unittest.TestCase):
281 self.assertRaises(errors.GenericError, jqueue._QueuedJob,
284 def testDefaults(self):
288 opcodes.OpTestDelay(),
292 self.assertEqual(job.id, job_id)
293 self.assertEqual(job.log_serial, 0)
294 self.assert_(job.received_timestamp)
295 self.assert_(job.start_timestamp is None)
296 self.assert_(job.end_timestamp is None)
297 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299 self.assert_(repr(job).startswith("<"))
300 self.assertEqual(len(job.ops), len(ops))
301 self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302 for (inp, op) in zip(ops, job.ops)))
303 self.assertRaises(errors.OpExecError, job.GetInfo,
305 self.assertEqual(job.GetInfo(["summary"]),
306 [[op.input.Summary() for op in job.ops]])
308 job1 = jqueue._QueuedJob(None, job_id, ops)
310 job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
312 self.assertEqual(job1.Serialize(), job2.Serialize())
314 def testPriority(self):
317 opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
318 opcodes.OpTestDelay(),
322 self.assertEqual(job.id, job_id)
323 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324 self.assert_(repr(job).startswith("<"))
326 job = jqueue._QueuedJob(None, job_id, ops)
328 self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
330 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
333 job.ops[0].priority -= 1
335 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
337 # Mark opcode as finished
338 job.ops[0].status = constants.OP_STATUS_SUCCESS
340 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
343 job.ops[1].priority -= 10
344 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
346 # Test increasing first
347 job.ops[0].status = constants.OP_STATUS_RUNNING
348 job.ops[0].priority -= 19
349 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
351 def testCalcStatus(self):
353 # The default status is "queued"
354 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
358 ops[0].status = constants.OP_STATUS_WAITLOCK
361 ops[0].status = constants.OP_STATUS_SUCCESS
362 ops[1].status = constants.OP_STATUS_SUCCESS
363 ops[2].status = constants.OP_STATUS_WAITLOCK
366 ops[0].status = constants.OP_STATUS_SUCCESS
367 ops[1].status = constants.OP_STATUS_RUNNING
369 op.status = constants.OP_STATUS_QUEUED
371 def _Canceling1(ops):
372 ops[0].status = constants.OP_STATUS_SUCCESS
373 ops[1].status = constants.OP_STATUS_SUCCESS
375 op.status = constants.OP_STATUS_CANCELING
377 def _Canceling2(ops):
379 op.status = constants.OP_STATUS_CANCELING
383 op.status = constants.OP_STATUS_CANCELED
386 for idx, op in enumerate(ops):
388 op.status = constants.OP_STATUS_ERROR
390 op.status = constants.OP_STATUS_SUCCESS
394 op.status = constants.OP_STATUS_ERROR
398 op.status = constants.OP_STATUS_SUCCESS
401 constants.JOB_STATUS_QUEUED: [_Queued],
402 constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403 constants.JOB_STATUS_RUNNING: [_Running],
404 constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405 constants.JOB_STATUS_CANCELED: [_Canceled],
406 constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407 constants.JOB_STATUS_SUCCESS: [_Success],
411 job = jqueue._QueuedJob(None, 1,
412 [opcodes.OpTestDelay() for _ in range(10)])
413 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
418 for status in constants.JOB_STATUS_ALL:
419 sttests = tests[status]
424 self.assertEqual(job.CalcStatus(), status)
427 class _FakeQueueForProc:
429 self._acquired = False
432 def IsAcquired(self):
433 return self._acquired
435 def GetNextUpdate(self):
436 return self._updates.pop(0)
438 def acquire(self, shared=0):
440 self._acquired = True
443 assert self._acquired
444 self._acquired = False
446 def UpdateJobUnlocked(self, job, replicate=True):
447 assert self._acquired, "Lock not acquired while updating job"
448 self._updates.append((job, bool(replicate)))
451 class _FakeExecOpCodeForProc:
452 def __init__(self, queue, before_start, after_start):
454 self._before_start = before_start
455 self._after_start = after_start
457 def __call__(self, op, cbs, timeout=None, priority=None):
458 assert isinstance(op, opcodes.OpTestDummy)
459 assert not self._queue.IsAcquired(), \
460 "Queue lock not released when executing opcode"
462 if self._before_start:
463 self._before_start(timeout, priority)
467 if self._after_start:
468 self._after_start(op, cbs)
470 # Check again after the callbacks
471 assert not self._queue.IsAcquired()
474 raise errors.OpExecError("Error requested (%s)" % op.result)
479 class _JobProcessorTestUtils:
480 def _CreateJob(self, queue, job_id, ops):
481 job = jqueue._QueuedJob(queue, job_id, ops)
482 self.assertFalse(job.start_timestamp)
483 self.assertFalse(job.end_timestamp)
484 self.assertEqual(len(ops), len(job.ops))
485 self.assert_(compat.all(op.input == inp
486 for (op, inp) in zip(job.ops, ops)))
487 self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
491 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
492 def _GenericCheckJob(self, job):
493 assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
496 self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
497 [[op.start_timestamp for op in job.ops],
498 [op.exec_timestamp for op in job.ops],
499 [op.end_timestamp for op in job.ops]])
500 self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
501 [job.received_timestamp,
504 self.assert_(job.start_timestamp)
505 self.assert_(job.end_timestamp)
506 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
508 def testSuccess(self):
509 queue = _FakeQueueForProc()
511 for (job_id, opcount) in [(25351, 1), (6637, 3),
512 (24644, 10), (32207, 100)]:
513 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
514 for i in range(opcount)]
517 job = self._CreateJob(queue, job_id, ops)
519 def _BeforeStart(timeout, priority):
520 self.assertEqual(queue.GetNextUpdate(), (job, True))
521 self.assertRaises(IndexError, queue.GetNextUpdate)
522 self.assertFalse(queue.IsAcquired())
523 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
524 self.assertFalse(job.cur_opctx)
526 def _AfterStart(op, cbs):
527 self.assertEqual(queue.GetNextUpdate(), (job, True))
528 self.assertRaises(IndexError, queue.GetNextUpdate)
530 self.assertFalse(queue.IsAcquired())
531 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
532 self.assertFalse(job.cur_opctx)
534 # Job is running, cancelling shouldn't be possible
535 (success, _) = job.Cancel()
536 self.assertFalse(success)
538 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
540 for idx in range(len(ops)):
541 self.assertRaises(IndexError, queue.GetNextUpdate)
542 result = jqueue._JobProcessor(queue, opexec, job)()
543 self.assertEqual(queue.GetNextUpdate(), (job, True))
544 self.assertRaises(IndexError, queue.GetNextUpdate)
545 if idx == len(ops) - 1:
549 self.assertFalse(result)
551 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
552 self.assert_(job.start_timestamp)
553 self.assertFalse(job.end_timestamp)
555 self.assertRaises(IndexError, queue.GetNextUpdate)
557 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
558 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
559 self.assertEqual(job.GetInfo(["opresult"]),
560 [[op.input.result for op in job.ops]])
561 self.assertEqual(job.GetInfo(["opstatus"]),
562 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
563 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
566 self._GenericCheckJob(job)
568 # Finished jobs can't be processed any further
569 self.assertRaises(errors.ProgrammerError,
570 jqueue._JobProcessor(queue, opexec, job))
572 def testOpcodeError(self):
573 queue = _FakeQueueForProc()
580 (23816, 100, 39, 45),
583 for (job_id, opcount, failfrom, failto) in testdata:
585 ops = [opcodes.OpTestDummy(result="Res%s" % i,
586 fail=(failfrom <= i and
588 for i in range(opcount)]
591 job = self._CreateJob(queue, job_id, ops)
593 opexec = _FakeExecOpCodeForProc(queue, None, None)
595 for idx in range(len(ops)):
596 self.assertRaises(IndexError, queue.GetNextUpdate)
597 result = jqueue._JobProcessor(queue, opexec, job)()
599 self.assertEqual(queue.GetNextUpdate(), (job, True))
600 # waitlock to running
601 self.assertEqual(queue.GetNextUpdate(), (job, True))
603 self.assertEqual(queue.GetNextUpdate(), (job, True))
604 self.assertRaises(IndexError, queue.GetNextUpdate)
606 if idx in (failfrom, len(ops) - 1):
611 self.assertFalse(result)
613 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
615 self.assertRaises(IndexError, queue.GetNextUpdate)
618 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
619 self.assertEqual(job.GetInfo(["id"]), [job_id])
620 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
622 # Check opcode status
624 job.GetInfo(["opstatus"])[0],
625 job.GetInfo(["opresult"])[0])
627 for idx, (op, opstatus, opresult) in enumerate(data):
629 assert not op.input.fail
630 self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
631 self.assertEqual(opresult, op.input.result)
634 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
635 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
637 assert not op.input.fail
638 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
639 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
641 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
642 for op in job.ops[:failfrom]))
644 self._GenericCheckJob(job)
646 # Finished jobs can't be processed any further
647 self.assertRaises(errors.ProgrammerError,
648 jqueue._JobProcessor(queue, opexec, job))
650 def testCancelWhileInQueue(self):
651 queue = _FakeQueueForProc()
653 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
658 job = self._CreateJob(queue, job_id, ops)
660 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
663 (success, _) = job.Cancel()
664 self.assert_(success)
666 self.assertRaises(IndexError, queue.GetNextUpdate)
668 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
671 opexec = _FakeExecOpCodeForProc(queue, None, None)
672 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
675 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
676 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
677 self.assertFalse(job.start_timestamp)
678 self.assert_(job.end_timestamp)
679 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
681 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
682 [[constants.OP_STATUS_CANCELED for _ in job.ops],
683 ["Job canceled by request" for _ in job.ops]])
685 def testCancelWhileWaitlockInQueue(self):
686 queue = _FakeQueueForProc()
688 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
693 job = self._CreateJob(queue, job_id, ops)
695 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
697 job.ops[0].status = constants.OP_STATUS_WAITLOCK
699 assert len(job.ops) == 5
701 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
704 (success, _) = job.Cancel()
705 self.assert_(success)
707 self.assertRaises(IndexError, queue.GetNextUpdate)
709 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
712 opexec = _FakeExecOpCodeForProc(queue, None, None)
713 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
716 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
717 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
718 self.assertFalse(job.start_timestamp)
719 self.assert_(job.end_timestamp)
720 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
722 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
723 [[constants.OP_STATUS_CANCELED for _ in job.ops],
724 ["Job canceled by request" for _ in job.ops]])
726 def testCancelWhileWaitlock(self):
727 queue = _FakeQueueForProc()
729 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
734 job = self._CreateJob(queue, job_id, ops)
736 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
738 def _BeforeStart(timeout, priority):
739 self.assertEqual(queue.GetNextUpdate(), (job, True))
740 self.assertRaises(IndexError, queue.GetNextUpdate)
741 self.assertFalse(queue.IsAcquired())
742 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
745 (success, _) = job.Cancel()
746 self.assert_(success)
748 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
750 self.assertRaises(IndexError, queue.GetNextUpdate)
752 def _AfterStart(op, cbs):
753 self.assertEqual(queue.GetNextUpdate(), (job, True))
754 self.assertRaises(IndexError, queue.GetNextUpdate)
755 self.assertFalse(queue.IsAcquired())
756 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
758 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
760 self.assertRaises(IndexError, queue.GetNextUpdate)
761 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
762 self.assertEqual(queue.GetNextUpdate(), (job, True))
763 self.assertRaises(IndexError, queue.GetNextUpdate)
766 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
767 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
768 self.assert_(job.start_timestamp)
769 self.assert_(job.end_timestamp)
770 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
772 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
773 [[constants.OP_STATUS_CANCELED for _ in job.ops],
774 ["Job canceled by request" for _ in job.ops]])
776 def testCancelWhileWaitlockWithTimeout(self):
777 queue = _FakeQueueForProc()
779 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
784 job = self._CreateJob(queue, job_id, ops)
786 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
788 def _BeforeStart(timeout, priority):
789 self.assertFalse(queue.IsAcquired())
790 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
793 (success, _) = job.Cancel()
794 self.assert_(success)
796 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
799 # Fake an acquire attempt timing out
800 raise mcpu.LockAcquireTimeout()
802 def _AfterStart(op, cbs):
803 self.fail("Should not reach this")
805 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
807 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
810 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
811 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
812 self.assert_(job.start_timestamp)
813 self.assert_(job.end_timestamp)
814 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
816 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
817 [[constants.OP_STATUS_CANCELED for _ in job.ops],
818 ["Job canceled by request" for _ in job.ops]])
820 def testCancelWhileRunning(self):
821 # Tests canceling a job with finished opcodes and more, unprocessed ones
822 queue = _FakeQueueForProc()
824 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
829 job = self._CreateJob(queue, job_id, ops)
831 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
833 opexec = _FakeExecOpCodeForProc(queue, None, None)
836 self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
838 # Job goes back to queued
839 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
840 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
841 [[constants.OP_STATUS_SUCCESS,
842 constants.OP_STATUS_QUEUED,
843 constants.OP_STATUS_QUEUED],
844 ["Res0", None, None]])
847 (success, _) = job.Cancel()
848 self.assert_(success)
850 # Try processing another opcode (this will actually cancel the job)
851 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
854 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
855 self.assertEqual(job.GetInfo(["id"]), [job_id])
856 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
857 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
858 [[constants.OP_STATUS_SUCCESS,
859 constants.OP_STATUS_CANCELED,
860 constants.OP_STATUS_CANCELED],
861 ["Res0", "Job canceled by request",
862 "Job canceled by request"]])
864 def testPartiallyRun(self):
865 # Tests calling the processor on a job that's been partially run before the
866 # program was restarted
867 queue = _FakeQueueForProc()
869 opexec = _FakeExecOpCodeForProc(queue, None, None)
871 for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
872 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
876 job = self._CreateJob(queue, job_id, ops)
878 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
880 for _ in range(successcount):
881 self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
883 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
884 self.assertEqual(job.GetInfo(["opstatus"]),
885 [[constants.OP_STATUS_SUCCESS
886 for _ in range(successcount)] +
887 [constants.OP_STATUS_QUEUED
888 for _ in range(len(ops) - successcount)]])
890 self.assert_(job.ops_iter)
892 # Serialize and restore (simulates program restart)
893 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
894 self.assertFalse(newjob.ops_iter)
895 self._TestPartial(newjob, successcount)
897 def _TestPartial(self, job, successcount):
898 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
899 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
901 queue = _FakeQueueForProc()
902 opexec = _FakeExecOpCodeForProc(queue, None, None)
904 for remaining in reversed(range(len(job.ops) - successcount)):
905 result = jqueue._JobProcessor(queue, opexec, job)()
912 self.assertFalse(result)
914 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
916 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
917 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
918 self.assertEqual(job.GetInfo(["opresult"]),
919 [[op.input.result for op in job.ops]])
920 self.assertEqual(job.GetInfo(["opstatus"]),
921 [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
922 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
925 self._GenericCheckJob(job)
927 # Finished jobs can't be processed any further
928 self.assertRaises(errors.ProgrammerError,
929 jqueue._JobProcessor(queue, opexec, job))
931 # ... also after being restored
932 job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
933 self.assertRaises(errors.ProgrammerError,
934 jqueue._JobProcessor(queue, opexec, job2))
936 def testProcessorOnRunningJob(self):
937 ops = [opcodes.OpTestDummy(result="result", fail=False)]
939 queue = _FakeQueueForProc()
940 opexec = _FakeExecOpCodeForProc(queue, None, None)
943 job = self._CreateJob(queue, 9571, ops)
945 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
947 job.ops[0].status = constants.OP_STATUS_RUNNING
949 assert len(job.ops) == 1
951 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
953 # Calling on running job must fail
954 self.assertRaises(errors.ProgrammerError,
955 jqueue._JobProcessor(queue, opexec, job))
957 def testLogMessages(self):
958 # Tests the "Feedback" callback function
959 queue = _FakeQueueForProc()
965 (constants.ELOG_MESSAGE, "there"),
968 (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
969 (constants.ELOG_JQUEUE_TEST, ("other", "type")),
972 ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
973 messages=messages.get(i, []))
977 job = self._CreateJob(queue, 29386, ops)
979 def _BeforeStart(timeout, priority):
980 self.assertEqual(queue.GetNextUpdate(), (job, True))
981 self.assertRaises(IndexError, queue.GetNextUpdate)
982 self.assertFalse(queue.IsAcquired())
983 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
985 def _AfterStart(op, cbs):
986 self.assertEqual(queue.GetNextUpdate(), (job, True))
987 self.assertRaises(IndexError, queue.GetNextUpdate)
988 self.assertFalse(queue.IsAcquired())
989 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
991 self.assertRaises(AssertionError, cbs.Feedback,
992 "too", "many", "arguments")
994 for (log_type, msg) in op.messages:
995 self.assertRaises(IndexError, queue.GetNextUpdate)
997 cbs.Feedback(log_type, msg)
1000 # Check for job update without replication
1001 self.assertEqual(queue.GetNextUpdate(), (job, False))
1002 self.assertRaises(IndexError, queue.GetNextUpdate)
1004 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1006 for remaining in reversed(range(len(job.ops))):
1007 self.assertRaises(IndexError, queue.GetNextUpdate)
1008 result = jqueue._JobProcessor(queue, opexec, job)()
1009 self.assertEqual(queue.GetNextUpdate(), (job, True))
1010 self.assertRaises(IndexError, queue.GetNextUpdate)
1014 self.assert_(result)
1017 self.assertFalse(result)
1019 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1021 self.assertRaises(IndexError, queue.GetNextUpdate)
1023 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1024 self.assertEqual(job.GetInfo(["opresult"]),
1025 [[op.input.result for op in job.ops]])
1027 logmsgcount = sum(len(m) for m in messages.values())
1029 self._CheckLogMessages(job, logmsgcount)
1031 # Serialize and restore (simulates program restart)
1032 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
1033 self._CheckLogMessages(newjob, logmsgcount)
1035 # Check each message
1037 for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1038 for (serial, timestamp, log_type, msg) in oplog:
1039 (exptype, expmsg) = messages.get(idx).pop(0)
1041 self.assertEqual(log_type, exptype)
1043 self.assertEqual(log_type, constants.ELOG_MESSAGE)
1044 self.assertEqual(expmsg, msg)
1045 self.assert_(serial > prevserial)
1048 def _CheckLogMessages(self, job, count):
1050 self.assertEqual(job.log_serial, count)
1053 self.assertEqual(job.GetLogEntries(None),
1054 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1055 for entry in entries])
1057 # Filter with serial
1059 self.assert_(job.GetLogEntries(3))
1060 self.assertEqual(job.GetLogEntries(3),
1061 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1062 for entry in entries][3:])
1064 # No log message after highest serial
1065 self.assertFalse(job.GetLogEntries(count))
1066 self.assertFalse(job.GetLogEntries(count + 3))
1069 class _FakeTimeoutStrategy:
1070 def __init__(self, timeouts):
1071 self.timeouts = timeouts
1073 self.last_timeout = None
1075 def NextAttempt(self):
1078 timeout = self.timeouts.pop(0)
1081 self.last_timeout = timeout
1085 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1087 self.queue = _FakeQueueForProc()
1090 self.opcounter = None
1091 self.timeout_strategy = None
1093 self.prev_tsop = None
1094 self.prev_prio = None
1095 self.prev_status = None
1096 self.lock_acq_prio = None
1097 self.gave_lock = None
1098 self.done_lock_before_blocking = False
1100 def _BeforeStart(self, timeout, priority):
1103 # If status has changed, job must've been written
1104 if self.prev_status != self.job.ops[self.curop].status:
1105 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1106 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1108 self.assertFalse(self.queue.IsAcquired())
1109 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1111 ts = self.timeout_strategy
1113 self.assert_(timeout is None or isinstance(timeout, (int, float)))
1114 self.assertEqual(timeout, ts.last_timeout)
1115 self.assertEqual(priority, job.ops[self.curop].priority)
1117 self.gave_lock = True
1118 self.lock_acq_prio = priority
1120 if (self.curop == 3 and
1121 job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1122 # Give locks before running into blocking acquire
1123 assert self.retries == 7
1125 self.done_lock_before_blocking = True
1128 if self.retries > 0:
1129 self.assert_(timeout is not None)
1131 self.gave_lock = False
1132 raise mcpu.LockAcquireTimeout()
1134 if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1135 assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1136 assert not ts.timeouts
1137 self.assert_(timeout is None)
1139 def _AfterStart(self, op, cbs):
1142 # Setting to "running" requires an update
1143 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1144 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1146 self.assertFalse(self.queue.IsAcquired())
1147 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1149 # Job is running, cancelling shouldn't be possible
1150 (success, _) = job.Cancel()
1151 self.assertFalse(success)
1153 def _NextOpcode(self):
1154 self.curop = self.opcounter.next()
1155 self.prev_prio = self.job.ops[self.curop].priority
1156 self.prev_status = self.job.ops[self.curop].status
1158 def _NewTimeoutStrategy(self):
1161 self.assertEqual(self.retries, 0)
1163 if self.prev_tsop == self.curop:
1164 # Still on the same opcode, priority must've been increased
1165 self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1169 timeouts = range(10, 31, 10)
1170 self.retries = len(timeouts) - 1
1172 elif self.curop == 2:
1173 # Let this run into a blocking acquire
1174 timeouts = range(11, 61, 12)
1175 self.retries = len(timeouts)
1177 elif self.curop == 3:
1178 # Wait for priority to increase, but give lock before blocking acquire
1179 timeouts = range(12, 100, 14)
1180 self.retries = len(timeouts)
1182 self.assertFalse(self.done_lock_before_blocking)
1184 elif self.curop == 4:
1185 self.assert_(self.done_lock_before_blocking)
1187 # Timeouts, but no need to retry
1188 timeouts = range(10, 31, 10)
1191 elif self.curop == 5:
1193 timeouts = range(19, 100, 11)
1194 self.retries = len(timeouts)
1200 assert len(job.ops) == 10
1201 assert self.retries <= len(timeouts)
1203 ts = _FakeTimeoutStrategy(timeouts)
1205 self.timeout_strategy = ts
1206 self.prev_tsop = self.curop
1207 self.prev_prio = job.ops[self.curop].priority
1211 def testTimeout(self):
1212 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1217 job = self._CreateJob(self.queue, job_id, ops)
1220 self.opcounter = itertools.count(0)
1222 opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1224 tsf = self._NewTimeoutStrategy
1226 self.assertFalse(self.done_lock_before_blocking)
1229 proc = jqueue._JobProcessor(self.queue, opexec, job,
1230 _timeout_strategy_factory=tsf)
1232 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1234 if self.curop is not None:
1235 self.prev_status = self.job.ops[self.curop].status
1237 self.lock_acq_prio = None
1239 result = proc(_nextop_fn=self._NextOpcode)
1240 assert self.curop is not None
1242 if result or self.gave_lock:
1243 # Got lock and/or job is done, result must've been written
1244 self.assertFalse(job.cur_opctx)
1245 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1246 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1247 self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1248 self.assert_(job.ops[self.curop].exec_timestamp)
1251 self.assertFalse(job.cur_opctx)
1254 self.assertFalse(result)
1257 self.assertEqual(job.ops[self.curop].start_timestamp,
1258 job.start_timestamp)
1261 # Opcode finished, but job not yet done
1262 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1265 self.assert_(job.cur_opctx)
1266 self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1267 self.timeout_strategy.NextAttempt)
1268 self.assertFalse(job.ops[self.curop].exec_timestamp)
1269 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1271 # If priority has changed since acquiring locks, the job must've been
1273 if self.lock_acq_prio != job.ops[self.curop].priority:
1274 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1276 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1278 self.assert_(job.start_timestamp)
1279 self.assertFalse(job.end_timestamp)
1281 self.assertEqual(self.curop, len(job.ops) - 1)
1282 self.assertEqual(self.job, job)
1283 self.assertEqual(self.opcounter.next(), len(job.ops))
1284 self.assert_(self.done_lock_before_blocking)
1286 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1287 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1288 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1289 self.assertEqual(job.GetInfo(["opresult"]),
1290 [[op.input.result for op in job.ops]])
1291 self.assertEqual(job.GetInfo(["opstatus"]),
1292 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1293 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1296 # Finished jobs can't be processed any further
1297 self.assertRaises(errors.ProgrammerError,
1298 jqueue._JobProcessor(self.queue, opexec, job))
1301 if __name__ == "__main__":
1302 testutils.GanetiTestProgram()