4 # Copyright (C) 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.jqueue"""
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import jqueue
36 from ganeti import opcodes
37 from ganeti import compat
38 from ganeti import mcpu
44 def __init__(self, job_id, status):
49 def SetStatus(self, status):
52 def AddLogEntry(self, msg):
53 self._log.append((len(self._log), msg))
58 def GetInfo(self, fields):
63 result.append(self._status)
65 raise Exception("Unknown field")
69 def GetLogEntries(self, newer_than):
70 assert newer_than is None or newer_than >= 0
72 if newer_than is None:
75 return self._log[newer_than:]
78 class TestJobChangesChecker(unittest.TestCase):
80 job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81 checker = jqueue._JobChangesChecker(["status"], None, None)
82 self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
84 job.SetStatus(constants.JOB_STATUS_RUNNING)
85 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
87 job.SetStatus(constants.JOB_STATUS_SUCCESS)
88 self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
90 # job.id is used by checker
91 self.assertEqual(job.id, 9094)
93 def testStatusWithPrev(self):
94 job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95 checker = jqueue._JobChangesChecker(["status"],
96 [constants.JOB_STATUS_QUEUED], None)
97 self.assert_(checker(job) is None)
99 job.SetStatus(constants.JOB_STATUS_RUNNING)
100 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
102 def testFinalStatus(self):
103 for status in constants.JOBS_FINALIZED:
104 job = _FakeJob(2178711, status)
105 checker = jqueue._JobChangesChecker(["status"], [status], None)
106 # There won't be any changes in this status, hence it should signal
107 # a change immediately
108 self.assertEqual(checker(job), ([status], []))
111 job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112 checker = jqueue._JobChangesChecker(["status"], None, None)
113 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
115 job.AddLogEntry("Hello World")
116 (job_info, log_entries) = checker(job)
117 self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118 self.assertEqual(log_entries, [[0, "Hello World"]])
120 checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121 self.assert_(checker2(job) is None)
123 job.AddLogEntry("Foo Bar")
124 job.SetStatus(constants.JOB_STATUS_ERROR)
126 (job_info, log_entries) = checker2(job)
127 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128 self.assertEqual(log_entries, [[1, "Foo Bar"]])
130 checker3 = jqueue._JobChangesChecker(["status"], None, None)
131 (job_info, log_entries) = checker3(job)
132 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133 self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
136 class TestJobChangesWaiter(unittest.TestCase):
138 self.tmpdir = tempfile.mkdtemp()
139 self.filename = utils.PathJoin(self.tmpdir, "job-1")
140 utils.WriteFile(self.filename, data="")
143 shutil.rmtree(self.tmpdir)
145 def _EnsureNotifierClosed(self, notifier):
147 os.fstat(notifier._fd)
148 except EnvironmentError, err:
149 self.assertEqual(err.errno, errno.EBADF)
151 self.fail("File descriptor wasn't closed")
154 for wait in [False, True]:
155 waiter = jqueue._JobFileChangesWaiter(self.filename)
162 # Ensure file descriptor was closed
163 self._EnsureNotifierClosed(waiter._notifier)
165 def testChangingFile(self):
166 waiter = jqueue._JobFileChangesWaiter(self.filename)
168 self.assertFalse(waiter.Wait(0.1))
169 utils.WriteFile(self.filename, data="changed")
170 self.assert_(waiter.Wait(60))
174 self._EnsureNotifierClosed(waiter._notifier)
176 def testChangingFile2(self):
177 waiter = jqueue._JobChangesWaiter(self.filename)
179 self.assertFalse(waiter._filewaiter)
180 self.assert_(waiter.Wait(0.1))
181 self.assert_(waiter._filewaiter)
183 # File waiter is now used, but there have been no changes
184 self.assertFalse(waiter.Wait(0.1))
185 utils.WriteFile(self.filename, data="changed")
186 self.assert_(waiter.Wait(60))
190 self._EnsureNotifierClosed(waiter._filewaiter._notifier)
193 class TestWaitForJobChangesHelper(unittest.TestCase):
195 self.tmpdir = tempfile.mkdtemp()
196 self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197 utils.WriteFile(self.filename, data="")
200 shutil.rmtree(self.tmpdir)
202 def _LoadWaitingJob(self):
203 return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
205 def _LoadLostJob(self):
208 def testNoChanges(self):
209 wfjc = jqueue._WaitForJobChangesHelper()
212 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213 [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214 constants.JOB_NOTCHANGED)
216 # No previous information
217 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218 ["status"], None, None, 1.0),
219 ([constants.JOB_STATUS_WAITLOCK], []))
221 def testLostJob(self):
222 wfjc = jqueue._WaitForJobChangesHelper()
223 self.assert_(wfjc(self.filename, self._LoadLostJob,
224 ["status"], None, None, 1.0) is None)
227 class TestEncodeOpError(unittest.TestCase):
229 encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230 self.assert_(isinstance(encerr, tuple))
231 self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
233 encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234 self.assert_(isinstance(encerr, tuple))
235 self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
237 encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238 self.assert_(isinstance(encerr, tuple))
239 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
241 encerr = jqueue._EncodeOpError("Hello World")
242 self.assert_(isinstance(encerr, tuple))
243 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
246 class TestQueuedOpCode(unittest.TestCase):
247 def testDefaults(self):
249 self.assertFalse(hasattr(op.input, "dry_run"))
250 self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251 self.assertFalse(op.log)
252 self.assert_(op.start_timestamp is None)
253 self.assert_(op.exec_timestamp is None)
254 self.assert_(op.end_timestamp is None)
255 self.assert_(op.result is None)
256 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
258 op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
260 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
262 self.assertEqual(op1.Serialize(), op2.Serialize())
264 def testPriority(self):
266 assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267 "Default priority equals high priority; test can't work"
268 self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
271 inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
272 op1 = jqueue._QueuedOpCode(inpop)
274 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
276 self.assertEqual(op1.Serialize(), op2.Serialize())
279 class TestQueuedJob(unittest.TestCase):
281 self.assertRaises(errors.GenericError, jqueue._QueuedJob,
284 def testDefaults(self):
288 opcodes.OpTestDelay(),
292 self.assertEqual(job.id, job_id)
293 self.assertEqual(job.log_serial, 0)
294 self.assert_(job.received_timestamp)
295 self.assert_(job.start_timestamp is None)
296 self.assert_(job.end_timestamp is None)
297 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299 self.assert_(repr(job).startswith("<"))
300 self.assertEqual(len(job.ops), len(ops))
301 self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302 for (inp, op) in zip(ops, job.ops)))
303 self.assertRaises(errors.OpExecError, job.GetInfo,
305 self.assertEqual(job.GetInfo(["summary"]),
306 [[op.input.Summary() for op in job.ops]])
308 job1 = jqueue._QueuedJob(None, job_id, ops)
310 job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
312 self.assertEqual(job1.Serialize(), job2.Serialize())
314 def testPriority(self):
317 opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
318 opcodes.OpTestDelay(),
322 self.assertEqual(job.id, job_id)
323 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324 self.assert_(repr(job).startswith("<"))
326 job = jqueue._QueuedJob(None, job_id, ops)
328 self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
330 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
333 job.ops[0].priority -= 1
335 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
337 # Mark opcode as finished
338 job.ops[0].status = constants.OP_STATUS_SUCCESS
340 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
343 job.ops[1].priority -= 10
344 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
346 # Test increasing first
347 job.ops[0].status = constants.OP_STATUS_RUNNING
348 job.ops[0].priority -= 19
349 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
351 def testCalcStatus(self):
353 # The default status is "queued"
354 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
358 ops[0].status = constants.OP_STATUS_WAITLOCK
361 ops[0].status = constants.OP_STATUS_SUCCESS
362 ops[1].status = constants.OP_STATUS_SUCCESS
363 ops[2].status = constants.OP_STATUS_WAITLOCK
366 ops[0].status = constants.OP_STATUS_SUCCESS
367 ops[1].status = constants.OP_STATUS_RUNNING
369 op.status = constants.OP_STATUS_QUEUED
371 def _Canceling1(ops):
372 ops[0].status = constants.OP_STATUS_SUCCESS
373 ops[1].status = constants.OP_STATUS_SUCCESS
375 op.status = constants.OP_STATUS_CANCELING
377 def _Canceling2(ops):
379 op.status = constants.OP_STATUS_CANCELING
383 op.status = constants.OP_STATUS_CANCELED
386 for idx, op in enumerate(ops):
388 op.status = constants.OP_STATUS_ERROR
390 op.status = constants.OP_STATUS_SUCCESS
394 op.status = constants.OP_STATUS_ERROR
398 op.status = constants.OP_STATUS_SUCCESS
401 constants.JOB_STATUS_QUEUED: [_Queued],
402 constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403 constants.JOB_STATUS_RUNNING: [_Running],
404 constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405 constants.JOB_STATUS_CANCELED: [_Canceled],
406 constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407 constants.JOB_STATUS_SUCCESS: [_Success],
411 job = jqueue._QueuedJob(None, 1,
412 [opcodes.OpTestDelay() for _ in range(10)])
413 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
418 for status in constants.JOB_STATUS_ALL:
419 sttests = tests[status]
424 self.assertEqual(job.CalcStatus(), status)
427 class _FakeDependencyManager:
430 self._notifications = []
431 self._waiting = set()
433 def AddCheckResult(self, job, dep_job_id, dep_status, result):
434 self._checks.append((job, dep_job_id, dep_status, result))
436 def CountPendingResults(self):
437 return len(self._checks)
439 def CountWaitingJobs(self):
440 return len(self._waiting)
442 def GetNextNotification(self):
443 return self._notifications.pop(0)
445 def JobWaiting(self, job):
446 return job in self._waiting
448 def CheckAndRegister(self, job, dep_job_id, dep_status):
449 (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
451 assert exp_job == job
452 assert exp_dep_job_id == dep_job_id
453 assert exp_dep_status == dep_status
455 (result_status, _) = result
457 if result_status == jqueue._JobDependencyManager.WAIT:
458 self._waiting.add(job)
459 elif result_status == jqueue._JobDependencyManager.CONTINUE:
460 self._waiting.remove(job)
464 def NotifyWaiters(self, job_id):
465 self._notifications.append(job_id)
468 class _DisabledFakeDependencyManager:
469 def JobWaiting(self, _):
472 def CheckAndRegister(self, *args):
473 assert False, "Should not be called"
475 def NotifyWaiters(self, _):
479 class _FakeQueueForProc:
480 def __init__(self, depmgr=None):
481 self._acquired = False
485 self._submit_count = itertools.count(1000)
490 self.depmgr = _DisabledFakeDependencyManager()
492 def IsAcquired(self):
493 return self._acquired
495 def GetNextUpdate(self):
496 return self._updates.pop(0)
498 def GetNextSubmittedJob(self):
499 return self._submitted.pop(0)
501 def acquire(self, shared=0):
503 self._acquired = True
506 assert self._acquired
507 self._acquired = False
509 def UpdateJobUnlocked(self, job, replicate=True):
510 assert self._acquired, "Lock not acquired while updating job"
511 self._updates.append((job, bool(replicate)))
513 def SubmitManyJobs(self, jobs):
514 assert not self._acquired, "Lock acquired while submitting jobs"
515 job_ids = [self._submit_count.next() for _ in jobs]
516 self._submitted.extend(zip(job_ids, jobs))
520 class _FakeExecOpCodeForProc:
521 def __init__(self, queue, before_start, after_start):
523 self._before_start = before_start
524 self._after_start = after_start
526 def __call__(self, op, cbs, timeout=None, priority=None):
527 assert isinstance(op, opcodes.OpTestDummy)
528 assert not self._queue.IsAcquired(), \
529 "Queue lock not released when executing opcode"
531 if self._before_start:
532 self._before_start(timeout, priority)
536 if self._after_start:
537 self._after_start(op, cbs)
539 # Check again after the callbacks
540 assert not self._queue.IsAcquired()
543 raise errors.OpExecError("Error requested (%s)" % op.result)
545 if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
546 return cbs.SubmitManyJobs(op.submit_jobs)
551 class _JobProcessorTestUtils:
552 def _CreateJob(self, queue, job_id, ops):
553 job = jqueue._QueuedJob(queue, job_id, ops)
554 self.assertFalse(job.start_timestamp)
555 self.assertFalse(job.end_timestamp)
556 self.assertEqual(len(ops), len(job.ops))
557 self.assert_(compat.all(op.input == inp
558 for (op, inp) in zip(job.ops, ops)))
559 self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
563 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
564 def _GenericCheckJob(self, job):
565 assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
568 self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
569 [[op.start_timestamp for op in job.ops],
570 [op.exec_timestamp for op in job.ops],
571 [op.end_timestamp for op in job.ops]])
572 self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
573 [job.received_timestamp,
576 self.assert_(job.start_timestamp)
577 self.assert_(job.end_timestamp)
578 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
580 def testSuccess(self):
581 queue = _FakeQueueForProc()
583 for (job_id, opcount) in [(25351, 1), (6637, 3),
584 (24644, 10), (32207, 100)]:
585 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
586 for i in range(opcount)]
589 job = self._CreateJob(queue, job_id, ops)
591 def _BeforeStart(timeout, priority):
592 self.assertEqual(queue.GetNextUpdate(), (job, True))
593 self.assertRaises(IndexError, queue.GetNextUpdate)
594 self.assertFalse(queue.IsAcquired())
595 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
596 self.assertFalse(job.cur_opctx)
598 def _AfterStart(op, cbs):
599 self.assertEqual(queue.GetNextUpdate(), (job, True))
600 self.assertRaises(IndexError, queue.GetNextUpdate)
602 self.assertFalse(queue.IsAcquired())
603 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
604 self.assertFalse(job.cur_opctx)
606 # Job is running, cancelling shouldn't be possible
607 (success, _) = job.Cancel()
608 self.assertFalse(success)
610 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
612 for idx in range(len(ops)):
613 self.assertRaises(IndexError, queue.GetNextUpdate)
614 result = jqueue._JobProcessor(queue, opexec, job)()
615 self.assertEqual(queue.GetNextUpdate(), (job, True))
616 self.assertRaises(IndexError, queue.GetNextUpdate)
617 if idx == len(ops) - 1:
621 self.assertFalse(result)
623 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
624 self.assert_(job.start_timestamp)
625 self.assertFalse(job.end_timestamp)
627 self.assertRaises(IndexError, queue.GetNextUpdate)
629 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
630 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
631 self.assertEqual(job.GetInfo(["opresult"]),
632 [[op.input.result for op in job.ops]])
633 self.assertEqual(job.GetInfo(["opstatus"]),
634 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
635 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
638 self._GenericCheckJob(job)
640 # Calling the processor on a finished job should be a no-op
641 self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
642 self.assertRaises(IndexError, queue.GetNextUpdate)
644 def testOpcodeError(self):
645 queue = _FakeQueueForProc()
652 (23816, 100, 39, 45),
655 for (job_id, opcount, failfrom, failto) in testdata:
657 ops = [opcodes.OpTestDummy(result="Res%s" % i,
658 fail=(failfrom <= i and
660 for i in range(opcount)]
663 job = self._CreateJob(queue, job_id, ops)
665 opexec = _FakeExecOpCodeForProc(queue, None, None)
667 for idx in range(len(ops)):
668 self.assertRaises(IndexError, queue.GetNextUpdate)
669 result = jqueue._JobProcessor(queue, opexec, job)()
671 self.assertEqual(queue.GetNextUpdate(), (job, True))
672 # waitlock to running
673 self.assertEqual(queue.GetNextUpdate(), (job, True))
675 self.assertEqual(queue.GetNextUpdate(), (job, True))
676 self.assertRaises(IndexError, queue.GetNextUpdate)
678 if idx in (failfrom, len(ops) - 1):
683 self.assertFalse(result)
685 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
687 self.assertRaises(IndexError, queue.GetNextUpdate)
690 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
691 self.assertEqual(job.GetInfo(["id"]), [job_id])
692 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
694 # Check opcode status
696 job.GetInfo(["opstatus"])[0],
697 job.GetInfo(["opresult"])[0])
699 for idx, (op, opstatus, opresult) in enumerate(data):
701 assert not op.input.fail
702 self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
703 self.assertEqual(opresult, op.input.result)
706 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
707 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
709 assert not op.input.fail
710 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
711 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
713 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
714 for op in job.ops[:failfrom]))
716 self._GenericCheckJob(job)
718 # Calling the processor on a finished job should be a no-op
719 self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
720 self.assertRaises(IndexError, queue.GetNextUpdate)
722 def testCancelWhileInQueue(self):
723 queue = _FakeQueueForProc()
725 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
730 job = self._CreateJob(queue, job_id, ops)
732 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
735 (success, _) = job.Cancel()
736 self.assert_(success)
738 self.assertRaises(IndexError, queue.GetNextUpdate)
740 self.assertFalse(job.start_timestamp)
741 self.assertTrue(job.end_timestamp)
742 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
745 # Serialize to check for differences
746 before_proc = job.Serialize()
748 # Simulate processor called in workerpool
749 opexec = _FakeExecOpCodeForProc(queue, None, None)
750 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
753 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
754 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
755 self.assertFalse(job.start_timestamp)
756 self.assertTrue(job.end_timestamp)
757 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
759 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
760 [[constants.OP_STATUS_CANCELED for _ in job.ops],
761 ["Job canceled by request" for _ in job.ops]])
763 # Must not have changed or written
764 self.assertEqual(before_proc, job.Serialize())
765 self.assertRaises(IndexError, queue.GetNextUpdate)
767 def testCancelWhileWaitlockInQueue(self):
768 queue = _FakeQueueForProc()
770 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
775 job = self._CreateJob(queue, job_id, ops)
777 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
779 job.ops[0].status = constants.OP_STATUS_WAITLOCK
781 assert len(job.ops) == 5
783 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
786 (success, _) = job.Cancel()
787 self.assert_(success)
789 self.assertRaises(IndexError, queue.GetNextUpdate)
791 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
794 opexec = _FakeExecOpCodeForProc(queue, None, None)
795 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
798 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
799 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
800 self.assertFalse(job.start_timestamp)
801 self.assert_(job.end_timestamp)
802 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
804 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
805 [[constants.OP_STATUS_CANCELED for _ in job.ops],
806 ["Job canceled by request" for _ in job.ops]])
808 def testCancelWhileWaitlock(self):
809 queue = _FakeQueueForProc()
811 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
816 job = self._CreateJob(queue, job_id, ops)
818 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
820 def _BeforeStart(timeout, priority):
821 self.assertEqual(queue.GetNextUpdate(), (job, True))
822 self.assertRaises(IndexError, queue.GetNextUpdate)
823 self.assertFalse(queue.IsAcquired())
824 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
827 (success, _) = job.Cancel()
828 self.assert_(success)
830 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
832 self.assertRaises(IndexError, queue.GetNextUpdate)
834 def _AfterStart(op, cbs):
835 self.assertEqual(queue.GetNextUpdate(), (job, True))
836 self.assertRaises(IndexError, queue.GetNextUpdate)
837 self.assertFalse(queue.IsAcquired())
838 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
840 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
842 self.assertRaises(IndexError, queue.GetNextUpdate)
843 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
844 self.assertEqual(queue.GetNextUpdate(), (job, True))
845 self.assertRaises(IndexError, queue.GetNextUpdate)
848 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
849 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
850 self.assert_(job.start_timestamp)
851 self.assert_(job.end_timestamp)
852 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
854 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
855 [[constants.OP_STATUS_CANCELED for _ in job.ops],
856 ["Job canceled by request" for _ in job.ops]])
858 def testCancelWhileWaitlockWithTimeout(self):
859 queue = _FakeQueueForProc()
861 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
866 job = self._CreateJob(queue, job_id, ops)
868 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
870 def _BeforeStart(timeout, priority):
871 self.assertFalse(queue.IsAcquired())
872 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
875 (success, _) = job.Cancel()
876 self.assert_(success)
878 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
881 # Fake an acquire attempt timing out
882 raise mcpu.LockAcquireTimeout()
884 def _AfterStart(op, cbs):
885 self.fail("Should not reach this")
887 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
889 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
892 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
893 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
894 self.assert_(job.start_timestamp)
895 self.assert_(job.end_timestamp)
896 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
898 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
899 [[constants.OP_STATUS_CANCELED for _ in job.ops],
900 ["Job canceled by request" for _ in job.ops]])
902 def testCancelWhileRunning(self):
903 # Tests canceling a job with finished opcodes and more, unprocessed ones
904 queue = _FakeQueueForProc()
906 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
911 job = self._CreateJob(queue, job_id, ops)
913 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
915 opexec = _FakeExecOpCodeForProc(queue, None, None)
918 self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
920 # Job goes back to queued
921 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
922 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
923 [[constants.OP_STATUS_SUCCESS,
924 constants.OP_STATUS_QUEUED,
925 constants.OP_STATUS_QUEUED],
926 ["Res0", None, None]])
929 (success, _) = job.Cancel()
930 self.assert_(success)
932 # Try processing another opcode (this will actually cancel the job)
933 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
936 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
937 self.assertEqual(job.GetInfo(["id"]), [job_id])
938 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
939 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
940 [[constants.OP_STATUS_SUCCESS,
941 constants.OP_STATUS_CANCELED,
942 constants.OP_STATUS_CANCELED],
943 ["Res0", "Job canceled by request",
944 "Job canceled by request"]])
946 def testPartiallyRun(self):
947 # Tests calling the processor on a job that's been partially run before the
948 # program was restarted
949 queue = _FakeQueueForProc()
951 opexec = _FakeExecOpCodeForProc(queue, None, None)
953 for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
954 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
958 job = self._CreateJob(queue, job_id, ops)
960 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
962 for _ in range(successcount):
963 self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
965 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
966 self.assertEqual(job.GetInfo(["opstatus"]),
967 [[constants.OP_STATUS_SUCCESS
968 for _ in range(successcount)] +
969 [constants.OP_STATUS_QUEUED
970 for _ in range(len(ops) - successcount)]])
972 self.assert_(job.ops_iter)
974 # Serialize and restore (simulates program restart)
975 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
976 self.assertFalse(newjob.ops_iter)
977 self._TestPartial(newjob, successcount)
979 def _TestPartial(self, job, successcount):
980 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
981 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
983 queue = _FakeQueueForProc()
984 opexec = _FakeExecOpCodeForProc(queue, None, None)
986 for remaining in reversed(range(len(job.ops) - successcount)):
987 result = jqueue._JobProcessor(queue, opexec, job)()
988 self.assertEqual(queue.GetNextUpdate(), (job, True))
989 self.assertEqual(queue.GetNextUpdate(), (job, True))
990 self.assertEqual(queue.GetNextUpdate(), (job, True))
991 self.assertRaises(IndexError, queue.GetNextUpdate)
998 self.assertFalse(result)
1000 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1002 self.assertRaises(IndexError, queue.GetNextUpdate)
1003 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1004 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1005 self.assertEqual(job.GetInfo(["opresult"]),
1006 [[op.input.result for op in job.ops]])
1007 self.assertEqual(job.GetInfo(["opstatus"]),
1008 [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1009 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1012 self._GenericCheckJob(job)
1014 # Calling the processor on a finished job should be a no-op
1015 self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1016 self.assertRaises(IndexError, queue.GetNextUpdate)
1018 # ... also after being restored
1019 job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
1020 # Calling the processor on a finished job should be a no-op
1021 self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
1022 self.assertRaises(IndexError, queue.GetNextUpdate)
1024 def testProcessorOnRunningJob(self):
1025 ops = [opcodes.OpTestDummy(result="result", fail=False)]
1027 queue = _FakeQueueForProc()
1028 opexec = _FakeExecOpCodeForProc(queue, None, None)
1031 job = self._CreateJob(queue, 9571, ops)
1033 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1035 job.ops[0].status = constants.OP_STATUS_RUNNING
1037 assert len(job.ops) == 1
1039 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1041 # Calling on running job must fail
1042 self.assertRaises(errors.ProgrammerError,
1043 jqueue._JobProcessor(queue, opexec, job))
1045 def testLogMessages(self):
1046 # Tests the "Feedback" callback function
1047 queue = _FakeQueueForProc()
1053 (constants.ELOG_MESSAGE, "there"),
1056 (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1057 (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1060 ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1061 messages=messages.get(i, []))
1065 job = self._CreateJob(queue, 29386, ops)
1067 def _BeforeStart(timeout, priority):
1068 self.assertEqual(queue.GetNextUpdate(), (job, True))
1069 self.assertRaises(IndexError, queue.GetNextUpdate)
1070 self.assertFalse(queue.IsAcquired())
1071 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1073 def _AfterStart(op, cbs):
1074 self.assertEqual(queue.GetNextUpdate(), (job, True))
1075 self.assertRaises(IndexError, queue.GetNextUpdate)
1076 self.assertFalse(queue.IsAcquired())
1077 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1079 self.assertRaises(AssertionError, cbs.Feedback,
1080 "too", "many", "arguments")
1082 for (log_type, msg) in op.messages:
1083 self.assertRaises(IndexError, queue.GetNextUpdate)
1085 cbs.Feedback(log_type, msg)
1088 # Check for job update without replication
1089 self.assertEqual(queue.GetNextUpdate(), (job, False))
1090 self.assertRaises(IndexError, queue.GetNextUpdate)
1092 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1094 for remaining in reversed(range(len(job.ops))):
1095 self.assertRaises(IndexError, queue.GetNextUpdate)
1096 result = jqueue._JobProcessor(queue, opexec, job)()
1097 self.assertEqual(queue.GetNextUpdate(), (job, True))
1098 self.assertRaises(IndexError, queue.GetNextUpdate)
1102 self.assert_(result)
1105 self.assertFalse(result)
1107 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1109 self.assertRaises(IndexError, queue.GetNextUpdate)
1111 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1112 self.assertEqual(job.GetInfo(["opresult"]),
1113 [[op.input.result for op in job.ops]])
1115 logmsgcount = sum(len(m) for m in messages.values())
1117 self._CheckLogMessages(job, logmsgcount)
1119 # Serialize and restore (simulates program restart)
1120 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
1121 self._CheckLogMessages(newjob, logmsgcount)
1123 # Check each message
1125 for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1126 for (serial, timestamp, log_type, msg) in oplog:
1127 (exptype, expmsg) = messages.get(idx).pop(0)
1129 self.assertEqual(log_type, exptype)
1131 self.assertEqual(log_type, constants.ELOG_MESSAGE)
1132 self.assertEqual(expmsg, msg)
1133 self.assert_(serial > prevserial)
1136 def _CheckLogMessages(self, job, count):
1138 self.assertEqual(job.log_serial, count)
1141 self.assertEqual(job.GetLogEntries(None),
1142 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1143 for entry in entries])
1145 # Filter with serial
1147 self.assert_(job.GetLogEntries(3))
1148 self.assertEqual(job.GetLogEntries(3),
1149 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1150 for entry in entries][3:])
1152 # No log message after highest serial
1153 self.assertFalse(job.GetLogEntries(count))
1154 self.assertFalse(job.GetLogEntries(count + 3))
1156 def testSubmitManyJobs(self):
1157 queue = _FakeQueueForProc()
1161 opcodes.OpTestDummy(result="Res0", fail=False,
1163 opcodes.OpTestDummy(result="Res1", fail=False,
1165 [opcodes.OpTestDummy(result="r1j0", fail=False)],
1167 opcodes.OpTestDummy(result="Res2", fail=False,
1169 [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1170 opcodes.OpTestDummy(result="r2j0o1", fail=False),
1171 opcodes.OpTestDummy(result="r2j0o2", fail=False),
1172 opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1173 [opcodes.OpTestDummy(result="r2j1", fail=False)],
1174 [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1175 opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1180 job = self._CreateJob(queue, job_id, ops)
1182 def _BeforeStart(timeout, priority):
1183 self.assertEqual(queue.GetNextUpdate(), (job, True))
1184 self.assertRaises(IndexError, queue.GetNextUpdate)
1185 self.assertFalse(queue.IsAcquired())
1186 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1187 self.assertFalse(job.cur_opctx)
1189 def _AfterStart(op, cbs):
1190 self.assertEqual(queue.GetNextUpdate(), (job, True))
1191 self.assertRaises(IndexError, queue.GetNextUpdate)
1193 self.assertFalse(queue.IsAcquired())
1194 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1195 self.assertFalse(job.cur_opctx)
1197 # Job is running, cancelling shouldn't be possible
1198 (success, _) = job.Cancel()
1199 self.assertFalse(success)
1201 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1203 for idx in range(len(ops)):
1204 self.assertRaises(IndexError, queue.GetNextUpdate)
1205 result = jqueue._JobProcessor(queue, opexec, job)()
1206 self.assertEqual(queue.GetNextUpdate(), (job, True))
1207 self.assertRaises(IndexError, queue.GetNextUpdate)
1208 if idx == len(ops) - 1:
1210 self.assert_(result)
1212 self.assertFalse(result)
1214 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1215 self.assert_(job.start_timestamp)
1216 self.assertFalse(job.end_timestamp)
1218 self.assertRaises(IndexError, queue.GetNextUpdate)
1220 for idx, submitted_ops in enumerate(job_ops
1222 for job_ops in op.submit_jobs):
1223 self.assertEqual(queue.GetNextSubmittedJob(),
1224 (1000 + idx, submitted_ops))
1225 self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1227 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1228 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1229 self.assertEqual(job.GetInfo(["opresult"]),
1230 [[[], [1000], [1001, 1002, 1003]]])
1231 self.assertEqual(job.GetInfo(["opstatus"]),
1232 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1234 self._GenericCheckJob(job)
1236 # Calling the processor on a finished job should be a no-op
1237 self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1238 self.assertRaises(IndexError, queue.GetNextUpdate)
1240 def testJobDependency(self):
1241 depmgr = _FakeDependencyManager()
1242 queue = _FakeQueueForProc(depmgr=depmgr)
1244 self.assertEqual(queue.depmgr, depmgr)
1247 prev_job_id2 = 28102
1250 opcodes.OpTestDummy(result="Res0", fail=False,
1252 [prev_job_id2, None],
1253 [prev_job_id, None],
1255 opcodes.OpTestDummy(result="Res1", fail=False),
1259 job = self._CreateJob(queue, job_id, ops)
1261 def _BeforeStart(timeout, priority):
1262 if attempt == 0 or attempt > 5:
1263 # Job should only be updated when it wasn't waiting for another job
1264 self.assertEqual(queue.GetNextUpdate(), (job, True))
1265 self.assertRaises(IndexError, queue.GetNextUpdate)
1266 self.assertFalse(queue.IsAcquired())
1267 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1268 self.assertFalse(job.cur_opctx)
1270 def _AfterStart(op, cbs):
1271 self.assertEqual(queue.GetNextUpdate(), (job, True))
1272 self.assertRaises(IndexError, queue.GetNextUpdate)
1274 self.assertFalse(queue.IsAcquired())
1275 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1276 self.assertFalse(job.cur_opctx)
1278 # Job is running, cancelling shouldn't be possible
1279 (success, _) = job.Cancel()
1280 self.assertFalse(success)
1282 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1284 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1286 counter = itertools.count()
1288 attempt = counter.next()
1290 self.assertRaises(IndexError, queue.GetNextUpdate)
1291 self.assertRaises(IndexError, depmgr.GetNextNotification)
1294 depmgr.AddCheckResult(job, prev_job_id2, None,
1295 (jqueue._JobDependencyManager.WAIT, "wait2"))
1297 depmgr.AddCheckResult(job, prev_job_id2, None,
1298 (jqueue._JobDependencyManager.CONTINUE, "cont"))
1299 # The processor will ask for the next dependency immediately
1300 depmgr.AddCheckResult(job, prev_job_id, None,
1301 (jqueue._JobDependencyManager.WAIT, "wait"))
1303 depmgr.AddCheckResult(job, prev_job_id, None,
1304 (jqueue._JobDependencyManager.WAIT, "wait"))
1306 depmgr.AddCheckResult(job, prev_job_id, None,
1307 (jqueue._JobDependencyManager.CONTINUE, "cont"))
1309 self.assertEqual(depmgr.CountPendingResults(), 2)
1311 self.assertEqual(depmgr.CountPendingResults(), 0)
1313 self.assertEqual(depmgr.CountPendingResults(), 1)
1315 result = jqueue._JobProcessor(queue, opexec, job)()
1316 if attempt == 0 or attempt >= 5:
1317 # Job should only be updated if there was an actual change
1318 self.assertEqual(queue.GetNextUpdate(), (job, True))
1319 self.assertRaises(IndexError, queue.GetNextUpdate)
1320 self.assertFalse(depmgr.CountPendingResults())
1323 # Simulate waiting for other job
1324 self.assertTrue(result)
1325 self.assertTrue(job.cur_opctx)
1326 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1327 self.assertRaises(IndexError, depmgr.GetNextNotification)
1328 self.assert_(job.start_timestamp)
1329 self.assertFalse(job.end_timestamp)
1334 self.assertFalse(job.cur_opctx)
1335 self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1338 self.assertRaises(IndexError, depmgr.GetNextNotification)
1340 self.assertFalse(result)
1341 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1342 self.assert_(job.start_timestamp)
1343 self.assertFalse(job.end_timestamp)
1345 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1346 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1347 self.assertEqual(job.GetInfo(["opresult"]),
1348 [[op.input.result for op in job.ops]])
1349 self.assertEqual(job.GetInfo(["opstatus"]),
1350 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1351 self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1354 self._GenericCheckJob(job)
1356 self.assertRaises(IndexError, queue.GetNextUpdate)
1357 self.assertRaises(IndexError, depmgr.GetNextNotification)
1358 self.assertFalse(depmgr.CountPendingResults())
1359 self.assertFalse(depmgr.CountWaitingJobs())
1361 # Calling the processor on a finished job should be a no-op
1362 self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1363 self.assertRaises(IndexError, queue.GetNextUpdate)
1365 def testJobDependencyCancel(self):
1366 depmgr = _FakeDependencyManager()
1367 queue = _FakeQueueForProc(depmgr=depmgr)
1369 self.assertEqual(queue.depmgr, depmgr)
1374 opcodes.OpTestDummy(result="Res0", fail=False),
1375 opcodes.OpTestDummy(result="Res1", fail=False,
1377 [prev_job_id, None],
1379 opcodes.OpTestDummy(result="Res2", fail=False),
1383 job = self._CreateJob(queue, job_id, ops)
1385 def _BeforeStart(timeout, priority):
1386 if attempt == 0 or attempt > 5:
1387 # Job should only be updated when it wasn't waiting for another job
1388 self.assertEqual(queue.GetNextUpdate(), (job, True))
1389 self.assertRaises(IndexError, queue.GetNextUpdate)
1390 self.assertFalse(queue.IsAcquired())
1391 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1392 self.assertFalse(job.cur_opctx)
1394 def _AfterStart(op, cbs):
1395 self.assertEqual(queue.GetNextUpdate(), (job, True))
1396 self.assertRaises(IndexError, queue.GetNextUpdate)
1398 self.assertFalse(queue.IsAcquired())
1399 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1400 self.assertFalse(job.cur_opctx)
1402 # Job is running, cancelling shouldn't be possible
1403 (success, _) = job.Cancel()
1404 self.assertFalse(success)
1406 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1408 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1410 counter = itertools.count()
1412 attempt = counter.next()
1414 self.assertRaises(IndexError, queue.GetNextUpdate)
1415 self.assertRaises(IndexError, depmgr.GetNextNotification)
1418 # This will handle the first opcode
1421 depmgr.AddCheckResult(job, prev_job_id, None,
1422 (jqueue._JobDependencyManager.WAIT, "wait"))
1424 # Other job was cancelled
1425 depmgr.AddCheckResult(job, prev_job_id, None,
1426 (jqueue._JobDependencyManager.CANCEL, "cancel"))
1429 self.assertEqual(depmgr.CountPendingResults(), 0)
1431 self.assertEqual(depmgr.CountPendingResults(), 1)
1433 result = jqueue._JobProcessor(queue, opexec, job)()
1434 if attempt <= 1 or attempt >= 4:
1435 # Job should only be updated if there was an actual change
1436 self.assertEqual(queue.GetNextUpdate(), (job, True))
1437 self.assertRaises(IndexError, queue.GetNextUpdate)
1438 self.assertFalse(depmgr.CountPendingResults())
1440 if attempt > 0 and attempt < 4:
1441 # Simulate waiting for other job
1442 self.assertTrue(result)
1443 self.assertTrue(job.cur_opctx)
1444 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1445 self.assertRaises(IndexError, depmgr.GetNextNotification)
1446 self.assert_(job.start_timestamp)
1447 self.assertFalse(job.end_timestamp)
1452 self.assertFalse(job.cur_opctx)
1453 self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1456 self.assertRaises(IndexError, depmgr.GetNextNotification)
1458 self.assertFalse(result)
1459 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1460 self.assert_(job.start_timestamp)
1461 self.assertFalse(job.end_timestamp)
1463 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1464 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1465 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1466 [[constants.OP_STATUS_SUCCESS,
1467 constants.OP_STATUS_CANCELED,
1468 constants.OP_STATUS_CANCELED],
1469 ["Res0", "Job canceled by request",
1470 "Job canceled by request"]])
1472 self._GenericCheckJob(job)
1474 self.assertRaises(IndexError, queue.GetNextUpdate)
1475 self.assertRaises(IndexError, depmgr.GetNextNotification)
1476 self.assertFalse(depmgr.CountPendingResults())
1478 # Calling the processor on a finished job should be a no-op
1479 self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1480 self.assertRaises(IndexError, queue.GetNextUpdate)
1482 def testJobDependencyWrongstatus(self):
1483 depmgr = _FakeDependencyManager()
1484 queue = _FakeQueueForProc(depmgr=depmgr)
1486 self.assertEqual(queue.depmgr, depmgr)
1491 opcodes.OpTestDummy(result="Res0", fail=False),
1492 opcodes.OpTestDummy(result="Res1", fail=False,
1494 [prev_job_id, None],
1496 opcodes.OpTestDummy(result="Res2", fail=False),
1500 job = self._CreateJob(queue, job_id, ops)
1502 def _BeforeStart(timeout, priority):
1503 if attempt == 0 or attempt > 5:
1504 # Job should only be updated when it wasn't waiting for another job
1505 self.assertEqual(queue.GetNextUpdate(), (job, True))
1506 self.assertRaises(IndexError, queue.GetNextUpdate)
1507 self.assertFalse(queue.IsAcquired())
1508 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1509 self.assertFalse(job.cur_opctx)
1511 def _AfterStart(op, cbs):
1512 self.assertEqual(queue.GetNextUpdate(), (job, True))
1513 self.assertRaises(IndexError, queue.GetNextUpdate)
1515 self.assertFalse(queue.IsAcquired())
1516 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1517 self.assertFalse(job.cur_opctx)
1519 # Job is running, cancelling shouldn't be possible
1520 (success, _) = job.Cancel()
1521 self.assertFalse(success)
1523 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1525 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1527 counter = itertools.count()
1529 attempt = counter.next()
1531 self.assertRaises(IndexError, queue.GetNextUpdate)
1532 self.assertRaises(IndexError, depmgr.GetNextNotification)
1535 # This will handle the first opcode
1538 depmgr.AddCheckResult(job, prev_job_id, None,
1539 (jqueue._JobDependencyManager.WAIT, "wait"))
1542 depmgr.AddCheckResult(job, prev_job_id, None,
1543 (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1546 self.assertEqual(depmgr.CountPendingResults(), 0)
1548 self.assertEqual(depmgr.CountPendingResults(), 1)
1550 result = jqueue._JobProcessor(queue, opexec, job)()
1551 if attempt <= 1 or attempt >= 4:
1552 # Job should only be updated if there was an actual change
1553 self.assertEqual(queue.GetNextUpdate(), (job, True))
1554 self.assertRaises(IndexError, queue.GetNextUpdate)
1555 self.assertFalse(depmgr.CountPendingResults())
1557 if attempt > 0 and attempt < 4:
1558 # Simulate waiting for other job
1559 self.assertTrue(result)
1560 self.assertTrue(job.cur_opctx)
1561 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1562 self.assertRaises(IndexError, depmgr.GetNextNotification)
1563 self.assert_(job.start_timestamp)
1564 self.assertFalse(job.end_timestamp)
1569 self.assertFalse(job.cur_opctx)
1570 self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1573 self.assertRaises(IndexError, depmgr.GetNextNotification)
1575 self.assertFalse(result)
1576 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1577 self.assert_(job.start_timestamp)
1578 self.assertFalse(job.end_timestamp)
1580 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1581 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1582 self.assertEqual(job.GetInfo(["opstatus"]),
1583 [[constants.OP_STATUS_SUCCESS,
1584 constants.OP_STATUS_ERROR,
1585 constants.OP_STATUS_ERROR]]),
1587 (opresult, ) = job.GetInfo(["opresult"])
1588 self.assertEqual(len(opresult), len(ops))
1589 self.assertEqual(opresult[0], "Res0")
1590 self.assertTrue(errors.GetEncodedError(opresult[1]))
1591 self.assertTrue(errors.GetEncodedError(opresult[2]))
1593 self._GenericCheckJob(job)
1595 self.assertRaises(IndexError, queue.GetNextUpdate)
1596 self.assertRaises(IndexError, depmgr.GetNextNotification)
1597 self.assertFalse(depmgr.CountPendingResults())
1599 # Calling the processor on a finished job should be a no-op
1600 self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1601 self.assertRaises(IndexError, queue.GetNextUpdate)
1604 class _FakeTimeoutStrategy:
1605 def __init__(self, timeouts):
1606 self.timeouts = timeouts
1608 self.last_timeout = None
1610 def NextAttempt(self):
1613 timeout = self.timeouts.pop(0)
1616 self.last_timeout = timeout
1620 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1622 self.queue = _FakeQueueForProc()
1625 self.opcounter = None
1626 self.timeout_strategy = None
1628 self.prev_tsop = None
1629 self.prev_prio = None
1630 self.prev_status = None
1631 self.lock_acq_prio = None
1632 self.gave_lock = None
1633 self.done_lock_before_blocking = False
1635 def _BeforeStart(self, timeout, priority):
1638 # If status has changed, job must've been written
1639 if self.prev_status != self.job.ops[self.curop].status:
1640 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1641 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1643 self.assertFalse(self.queue.IsAcquired())
1644 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1646 ts = self.timeout_strategy
1648 self.assert_(timeout is None or isinstance(timeout, (int, float)))
1649 self.assertEqual(timeout, ts.last_timeout)
1650 self.assertEqual(priority, job.ops[self.curop].priority)
1652 self.gave_lock = True
1653 self.lock_acq_prio = priority
1655 if (self.curop == 3 and
1656 job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1657 # Give locks before running into blocking acquire
1658 assert self.retries == 7
1660 self.done_lock_before_blocking = True
1663 if self.retries > 0:
1664 self.assert_(timeout is not None)
1666 self.gave_lock = False
1667 raise mcpu.LockAcquireTimeout()
1669 if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1670 assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1671 assert not ts.timeouts
1672 self.assert_(timeout is None)
1674 def _AfterStart(self, op, cbs):
1677 # Setting to "running" requires an update
1678 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1679 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1681 self.assertFalse(self.queue.IsAcquired())
1682 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1684 # Job is running, cancelling shouldn't be possible
1685 (success, _) = job.Cancel()
1686 self.assertFalse(success)
1688 def _NextOpcode(self):
1689 self.curop = self.opcounter.next()
1690 self.prev_prio = self.job.ops[self.curop].priority
1691 self.prev_status = self.job.ops[self.curop].status
1693 def _NewTimeoutStrategy(self):
1696 self.assertEqual(self.retries, 0)
1698 if self.prev_tsop == self.curop:
1699 # Still on the same opcode, priority must've been increased
1700 self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1704 timeouts = range(10, 31, 10)
1705 self.retries = len(timeouts) - 1
1707 elif self.curop == 2:
1708 # Let this run into a blocking acquire
1709 timeouts = range(11, 61, 12)
1710 self.retries = len(timeouts)
1712 elif self.curop == 3:
1713 # Wait for priority to increase, but give lock before blocking acquire
1714 timeouts = range(12, 100, 14)
1715 self.retries = len(timeouts)
1717 self.assertFalse(self.done_lock_before_blocking)
1719 elif self.curop == 4:
1720 self.assert_(self.done_lock_before_blocking)
1722 # Timeouts, but no need to retry
1723 timeouts = range(10, 31, 10)
1726 elif self.curop == 5:
1728 timeouts = range(19, 100, 11)
1729 self.retries = len(timeouts)
1735 assert len(job.ops) == 10
1736 assert self.retries <= len(timeouts)
1738 ts = _FakeTimeoutStrategy(timeouts)
1740 self.timeout_strategy = ts
1741 self.prev_tsop = self.curop
1742 self.prev_prio = job.ops[self.curop].priority
1746 def testTimeout(self):
1747 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1752 job = self._CreateJob(self.queue, job_id, ops)
1755 self.opcounter = itertools.count(0)
1757 opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1759 tsf = self._NewTimeoutStrategy
1761 self.assertFalse(self.done_lock_before_blocking)
1764 proc = jqueue._JobProcessor(self.queue, opexec, job,
1765 _timeout_strategy_factory=tsf)
1767 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1769 if self.curop is not None:
1770 self.prev_status = self.job.ops[self.curop].status
1772 self.lock_acq_prio = None
1774 result = proc(_nextop_fn=self._NextOpcode)
1775 assert self.curop is not None
1777 if result or self.gave_lock:
1778 # Got lock and/or job is done, result must've been written
1779 self.assertFalse(job.cur_opctx)
1780 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1781 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1782 self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1783 self.assert_(job.ops[self.curop].exec_timestamp)
1786 self.assertFalse(job.cur_opctx)
1789 self.assertFalse(result)
1792 self.assertEqual(job.ops[self.curop].start_timestamp,
1793 job.start_timestamp)
1796 # Opcode finished, but job not yet done
1797 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1800 self.assert_(job.cur_opctx)
1801 self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1802 self.timeout_strategy.NextAttempt)
1803 self.assertFalse(job.ops[self.curop].exec_timestamp)
1804 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1806 # If priority has changed since acquiring locks, the job must've been
1808 if self.lock_acq_prio != job.ops[self.curop].priority:
1809 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1811 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1813 self.assert_(job.start_timestamp)
1814 self.assertFalse(job.end_timestamp)
1816 self.assertEqual(self.curop, len(job.ops) - 1)
1817 self.assertEqual(self.job, job)
1818 self.assertEqual(self.opcounter.next(), len(job.ops))
1819 self.assert_(self.done_lock_before_blocking)
1821 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1822 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1823 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1824 self.assertEqual(job.GetInfo(["opresult"]),
1825 [[op.input.result for op in job.ops]])
1826 self.assertEqual(job.GetInfo(["opstatus"]),
1827 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1828 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1831 # Calling the processor on a finished job should be a no-op
1832 self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
1833 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1836 class TestJobDependencyManager(unittest.TestCase):
1838 def __init__(self, job_id):
1839 self.id = str(job_id)
1844 self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1846 def _GetStatus(self, job_id):
1847 (exp_job_id, result) = self._status.pop(0)
1848 self.assertEqual(exp_job_id, job_id)
1851 def _Enqueue(self, jobs):
1852 self._queue.append(jobs)
1854 def testNotFinalizedThenCancel(self):
1855 job = self._FakeJob(17697)
1858 self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1859 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1860 self.assertEqual(result, self.jdm.WAIT)
1861 self.assertFalse(self._status)
1862 self.assertFalse(self._queue)
1863 self.assertTrue(self.jdm.JobWaiting(job))
1864 self.assertEqual(self.jdm._waiters, {
1868 self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1869 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1870 self.assertEqual(result, self.jdm.CANCEL)
1871 self.assertFalse(self._status)
1872 self.assertFalse(self._queue)
1873 self.assertFalse(self.jdm.JobWaiting(job))
1875 def testRequireCancel(self):
1876 job = self._FakeJob(5278)
1878 dep_status = [constants.JOB_STATUS_CANCELED]
1880 self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1881 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1882 self.assertEqual(result, self.jdm.WAIT)
1883 self.assertFalse(self._status)
1884 self.assertFalse(self._queue)
1885 self.assertTrue(self.jdm.JobWaiting(job))
1886 self.assertEqual(self.jdm._waiters, {
1890 self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1891 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1892 self.assertEqual(result, self.jdm.CONTINUE)
1893 self.assertFalse(self._status)
1894 self.assertFalse(self._queue)
1895 self.assertFalse(self.jdm.JobWaiting(job))
1897 def testRequireError(self):
1898 job = self._FakeJob(21459)
1900 dep_status = [constants.JOB_STATUS_ERROR]
1902 self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1903 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1904 self.assertEqual(result, self.jdm.WAIT)
1905 self.assertFalse(self._status)
1906 self.assertFalse(self._queue)
1907 self.assertTrue(self.jdm.JobWaiting(job))
1908 self.assertEqual(self.jdm._waiters, {
1912 self._status.append((job_id, constants.JOB_STATUS_ERROR))
1913 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1914 self.assertEqual(result, self.jdm.CONTINUE)
1915 self.assertFalse(self._status)
1916 self.assertFalse(self._queue)
1917 self.assertFalse(self.jdm.JobWaiting(job))
1919 def testRequireMultiple(self):
1920 dep_status = list(constants.JOBS_FINALIZED)
1922 for end_status in dep_status:
1923 job = self._FakeJob(21343)
1926 self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1927 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1928 self.assertEqual(result, self.jdm.WAIT)
1929 self.assertFalse(self._status)
1930 self.assertFalse(self._queue)
1931 self.assertTrue(self.jdm.JobWaiting(job))
1932 self.assertEqual(self.jdm._waiters, {
1936 self._status.append((job_id, end_status))
1937 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1938 self.assertEqual(result, self.jdm.CONTINUE)
1939 self.assertFalse(self._status)
1940 self.assertFalse(self._queue)
1941 self.assertFalse(self.jdm.JobWaiting(job))
1943 def testNotify(self):
1944 job = self._FakeJob(8227)
1947 self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1948 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1949 self.assertEqual(result, self.jdm.WAIT)
1950 self.assertFalse(self._status)
1951 self.assertFalse(self._queue)
1952 self.assertTrue(self.jdm.JobWaiting(job))
1953 self.assertEqual(self.jdm._waiters, {
1957 self.jdm.NotifyWaiters(job_id)
1958 self.assertFalse(self._status)
1959 self.assertFalse(self.jdm._waiters)
1960 self.assertFalse(self.jdm.JobWaiting(job))
1961 self.assertEqual(self._queue, [set([job])])
1963 def testWrongStatus(self):
1964 job = self._FakeJob(10102)
1967 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
1968 (result, _) = self.jdm.CheckAndRegister(job, job_id,
1969 [constants.JOB_STATUS_SUCCESS])
1970 self.assertEqual(result, self.jdm.WAIT)
1971 self.assertFalse(self._status)
1972 self.assertFalse(self._queue)
1973 self.assertTrue(self.jdm.JobWaiting(job))
1974 self.assertEqual(self.jdm._waiters, {
1978 self._status.append((job_id, constants.JOB_STATUS_ERROR))
1979 (result, _) = self.jdm.CheckAndRegister(job, job_id,
1980 [constants.JOB_STATUS_SUCCESS])
1981 self.assertEqual(result, self.jdm.WRONGSTATUS)
1982 self.assertFalse(self._status)
1983 self.assertFalse(self._queue)
1984 self.assertFalse(self.jdm.JobWaiting(job))
1986 def testCorrectStatus(self):
1987 job = self._FakeJob(24273)
1990 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
1991 (result, _) = self.jdm.CheckAndRegister(job, job_id,
1992 [constants.JOB_STATUS_SUCCESS])
1993 self.assertEqual(result, self.jdm.WAIT)
1994 self.assertFalse(self._status)
1995 self.assertFalse(self._queue)
1996 self.assertTrue(self.jdm.JobWaiting(job))
1997 self.assertEqual(self.jdm._waiters, {
2001 self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2002 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2003 [constants.JOB_STATUS_SUCCESS])
2004 self.assertEqual(result, self.jdm.CONTINUE)
2005 self.assertFalse(self._status)
2006 self.assertFalse(self._queue)
2007 self.assertFalse(self.jdm.JobWaiting(job))
2009 def testFinalizedRightAway(self):
2010 job = self._FakeJob(224)
2013 self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2014 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2015 [constants.JOB_STATUS_SUCCESS])
2016 self.assertEqual(result, self.jdm.CONTINUE)
2017 self.assertFalse(self._status)
2018 self.assertFalse(self._queue)
2019 self.assertFalse(self.jdm.JobWaiting(job))
2020 self.assertEqual(self.jdm._waiters, {
2025 self.jdm.NotifyWaiters("0")
2026 self.assertFalse(self.jdm._waiters)
2027 self.assertFalse(self._status)
2028 self.assertFalse(self._queue)
2030 def testSelfDependency(self):
2031 job = self._FakeJob(18937)
2033 self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2034 (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2035 self.assertEqual(result, self.jdm.ERROR)
2037 def testJobDisappears(self):
2038 job = self._FakeJob(30540)
2042 raise errors.JobLost("#msg#")
2044 jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2045 (result, _) = jdm.CheckAndRegister(job, job_id, [])
2046 self.assertEqual(result, self.jdm.ERROR)
2047 self.assertFalse(jdm.JobWaiting(job))
2050 if __name__ == "__main__":
2051 testutils.GanetiTestProgram()