4 # Copyright (C) 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.jqueue"""
33 from ganeti import constants
34 from ganeti import utils
35 from ganeti import errors
36 from ganeti import jqueue
37 from ganeti import opcodes
38 from ganeti import compat
39 from ganeti import mcpu
40 from ganeti import query
41 from ganeti import workerpool
47 def __init__(self, job_id, status):
53 def SetStatus(self, status):
56 def AddLogEntry(self, msg):
57 self._log.append((len(self._log), msg))
62 def GetInfo(self, fields):
67 result.append(self._status)
69 raise Exception("Unknown field")
73 def GetLogEntries(self, newer_than):
74 assert newer_than is None or newer_than >= 0
76 if newer_than is None:
79 return self._log[newer_than:]
82 class TestJobChangesChecker(unittest.TestCase):
84 job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
85 checker = jqueue._JobChangesChecker(["status"], None, None)
86 self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
88 job.SetStatus(constants.JOB_STATUS_RUNNING)
89 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
91 job.SetStatus(constants.JOB_STATUS_SUCCESS)
92 self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
94 # job.id is used by checker
95 self.assertEqual(job.id, 9094)
97 def testStatusWithPrev(self):
98 job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
99 checker = jqueue._JobChangesChecker(["status"],
100 [constants.JOB_STATUS_QUEUED], None)
101 self.assert_(checker(job) is None)
103 job.SetStatus(constants.JOB_STATUS_RUNNING)
104 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
106 def testFinalStatus(self):
107 for status in constants.JOBS_FINALIZED:
108 job = _FakeJob(2178711, status)
109 checker = jqueue._JobChangesChecker(["status"], [status], None)
110 # There won't be any changes in this status, hence it should signal
111 # a change immediately
112 self.assertEqual(checker(job), ([status], []))
115 job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
116 checker = jqueue._JobChangesChecker(["status"], None, None)
117 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
119 job.AddLogEntry("Hello World")
120 (job_info, log_entries) = checker(job)
121 self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
122 self.assertEqual(log_entries, [[0, "Hello World"]])
124 checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
125 self.assert_(checker2(job) is None)
127 job.AddLogEntry("Foo Bar")
128 job.SetStatus(constants.JOB_STATUS_ERROR)
130 (job_info, log_entries) = checker2(job)
131 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
132 self.assertEqual(log_entries, [[1, "Foo Bar"]])
134 checker3 = jqueue._JobChangesChecker(["status"], None, None)
135 (job_info, log_entries) = checker3(job)
136 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
137 self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
140 class TestJobChangesWaiter(unittest.TestCase):
142 self.tmpdir = tempfile.mkdtemp()
143 self.filename = utils.PathJoin(self.tmpdir, "job-1")
144 utils.WriteFile(self.filename, data="")
147 shutil.rmtree(self.tmpdir)
149 def _EnsureNotifierClosed(self, notifier):
151 os.fstat(notifier._fd)
152 except EnvironmentError, err:
153 self.assertEqual(err.errno, errno.EBADF)
155 self.fail("File descriptor wasn't closed")
158 for wait in [False, True]:
159 waiter = jqueue._JobFileChangesWaiter(self.filename)
166 # Ensure file descriptor was closed
167 self._EnsureNotifierClosed(waiter._notifier)
169 def testChangingFile(self):
170 waiter = jqueue._JobFileChangesWaiter(self.filename)
172 self.assertFalse(waiter.Wait(0.1))
173 utils.WriteFile(self.filename, data="changed")
174 self.assert_(waiter.Wait(60))
178 self._EnsureNotifierClosed(waiter._notifier)
180 def testChangingFile2(self):
181 waiter = jqueue._JobChangesWaiter(self.filename)
183 self.assertFalse(waiter._filewaiter)
184 self.assert_(waiter.Wait(0.1))
185 self.assert_(waiter._filewaiter)
187 # File waiter is now used, but there have been no changes
188 self.assertFalse(waiter.Wait(0.1))
189 utils.WriteFile(self.filename, data="changed")
190 self.assert_(waiter.Wait(60))
194 self._EnsureNotifierClosed(waiter._filewaiter._notifier)
197 class TestWaitForJobChangesHelper(unittest.TestCase):
199 self.tmpdir = tempfile.mkdtemp()
200 self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
201 utils.WriteFile(self.filename, data="")
204 shutil.rmtree(self.tmpdir)
206 def _LoadWaitingJob(self):
207 return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
209 def _LoadLostJob(self):
212 def testNoChanges(self):
213 wfjc = jqueue._WaitForJobChangesHelper()
216 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
217 [constants.JOB_STATUS_WAITING], None, 0.1),
218 constants.JOB_NOTCHANGED)
220 # No previous information
221 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
222 ["status"], None, None, 1.0),
223 ([constants.JOB_STATUS_WAITING], []))
225 def testLostJob(self):
226 wfjc = jqueue._WaitForJobChangesHelper()
227 self.assert_(wfjc(self.filename, self._LoadLostJob,
228 ["status"], None, None, 1.0) is None)
231 class TestEncodeOpError(unittest.TestCase):
233 encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
234 self.assert_(isinstance(encerr, tuple))
235 self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
237 encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
238 self.assert_(isinstance(encerr, tuple))
239 self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
241 encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
242 self.assert_(isinstance(encerr, tuple))
243 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
245 encerr = jqueue._EncodeOpError("Hello World")
246 self.assert_(isinstance(encerr, tuple))
247 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
250 class TestQueuedOpCode(unittest.TestCase):
251 def testDefaults(self):
253 self.assertFalse(hasattr(op.input, "dry_run"))
254 self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
255 self.assertFalse(op.log)
256 self.assert_(op.start_timestamp is None)
257 self.assert_(op.exec_timestamp is None)
258 self.assert_(op.end_timestamp is None)
259 self.assert_(op.result is None)
260 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
262 op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
264 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
266 self.assertEqual(op1.Serialize(), op2.Serialize())
268 def testPriority(self):
270 assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
271 "Default priority equals high priority; test can't work"
272 self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
273 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
275 inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
276 op1 = jqueue._QueuedOpCode(inpop)
278 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
280 self.assertEqual(op1.Serialize(), op2.Serialize())
283 class TestQueuedJob(unittest.TestCase):
285 self.assertRaises(errors.GenericError, jqueue._QueuedJob,
288 def testDefaults(self):
292 opcodes.OpTestDelay(),
296 self.assertTrue(job.writable)
297 self.assertEqual(job.id, job_id)
298 self.assertEqual(job.log_serial, 0)
299 self.assert_(job.received_timestamp)
300 self.assert_(job.start_timestamp is None)
301 self.assert_(job.end_timestamp is None)
302 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
303 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
304 self.assert_(repr(job).startswith("<"))
305 self.assertEqual(len(job.ops), len(ops))
306 self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
307 for (inp, op) in zip(ops, job.ops)))
308 self.assertRaises(errors.OpPrereqError, job.GetInfo,
310 self.assertEqual(job.GetInfo(["summary"]),
311 [[op.input.Summary() for op in job.ops]])
313 job1 = jqueue._QueuedJob(None, job_id, ops, True)
315 job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
317 self.assertEqual(job1.Serialize(), job2.Serialize())
319 def testWritable(self):
320 job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
321 self.assertFalse(job.writable)
323 job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
324 self.assertTrue(job.writable)
326 def testPriority(self):
329 opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
330 opcodes.OpTestDelay(),
334 self.assertEqual(job.id, job_id)
335 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
336 self.assert_(repr(job).startswith("<"))
338 job = jqueue._QueuedJob(None, job_id, ops, True)
340 self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
342 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
345 job.ops[0].priority -= 1
347 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
349 # Mark opcode as finished
350 job.ops[0].status = constants.OP_STATUS_SUCCESS
352 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
355 job.ops[1].priority -= 10
356 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
358 # Test increasing first
359 job.ops[0].status = constants.OP_STATUS_RUNNING
360 job.ops[0].priority -= 19
361 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
363 def testCalcStatus(self):
365 # The default status is "queued"
366 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
370 ops[0].status = constants.OP_STATUS_WAITING
373 ops[0].status = constants.OP_STATUS_SUCCESS
374 ops[1].status = constants.OP_STATUS_SUCCESS
375 ops[2].status = constants.OP_STATUS_WAITING
378 ops[0].status = constants.OP_STATUS_SUCCESS
379 ops[1].status = constants.OP_STATUS_RUNNING
381 op.status = constants.OP_STATUS_QUEUED
383 def _Canceling1(ops):
384 ops[0].status = constants.OP_STATUS_SUCCESS
385 ops[1].status = constants.OP_STATUS_SUCCESS
387 op.status = constants.OP_STATUS_CANCELING
389 def _Canceling2(ops):
391 op.status = constants.OP_STATUS_CANCELING
395 op.status = constants.OP_STATUS_CANCELED
398 for idx, op in enumerate(ops):
400 op.status = constants.OP_STATUS_ERROR
402 op.status = constants.OP_STATUS_SUCCESS
406 op.status = constants.OP_STATUS_ERROR
410 op.status = constants.OP_STATUS_SUCCESS
413 constants.JOB_STATUS_QUEUED: [_Queued],
414 constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
415 constants.JOB_STATUS_RUNNING: [_Running],
416 constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
417 constants.JOB_STATUS_CANCELED: [_Canceled],
418 constants.JOB_STATUS_ERROR: [_Error1, _Error2],
419 constants.JOB_STATUS_SUCCESS: [_Success],
423 job = jqueue._QueuedJob(None, 1,
424 [opcodes.OpTestDelay() for _ in range(10)],
426 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
427 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
431 for status in constants.JOB_STATUS_ALL:
432 sttests = tests[status]
437 self.assertEqual(job.CalcStatus(), status)
440 class _FakeDependencyManager:
443 self._notifications = []
444 self._waiting = set()
446 def AddCheckResult(self, job, dep_job_id, dep_status, result):
447 self._checks.append((job, dep_job_id, dep_status, result))
449 def CountPendingResults(self):
450 return len(self._checks)
452 def CountWaitingJobs(self):
453 return len(self._waiting)
455 def GetNextNotification(self):
456 return self._notifications.pop(0)
458 def JobWaiting(self, job):
459 return job in self._waiting
461 def CheckAndRegister(self, job, dep_job_id, dep_status):
462 (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
464 assert exp_job == job
465 assert exp_dep_job_id == dep_job_id
466 assert exp_dep_status == dep_status
468 (result_status, _) = result
470 if result_status == jqueue._JobDependencyManager.WAIT:
471 self._waiting.add(job)
472 elif result_status == jqueue._JobDependencyManager.CONTINUE:
473 self._waiting.remove(job)
477 def NotifyWaiters(self, job_id):
478 self._notifications.append(job_id)
481 class _DisabledFakeDependencyManager:
482 def JobWaiting(self, _):
485 def CheckAndRegister(self, *args):
486 assert False, "Should not be called"
488 def NotifyWaiters(self, _):
492 class _FakeQueueForProc:
493 def __init__(self, depmgr=None):
494 self._acquired = False
498 self._submit_count = itertools.count(1000)
503 self.depmgr = _DisabledFakeDependencyManager()
505 def IsAcquired(self):
506 return self._acquired
508 def GetNextUpdate(self):
509 return self._updates.pop(0)
511 def GetNextSubmittedJob(self):
512 return self._submitted.pop(0)
514 def acquire(self, shared=0):
516 self._acquired = True
519 assert self._acquired
520 self._acquired = False
522 def UpdateJobUnlocked(self, job, replicate=True):
523 assert self._acquired, "Lock not acquired while updating job"
524 self._updates.append((job, bool(replicate)))
526 def SubmitManyJobs(self, jobs):
527 assert not self._acquired, "Lock acquired while submitting jobs"
528 job_ids = [self._submit_count.next() for _ in jobs]
529 self._submitted.extend(zip(job_ids, jobs))
533 class _FakeExecOpCodeForProc:
534 def __init__(self, queue, before_start, after_start):
536 self._before_start = before_start
537 self._after_start = after_start
539 def __call__(self, op, cbs, timeout=None, priority=None):
540 assert isinstance(op, opcodes.OpTestDummy)
541 assert not self._queue.IsAcquired(), \
542 "Queue lock not released when executing opcode"
544 if self._before_start:
545 self._before_start(timeout, priority)
549 if self._after_start:
550 self._after_start(op, cbs)
552 # Check again after the callbacks
553 assert not self._queue.IsAcquired()
556 raise errors.OpExecError("Error requested (%s)" % op.result)
558 if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
559 return cbs.SubmitManyJobs(op.submit_jobs)
564 class _JobProcessorTestUtils:
565 def _CreateJob(self, queue, job_id, ops):
566 job = jqueue._QueuedJob(queue, job_id, ops, True)
567 self.assertFalse(job.start_timestamp)
568 self.assertFalse(job.end_timestamp)
569 self.assertEqual(len(ops), len(job.ops))
570 self.assert_(compat.all(op.input == inp
571 for (op, inp) in zip(job.ops, ops)))
572 self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
576 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
577 def _GenericCheckJob(self, job):
578 assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
581 self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
582 [[op.start_timestamp for op in job.ops],
583 [op.exec_timestamp for op in job.ops],
584 [op.end_timestamp for op in job.ops]])
585 self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
586 [job.received_timestamp,
589 self.assert_(job.start_timestamp)
590 self.assert_(job.end_timestamp)
591 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
593 def testSuccess(self):
594 queue = _FakeQueueForProc()
596 for (job_id, opcount) in [(25351, 1), (6637, 3),
597 (24644, 10), (32207, 100)]:
598 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
599 for i in range(opcount)]
602 job = self._CreateJob(queue, job_id, ops)
604 def _BeforeStart(timeout, priority):
605 self.assertEqual(queue.GetNextUpdate(), (job, True))
606 self.assertRaises(IndexError, queue.GetNextUpdate)
607 self.assertFalse(queue.IsAcquired())
608 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
609 self.assertFalse(job.cur_opctx)
611 def _AfterStart(op, cbs):
612 self.assertEqual(queue.GetNextUpdate(), (job, True))
613 self.assertRaises(IndexError, queue.GetNextUpdate)
615 self.assertFalse(queue.IsAcquired())
616 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
617 self.assertFalse(job.cur_opctx)
619 # Job is running, cancelling shouldn't be possible
620 (success, _) = job.Cancel()
621 self.assertFalse(success)
623 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
625 for idx in range(len(ops)):
626 self.assertRaises(IndexError, queue.GetNextUpdate)
627 result = jqueue._JobProcessor(queue, opexec, job)()
628 self.assertEqual(queue.GetNextUpdate(), (job, True))
629 self.assertRaises(IndexError, queue.GetNextUpdate)
630 if idx == len(ops) - 1:
632 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
634 self.assertEqual(result, jqueue._JobProcessor.DEFER)
636 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
637 self.assert_(job.start_timestamp)
638 self.assertFalse(job.end_timestamp)
640 self.assertRaises(IndexError, queue.GetNextUpdate)
642 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
643 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
644 self.assertEqual(job.GetInfo(["opresult"]),
645 [[op.input.result for op in job.ops]])
646 self.assertEqual(job.GetInfo(["opstatus"]),
647 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
648 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
651 self._GenericCheckJob(job)
653 # Calling the processor on a finished job should be a no-op
654 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
655 jqueue._JobProcessor.FINISHED)
656 self.assertRaises(IndexError, queue.GetNextUpdate)
658 def testOpcodeError(self):
659 queue = _FakeQueueForProc()
666 (23816, 100, 39, 45),
669 for (job_id, opcount, failfrom, failto) in testdata:
671 ops = [opcodes.OpTestDummy(result="Res%s" % i,
672 fail=(failfrom <= i and
674 for i in range(opcount)]
677 job = self._CreateJob(queue, str(job_id), ops)
679 opexec = _FakeExecOpCodeForProc(queue, None, None)
681 for idx in range(len(ops)):
682 self.assertRaises(IndexError, queue.GetNextUpdate)
683 result = jqueue._JobProcessor(queue, opexec, job)()
685 self.assertEqual(queue.GetNextUpdate(), (job, True))
686 # waitlock to running
687 self.assertEqual(queue.GetNextUpdate(), (job, True))
689 self.assertEqual(queue.GetNextUpdate(), (job, True))
690 self.assertRaises(IndexError, queue.GetNextUpdate)
692 if idx in (failfrom, len(ops) - 1):
694 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
697 self.assertEqual(result, jqueue._JobProcessor.DEFER)
699 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
701 self.assertRaises(IndexError, queue.GetNextUpdate)
704 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
705 self.assertEqual(job.GetInfo(["id"]), [job_id])
706 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
708 # Check opcode status
710 job.GetInfo(["opstatus"])[0],
711 job.GetInfo(["opresult"])[0])
713 for idx, (op, opstatus, opresult) in enumerate(data):
715 assert not op.input.fail
716 self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
717 self.assertEqual(opresult, op.input.result)
720 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
721 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
723 assert not op.input.fail
724 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
725 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
727 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
728 for op in job.ops[:failfrom]))
730 self._GenericCheckJob(job)
732 # Calling the processor on a finished job should be a no-op
733 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
734 jqueue._JobProcessor.FINISHED)
735 self.assertRaises(IndexError, queue.GetNextUpdate)
737 def testCancelWhileInQueue(self):
738 queue = _FakeQueueForProc()
740 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
745 job = self._CreateJob(queue, job_id, ops)
747 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
750 (success, _) = job.Cancel()
751 self.assert_(success)
753 self.assertRaises(IndexError, queue.GetNextUpdate)
755 self.assertFalse(job.start_timestamp)
756 self.assertTrue(job.end_timestamp)
757 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
760 # Serialize to check for differences
761 before_proc = job.Serialize()
763 # Simulate processor called in workerpool
764 opexec = _FakeExecOpCodeForProc(queue, None, None)
765 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
766 jqueue._JobProcessor.FINISHED)
769 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
770 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
771 self.assertFalse(job.start_timestamp)
772 self.assertTrue(job.end_timestamp)
773 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
775 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
776 [[constants.OP_STATUS_CANCELED for _ in job.ops],
777 ["Job canceled by request" for _ in job.ops]])
779 # Must not have changed or written
780 self.assertEqual(before_proc, job.Serialize())
781 self.assertRaises(IndexError, queue.GetNextUpdate)
783 def testCancelWhileWaitlockInQueue(self):
784 queue = _FakeQueueForProc()
786 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
791 job = self._CreateJob(queue, job_id, ops)
793 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
795 job.ops[0].status = constants.OP_STATUS_WAITING
797 assert len(job.ops) == 5
799 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
802 (success, _) = job.Cancel()
803 self.assert_(success)
805 self.assertRaises(IndexError, queue.GetNextUpdate)
807 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
810 opexec = _FakeExecOpCodeForProc(queue, None, None)
811 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
812 jqueue._JobProcessor.FINISHED)
815 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
816 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
817 self.assertFalse(job.start_timestamp)
818 self.assert_(job.end_timestamp)
819 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
821 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
822 [[constants.OP_STATUS_CANCELED for _ in job.ops],
823 ["Job canceled by request" for _ in job.ops]])
825 def testCancelWhileWaitlock(self):
826 queue = _FakeQueueForProc()
828 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
833 job = self._CreateJob(queue, job_id, ops)
835 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
837 def _BeforeStart(timeout, priority):
838 self.assertEqual(queue.GetNextUpdate(), (job, True))
839 self.assertRaises(IndexError, queue.GetNextUpdate)
840 self.assertFalse(queue.IsAcquired())
841 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
844 (success, _) = job.Cancel()
845 self.assert_(success)
847 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
849 self.assertRaises(IndexError, queue.GetNextUpdate)
851 def _AfterStart(op, cbs):
852 self.assertEqual(queue.GetNextUpdate(), (job, True))
853 self.assertRaises(IndexError, queue.GetNextUpdate)
854 self.assertFalse(queue.IsAcquired())
855 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
857 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
859 self.assertRaises(IndexError, queue.GetNextUpdate)
860 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
861 jqueue._JobProcessor.FINISHED)
862 self.assertEqual(queue.GetNextUpdate(), (job, True))
863 self.assertRaises(IndexError, queue.GetNextUpdate)
866 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
867 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
868 self.assert_(job.start_timestamp)
869 self.assert_(job.end_timestamp)
870 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
872 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
873 [[constants.OP_STATUS_CANCELED for _ in job.ops],
874 ["Job canceled by request" for _ in job.ops]])
876 def testCancelWhileWaitlockWithTimeout(self):
877 queue = _FakeQueueForProc()
879 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
884 job = self._CreateJob(queue, job_id, ops)
886 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
888 def _BeforeStart(timeout, priority):
889 self.assertFalse(queue.IsAcquired())
890 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
893 (success, _) = job.Cancel()
894 self.assert_(success)
896 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
899 # Fake an acquire attempt timing out
900 raise mcpu.LockAcquireTimeout()
902 def _AfterStart(op, cbs):
903 self.fail("Should not reach this")
905 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
907 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
908 jqueue._JobProcessor.FINISHED)
911 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
912 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
913 self.assert_(job.start_timestamp)
914 self.assert_(job.end_timestamp)
915 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
917 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
918 [[constants.OP_STATUS_CANCELED for _ in job.ops],
919 ["Job canceled by request" for _ in job.ops]])
921 def testCancelWhileRunning(self):
922 # Tests canceling a job with finished opcodes and more, unprocessed ones
923 queue = _FakeQueueForProc()
925 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
930 job = self._CreateJob(queue, job_id, ops)
932 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
934 opexec = _FakeExecOpCodeForProc(queue, None, None)
937 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
938 jqueue._JobProcessor.DEFER)
940 # Job goes back to queued
941 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
942 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
943 [[constants.OP_STATUS_SUCCESS,
944 constants.OP_STATUS_QUEUED,
945 constants.OP_STATUS_QUEUED],
946 ["Res0", None, None]])
949 (success, _) = job.Cancel()
950 self.assert_(success)
952 # Try processing another opcode (this will actually cancel the job)
953 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
954 jqueue._JobProcessor.FINISHED)
957 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
958 self.assertEqual(job.GetInfo(["id"]), [job_id])
959 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
960 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
961 [[constants.OP_STATUS_SUCCESS,
962 constants.OP_STATUS_CANCELED,
963 constants.OP_STATUS_CANCELED],
964 ["Res0", "Job canceled by request",
965 "Job canceled by request"]])
967 def testPartiallyRun(self):
968 # Tests calling the processor on a job that's been partially run before the
969 # program was restarted
970 queue = _FakeQueueForProc()
972 opexec = _FakeExecOpCodeForProc(queue, None, None)
974 for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
975 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
979 job = self._CreateJob(queue, job_id, ops)
981 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
983 for _ in range(successcount):
984 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
985 jqueue._JobProcessor.DEFER)
987 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
988 self.assertEqual(job.GetInfo(["opstatus"]),
989 [[constants.OP_STATUS_SUCCESS
990 for _ in range(successcount)] +
991 [constants.OP_STATUS_QUEUED
992 for _ in range(len(ops) - successcount)]])
994 self.assert_(job.ops_iter)
996 # Serialize and restore (simulates program restart)
997 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
998 self.assertFalse(newjob.ops_iter)
999 self._TestPartial(newjob, successcount)
1001 def _TestPartial(self, job, successcount):
1002 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1003 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1005 queue = _FakeQueueForProc()
1006 opexec = _FakeExecOpCodeForProc(queue, None, None)
1008 for remaining in reversed(range(len(job.ops) - successcount)):
1009 result = jqueue._JobProcessor(queue, opexec, job)()
1010 self.assertEqual(queue.GetNextUpdate(), (job, True))
1011 self.assertEqual(queue.GetNextUpdate(), (job, True))
1012 self.assertEqual(queue.GetNextUpdate(), (job, True))
1013 self.assertRaises(IndexError, queue.GetNextUpdate)
1017 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1020 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1022 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1024 self.assertRaises(IndexError, queue.GetNextUpdate)
1025 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1026 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1027 self.assertEqual(job.GetInfo(["opresult"]),
1028 [[op.input.result for op in job.ops]])
1029 self.assertEqual(job.GetInfo(["opstatus"]),
1030 [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1031 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1034 self._GenericCheckJob(job)
1036 # Calling the processor on a finished job should be a no-op
1037 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1038 jqueue._JobProcessor.FINISHED)
1039 self.assertRaises(IndexError, queue.GetNextUpdate)
1041 # ... also after being restored
1042 job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1043 # Calling the processor on a finished job should be a no-op
1044 self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1045 jqueue._JobProcessor.FINISHED)
1046 self.assertRaises(IndexError, queue.GetNextUpdate)
1048 def testProcessorOnRunningJob(self):
1049 ops = [opcodes.OpTestDummy(result="result", fail=False)]
1051 queue = _FakeQueueForProc()
1052 opexec = _FakeExecOpCodeForProc(queue, None, None)
1055 job = self._CreateJob(queue, 9571, ops)
1057 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1059 job.ops[0].status = constants.OP_STATUS_RUNNING
1061 assert len(job.ops) == 1
1063 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1065 # Calling on running job must fail
1066 self.assertRaises(errors.ProgrammerError,
1067 jqueue._JobProcessor(queue, opexec, job))
1069 def testLogMessages(self):
1070 # Tests the "Feedback" callback function
1071 queue = _FakeQueueForProc()
1077 (constants.ELOG_MESSAGE, "there"),
1080 (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1081 (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1084 ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1085 messages=messages.get(i, []))
1089 job = self._CreateJob(queue, 29386, ops)
1091 def _BeforeStart(timeout, priority):
1092 self.assertEqual(queue.GetNextUpdate(), (job, True))
1093 self.assertRaises(IndexError, queue.GetNextUpdate)
1094 self.assertFalse(queue.IsAcquired())
1095 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1097 def _AfterStart(op, cbs):
1098 self.assertEqual(queue.GetNextUpdate(), (job, True))
1099 self.assertRaises(IndexError, queue.GetNextUpdate)
1100 self.assertFalse(queue.IsAcquired())
1101 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1103 self.assertRaises(AssertionError, cbs.Feedback,
1104 "too", "many", "arguments")
1106 for (log_type, msg) in op.messages:
1107 self.assertRaises(IndexError, queue.GetNextUpdate)
1109 cbs.Feedback(log_type, msg)
1112 # Check for job update without replication
1113 self.assertEqual(queue.GetNextUpdate(), (job, False))
1114 self.assertRaises(IndexError, queue.GetNextUpdate)
1116 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1118 for remaining in reversed(range(len(job.ops))):
1119 self.assertRaises(IndexError, queue.GetNextUpdate)
1120 result = jqueue._JobProcessor(queue, opexec, job)()
1121 self.assertEqual(queue.GetNextUpdate(), (job, True))
1122 self.assertRaises(IndexError, queue.GetNextUpdate)
1126 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1129 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1131 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1133 self.assertRaises(IndexError, queue.GetNextUpdate)
1135 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1136 self.assertEqual(job.GetInfo(["opresult"]),
1137 [[op.input.result for op in job.ops]])
1139 logmsgcount = sum(len(m) for m in messages.values())
1141 self._CheckLogMessages(job, logmsgcount)
1143 # Serialize and restore (simulates program restart)
1144 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1145 self._CheckLogMessages(newjob, logmsgcount)
1147 # Check each message
1149 for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1150 for (serial, timestamp, log_type, msg) in oplog:
1151 (exptype, expmsg) = messages.get(idx).pop(0)
1153 self.assertEqual(log_type, exptype)
1155 self.assertEqual(log_type, constants.ELOG_MESSAGE)
1156 self.assertEqual(expmsg, msg)
1157 self.assert_(serial > prevserial)
1160 def _CheckLogMessages(self, job, count):
1162 self.assertEqual(job.log_serial, count)
1165 self.assertEqual(job.GetLogEntries(None),
1166 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1167 for entry in entries])
1169 # Filter with serial
1171 self.assert_(job.GetLogEntries(3))
1172 self.assertEqual(job.GetLogEntries(3),
1173 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1174 for entry in entries][3:])
1176 # No log message after highest serial
1177 self.assertFalse(job.GetLogEntries(count))
1178 self.assertFalse(job.GetLogEntries(count + 3))
1180 def testSubmitManyJobs(self):
1181 queue = _FakeQueueForProc()
1185 opcodes.OpTestDummy(result="Res0", fail=False,
1187 opcodes.OpTestDummy(result="Res1", fail=False,
1189 [opcodes.OpTestDummy(result="r1j0", fail=False)],
1191 opcodes.OpTestDummy(result="Res2", fail=False,
1193 [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1194 opcodes.OpTestDummy(result="r2j0o1", fail=False),
1195 opcodes.OpTestDummy(result="r2j0o2", fail=False),
1196 opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1197 [opcodes.OpTestDummy(result="r2j1", fail=False)],
1198 [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1199 opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1204 job = self._CreateJob(queue, job_id, ops)
1206 def _BeforeStart(timeout, priority):
1207 self.assertEqual(queue.GetNextUpdate(), (job, True))
1208 self.assertRaises(IndexError, queue.GetNextUpdate)
1209 self.assertFalse(queue.IsAcquired())
1210 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1211 self.assertFalse(job.cur_opctx)
1213 def _AfterStart(op, cbs):
1214 self.assertEqual(queue.GetNextUpdate(), (job, True))
1215 self.assertRaises(IndexError, queue.GetNextUpdate)
1217 self.assertFalse(queue.IsAcquired())
1218 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1219 self.assertFalse(job.cur_opctx)
1221 # Job is running, cancelling shouldn't be possible
1222 (success, _) = job.Cancel()
1223 self.assertFalse(success)
1225 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1227 for idx in range(len(ops)):
1228 self.assertRaises(IndexError, queue.GetNextUpdate)
1229 result = jqueue._JobProcessor(queue, opexec, job)()
1230 self.assertEqual(queue.GetNextUpdate(), (job, True))
1231 self.assertRaises(IndexError, queue.GetNextUpdate)
1232 if idx == len(ops) - 1:
1234 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1236 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1238 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1239 self.assert_(job.start_timestamp)
1240 self.assertFalse(job.end_timestamp)
1242 self.assertRaises(IndexError, queue.GetNextUpdate)
1244 for idx, submitted_ops in enumerate(job_ops
1246 for job_ops in op.submit_jobs):
1247 self.assertEqual(queue.GetNextSubmittedJob(),
1248 (1000 + idx, submitted_ops))
1249 self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1251 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1252 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1253 self.assertEqual(job.GetInfo(["opresult"]),
1254 [[[], [1000], [1001, 1002, 1003]]])
1255 self.assertEqual(job.GetInfo(["opstatus"]),
1256 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1258 self._GenericCheckJob(job)
1260 # Calling the processor on a finished job should be a no-op
1261 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1262 jqueue._JobProcessor.FINISHED)
1263 self.assertRaises(IndexError, queue.GetNextUpdate)
1265 def testJobDependency(self):
1266 depmgr = _FakeDependencyManager()
1267 queue = _FakeQueueForProc(depmgr=depmgr)
1269 self.assertEqual(queue.depmgr, depmgr)
1272 prev_job_id2 = 28102
1275 opcodes.OpTestDummy(result="Res0", fail=False,
1277 [prev_job_id2, None],
1278 [prev_job_id, None],
1280 opcodes.OpTestDummy(result="Res1", fail=False),
1284 job = self._CreateJob(queue, job_id, ops)
1286 def _BeforeStart(timeout, priority):
1287 if attempt == 0 or attempt > 5:
1288 # Job should only be updated when it wasn't waiting for another job
1289 self.assertEqual(queue.GetNextUpdate(), (job, True))
1290 self.assertRaises(IndexError, queue.GetNextUpdate)
1291 self.assertFalse(queue.IsAcquired())
1292 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1293 self.assertFalse(job.cur_opctx)
1295 def _AfterStart(op, cbs):
1296 self.assertEqual(queue.GetNextUpdate(), (job, True))
1297 self.assertRaises(IndexError, queue.GetNextUpdate)
1299 self.assertFalse(queue.IsAcquired())
1300 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1301 self.assertFalse(job.cur_opctx)
1303 # Job is running, cancelling shouldn't be possible
1304 (success, _) = job.Cancel()
1305 self.assertFalse(success)
1307 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1309 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1311 counter = itertools.count()
1313 attempt = counter.next()
1315 self.assertRaises(IndexError, queue.GetNextUpdate)
1316 self.assertRaises(IndexError, depmgr.GetNextNotification)
1319 depmgr.AddCheckResult(job, prev_job_id2, None,
1320 (jqueue._JobDependencyManager.WAIT, "wait2"))
1322 depmgr.AddCheckResult(job, prev_job_id2, None,
1323 (jqueue._JobDependencyManager.CONTINUE, "cont"))
1324 # The processor will ask for the next dependency immediately
1325 depmgr.AddCheckResult(job, prev_job_id, None,
1326 (jqueue._JobDependencyManager.WAIT, "wait"))
1328 depmgr.AddCheckResult(job, prev_job_id, None,
1329 (jqueue._JobDependencyManager.WAIT, "wait"))
1331 depmgr.AddCheckResult(job, prev_job_id, None,
1332 (jqueue._JobDependencyManager.CONTINUE, "cont"))
1334 self.assertEqual(depmgr.CountPendingResults(), 2)
1336 self.assertEqual(depmgr.CountPendingResults(), 0)
1338 self.assertEqual(depmgr.CountPendingResults(), 1)
1340 result = jqueue._JobProcessor(queue, opexec, job)()
1341 if attempt == 0 or attempt >= 5:
1342 # Job should only be updated if there was an actual change
1343 self.assertEqual(queue.GetNextUpdate(), (job, True))
1344 self.assertRaises(IndexError, queue.GetNextUpdate)
1345 self.assertFalse(depmgr.CountPendingResults())
1348 # Simulate waiting for other job
1349 self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1350 self.assertTrue(job.cur_opctx)
1351 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1352 self.assertRaises(IndexError, depmgr.GetNextNotification)
1353 self.assert_(job.start_timestamp)
1354 self.assertFalse(job.end_timestamp)
1357 if result == jqueue._JobProcessor.FINISHED:
1359 self.assertFalse(job.cur_opctx)
1362 self.assertRaises(IndexError, depmgr.GetNextNotification)
1364 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1365 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1366 self.assert_(job.start_timestamp)
1367 self.assertFalse(job.end_timestamp)
1369 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1370 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1371 self.assertEqual(job.GetInfo(["opresult"]),
1372 [[op.input.result for op in job.ops]])
1373 self.assertEqual(job.GetInfo(["opstatus"]),
1374 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1375 self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1378 self._GenericCheckJob(job)
1380 self.assertRaises(IndexError, queue.GetNextUpdate)
1381 self.assertRaises(IndexError, depmgr.GetNextNotification)
1382 self.assertFalse(depmgr.CountPendingResults())
1383 self.assertFalse(depmgr.CountWaitingJobs())
1385 # Calling the processor on a finished job should be a no-op
1386 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1387 jqueue._JobProcessor.FINISHED)
1388 self.assertRaises(IndexError, queue.GetNextUpdate)
1390 def testJobDependencyCancel(self):
1391 depmgr = _FakeDependencyManager()
1392 queue = _FakeQueueForProc(depmgr=depmgr)
1394 self.assertEqual(queue.depmgr, depmgr)
1399 opcodes.OpTestDummy(result="Res0", fail=False),
1400 opcodes.OpTestDummy(result="Res1", fail=False,
1402 [prev_job_id, None],
1404 opcodes.OpTestDummy(result="Res2", fail=False),
1408 job = self._CreateJob(queue, job_id, ops)
1410 def _BeforeStart(timeout, priority):
1411 if attempt == 0 or attempt > 5:
1412 # Job should only be updated when it wasn't waiting for another job
1413 self.assertEqual(queue.GetNextUpdate(), (job, True))
1414 self.assertRaises(IndexError, queue.GetNextUpdate)
1415 self.assertFalse(queue.IsAcquired())
1416 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1417 self.assertFalse(job.cur_opctx)
1419 def _AfterStart(op, cbs):
1420 self.assertEqual(queue.GetNextUpdate(), (job, True))
1421 self.assertRaises(IndexError, queue.GetNextUpdate)
1423 self.assertFalse(queue.IsAcquired())
1424 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1425 self.assertFalse(job.cur_opctx)
1427 # Job is running, cancelling shouldn't be possible
1428 (success, _) = job.Cancel()
1429 self.assertFalse(success)
1431 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1433 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1435 counter = itertools.count()
1437 attempt = counter.next()
1439 self.assertRaises(IndexError, queue.GetNextUpdate)
1440 self.assertRaises(IndexError, depmgr.GetNextNotification)
1443 # This will handle the first opcode
1446 depmgr.AddCheckResult(job, prev_job_id, None,
1447 (jqueue._JobDependencyManager.WAIT, "wait"))
1449 # Other job was cancelled
1450 depmgr.AddCheckResult(job, prev_job_id, None,
1451 (jqueue._JobDependencyManager.CANCEL, "cancel"))
1454 self.assertEqual(depmgr.CountPendingResults(), 0)
1456 self.assertEqual(depmgr.CountPendingResults(), 1)
1458 result = jqueue._JobProcessor(queue, opexec, job)()
1459 if attempt <= 1 or attempt >= 4:
1460 # Job should only be updated if there was an actual change
1461 self.assertEqual(queue.GetNextUpdate(), (job, True))
1462 self.assertRaises(IndexError, queue.GetNextUpdate)
1463 self.assertFalse(depmgr.CountPendingResults())
1465 if attempt > 0 and attempt < 4:
1466 # Simulate waiting for other job
1467 self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1468 self.assertTrue(job.cur_opctx)
1469 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1470 self.assertRaises(IndexError, depmgr.GetNextNotification)
1471 self.assert_(job.start_timestamp)
1472 self.assertFalse(job.end_timestamp)
1475 if result == jqueue._JobProcessor.FINISHED:
1477 self.assertFalse(job.cur_opctx)
1480 self.assertRaises(IndexError, depmgr.GetNextNotification)
1482 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1483 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1484 self.assert_(job.start_timestamp)
1485 self.assertFalse(job.end_timestamp)
1487 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1488 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1489 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1490 [[constants.OP_STATUS_SUCCESS,
1491 constants.OP_STATUS_CANCELED,
1492 constants.OP_STATUS_CANCELED],
1493 ["Res0", "Job canceled by request",
1494 "Job canceled by request"]])
1496 self._GenericCheckJob(job)
1498 self.assertRaises(IndexError, queue.GetNextUpdate)
1499 self.assertRaises(IndexError, depmgr.GetNextNotification)
1500 self.assertFalse(depmgr.CountPendingResults())
1502 # Calling the processor on a finished job should be a no-op
1503 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1504 jqueue._JobProcessor.FINISHED)
1505 self.assertRaises(IndexError, queue.GetNextUpdate)
1507 def testJobDependencyWrongstatus(self):
1508 depmgr = _FakeDependencyManager()
1509 queue = _FakeQueueForProc(depmgr=depmgr)
1511 self.assertEqual(queue.depmgr, depmgr)
1516 opcodes.OpTestDummy(result="Res0", fail=False),
1517 opcodes.OpTestDummy(result="Res1", fail=False,
1519 [prev_job_id, None],
1521 opcodes.OpTestDummy(result="Res2", fail=False),
1525 job = self._CreateJob(queue, job_id, ops)
1527 def _BeforeStart(timeout, priority):
1528 if attempt == 0 or attempt > 5:
1529 # Job should only be updated when it wasn't waiting for another job
1530 self.assertEqual(queue.GetNextUpdate(), (job, True))
1531 self.assertRaises(IndexError, queue.GetNextUpdate)
1532 self.assertFalse(queue.IsAcquired())
1533 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1534 self.assertFalse(job.cur_opctx)
1536 def _AfterStart(op, cbs):
1537 self.assertEqual(queue.GetNextUpdate(), (job, True))
1538 self.assertRaises(IndexError, queue.GetNextUpdate)
1540 self.assertFalse(queue.IsAcquired())
1541 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1542 self.assertFalse(job.cur_opctx)
1544 # Job is running, cancelling shouldn't be possible
1545 (success, _) = job.Cancel()
1546 self.assertFalse(success)
1548 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1550 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1552 counter = itertools.count()
1554 attempt = counter.next()
1556 self.assertRaises(IndexError, queue.GetNextUpdate)
1557 self.assertRaises(IndexError, depmgr.GetNextNotification)
1560 # This will handle the first opcode
1563 depmgr.AddCheckResult(job, prev_job_id, None,
1564 (jqueue._JobDependencyManager.WAIT, "wait"))
1567 depmgr.AddCheckResult(job, prev_job_id, None,
1568 (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1571 self.assertEqual(depmgr.CountPendingResults(), 0)
1573 self.assertEqual(depmgr.CountPendingResults(), 1)
1575 result = jqueue._JobProcessor(queue, opexec, job)()
1576 if attempt <= 1 or attempt >= 4:
1577 # Job should only be updated if there was an actual change
1578 self.assertEqual(queue.GetNextUpdate(), (job, True))
1579 self.assertRaises(IndexError, queue.GetNextUpdate)
1580 self.assertFalse(depmgr.CountPendingResults())
1582 if attempt > 0 and attempt < 4:
1583 # Simulate waiting for other job
1584 self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1585 self.assertTrue(job.cur_opctx)
1586 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1587 self.assertRaises(IndexError, depmgr.GetNextNotification)
1588 self.assert_(job.start_timestamp)
1589 self.assertFalse(job.end_timestamp)
1592 if result == jqueue._JobProcessor.FINISHED:
1594 self.assertFalse(job.cur_opctx)
1597 self.assertRaises(IndexError, depmgr.GetNextNotification)
1599 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1600 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1601 self.assert_(job.start_timestamp)
1602 self.assertFalse(job.end_timestamp)
1604 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1605 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1606 self.assertEqual(job.GetInfo(["opstatus"]),
1607 [[constants.OP_STATUS_SUCCESS,
1608 constants.OP_STATUS_ERROR,
1609 constants.OP_STATUS_ERROR]]),
1611 (opresult, ) = job.GetInfo(["opresult"])
1612 self.assertEqual(len(opresult), len(ops))
1613 self.assertEqual(opresult[0], "Res0")
1614 self.assertTrue(errors.GetEncodedError(opresult[1]))
1615 self.assertTrue(errors.GetEncodedError(opresult[2]))
1617 self._GenericCheckJob(job)
1619 self.assertRaises(IndexError, queue.GetNextUpdate)
1620 self.assertRaises(IndexError, depmgr.GetNextNotification)
1621 self.assertFalse(depmgr.CountPendingResults())
1623 # Calling the processor on a finished job should be a no-op
1624 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1625 jqueue._JobProcessor.FINISHED)
1626 self.assertRaises(IndexError, queue.GetNextUpdate)
1629 class TestEvaluateJobProcessorResult(unittest.TestCase):
1630 def testFinished(self):
1631 depmgr = _FakeDependencyManager()
1632 job = _IdOnlyFakeJob(30953)
1633 jqueue._EvaluateJobProcessorResult(depmgr, job,
1634 jqueue._JobProcessor.FINISHED)
1635 self.assertEqual(depmgr.GetNextNotification(), job.id)
1636 self.assertRaises(IndexError, depmgr.GetNextNotification)
1638 def testDefer(self):
1639 depmgr = _FakeDependencyManager()
1640 job = _IdOnlyFakeJob(11326, priority=5463)
1642 jqueue._EvaluateJobProcessorResult(depmgr, job,
1643 jqueue._JobProcessor.DEFER)
1644 except workerpool.DeferTask, err:
1645 self.assertEqual(err.priority, 5463)
1647 self.fail("Didn't raise exception")
1648 self.assertRaises(IndexError, depmgr.GetNextNotification)
1650 def testWaitdep(self):
1651 depmgr = _FakeDependencyManager()
1652 job = _IdOnlyFakeJob(21317)
1653 jqueue._EvaluateJobProcessorResult(depmgr, job,
1654 jqueue._JobProcessor.WAITDEP)
1655 self.assertRaises(IndexError, depmgr.GetNextNotification)
1657 def testOther(self):
1658 depmgr = _FakeDependencyManager()
1659 job = _IdOnlyFakeJob(5813)
1660 self.assertRaises(errors.ProgrammerError,
1661 jqueue._EvaluateJobProcessorResult,
1662 depmgr, job, "Other result")
1663 self.assertRaises(IndexError, depmgr.GetNextNotification)
1666 class _FakeTimeoutStrategy:
1667 def __init__(self, timeouts):
1668 self.timeouts = timeouts
1670 self.last_timeout = None
1672 def NextAttempt(self):
1675 timeout = self.timeouts.pop(0)
1678 self.last_timeout = timeout
1682 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1684 self.queue = _FakeQueueForProc()
1687 self.opcounter = None
1688 self.timeout_strategy = None
1690 self.prev_tsop = None
1691 self.prev_prio = None
1692 self.prev_status = None
1693 self.lock_acq_prio = None
1694 self.gave_lock = None
1695 self.done_lock_before_blocking = False
1697 def _BeforeStart(self, timeout, priority):
1700 # If status has changed, job must've been written
1701 if self.prev_status != self.job.ops[self.curop].status:
1702 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1703 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1705 self.assertFalse(self.queue.IsAcquired())
1706 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1708 ts = self.timeout_strategy
1710 self.assert_(timeout is None or isinstance(timeout, (int, float)))
1711 self.assertEqual(timeout, ts.last_timeout)
1712 self.assertEqual(priority, job.ops[self.curop].priority)
1714 self.gave_lock = True
1715 self.lock_acq_prio = priority
1717 if (self.curop == 3 and
1718 job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1719 # Give locks before running into blocking acquire
1720 assert self.retries == 7
1722 self.done_lock_before_blocking = True
1725 if self.retries > 0:
1726 self.assert_(timeout is not None)
1728 self.gave_lock = False
1729 raise mcpu.LockAcquireTimeout()
1731 if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1732 assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1733 assert not ts.timeouts
1734 self.assert_(timeout is None)
1736 def _AfterStart(self, op, cbs):
1739 # Setting to "running" requires an update
1740 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1741 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1743 self.assertFalse(self.queue.IsAcquired())
1744 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1746 # Job is running, cancelling shouldn't be possible
1747 (success, _) = job.Cancel()
1748 self.assertFalse(success)
1750 def _NextOpcode(self):
1751 self.curop = self.opcounter.next()
1752 self.prev_prio = self.job.ops[self.curop].priority
1753 self.prev_status = self.job.ops[self.curop].status
1755 def _NewTimeoutStrategy(self):
1758 self.assertEqual(self.retries, 0)
1760 if self.prev_tsop == self.curop:
1761 # Still on the same opcode, priority must've been increased
1762 self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1766 timeouts = range(10, 31, 10)
1767 self.retries = len(timeouts) - 1
1769 elif self.curop == 2:
1770 # Let this run into a blocking acquire
1771 timeouts = range(11, 61, 12)
1772 self.retries = len(timeouts)
1774 elif self.curop == 3:
1775 # Wait for priority to increase, but give lock before blocking acquire
1776 timeouts = range(12, 100, 14)
1777 self.retries = len(timeouts)
1779 self.assertFalse(self.done_lock_before_blocking)
1781 elif self.curop == 4:
1782 self.assert_(self.done_lock_before_blocking)
1784 # Timeouts, but no need to retry
1785 timeouts = range(10, 31, 10)
1788 elif self.curop == 5:
1790 timeouts = range(19, 100, 11)
1791 self.retries = len(timeouts)
1797 assert len(job.ops) == 10
1798 assert self.retries <= len(timeouts)
1800 ts = _FakeTimeoutStrategy(timeouts)
1802 self.timeout_strategy = ts
1803 self.prev_tsop = self.curop
1804 self.prev_prio = job.ops[self.curop].priority
1808 def testTimeout(self):
1809 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1814 job = self._CreateJob(self.queue, job_id, ops)
1817 self.opcounter = itertools.count(0)
1819 opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1821 tsf = self._NewTimeoutStrategy
1823 self.assertFalse(self.done_lock_before_blocking)
1826 proc = jqueue._JobProcessor(self.queue, opexec, job,
1827 _timeout_strategy_factory=tsf)
1829 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1831 if self.curop is not None:
1832 self.prev_status = self.job.ops[self.curop].status
1834 self.lock_acq_prio = None
1836 result = proc(_nextop_fn=self._NextOpcode)
1837 assert self.curop is not None
1839 if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
1840 # Got lock and/or job is done, result must've been written
1841 self.assertFalse(job.cur_opctx)
1842 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1843 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1844 self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1845 self.assert_(job.ops[self.curop].exec_timestamp)
1847 if result == jqueue._JobProcessor.FINISHED:
1848 self.assertFalse(job.cur_opctx)
1851 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1854 self.assertEqual(job.ops[self.curop].start_timestamp,
1855 job.start_timestamp)
1858 # Opcode finished, but job not yet done
1859 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1862 self.assert_(job.cur_opctx)
1863 self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1864 self.timeout_strategy.NextAttempt)
1865 self.assertFalse(job.ops[self.curop].exec_timestamp)
1866 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1868 # If priority has changed since acquiring locks, the job must've been
1870 if self.lock_acq_prio != job.ops[self.curop].priority:
1871 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1873 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1875 self.assert_(job.start_timestamp)
1876 self.assertFalse(job.end_timestamp)
1878 self.assertEqual(self.curop, len(job.ops) - 1)
1879 self.assertEqual(self.job, job)
1880 self.assertEqual(self.opcounter.next(), len(job.ops))
1881 self.assert_(self.done_lock_before_blocking)
1883 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1884 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1885 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1886 self.assertEqual(job.GetInfo(["opresult"]),
1887 [[op.input.result for op in job.ops]])
1888 self.assertEqual(job.GetInfo(["opstatus"]),
1889 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1890 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1893 # Calling the processor on a finished job should be a no-op
1894 self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
1895 jqueue._JobProcessor.FINISHED)
1896 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1899 class _IdOnlyFakeJob:
1900 def __init__(self, job_id, priority=NotImplemented):
1901 self.id = str(job_id)
1902 self._priority = priority
1904 def CalcPriority(self):
1905 return self._priority
1908 class TestJobDependencyManager(unittest.TestCase):
1912 self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1914 def _GetStatus(self, job_id):
1915 (exp_job_id, result) = self._status.pop(0)
1916 self.assertEqual(exp_job_id, job_id)
1919 def _Enqueue(self, jobs):
1920 self.assertFalse(self.jdm._lock.is_owned(),
1921 msg=("Must not own manager lock while re-adding jobs"
1922 " (potential deadlock)"))
1923 self._queue.append(jobs)
1925 def testNotFinalizedThenCancel(self):
1926 job = _IdOnlyFakeJob(17697)
1929 self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1930 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1931 self.assertEqual(result, self.jdm.WAIT)
1932 self.assertFalse(self._status)
1933 self.assertFalse(self._queue)
1934 self.assertTrue(self.jdm.JobWaiting(job))
1935 self.assertEqual(self.jdm._waiters, {
1938 self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1939 ("job/28625", None, None, [("job", [job.id])])
1942 self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1943 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1944 self.assertEqual(result, self.jdm.CANCEL)
1945 self.assertFalse(self._status)
1946 self.assertFalse(self._queue)
1947 self.assertFalse(self.jdm.JobWaiting(job))
1948 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1950 def testRequireCancel(self):
1951 job = _IdOnlyFakeJob(5278)
1953 dep_status = [constants.JOB_STATUS_CANCELED]
1955 self._status.append((job_id, constants.JOB_STATUS_WAITING))
1956 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1957 self.assertEqual(result, self.jdm.WAIT)
1958 self.assertFalse(self._status)
1959 self.assertFalse(self._queue)
1960 self.assertTrue(self.jdm.JobWaiting(job))
1961 self.assertEqual(self.jdm._waiters, {
1964 self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1965 ("job/9610", None, None, [("job", [job.id])])
1968 self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1969 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1970 self.assertEqual(result, self.jdm.CONTINUE)
1971 self.assertFalse(self._status)
1972 self.assertFalse(self._queue)
1973 self.assertFalse(self.jdm.JobWaiting(job))
1974 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1976 def testRequireError(self):
1977 job = _IdOnlyFakeJob(21459)
1979 dep_status = [constants.JOB_STATUS_ERROR]
1981 self._status.append((job_id, constants.JOB_STATUS_WAITING))
1982 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1983 self.assertEqual(result, self.jdm.WAIT)
1984 self.assertFalse(self._status)
1985 self.assertFalse(self._queue)
1986 self.assertTrue(self.jdm.JobWaiting(job))
1987 self.assertEqual(self.jdm._waiters, {
1991 self._status.append((job_id, constants.JOB_STATUS_ERROR))
1992 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1993 self.assertEqual(result, self.jdm.CONTINUE)
1994 self.assertFalse(self._status)
1995 self.assertFalse(self._queue)
1996 self.assertFalse(self.jdm.JobWaiting(job))
1997 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1999 def testRequireMultiple(self):
2000 dep_status = list(constants.JOBS_FINALIZED)
2002 for end_status in dep_status:
2003 job = _IdOnlyFakeJob(21343)
2006 self._status.append((job_id, constants.JOB_STATUS_WAITING))
2007 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2008 self.assertEqual(result, self.jdm.WAIT)
2009 self.assertFalse(self._status)
2010 self.assertFalse(self._queue)
2011 self.assertTrue(self.jdm.JobWaiting(job))
2012 self.assertEqual(self.jdm._waiters, {
2015 self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2016 ("job/14609", None, None, [("job", [job.id])])
2019 self._status.append((job_id, end_status))
2020 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2021 self.assertEqual(result, self.jdm.CONTINUE)
2022 self.assertFalse(self._status)
2023 self.assertFalse(self._queue)
2024 self.assertFalse(self.jdm.JobWaiting(job))
2025 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2027 def testNotify(self):
2028 job = _IdOnlyFakeJob(8227)
2031 self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2032 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2033 self.assertEqual(result, self.jdm.WAIT)
2034 self.assertFalse(self._status)
2035 self.assertFalse(self._queue)
2036 self.assertTrue(self.jdm.JobWaiting(job))
2037 self.assertEqual(self.jdm._waiters, {
2041 self.jdm.NotifyWaiters(job_id)
2042 self.assertFalse(self._status)
2043 self.assertFalse(self.jdm._waiters)
2044 self.assertFalse(self.jdm.JobWaiting(job))
2045 self.assertEqual(self._queue, [set([job])])
2047 def testWrongStatus(self):
2048 job = _IdOnlyFakeJob(10102)
2051 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2052 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2053 [constants.JOB_STATUS_SUCCESS])
2054 self.assertEqual(result, self.jdm.WAIT)
2055 self.assertFalse(self._status)
2056 self.assertFalse(self._queue)
2057 self.assertTrue(self.jdm.JobWaiting(job))
2058 self.assertEqual(self.jdm._waiters, {
2062 self._status.append((job_id, constants.JOB_STATUS_ERROR))
2063 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2064 [constants.JOB_STATUS_SUCCESS])
2065 self.assertEqual(result, self.jdm.WRONGSTATUS)
2066 self.assertFalse(self._status)
2067 self.assertFalse(self._queue)
2068 self.assertFalse(self.jdm.JobWaiting(job))
2070 def testCorrectStatus(self):
2071 job = _IdOnlyFakeJob(24273)
2074 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2075 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2076 [constants.JOB_STATUS_SUCCESS])
2077 self.assertEqual(result, self.jdm.WAIT)
2078 self.assertFalse(self._status)
2079 self.assertFalse(self._queue)
2080 self.assertTrue(self.jdm.JobWaiting(job))
2081 self.assertEqual(self.jdm._waiters, {
2085 self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2086 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2087 [constants.JOB_STATUS_SUCCESS])
2088 self.assertEqual(result, self.jdm.CONTINUE)
2089 self.assertFalse(self._status)
2090 self.assertFalse(self._queue)
2091 self.assertFalse(self.jdm.JobWaiting(job))
2093 def testFinalizedRightAway(self):
2094 job = _IdOnlyFakeJob(224)
2097 self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2098 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2099 [constants.JOB_STATUS_SUCCESS])
2100 self.assertEqual(result, self.jdm.CONTINUE)
2101 self.assertFalse(self._status)
2102 self.assertFalse(self._queue)
2103 self.assertFalse(self.jdm.JobWaiting(job))
2104 self.assertEqual(self.jdm._waiters, {
2109 self.jdm.NotifyWaiters("0")
2110 self.assertFalse(self.jdm._waiters)
2111 self.assertFalse(self._status)
2112 self.assertFalse(self._queue)
2114 def testMultipleWaiting(self):
2115 # Use a deterministic random generator
2116 rnd = random.Random(21402)
2118 job_ids = map(str, rnd.sample(range(1, 10000), 150))
2120 waiters = dict((job_ids.pop(),
2121 set(map(_IdOnlyFakeJob,
2123 for _ in range(rnd.randint(1, 20))])))
2126 # Ensure there are no duplicate job IDs
2127 assert not utils.FindDuplicates(waiters.keys() +
2129 for jobs in waiters.values()
2132 # Register all jobs as waiters
2133 for job_id, job in [(job_id, job)
2134 for (job_id, jobs) in waiters.items()
2136 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2137 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2138 [constants.JOB_STATUS_SUCCESS])
2139 self.assertEqual(result, self.jdm.WAIT)
2140 self.assertFalse(self._status)
2141 self.assertFalse(self._queue)
2142 self.assertTrue(self.jdm.JobWaiting(job))
2144 self.assertEqual(self.jdm._waiters, waiters)
2146 def _MakeSet((name, mode, owner_names, pending)):
2147 return (name, mode, owner_names,
2148 [(pendmode, set(pend)) for (pendmode, pend) in pending])
2150 def _CheckLockInfo():
2151 info = self.jdm.GetLockInfo([query.LQ_PENDING])
2152 self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2153 ("job/%s" % job_id, None, None,
2154 [("job", set([job.id for job in jobs]))])
2155 for job_id, jobs in waiters.items()
2161 # Notify in random order
2162 for job_id in rnd.sample(waiters, len(waiters)):
2163 # Remove from pending waiter list
2164 jobs = waiters.pop(job_id)
2166 self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2167 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2168 [constants.JOB_STATUS_SUCCESS])
2169 self.assertEqual(result, self.jdm.CONTINUE)
2170 self.assertFalse(self._status)
2171 self.assertFalse(self._queue)
2172 self.assertFalse(self.jdm.JobWaiting(job))
2176 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2180 def testSelfDependency(self):
2181 job = _IdOnlyFakeJob(18937)
2183 self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2184 (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2185 self.assertEqual(result, self.jdm.ERROR)
2187 def testJobDisappears(self):
2188 job = _IdOnlyFakeJob(30540)
2192 raise errors.JobLost("#msg#")
2194 jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2195 (result, _) = jdm.CheckAndRegister(job, job_id, [])
2196 self.assertEqual(result, self.jdm.ERROR)
2197 self.assertFalse(jdm.JobWaiting(job))
2198 self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2201 if __name__ == "__main__":
2202 testutils.GanetiTestProgram()