4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.jqueue"""
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import jqueue
36 from ganeti import opcodes
37 from ganeti import compat
38 from ganeti import mcpu
44 def __init__(self, job_id, status):
49 def SetStatus(self, status):
52 def AddLogEntry(self, msg):
53 self._log.append((len(self._log), msg))
58 def GetInfo(self, fields):
63 result.append(self._status)
65 raise Exception("Unknown field")
69 def GetLogEntries(self, newer_than):
70 assert newer_than is None or newer_than >= 0
72 if newer_than is None:
75 return self._log[newer_than:]
78 class TestJobChangesChecker(unittest.TestCase):
80 job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81 checker = jqueue._JobChangesChecker(["status"], None, None)
82 self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
84 job.SetStatus(constants.JOB_STATUS_RUNNING)
85 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
87 job.SetStatus(constants.JOB_STATUS_SUCCESS)
88 self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
90 # job.id is used by checker
91 self.assertEqual(job.id, 9094)
93 def testStatusWithPrev(self):
94 job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95 checker = jqueue._JobChangesChecker(["status"],
96 [constants.JOB_STATUS_QUEUED], None)
97 self.assert_(checker(job) is None)
99 job.SetStatus(constants.JOB_STATUS_RUNNING)
100 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
102 def testFinalStatus(self):
103 for status in constants.JOBS_FINALIZED:
104 job = _FakeJob(2178711, status)
105 checker = jqueue._JobChangesChecker(["status"], [status], None)
106 # There won't be any changes in this status, hence it should signal
107 # a change immediately
108 self.assertEqual(checker(job), ([status], []))
111 job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112 checker = jqueue._JobChangesChecker(["status"], None, None)
113 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
115 job.AddLogEntry("Hello World")
116 (job_info, log_entries) = checker(job)
117 self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118 self.assertEqual(log_entries, [[0, "Hello World"]])
120 checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121 self.assert_(checker2(job) is None)
123 job.AddLogEntry("Foo Bar")
124 job.SetStatus(constants.JOB_STATUS_ERROR)
126 (job_info, log_entries) = checker2(job)
127 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128 self.assertEqual(log_entries, [[1, "Foo Bar"]])
130 checker3 = jqueue._JobChangesChecker(["status"], None, None)
131 (job_info, log_entries) = checker3(job)
132 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133 self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
136 class TestJobChangesWaiter(unittest.TestCase):
138 self.tmpdir = tempfile.mkdtemp()
139 self.filename = utils.PathJoin(self.tmpdir, "job-1")
140 utils.WriteFile(self.filename, data="")
143 shutil.rmtree(self.tmpdir)
145 def _EnsureNotifierClosed(self, notifier):
147 os.fstat(notifier._fd)
148 except EnvironmentError, err:
149 self.assertEqual(err.errno, errno.EBADF)
151 self.fail("File descriptor wasn't closed")
154 for wait in [False, True]:
155 waiter = jqueue._JobFileChangesWaiter(self.filename)
162 # Ensure file descriptor was closed
163 self._EnsureNotifierClosed(waiter._notifier)
165 def testChangingFile(self):
166 waiter = jqueue._JobFileChangesWaiter(self.filename)
168 self.assertFalse(waiter.Wait(0.1))
169 utils.WriteFile(self.filename, data="changed")
170 self.assert_(waiter.Wait(60))
174 self._EnsureNotifierClosed(waiter._notifier)
176 def testChangingFile2(self):
177 waiter = jqueue._JobChangesWaiter(self.filename)
179 self.assertFalse(waiter._filewaiter)
180 self.assert_(waiter.Wait(0.1))
181 self.assert_(waiter._filewaiter)
183 # File waiter is now used, but there have been no changes
184 self.assertFalse(waiter.Wait(0.1))
185 utils.WriteFile(self.filename, data="changed")
186 self.assert_(waiter.Wait(60))
190 self._EnsureNotifierClosed(waiter._filewaiter._notifier)
193 class TestWaitForJobChangesHelper(unittest.TestCase):
195 self.tmpdir = tempfile.mkdtemp()
196 self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197 utils.WriteFile(self.filename, data="")
200 shutil.rmtree(self.tmpdir)
202 def _LoadWaitingJob(self):
203 return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
205 def _LoadLostJob(self):
208 def testNoChanges(self):
209 wfjc = jqueue._WaitForJobChangesHelper()
212 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213 [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214 constants.JOB_NOTCHANGED)
216 # No previous information
217 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218 ["status"], None, None, 1.0),
219 ([constants.JOB_STATUS_WAITLOCK], []))
221 def testLostJob(self):
222 wfjc = jqueue._WaitForJobChangesHelper()
223 self.assert_(wfjc(self.filename, self._LoadLostJob,
224 ["status"], None, None, 1.0) is None)
227 class TestEncodeOpError(unittest.TestCase):
229 encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230 self.assert_(isinstance(encerr, tuple))
231 self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
233 encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234 self.assert_(isinstance(encerr, tuple))
235 self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
237 encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238 self.assert_(isinstance(encerr, tuple))
239 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
241 encerr = jqueue._EncodeOpError("Hello World")
242 self.assert_(isinstance(encerr, tuple))
243 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
246 class TestQueuedOpCode(unittest.TestCase):
247 def testDefaults(self):
249 self.assertFalse(hasattr(op.input, "dry_run"))
250 self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251 self.assertFalse(op.log)
252 self.assert_(op.start_timestamp is None)
253 self.assert_(op.exec_timestamp is None)
254 self.assert_(op.end_timestamp is None)
255 self.assert_(op.result is None)
256 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
258 op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
260 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
262 self.assertEqual(op1.Serialize(), op2.Serialize())
264 def testPriority(self):
266 assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267 "Default priority equals high priority; test can't work"
268 self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
271 inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
272 op1 = jqueue._QueuedOpCode(inpop)
274 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
276 self.assertEqual(op1.Serialize(), op2.Serialize())
279 class TestQueuedJob(unittest.TestCase):
281 self.assertRaises(errors.GenericError, jqueue._QueuedJob,
284 def testDefaults(self):
288 opcodes.OpTestDelay(),
292 self.assertEqual(job.id, job_id)
293 self.assertEqual(job.log_serial, 0)
294 self.assert_(job.received_timestamp)
295 self.assert_(job.start_timestamp is None)
296 self.assert_(job.end_timestamp is None)
297 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299 self.assert_(repr(job).startswith("<"))
300 self.assertEqual(len(job.ops), len(ops))
301 self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302 for (inp, op) in zip(ops, job.ops)))
303 self.assertRaises(errors.OpExecError, job.GetInfo,
305 self.assertEqual(job.GetInfo(["summary"]),
306 [[op.input.Summary() for op in job.ops]])
308 job1 = jqueue._QueuedJob(None, job_id, ops)
310 job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
312 self.assertEqual(job1.Serialize(), job2.Serialize())
314 def testPriority(self):
317 opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
318 opcodes.OpTestDelay(),
322 self.assertEqual(job.id, job_id)
323 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324 self.assert_(repr(job).startswith("<"))
326 job = jqueue._QueuedJob(None, job_id, ops)
328 self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
330 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
333 job.ops[0].priority -= 1
335 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
337 # Mark opcode as finished
338 job.ops[0].status = constants.OP_STATUS_SUCCESS
340 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
343 job.ops[1].priority -= 10
344 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
346 # Test increasing first
347 job.ops[0].status = constants.OP_STATUS_RUNNING
348 job.ops[0].priority -= 19
349 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
351 def testCalcStatus(self):
353 # The default status is "queued"
354 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
358 ops[0].status = constants.OP_STATUS_WAITLOCK
361 ops[0].status = constants.OP_STATUS_SUCCESS
362 ops[1].status = constants.OP_STATUS_SUCCESS
363 ops[2].status = constants.OP_STATUS_WAITLOCK
366 ops[0].status = constants.OP_STATUS_SUCCESS
367 ops[1].status = constants.OP_STATUS_RUNNING
369 op.status = constants.OP_STATUS_QUEUED
371 def _Canceling1(ops):
372 ops[0].status = constants.OP_STATUS_SUCCESS
373 ops[1].status = constants.OP_STATUS_SUCCESS
375 op.status = constants.OP_STATUS_CANCELING
377 def _Canceling2(ops):
379 op.status = constants.OP_STATUS_CANCELING
383 op.status = constants.OP_STATUS_CANCELED
386 for idx, op in enumerate(ops):
388 op.status = constants.OP_STATUS_ERROR
390 op.status = constants.OP_STATUS_SUCCESS
394 op.status = constants.OP_STATUS_ERROR
398 op.status = constants.OP_STATUS_SUCCESS
401 constants.JOB_STATUS_QUEUED: [_Queued],
402 constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403 constants.JOB_STATUS_RUNNING: [_Running],
404 constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405 constants.JOB_STATUS_CANCELED: [_Canceled],
406 constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407 constants.JOB_STATUS_SUCCESS: [_Success],
411 job = jqueue._QueuedJob(None, 1,
412 [opcodes.OpTestDelay() for _ in range(10)])
413 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
418 for status in constants.JOB_STATUS_ALL:
419 sttests = tests[status]
424 self.assertEqual(job.CalcStatus(), status)
427 class _FakeQueueForProc:
429 self._acquired = False
432 def IsAcquired(self):
433 return self._acquired
435 def GetNextUpdate(self):
436 return self._updates.pop(0)
438 def acquire(self, shared=0):
440 self._acquired = True
443 assert self._acquired
444 self._acquired = False
446 def UpdateJobUnlocked(self, job, replicate=True):
447 assert self._acquired, "Lock not acquired while updating job"
448 self._updates.append((job, bool(replicate)))
451 class _FakeExecOpCodeForProc:
452 def __init__(self, queue, before_start, after_start):
454 self._before_start = before_start
455 self._after_start = after_start
457 def __call__(self, op, cbs, timeout=None, priority=None):
458 assert isinstance(op, opcodes.OpTestDummy)
459 assert not self._queue.IsAcquired(), \
460 "Queue lock not released when executing opcode"
462 if self._before_start:
463 self._before_start(timeout, priority)
467 if self._after_start:
468 self._after_start(op, cbs)
470 # Check again after the callbacks
471 assert not self._queue.IsAcquired()
474 raise errors.OpExecError("Error requested (%s)" % op.result)
479 class _JobProcessorTestUtils:
480 def _CreateJob(self, queue, job_id, ops):
481 job = jqueue._QueuedJob(queue, job_id, ops)
482 self.assertFalse(job.start_timestamp)
483 self.assertFalse(job.end_timestamp)
484 self.assertEqual(len(ops), len(job.ops))
485 self.assert_(compat.all(op.input == inp
486 for (op, inp) in zip(job.ops, ops)))
487 self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
491 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
492 def _GenericCheckJob(self, job):
493 assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
496 self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
497 [[op.start_timestamp for op in job.ops],
498 [op.exec_timestamp for op in job.ops],
499 [op.end_timestamp for op in job.ops]])
500 self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
501 [job.received_timestamp,
504 self.assert_(job.start_timestamp)
505 self.assert_(job.end_timestamp)
506 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
508 def testSuccess(self):
509 queue = _FakeQueueForProc()
511 for (job_id, opcount) in [(25351, 1), (6637, 3),
512 (24644, 10), (32207, 100)]:
513 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
514 for i in range(opcount)]
517 job = self._CreateJob(queue, job_id, ops)
519 def _BeforeStart(timeout, priority):
520 self.assertEqual(queue.GetNextUpdate(), (job, True))
521 self.assertRaises(IndexError, queue.GetNextUpdate)
522 self.assertFalse(queue.IsAcquired())
523 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
525 def _AfterStart(op, cbs):
526 self.assertEqual(queue.GetNextUpdate(), (job, True))
527 self.assertRaises(IndexError, queue.GetNextUpdate)
529 self.assertFalse(queue.IsAcquired())
530 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
532 # Job is running, cancelling shouldn't be possible
533 (success, _) = job.Cancel()
534 self.assertFalse(success)
536 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
538 for idx in range(len(ops)):
539 self.assertRaises(IndexError, queue.GetNextUpdate)
540 result = jqueue._JobProcessor(queue, opexec, job)()
541 self.assertEqual(queue.GetNextUpdate(), (job, True))
542 self.assertRaises(IndexError, queue.GetNextUpdate)
543 if idx == len(ops) - 1:
547 self.assertFalse(result)
549 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
550 self.assert_(job.start_timestamp)
551 self.assertFalse(job.end_timestamp)
553 self.assertRaises(IndexError, queue.GetNextUpdate)
555 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
556 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
557 self.assertEqual(job.GetInfo(["opresult"]),
558 [[op.input.result for op in job.ops]])
559 self.assertEqual(job.GetInfo(["opstatus"]),
560 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
561 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
564 self._GenericCheckJob(job)
566 # Finished jobs can't be processed any further
567 self.assertRaises(errors.ProgrammerError,
568 jqueue._JobProcessor(queue, opexec, job))
570 def testOpcodeError(self):
571 queue = _FakeQueueForProc()
578 (23816, 100, 39, 45),
581 for (job_id, opcount, failfrom, failto) in testdata:
583 ops = [opcodes.OpTestDummy(result="Res%s" % i,
584 fail=(failfrom <= i and
586 for i in range(opcount)]
589 job = self._CreateJob(queue, job_id, ops)
591 opexec = _FakeExecOpCodeForProc(queue, None, None)
593 for idx in range(len(ops)):
594 self.assertRaises(IndexError, queue.GetNextUpdate)
595 result = jqueue._JobProcessor(queue, opexec, job)()
597 self.assertEqual(queue.GetNextUpdate(), (job, True))
598 # waitlock to running
599 self.assertEqual(queue.GetNextUpdate(), (job, True))
601 self.assertEqual(queue.GetNextUpdate(), (job, True))
602 self.assertRaises(IndexError, queue.GetNextUpdate)
604 if idx in (failfrom, len(ops) - 1):
609 self.assertFalse(result)
611 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
613 self.assertRaises(IndexError, queue.GetNextUpdate)
616 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
617 self.assertEqual(job.GetInfo(["id"]), [job_id])
618 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
620 # Check opcode status
622 job.GetInfo(["opstatus"])[0],
623 job.GetInfo(["opresult"])[0])
625 for idx, (op, opstatus, opresult) in enumerate(data):
627 assert not op.input.fail
628 self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
629 self.assertEqual(opresult, op.input.result)
632 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
633 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
635 assert not op.input.fail
636 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
637 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
639 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
640 for op in job.ops[:failfrom]))
642 self._GenericCheckJob(job)
644 # Finished jobs can't be processed any further
645 self.assertRaises(errors.ProgrammerError,
646 jqueue._JobProcessor(queue, opexec, job))
648 def testCancelWhileInQueue(self):
649 queue = _FakeQueueForProc()
651 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
656 job = self._CreateJob(queue, job_id, ops)
658 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
661 (success, _) = job.Cancel()
662 self.assert_(success)
664 self.assertRaises(IndexError, queue.GetNextUpdate)
666 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
669 opexec = _FakeExecOpCodeForProc(queue, None, None)
670 jqueue._JobProcessor(queue, opexec, job)()
673 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
674 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
675 self.assertFalse(job.start_timestamp)
676 self.assert_(job.end_timestamp)
677 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
679 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
680 [[constants.OP_STATUS_CANCELED for _ in job.ops],
681 ["Job canceled by request" for _ in job.ops]])
683 def testCancelWhileWaitlock(self):
684 queue = _FakeQueueForProc()
686 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
691 job = self._CreateJob(queue, job_id, ops)
693 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
695 def _BeforeStart(timeout, priority):
696 self.assertEqual(queue.GetNextUpdate(), (job, True))
697 self.assertRaises(IndexError, queue.GetNextUpdate)
698 self.assertFalse(queue.IsAcquired())
699 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
702 (success, _) = job.Cancel()
703 self.assert_(success)
705 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
707 self.assertRaises(IndexError, queue.GetNextUpdate)
709 def _AfterStart(op, cbs):
710 self.assertEqual(queue.GetNextUpdate(), (job, True))
711 self.assertRaises(IndexError, queue.GetNextUpdate)
712 self.assertFalse(queue.IsAcquired())
713 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
715 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
717 self.assertRaises(IndexError, queue.GetNextUpdate)
718 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
719 self.assertEqual(queue.GetNextUpdate(), (job, True))
720 self.assertRaises(IndexError, queue.GetNextUpdate)
723 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
724 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
725 self.assert_(job.start_timestamp)
726 self.assert_(job.end_timestamp)
727 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
729 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
730 [[constants.OP_STATUS_CANCELED for _ in job.ops],
731 ["Job canceled by request" for _ in job.ops]])
733 def testCancelWhileWaitlockWithTimeout(self):
734 queue = _FakeQueueForProc()
736 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
741 job = self._CreateJob(queue, job_id, ops)
743 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
745 def _BeforeStart(timeout, priority):
746 self.assertFalse(queue.IsAcquired())
747 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
750 (success, _) = job.Cancel()
751 self.assert_(success)
753 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
756 # Fake an acquire attempt timing out
757 raise mcpu.LockAcquireTimeout()
759 def _AfterStart(op, cbs):
760 self.fail("Should not reach this")
762 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
764 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
767 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
768 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
769 self.assert_(job.start_timestamp)
770 self.assert_(job.end_timestamp)
771 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
773 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
774 [[constants.OP_STATUS_CANCELED for _ in job.ops],
775 ["Job canceled by request" for _ in job.ops]])
777 def testCancelWhileRunning(self):
778 # Tests canceling a job with finished opcodes and more, unprocessed ones
779 queue = _FakeQueueForProc()
781 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
786 job = self._CreateJob(queue, job_id, ops)
788 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
790 opexec = _FakeExecOpCodeForProc(queue, None, None)
793 self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
795 # Job goes back to queued
796 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
797 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
798 [[constants.OP_STATUS_SUCCESS,
799 constants.OP_STATUS_QUEUED,
800 constants.OP_STATUS_QUEUED],
801 ["Res0", None, None]])
804 (success, _) = job.Cancel()
805 self.assert_(success)
807 # Try processing another opcode (this will actually cancel the job)
808 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
811 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
812 self.assertEqual(job.GetInfo(["id"]), [job_id])
813 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
814 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
815 [[constants.OP_STATUS_SUCCESS,
816 constants.OP_STATUS_CANCELED,
817 constants.OP_STATUS_CANCELED],
818 ["Res0", "Job canceled by request",
819 "Job canceled by request"]])
821 def testPartiallyRun(self):
822 # Tests calling the processor on a job that's been partially run before the
823 # program was restarted
824 queue = _FakeQueueForProc()
826 opexec = _FakeExecOpCodeForProc(queue, None, None)
828 for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
829 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
833 job = self._CreateJob(queue, job_id, ops)
835 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
837 for _ in range(successcount):
838 self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
840 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
841 self.assertEqual(job.GetInfo(["opstatus"]),
842 [[constants.OP_STATUS_SUCCESS
843 for _ in range(successcount)] +
844 [constants.OP_STATUS_QUEUED
845 for _ in range(len(ops) - successcount)]])
847 self.assert_(job.ops_iter)
849 # Serialize and restore (simulates program restart)
850 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
851 self.assertFalse(newjob.ops_iter)
852 self._TestPartial(newjob, successcount)
854 def _TestPartial(self, job, successcount):
855 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
856 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
858 queue = _FakeQueueForProc()
859 opexec = _FakeExecOpCodeForProc(queue, None, None)
861 for remaining in reversed(range(len(job.ops) - successcount)):
862 result = jqueue._JobProcessor(queue, opexec, job)()
869 self.assertFalse(result)
871 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
873 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
874 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
875 self.assertEqual(job.GetInfo(["opresult"]),
876 [[op.input.result for op in job.ops]])
877 self.assertEqual(job.GetInfo(["opstatus"]),
878 [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
879 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
882 self._GenericCheckJob(job)
884 # Finished jobs can't be processed any further
885 self.assertRaises(errors.ProgrammerError,
886 jqueue._JobProcessor(queue, opexec, job))
888 # ... also after being restored
889 job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
890 self.assertRaises(errors.ProgrammerError,
891 jqueue._JobProcessor(queue, opexec, job2))
893 def testProcessorOnRunningJob(self):
894 ops = [opcodes.OpTestDummy(result="result", fail=False)]
896 queue = _FakeQueueForProc()
897 opexec = _FakeExecOpCodeForProc(queue, None, None)
900 job = self._CreateJob(queue, 9571, ops)
902 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
904 job.ops[0].status = constants.OP_STATUS_RUNNING
906 assert len(job.ops) == 1
908 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
910 # Calling on running job must fail
911 self.assertRaises(errors.ProgrammerError,
912 jqueue._JobProcessor(queue, opexec, job))
914 def testLogMessages(self):
915 # Tests the "Feedback" callback function
916 queue = _FakeQueueForProc()
922 (constants.ELOG_MESSAGE, "there"),
925 (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
926 (constants.ELOG_JQUEUE_TEST, ("other", "type")),
929 ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
930 messages=messages.get(i, []))
934 job = self._CreateJob(queue, 29386, ops)
936 def _BeforeStart(timeout, priority):
937 self.assertEqual(queue.GetNextUpdate(), (job, True))
938 self.assertRaises(IndexError, queue.GetNextUpdate)
939 self.assertFalse(queue.IsAcquired())
940 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
942 def _AfterStart(op, cbs):
943 self.assertEqual(queue.GetNextUpdate(), (job, True))
944 self.assertRaises(IndexError, queue.GetNextUpdate)
945 self.assertFalse(queue.IsAcquired())
946 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
948 self.assertRaises(AssertionError, cbs.Feedback,
949 "too", "many", "arguments")
951 for (log_type, msg) in op.messages:
952 self.assertRaises(IndexError, queue.GetNextUpdate)
954 cbs.Feedback(log_type, msg)
957 # Check for job update without replication
958 self.assertEqual(queue.GetNextUpdate(), (job, False))
959 self.assertRaises(IndexError, queue.GetNextUpdate)
961 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
963 for remaining in reversed(range(len(job.ops))):
964 self.assertRaises(IndexError, queue.GetNextUpdate)
965 result = jqueue._JobProcessor(queue, opexec, job)()
966 self.assertEqual(queue.GetNextUpdate(), (job, True))
967 self.assertRaises(IndexError, queue.GetNextUpdate)
974 self.assertFalse(result)
976 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
978 self.assertRaises(IndexError, queue.GetNextUpdate)
980 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
981 self.assertEqual(job.GetInfo(["opresult"]),
982 [[op.input.result for op in job.ops]])
984 logmsgcount = sum(len(m) for m in messages.values())
986 self._CheckLogMessages(job, logmsgcount)
988 # Serialize and restore (simulates program restart)
989 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
990 self._CheckLogMessages(newjob, logmsgcount)
994 for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
995 for (serial, timestamp, log_type, msg) in oplog:
996 (exptype, expmsg) = messages.get(idx).pop(0)
998 self.assertEqual(log_type, exptype)
1000 self.assertEqual(log_type, constants.ELOG_MESSAGE)
1001 self.assertEqual(expmsg, msg)
1002 self.assert_(serial > prevserial)
1005 def _CheckLogMessages(self, job, count):
1007 self.assertEqual(job.log_serial, count)
1010 self.assertEqual(job.GetLogEntries(None),
1011 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1012 for entry in entries])
1014 # Filter with serial
1016 self.assert_(job.GetLogEntries(3))
1017 self.assertEqual(job.GetLogEntries(3),
1018 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1019 for entry in entries][3:])
1021 # No log message after highest serial
1022 self.assertFalse(job.GetLogEntries(count))
1023 self.assertFalse(job.GetLogEntries(count + 3))
1026 class _FakeTimeoutStrategy:
1027 def __init__(self, timeouts):
1028 self.timeouts = timeouts
1030 self.last_timeout = None
1032 def NextAttempt(self):
1035 timeout = self.timeouts.pop(0)
1038 self.last_timeout = timeout
1042 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1044 self.queue = _FakeQueueForProc()
1047 self.opcounter = None
1048 self.timeout_strategy = None
1050 self.prev_tsop = None
1051 self.prev_prio = None
1052 self.gave_lock = None
1053 self.done_lock_before_blocking = False
1055 def _BeforeStart(self, timeout, priority):
1058 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1059 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1060 self.assertFalse(self.queue.IsAcquired())
1061 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1063 ts = self.timeout_strategy
1065 self.assert_(timeout is None or isinstance(timeout, (int, float)))
1066 self.assertEqual(timeout, ts.last_timeout)
1067 self.assertEqual(priority, job.ops[self.curop].priority)
1069 self.gave_lock = True
1071 if (self.curop == 3 and
1072 job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1073 # Give locks before running into blocking acquire
1074 assert self.retries == 7
1076 self.done_lock_before_blocking = True
1079 if self.retries > 0:
1080 self.assert_(timeout is not None)
1082 self.gave_lock = False
1083 raise mcpu.LockAcquireTimeout()
1085 if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1086 assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1087 assert not ts.timeouts
1088 self.assert_(timeout is None)
1090 def _AfterStart(self, op, cbs):
1093 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1094 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1095 self.assertFalse(self.queue.IsAcquired())
1096 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1098 # Job is running, cancelling shouldn't be possible
1099 (success, _) = job.Cancel()
1100 self.assertFalse(success)
1102 def _NextOpcode(self):
1103 self.curop = self.opcounter.next()
1104 self.prev_prio = self.job.ops[self.curop].priority
1106 def _NewTimeoutStrategy(self):
1109 self.assertEqual(self.retries, 0)
1111 if self.prev_tsop == self.curop:
1112 # Still on the same opcode, priority must've been increased
1113 self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1117 timeouts = range(10, 31, 10)
1118 self.retries = len(timeouts) - 1
1120 elif self.curop == 2:
1121 # Let this run into a blocking acquire
1122 timeouts = range(11, 61, 12)
1123 self.retries = len(timeouts)
1125 elif self.curop == 3:
1126 # Wait for priority to increase, but give lock before blocking acquire
1127 timeouts = range(12, 100, 14)
1128 self.retries = len(timeouts)
1130 self.assertFalse(self.done_lock_before_blocking)
1132 elif self.curop == 4:
1133 self.assert_(self.done_lock_before_blocking)
1135 # Timeouts, but no need to retry
1136 timeouts = range(10, 31, 10)
1139 elif self.curop == 5:
1141 timeouts = range(19, 100, 11)
1142 self.retries = len(timeouts)
1148 assert len(job.ops) == 10
1149 assert self.retries <= len(timeouts)
1151 ts = _FakeTimeoutStrategy(timeouts)
1153 self.timeout_strategy = ts
1154 self.prev_tsop = self.curop
1155 self.prev_prio = job.ops[self.curop].priority
1159 def testTimeout(self):
1160 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1165 job = self._CreateJob(self.queue, job_id, ops)
1168 self.opcounter = itertools.count(0)
1170 opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1172 tsf = self._NewTimeoutStrategy
1174 self.assertFalse(self.done_lock_before_blocking)
1176 for i in itertools.count(0):
1177 proc = jqueue._JobProcessor(self.queue, opexec, job,
1178 _timeout_strategy_factory=tsf)
1180 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1181 result = proc(_nextop_fn=self._NextOpcode)
1182 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1183 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1185 self.assertFalse(job.cur_opctx)
1188 self.assertFalse(result)
1191 self.assertFalse(job.cur_opctx)
1193 self.assert_(job.cur_opctx)
1194 self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1195 self.timeout_strategy.NextAttempt)
1197 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1198 self.assert_(job.start_timestamp)
1199 self.assertFalse(job.end_timestamp)
1201 self.assertEqual(self.curop, len(job.ops) - 1)
1202 self.assertEqual(self.job, job)
1203 self.assertEqual(self.opcounter.next(), len(job.ops))
1204 self.assert_(self.done_lock_before_blocking)
1206 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1207 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1208 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1209 self.assertEqual(job.GetInfo(["opresult"]),
1210 [[op.input.result for op in job.ops]])
1211 self.assertEqual(job.GetInfo(["opstatus"]),
1212 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1213 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1216 # Finished jobs can't be processed any further
1217 self.assertRaises(errors.ProgrammerError,
1218 jqueue._JobProcessor(self.queue, opexec, job))
1221 if __name__ == "__main__":
1222 testutils.GanetiTestProgram()