4 # Copyright (C) 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.jqueue"""
33 from ganeti import constants
34 from ganeti import utils
35 from ganeti import errors
36 from ganeti import jqueue
37 from ganeti import opcodes
38 from ganeti import compat
39 from ganeti import mcpu
40 from ganeti import query
46 def __init__(self, job_id, status):
52 def SetStatus(self, status):
55 def AddLogEntry(self, msg):
56 self._log.append((len(self._log), msg))
61 def GetInfo(self, fields):
66 result.append(self._status)
68 raise Exception("Unknown field")
72 def GetLogEntries(self, newer_than):
73 assert newer_than is None or newer_than >= 0
75 if newer_than is None:
78 return self._log[newer_than:]
81 class TestJobChangesChecker(unittest.TestCase):
83 job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
84 checker = jqueue._JobChangesChecker(["status"], None, None)
85 self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
87 job.SetStatus(constants.JOB_STATUS_RUNNING)
88 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
90 job.SetStatus(constants.JOB_STATUS_SUCCESS)
91 self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
93 # job.id is used by checker
94 self.assertEqual(job.id, 9094)
96 def testStatusWithPrev(self):
97 job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
98 checker = jqueue._JobChangesChecker(["status"],
99 [constants.JOB_STATUS_QUEUED], None)
100 self.assert_(checker(job) is None)
102 job.SetStatus(constants.JOB_STATUS_RUNNING)
103 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
105 def testFinalStatus(self):
106 for status in constants.JOBS_FINALIZED:
107 job = _FakeJob(2178711, status)
108 checker = jqueue._JobChangesChecker(["status"], [status], None)
109 # There won't be any changes in this status, hence it should signal
110 # a change immediately
111 self.assertEqual(checker(job), ([status], []))
114 job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
115 checker = jqueue._JobChangesChecker(["status"], None, None)
116 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
118 job.AddLogEntry("Hello World")
119 (job_info, log_entries) = checker(job)
120 self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
121 self.assertEqual(log_entries, [[0, "Hello World"]])
123 checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
124 self.assert_(checker2(job) is None)
126 job.AddLogEntry("Foo Bar")
127 job.SetStatus(constants.JOB_STATUS_ERROR)
129 (job_info, log_entries) = checker2(job)
130 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
131 self.assertEqual(log_entries, [[1, "Foo Bar"]])
133 checker3 = jqueue._JobChangesChecker(["status"], None, None)
134 (job_info, log_entries) = checker3(job)
135 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
136 self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
139 class TestJobChangesWaiter(unittest.TestCase):
141 self.tmpdir = tempfile.mkdtemp()
142 self.filename = utils.PathJoin(self.tmpdir, "job-1")
143 utils.WriteFile(self.filename, data="")
146 shutil.rmtree(self.tmpdir)
148 def _EnsureNotifierClosed(self, notifier):
150 os.fstat(notifier._fd)
151 except EnvironmentError, err:
152 self.assertEqual(err.errno, errno.EBADF)
154 self.fail("File descriptor wasn't closed")
157 for wait in [False, True]:
158 waiter = jqueue._JobFileChangesWaiter(self.filename)
165 # Ensure file descriptor was closed
166 self._EnsureNotifierClosed(waiter._notifier)
168 def testChangingFile(self):
169 waiter = jqueue._JobFileChangesWaiter(self.filename)
171 self.assertFalse(waiter.Wait(0.1))
172 utils.WriteFile(self.filename, data="changed")
173 self.assert_(waiter.Wait(60))
177 self._EnsureNotifierClosed(waiter._notifier)
179 def testChangingFile2(self):
180 waiter = jqueue._JobChangesWaiter(self.filename)
182 self.assertFalse(waiter._filewaiter)
183 self.assert_(waiter.Wait(0.1))
184 self.assert_(waiter._filewaiter)
186 # File waiter is now used, but there have been no changes
187 self.assertFalse(waiter.Wait(0.1))
188 utils.WriteFile(self.filename, data="changed")
189 self.assert_(waiter.Wait(60))
193 self._EnsureNotifierClosed(waiter._filewaiter._notifier)
196 class TestWaitForJobChangesHelper(unittest.TestCase):
198 self.tmpdir = tempfile.mkdtemp()
199 self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
200 utils.WriteFile(self.filename, data="")
203 shutil.rmtree(self.tmpdir)
205 def _LoadWaitingJob(self):
206 return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
208 def _LoadLostJob(self):
211 def testNoChanges(self):
212 wfjc = jqueue._WaitForJobChangesHelper()
215 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
216 [constants.JOB_STATUS_WAITING], None, 0.1),
217 constants.JOB_NOTCHANGED)
219 # No previous information
220 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
221 ["status"], None, None, 1.0),
222 ([constants.JOB_STATUS_WAITING], []))
224 def testLostJob(self):
225 wfjc = jqueue._WaitForJobChangesHelper()
226 self.assert_(wfjc(self.filename, self._LoadLostJob,
227 ["status"], None, None, 1.0) is None)
230 class TestEncodeOpError(unittest.TestCase):
232 encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
233 self.assert_(isinstance(encerr, tuple))
234 self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
236 encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
237 self.assert_(isinstance(encerr, tuple))
238 self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
240 encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
241 self.assert_(isinstance(encerr, tuple))
242 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244 encerr = jqueue._EncodeOpError("Hello World")
245 self.assert_(isinstance(encerr, tuple))
246 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
249 class TestQueuedOpCode(unittest.TestCase):
250 def testDefaults(self):
252 self.assertFalse(hasattr(op.input, "dry_run"))
253 self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
254 self.assertFalse(op.log)
255 self.assert_(op.start_timestamp is None)
256 self.assert_(op.exec_timestamp is None)
257 self.assert_(op.end_timestamp is None)
258 self.assert_(op.result is None)
259 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
261 op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
263 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
265 self.assertEqual(op1.Serialize(), op2.Serialize())
267 def testPriority(self):
269 assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
270 "Default priority equals high priority; test can't work"
271 self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
272 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
274 inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
275 op1 = jqueue._QueuedOpCode(inpop)
277 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
279 self.assertEqual(op1.Serialize(), op2.Serialize())
282 class TestQueuedJob(unittest.TestCase):
284 self.assertRaises(errors.GenericError, jqueue._QueuedJob,
287 def testDefaults(self):
291 opcodes.OpTestDelay(),
295 self.assertTrue(job.writable)
296 self.assertEqual(job.id, job_id)
297 self.assertEqual(job.log_serial, 0)
298 self.assert_(job.received_timestamp)
299 self.assert_(job.start_timestamp is None)
300 self.assert_(job.end_timestamp is None)
301 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
302 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
303 self.assert_(repr(job).startswith("<"))
304 self.assertEqual(len(job.ops), len(ops))
305 self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
306 for (inp, op) in zip(ops, job.ops)))
307 self.assertRaises(errors.OpExecError, job.GetInfo,
309 self.assertEqual(job.GetInfo(["summary"]),
310 [[op.input.Summary() for op in job.ops]])
312 job1 = jqueue._QueuedJob(None, job_id, ops, True)
314 job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
316 self.assertEqual(job1.Serialize(), job2.Serialize())
318 def testWritable(self):
319 job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
320 self.assertFalse(job.writable)
322 job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
323 self.assertTrue(job.writable)
325 def testPriority(self):
328 opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
329 opcodes.OpTestDelay(),
333 self.assertEqual(job.id, job_id)
334 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
335 self.assert_(repr(job).startswith("<"))
337 job = jqueue._QueuedJob(None, job_id, ops, True)
339 self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
341 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
344 job.ops[0].priority -= 1
346 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
348 # Mark opcode as finished
349 job.ops[0].status = constants.OP_STATUS_SUCCESS
351 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
354 job.ops[1].priority -= 10
355 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
357 # Test increasing first
358 job.ops[0].status = constants.OP_STATUS_RUNNING
359 job.ops[0].priority -= 19
360 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
362 def testCalcStatus(self):
364 # The default status is "queued"
365 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
369 ops[0].status = constants.OP_STATUS_WAITING
372 ops[0].status = constants.OP_STATUS_SUCCESS
373 ops[1].status = constants.OP_STATUS_SUCCESS
374 ops[2].status = constants.OP_STATUS_WAITING
377 ops[0].status = constants.OP_STATUS_SUCCESS
378 ops[1].status = constants.OP_STATUS_RUNNING
380 op.status = constants.OP_STATUS_QUEUED
382 def _Canceling1(ops):
383 ops[0].status = constants.OP_STATUS_SUCCESS
384 ops[1].status = constants.OP_STATUS_SUCCESS
386 op.status = constants.OP_STATUS_CANCELING
388 def _Canceling2(ops):
390 op.status = constants.OP_STATUS_CANCELING
394 op.status = constants.OP_STATUS_CANCELED
397 for idx, op in enumerate(ops):
399 op.status = constants.OP_STATUS_ERROR
401 op.status = constants.OP_STATUS_SUCCESS
405 op.status = constants.OP_STATUS_ERROR
409 op.status = constants.OP_STATUS_SUCCESS
412 constants.JOB_STATUS_QUEUED: [_Queued],
413 constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
414 constants.JOB_STATUS_RUNNING: [_Running],
415 constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
416 constants.JOB_STATUS_CANCELED: [_Canceled],
417 constants.JOB_STATUS_ERROR: [_Error1, _Error2],
418 constants.JOB_STATUS_SUCCESS: [_Success],
422 job = jqueue._QueuedJob(None, 1,
423 [opcodes.OpTestDelay() for _ in range(10)],
425 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
426 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
430 for status in constants.JOB_STATUS_ALL:
431 sttests = tests[status]
436 self.assertEqual(job.CalcStatus(), status)
439 class _FakeDependencyManager:
442 self._notifications = []
443 self._waiting = set()
445 def AddCheckResult(self, job, dep_job_id, dep_status, result):
446 self._checks.append((job, dep_job_id, dep_status, result))
448 def CountPendingResults(self):
449 return len(self._checks)
451 def CountWaitingJobs(self):
452 return len(self._waiting)
454 def GetNextNotification(self):
455 return self._notifications.pop(0)
457 def JobWaiting(self, job):
458 return job in self._waiting
460 def CheckAndRegister(self, job, dep_job_id, dep_status):
461 (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
463 assert exp_job == job
464 assert exp_dep_job_id == dep_job_id
465 assert exp_dep_status == dep_status
467 (result_status, _) = result
469 if result_status == jqueue._JobDependencyManager.WAIT:
470 self._waiting.add(job)
471 elif result_status == jqueue._JobDependencyManager.CONTINUE:
472 self._waiting.remove(job)
476 def NotifyWaiters(self, job_id):
477 self._notifications.append(job_id)
480 class _DisabledFakeDependencyManager:
481 def JobWaiting(self, _):
484 def CheckAndRegister(self, *args):
485 assert False, "Should not be called"
487 def NotifyWaiters(self, _):
491 class _FakeQueueForProc:
492 def __init__(self, depmgr=None):
493 self._acquired = False
497 self._submit_count = itertools.count(1000)
502 self.depmgr = _DisabledFakeDependencyManager()
504 def IsAcquired(self):
505 return self._acquired
507 def GetNextUpdate(self):
508 return self._updates.pop(0)
510 def GetNextSubmittedJob(self):
511 return self._submitted.pop(0)
513 def acquire(self, shared=0):
515 self._acquired = True
518 assert self._acquired
519 self._acquired = False
521 def UpdateJobUnlocked(self, job, replicate=True):
522 assert self._acquired, "Lock not acquired while updating job"
523 self._updates.append((job, bool(replicate)))
525 def SubmitManyJobs(self, jobs):
526 assert not self._acquired, "Lock acquired while submitting jobs"
527 job_ids = [self._submit_count.next() for _ in jobs]
528 self._submitted.extend(zip(job_ids, jobs))
532 class _FakeExecOpCodeForProc:
533 def __init__(self, queue, before_start, after_start):
535 self._before_start = before_start
536 self._after_start = after_start
538 def __call__(self, op, cbs, timeout=None, priority=None):
539 assert isinstance(op, opcodes.OpTestDummy)
540 assert not self._queue.IsAcquired(), \
541 "Queue lock not released when executing opcode"
543 if self._before_start:
544 self._before_start(timeout, priority)
548 if self._after_start:
549 self._after_start(op, cbs)
551 # Check again after the callbacks
552 assert not self._queue.IsAcquired()
555 raise errors.OpExecError("Error requested (%s)" % op.result)
557 if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
558 return cbs.SubmitManyJobs(op.submit_jobs)
563 class _JobProcessorTestUtils:
564 def _CreateJob(self, queue, job_id, ops):
565 job = jqueue._QueuedJob(queue, job_id, ops, True)
566 self.assertFalse(job.start_timestamp)
567 self.assertFalse(job.end_timestamp)
568 self.assertEqual(len(ops), len(job.ops))
569 self.assert_(compat.all(op.input == inp
570 for (op, inp) in zip(job.ops, ops)))
571 self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
575 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
576 def _GenericCheckJob(self, job):
577 assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
580 self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
581 [[op.start_timestamp for op in job.ops],
582 [op.exec_timestamp for op in job.ops],
583 [op.end_timestamp for op in job.ops]])
584 self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
585 [job.received_timestamp,
588 self.assert_(job.start_timestamp)
589 self.assert_(job.end_timestamp)
590 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
592 def testSuccess(self):
593 queue = _FakeQueueForProc()
595 for (job_id, opcount) in [(25351, 1), (6637, 3),
596 (24644, 10), (32207, 100)]:
597 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
598 for i in range(opcount)]
601 job = self._CreateJob(queue, job_id, ops)
603 def _BeforeStart(timeout, priority):
604 self.assertEqual(queue.GetNextUpdate(), (job, True))
605 self.assertRaises(IndexError, queue.GetNextUpdate)
606 self.assertFalse(queue.IsAcquired())
607 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
608 self.assertFalse(job.cur_opctx)
610 def _AfterStart(op, cbs):
611 self.assertEqual(queue.GetNextUpdate(), (job, True))
612 self.assertRaises(IndexError, queue.GetNextUpdate)
614 self.assertFalse(queue.IsAcquired())
615 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
616 self.assertFalse(job.cur_opctx)
618 # Job is running, cancelling shouldn't be possible
619 (success, _) = job.Cancel()
620 self.assertFalse(success)
622 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
624 for idx in range(len(ops)):
625 self.assertRaises(IndexError, queue.GetNextUpdate)
626 result = jqueue._JobProcessor(queue, opexec, job)()
627 self.assertEqual(queue.GetNextUpdate(), (job, True))
628 self.assertRaises(IndexError, queue.GetNextUpdate)
629 if idx == len(ops) - 1:
631 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
633 self.assertEqual(result, jqueue._JobProcessor.DEFER)
635 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
636 self.assert_(job.start_timestamp)
637 self.assertFalse(job.end_timestamp)
639 self.assertRaises(IndexError, queue.GetNextUpdate)
641 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
642 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
643 self.assertEqual(job.GetInfo(["opresult"]),
644 [[op.input.result for op in job.ops]])
645 self.assertEqual(job.GetInfo(["opstatus"]),
646 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
647 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
650 self._GenericCheckJob(job)
652 # Calling the processor on a finished job should be a no-op
653 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
654 jqueue._JobProcessor.FINISHED)
655 self.assertRaises(IndexError, queue.GetNextUpdate)
657 def testOpcodeError(self):
658 queue = _FakeQueueForProc()
665 (23816, 100, 39, 45),
668 for (job_id, opcount, failfrom, failto) in testdata:
670 ops = [opcodes.OpTestDummy(result="Res%s" % i,
671 fail=(failfrom <= i and
673 for i in range(opcount)]
676 job = self._CreateJob(queue, job_id, ops)
678 opexec = _FakeExecOpCodeForProc(queue, None, None)
680 for idx in range(len(ops)):
681 self.assertRaises(IndexError, queue.GetNextUpdate)
682 result = jqueue._JobProcessor(queue, opexec, job)()
684 self.assertEqual(queue.GetNextUpdate(), (job, True))
685 # waitlock to running
686 self.assertEqual(queue.GetNextUpdate(), (job, True))
688 self.assertEqual(queue.GetNextUpdate(), (job, True))
689 self.assertRaises(IndexError, queue.GetNextUpdate)
691 if idx in (failfrom, len(ops) - 1):
693 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
696 self.assertEqual(result, jqueue._JobProcessor.DEFER)
698 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
700 self.assertRaises(IndexError, queue.GetNextUpdate)
703 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
704 self.assertEqual(job.GetInfo(["id"]), [job_id])
705 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
707 # Check opcode status
709 job.GetInfo(["opstatus"])[0],
710 job.GetInfo(["opresult"])[0])
712 for idx, (op, opstatus, opresult) in enumerate(data):
714 assert not op.input.fail
715 self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
716 self.assertEqual(opresult, op.input.result)
719 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
720 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
722 assert not op.input.fail
723 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
724 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
726 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
727 for op in job.ops[:failfrom]))
729 self._GenericCheckJob(job)
731 # Calling the processor on a finished job should be a no-op
732 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
733 jqueue._JobProcessor.FINISHED)
734 self.assertRaises(IndexError, queue.GetNextUpdate)
736 def testCancelWhileInQueue(self):
737 queue = _FakeQueueForProc()
739 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
744 job = self._CreateJob(queue, job_id, ops)
746 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
749 (success, _) = job.Cancel()
750 self.assert_(success)
752 self.assertRaises(IndexError, queue.GetNextUpdate)
754 self.assertFalse(job.start_timestamp)
755 self.assertTrue(job.end_timestamp)
756 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
759 # Serialize to check for differences
760 before_proc = job.Serialize()
762 # Simulate processor called in workerpool
763 opexec = _FakeExecOpCodeForProc(queue, None, None)
764 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
765 jqueue._JobProcessor.FINISHED)
768 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
769 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
770 self.assertFalse(job.start_timestamp)
771 self.assertTrue(job.end_timestamp)
772 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
774 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
775 [[constants.OP_STATUS_CANCELED for _ in job.ops],
776 ["Job canceled by request" for _ in job.ops]])
778 # Must not have changed or written
779 self.assertEqual(before_proc, job.Serialize())
780 self.assertRaises(IndexError, queue.GetNextUpdate)
782 def testCancelWhileWaitlockInQueue(self):
783 queue = _FakeQueueForProc()
785 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
790 job = self._CreateJob(queue, job_id, ops)
792 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
794 job.ops[0].status = constants.OP_STATUS_WAITING
796 assert len(job.ops) == 5
798 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
801 (success, _) = job.Cancel()
802 self.assert_(success)
804 self.assertRaises(IndexError, queue.GetNextUpdate)
806 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
809 opexec = _FakeExecOpCodeForProc(queue, None, None)
810 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
811 jqueue._JobProcessor.FINISHED)
814 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
815 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
816 self.assertFalse(job.start_timestamp)
817 self.assert_(job.end_timestamp)
818 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
820 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
821 [[constants.OP_STATUS_CANCELED for _ in job.ops],
822 ["Job canceled by request" for _ in job.ops]])
824 def testCancelWhileWaitlock(self):
825 queue = _FakeQueueForProc()
827 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
832 job = self._CreateJob(queue, job_id, ops)
834 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
836 def _BeforeStart(timeout, priority):
837 self.assertEqual(queue.GetNextUpdate(), (job, True))
838 self.assertRaises(IndexError, queue.GetNextUpdate)
839 self.assertFalse(queue.IsAcquired())
840 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
843 (success, _) = job.Cancel()
844 self.assert_(success)
846 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
848 self.assertRaises(IndexError, queue.GetNextUpdate)
850 def _AfterStart(op, cbs):
851 self.assertEqual(queue.GetNextUpdate(), (job, True))
852 self.assertRaises(IndexError, queue.GetNextUpdate)
853 self.assertFalse(queue.IsAcquired())
854 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
856 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
858 self.assertRaises(IndexError, queue.GetNextUpdate)
859 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
860 jqueue._JobProcessor.FINISHED)
861 self.assertEqual(queue.GetNextUpdate(), (job, True))
862 self.assertRaises(IndexError, queue.GetNextUpdate)
865 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
866 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
867 self.assert_(job.start_timestamp)
868 self.assert_(job.end_timestamp)
869 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
871 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
872 [[constants.OP_STATUS_CANCELED for _ in job.ops],
873 ["Job canceled by request" for _ in job.ops]])
875 def testCancelWhileWaitlockWithTimeout(self):
876 queue = _FakeQueueForProc()
878 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
883 job = self._CreateJob(queue, job_id, ops)
885 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
887 def _BeforeStart(timeout, priority):
888 self.assertFalse(queue.IsAcquired())
889 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
892 (success, _) = job.Cancel()
893 self.assert_(success)
895 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
898 # Fake an acquire attempt timing out
899 raise mcpu.LockAcquireTimeout()
901 def _AfterStart(op, cbs):
902 self.fail("Should not reach this")
904 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
906 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
907 jqueue._JobProcessor.FINISHED)
910 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
911 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
912 self.assert_(job.start_timestamp)
913 self.assert_(job.end_timestamp)
914 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
916 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
917 [[constants.OP_STATUS_CANCELED for _ in job.ops],
918 ["Job canceled by request" for _ in job.ops]])
920 def testCancelWhileRunning(self):
921 # Tests canceling a job with finished opcodes and more, unprocessed ones
922 queue = _FakeQueueForProc()
924 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
929 job = self._CreateJob(queue, job_id, ops)
931 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
933 opexec = _FakeExecOpCodeForProc(queue, None, None)
936 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
937 jqueue._JobProcessor.DEFER)
939 # Job goes back to queued
940 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
941 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
942 [[constants.OP_STATUS_SUCCESS,
943 constants.OP_STATUS_QUEUED,
944 constants.OP_STATUS_QUEUED],
945 ["Res0", None, None]])
948 (success, _) = job.Cancel()
949 self.assert_(success)
951 # Try processing another opcode (this will actually cancel the job)
952 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
953 jqueue._JobProcessor.FINISHED)
956 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
957 self.assertEqual(job.GetInfo(["id"]), [job_id])
958 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
959 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
960 [[constants.OP_STATUS_SUCCESS,
961 constants.OP_STATUS_CANCELED,
962 constants.OP_STATUS_CANCELED],
963 ["Res0", "Job canceled by request",
964 "Job canceled by request"]])
966 def testPartiallyRun(self):
967 # Tests calling the processor on a job that's been partially run before the
968 # program was restarted
969 queue = _FakeQueueForProc()
971 opexec = _FakeExecOpCodeForProc(queue, None, None)
973 for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
974 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
978 job = self._CreateJob(queue, job_id, ops)
980 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
982 for _ in range(successcount):
983 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
984 jqueue._JobProcessor.DEFER)
986 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
987 self.assertEqual(job.GetInfo(["opstatus"]),
988 [[constants.OP_STATUS_SUCCESS
989 for _ in range(successcount)] +
990 [constants.OP_STATUS_QUEUED
991 for _ in range(len(ops) - successcount)]])
993 self.assert_(job.ops_iter)
995 # Serialize and restore (simulates program restart)
996 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
997 self.assertFalse(newjob.ops_iter)
998 self._TestPartial(newjob, successcount)
1000 def _TestPartial(self, job, successcount):
1001 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1002 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1004 queue = _FakeQueueForProc()
1005 opexec = _FakeExecOpCodeForProc(queue, None, None)
1007 for remaining in reversed(range(len(job.ops) - successcount)):
1008 result = jqueue._JobProcessor(queue, opexec, job)()
1009 self.assertEqual(queue.GetNextUpdate(), (job, True))
1010 self.assertEqual(queue.GetNextUpdate(), (job, True))
1011 self.assertEqual(queue.GetNextUpdate(), (job, True))
1012 self.assertRaises(IndexError, queue.GetNextUpdate)
1016 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1019 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1021 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1023 self.assertRaises(IndexError, queue.GetNextUpdate)
1024 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1025 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1026 self.assertEqual(job.GetInfo(["opresult"]),
1027 [[op.input.result for op in job.ops]])
1028 self.assertEqual(job.GetInfo(["opstatus"]),
1029 [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1030 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1033 self._GenericCheckJob(job)
1035 # Calling the processor on a finished job should be a no-op
1036 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1037 jqueue._JobProcessor.FINISHED)
1038 self.assertRaises(IndexError, queue.GetNextUpdate)
1040 # ... also after being restored
1041 job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1042 # Calling the processor on a finished job should be a no-op
1043 self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1044 jqueue._JobProcessor.FINISHED)
1045 self.assertRaises(IndexError, queue.GetNextUpdate)
1047 def testProcessorOnRunningJob(self):
1048 ops = [opcodes.OpTestDummy(result="result", fail=False)]
1050 queue = _FakeQueueForProc()
1051 opexec = _FakeExecOpCodeForProc(queue, None, None)
1054 job = self._CreateJob(queue, 9571, ops)
1056 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1058 job.ops[0].status = constants.OP_STATUS_RUNNING
1060 assert len(job.ops) == 1
1062 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1064 # Calling on running job must fail
1065 self.assertRaises(errors.ProgrammerError,
1066 jqueue._JobProcessor(queue, opexec, job))
1068 def testLogMessages(self):
1069 # Tests the "Feedback" callback function
1070 queue = _FakeQueueForProc()
1076 (constants.ELOG_MESSAGE, "there"),
1079 (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1080 (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1083 ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1084 messages=messages.get(i, []))
1088 job = self._CreateJob(queue, 29386, ops)
1090 def _BeforeStart(timeout, priority):
1091 self.assertEqual(queue.GetNextUpdate(), (job, True))
1092 self.assertRaises(IndexError, queue.GetNextUpdate)
1093 self.assertFalse(queue.IsAcquired())
1094 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1096 def _AfterStart(op, cbs):
1097 self.assertEqual(queue.GetNextUpdate(), (job, True))
1098 self.assertRaises(IndexError, queue.GetNextUpdate)
1099 self.assertFalse(queue.IsAcquired())
1100 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1102 self.assertRaises(AssertionError, cbs.Feedback,
1103 "too", "many", "arguments")
1105 for (log_type, msg) in op.messages:
1106 self.assertRaises(IndexError, queue.GetNextUpdate)
1108 cbs.Feedback(log_type, msg)
1111 # Check for job update without replication
1112 self.assertEqual(queue.GetNextUpdate(), (job, False))
1113 self.assertRaises(IndexError, queue.GetNextUpdate)
1115 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1117 for remaining in reversed(range(len(job.ops))):
1118 self.assertRaises(IndexError, queue.GetNextUpdate)
1119 result = jqueue._JobProcessor(queue, opexec, job)()
1120 self.assertEqual(queue.GetNextUpdate(), (job, True))
1121 self.assertRaises(IndexError, queue.GetNextUpdate)
1125 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1128 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1130 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1132 self.assertRaises(IndexError, queue.GetNextUpdate)
1134 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1135 self.assertEqual(job.GetInfo(["opresult"]),
1136 [[op.input.result for op in job.ops]])
1138 logmsgcount = sum(len(m) for m in messages.values())
1140 self._CheckLogMessages(job, logmsgcount)
1142 # Serialize and restore (simulates program restart)
1143 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1144 self._CheckLogMessages(newjob, logmsgcount)
1146 # Check each message
1148 for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1149 for (serial, timestamp, log_type, msg) in oplog:
1150 (exptype, expmsg) = messages.get(idx).pop(0)
1152 self.assertEqual(log_type, exptype)
1154 self.assertEqual(log_type, constants.ELOG_MESSAGE)
1155 self.assertEqual(expmsg, msg)
1156 self.assert_(serial > prevserial)
1159 def _CheckLogMessages(self, job, count):
1161 self.assertEqual(job.log_serial, count)
1164 self.assertEqual(job.GetLogEntries(None),
1165 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1166 for entry in entries])
1168 # Filter with serial
1170 self.assert_(job.GetLogEntries(3))
1171 self.assertEqual(job.GetLogEntries(3),
1172 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1173 for entry in entries][3:])
1175 # No log message after highest serial
1176 self.assertFalse(job.GetLogEntries(count))
1177 self.assertFalse(job.GetLogEntries(count + 3))
1179 def testSubmitManyJobs(self):
1180 queue = _FakeQueueForProc()
1184 opcodes.OpTestDummy(result="Res0", fail=False,
1186 opcodes.OpTestDummy(result="Res1", fail=False,
1188 [opcodes.OpTestDummy(result="r1j0", fail=False)],
1190 opcodes.OpTestDummy(result="Res2", fail=False,
1192 [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1193 opcodes.OpTestDummy(result="r2j0o1", fail=False),
1194 opcodes.OpTestDummy(result="r2j0o2", fail=False),
1195 opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1196 [opcodes.OpTestDummy(result="r2j1", fail=False)],
1197 [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1198 opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1203 job = self._CreateJob(queue, job_id, ops)
1205 def _BeforeStart(timeout, priority):
1206 self.assertEqual(queue.GetNextUpdate(), (job, True))
1207 self.assertRaises(IndexError, queue.GetNextUpdate)
1208 self.assertFalse(queue.IsAcquired())
1209 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1210 self.assertFalse(job.cur_opctx)
1212 def _AfterStart(op, cbs):
1213 self.assertEqual(queue.GetNextUpdate(), (job, True))
1214 self.assertRaises(IndexError, queue.GetNextUpdate)
1216 self.assertFalse(queue.IsAcquired())
1217 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1218 self.assertFalse(job.cur_opctx)
1220 # Job is running, cancelling shouldn't be possible
1221 (success, _) = job.Cancel()
1222 self.assertFalse(success)
1224 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1226 for idx in range(len(ops)):
1227 self.assertRaises(IndexError, queue.GetNextUpdate)
1228 result = jqueue._JobProcessor(queue, opexec, job)()
1229 self.assertEqual(queue.GetNextUpdate(), (job, True))
1230 self.assertRaises(IndexError, queue.GetNextUpdate)
1231 if idx == len(ops) - 1:
1233 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1235 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1237 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1238 self.assert_(job.start_timestamp)
1239 self.assertFalse(job.end_timestamp)
1241 self.assertRaises(IndexError, queue.GetNextUpdate)
1243 for idx, submitted_ops in enumerate(job_ops
1245 for job_ops in op.submit_jobs):
1246 self.assertEqual(queue.GetNextSubmittedJob(),
1247 (1000 + idx, submitted_ops))
1248 self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1250 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1251 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1252 self.assertEqual(job.GetInfo(["opresult"]),
1253 [[[], [1000], [1001, 1002, 1003]]])
1254 self.assertEqual(job.GetInfo(["opstatus"]),
1255 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1257 self._GenericCheckJob(job)
1259 # Calling the processor on a finished job should be a no-op
1260 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1261 jqueue._JobProcessor.FINISHED)
1262 self.assertRaises(IndexError, queue.GetNextUpdate)
1264 def testJobDependency(self):
1265 depmgr = _FakeDependencyManager()
1266 queue = _FakeQueueForProc(depmgr=depmgr)
1268 self.assertEqual(queue.depmgr, depmgr)
1271 prev_job_id2 = 28102
1274 opcodes.OpTestDummy(result="Res0", fail=False,
1276 [prev_job_id2, None],
1277 [prev_job_id, None],
1279 opcodes.OpTestDummy(result="Res1", fail=False),
1283 job = self._CreateJob(queue, job_id, ops)
1285 def _BeforeStart(timeout, priority):
1286 if attempt == 0 or attempt > 5:
1287 # Job should only be updated when it wasn't waiting for another job
1288 self.assertEqual(queue.GetNextUpdate(), (job, True))
1289 self.assertRaises(IndexError, queue.GetNextUpdate)
1290 self.assertFalse(queue.IsAcquired())
1291 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1292 self.assertFalse(job.cur_opctx)
1294 def _AfterStart(op, cbs):
1295 self.assertEqual(queue.GetNextUpdate(), (job, True))
1296 self.assertRaises(IndexError, queue.GetNextUpdate)
1298 self.assertFalse(queue.IsAcquired())
1299 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1300 self.assertFalse(job.cur_opctx)
1302 # Job is running, cancelling shouldn't be possible
1303 (success, _) = job.Cancel()
1304 self.assertFalse(success)
1306 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1308 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1310 counter = itertools.count()
1312 attempt = counter.next()
1314 self.assertRaises(IndexError, queue.GetNextUpdate)
1315 self.assertRaises(IndexError, depmgr.GetNextNotification)
1318 depmgr.AddCheckResult(job, prev_job_id2, None,
1319 (jqueue._JobDependencyManager.WAIT, "wait2"))
1321 depmgr.AddCheckResult(job, prev_job_id2, None,
1322 (jqueue._JobDependencyManager.CONTINUE, "cont"))
1323 # The processor will ask for the next dependency immediately
1324 depmgr.AddCheckResult(job, prev_job_id, None,
1325 (jqueue._JobDependencyManager.WAIT, "wait"))
1327 depmgr.AddCheckResult(job, prev_job_id, None,
1328 (jqueue._JobDependencyManager.WAIT, "wait"))
1330 depmgr.AddCheckResult(job, prev_job_id, None,
1331 (jqueue._JobDependencyManager.CONTINUE, "cont"))
1333 self.assertEqual(depmgr.CountPendingResults(), 2)
1335 self.assertEqual(depmgr.CountPendingResults(), 0)
1337 self.assertEqual(depmgr.CountPendingResults(), 1)
1339 result = jqueue._JobProcessor(queue, opexec, job)()
1340 if attempt == 0 or attempt >= 5:
1341 # Job should only be updated if there was an actual change
1342 self.assertEqual(queue.GetNextUpdate(), (job, True))
1343 self.assertRaises(IndexError, queue.GetNextUpdate)
1344 self.assertFalse(depmgr.CountPendingResults())
1347 # Simulate waiting for other job
1348 self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1349 self.assertTrue(job.cur_opctx)
1350 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1351 self.assertRaises(IndexError, depmgr.GetNextNotification)
1352 self.assert_(job.start_timestamp)
1353 self.assertFalse(job.end_timestamp)
1356 if result == jqueue._JobProcessor.FINISHED:
1358 self.assertFalse(job.cur_opctx)
1361 self.assertRaises(IndexError, depmgr.GetNextNotification)
1363 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1364 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1365 self.assert_(job.start_timestamp)
1366 self.assertFalse(job.end_timestamp)
1368 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1369 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1370 self.assertEqual(job.GetInfo(["opresult"]),
1371 [[op.input.result for op in job.ops]])
1372 self.assertEqual(job.GetInfo(["opstatus"]),
1373 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1374 self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1377 self._GenericCheckJob(job)
1379 self.assertRaises(IndexError, queue.GetNextUpdate)
1380 self.assertRaises(IndexError, depmgr.GetNextNotification)
1381 self.assertFalse(depmgr.CountPendingResults())
1382 self.assertFalse(depmgr.CountWaitingJobs())
1384 # Calling the processor on a finished job should be a no-op
1385 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1386 jqueue._JobProcessor.FINISHED)
1387 self.assertRaises(IndexError, queue.GetNextUpdate)
1389 def testJobDependencyCancel(self):
1390 depmgr = _FakeDependencyManager()
1391 queue = _FakeQueueForProc(depmgr=depmgr)
1393 self.assertEqual(queue.depmgr, depmgr)
1398 opcodes.OpTestDummy(result="Res0", fail=False),
1399 opcodes.OpTestDummy(result="Res1", fail=False,
1401 [prev_job_id, None],
1403 opcodes.OpTestDummy(result="Res2", fail=False),
1407 job = self._CreateJob(queue, job_id, ops)
1409 def _BeforeStart(timeout, priority):
1410 if attempt == 0 or attempt > 5:
1411 # Job should only be updated when it wasn't waiting for another job
1412 self.assertEqual(queue.GetNextUpdate(), (job, True))
1413 self.assertRaises(IndexError, queue.GetNextUpdate)
1414 self.assertFalse(queue.IsAcquired())
1415 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1416 self.assertFalse(job.cur_opctx)
1418 def _AfterStart(op, cbs):
1419 self.assertEqual(queue.GetNextUpdate(), (job, True))
1420 self.assertRaises(IndexError, queue.GetNextUpdate)
1422 self.assertFalse(queue.IsAcquired())
1423 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1424 self.assertFalse(job.cur_opctx)
1426 # Job is running, cancelling shouldn't be possible
1427 (success, _) = job.Cancel()
1428 self.assertFalse(success)
1430 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1432 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1434 counter = itertools.count()
1436 attempt = counter.next()
1438 self.assertRaises(IndexError, queue.GetNextUpdate)
1439 self.assertRaises(IndexError, depmgr.GetNextNotification)
1442 # This will handle the first opcode
1445 depmgr.AddCheckResult(job, prev_job_id, None,
1446 (jqueue._JobDependencyManager.WAIT, "wait"))
1448 # Other job was cancelled
1449 depmgr.AddCheckResult(job, prev_job_id, None,
1450 (jqueue._JobDependencyManager.CANCEL, "cancel"))
1453 self.assertEqual(depmgr.CountPendingResults(), 0)
1455 self.assertEqual(depmgr.CountPendingResults(), 1)
1457 result = jqueue._JobProcessor(queue, opexec, job)()
1458 if attempt <= 1 or attempt >= 4:
1459 # Job should only be updated if there was an actual change
1460 self.assertEqual(queue.GetNextUpdate(), (job, True))
1461 self.assertRaises(IndexError, queue.GetNextUpdate)
1462 self.assertFalse(depmgr.CountPendingResults())
1464 if attempt > 0 and attempt < 4:
1465 # Simulate waiting for other job
1466 self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1467 self.assertTrue(job.cur_opctx)
1468 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1469 self.assertRaises(IndexError, depmgr.GetNextNotification)
1470 self.assert_(job.start_timestamp)
1471 self.assertFalse(job.end_timestamp)
1474 if result == jqueue._JobProcessor.FINISHED:
1476 self.assertFalse(job.cur_opctx)
1479 self.assertRaises(IndexError, depmgr.GetNextNotification)
1481 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1482 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1483 self.assert_(job.start_timestamp)
1484 self.assertFalse(job.end_timestamp)
1486 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1487 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1488 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1489 [[constants.OP_STATUS_SUCCESS,
1490 constants.OP_STATUS_CANCELED,
1491 constants.OP_STATUS_CANCELED],
1492 ["Res0", "Job canceled by request",
1493 "Job canceled by request"]])
1495 self._GenericCheckJob(job)
1497 self.assertRaises(IndexError, queue.GetNextUpdate)
1498 self.assertRaises(IndexError, depmgr.GetNextNotification)
1499 self.assertFalse(depmgr.CountPendingResults())
1501 # Calling the processor on a finished job should be a no-op
1502 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1503 jqueue._JobProcessor.FINISHED)
1504 self.assertRaises(IndexError, queue.GetNextUpdate)
1506 def testJobDependencyWrongstatus(self):
1507 depmgr = _FakeDependencyManager()
1508 queue = _FakeQueueForProc(depmgr=depmgr)
1510 self.assertEqual(queue.depmgr, depmgr)
1515 opcodes.OpTestDummy(result="Res0", fail=False),
1516 opcodes.OpTestDummy(result="Res1", fail=False,
1518 [prev_job_id, None],
1520 opcodes.OpTestDummy(result="Res2", fail=False),
1524 job = self._CreateJob(queue, job_id, ops)
1526 def _BeforeStart(timeout, priority):
1527 if attempt == 0 or attempt > 5:
1528 # Job should only be updated when it wasn't waiting for another job
1529 self.assertEqual(queue.GetNextUpdate(), (job, True))
1530 self.assertRaises(IndexError, queue.GetNextUpdate)
1531 self.assertFalse(queue.IsAcquired())
1532 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1533 self.assertFalse(job.cur_opctx)
1535 def _AfterStart(op, cbs):
1536 self.assertEqual(queue.GetNextUpdate(), (job, True))
1537 self.assertRaises(IndexError, queue.GetNextUpdate)
1539 self.assertFalse(queue.IsAcquired())
1540 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1541 self.assertFalse(job.cur_opctx)
1543 # Job is running, cancelling shouldn't be possible
1544 (success, _) = job.Cancel()
1545 self.assertFalse(success)
1547 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1549 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1551 counter = itertools.count()
1553 attempt = counter.next()
1555 self.assertRaises(IndexError, queue.GetNextUpdate)
1556 self.assertRaises(IndexError, depmgr.GetNextNotification)
1559 # This will handle the first opcode
1562 depmgr.AddCheckResult(job, prev_job_id, None,
1563 (jqueue._JobDependencyManager.WAIT, "wait"))
1566 depmgr.AddCheckResult(job, prev_job_id, None,
1567 (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1570 self.assertEqual(depmgr.CountPendingResults(), 0)
1572 self.assertEqual(depmgr.CountPendingResults(), 1)
1574 result = jqueue._JobProcessor(queue, opexec, job)()
1575 if attempt <= 1 or attempt >= 4:
1576 # Job should only be updated if there was an actual change
1577 self.assertEqual(queue.GetNextUpdate(), (job, True))
1578 self.assertRaises(IndexError, queue.GetNextUpdate)
1579 self.assertFalse(depmgr.CountPendingResults())
1581 if attempt > 0 and attempt < 4:
1582 # Simulate waiting for other job
1583 self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1584 self.assertTrue(job.cur_opctx)
1585 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1586 self.assertRaises(IndexError, depmgr.GetNextNotification)
1587 self.assert_(job.start_timestamp)
1588 self.assertFalse(job.end_timestamp)
1591 if result == jqueue._JobProcessor.FINISHED:
1593 self.assertFalse(job.cur_opctx)
1596 self.assertRaises(IndexError, depmgr.GetNextNotification)
1598 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1599 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1600 self.assert_(job.start_timestamp)
1601 self.assertFalse(job.end_timestamp)
1603 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1604 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1605 self.assertEqual(job.GetInfo(["opstatus"]),
1606 [[constants.OP_STATUS_SUCCESS,
1607 constants.OP_STATUS_ERROR,
1608 constants.OP_STATUS_ERROR]]),
1610 (opresult, ) = job.GetInfo(["opresult"])
1611 self.assertEqual(len(opresult), len(ops))
1612 self.assertEqual(opresult[0], "Res0")
1613 self.assertTrue(errors.GetEncodedError(opresult[1]))
1614 self.assertTrue(errors.GetEncodedError(opresult[2]))
1616 self._GenericCheckJob(job)
1618 self.assertRaises(IndexError, queue.GetNextUpdate)
1619 self.assertRaises(IndexError, depmgr.GetNextNotification)
1620 self.assertFalse(depmgr.CountPendingResults())
1622 # Calling the processor on a finished job should be a no-op
1623 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1624 jqueue._JobProcessor.FINISHED)
1625 self.assertRaises(IndexError, queue.GetNextUpdate)
1628 class _FakeTimeoutStrategy:
1629 def __init__(self, timeouts):
1630 self.timeouts = timeouts
1632 self.last_timeout = None
1634 def NextAttempt(self):
1637 timeout = self.timeouts.pop(0)
1640 self.last_timeout = timeout
1644 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1646 self.queue = _FakeQueueForProc()
1649 self.opcounter = None
1650 self.timeout_strategy = None
1652 self.prev_tsop = None
1653 self.prev_prio = None
1654 self.prev_status = None
1655 self.lock_acq_prio = None
1656 self.gave_lock = None
1657 self.done_lock_before_blocking = False
1659 def _BeforeStart(self, timeout, priority):
1662 # If status has changed, job must've been written
1663 if self.prev_status != self.job.ops[self.curop].status:
1664 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1665 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1667 self.assertFalse(self.queue.IsAcquired())
1668 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1670 ts = self.timeout_strategy
1672 self.assert_(timeout is None or isinstance(timeout, (int, float)))
1673 self.assertEqual(timeout, ts.last_timeout)
1674 self.assertEqual(priority, job.ops[self.curop].priority)
1676 self.gave_lock = True
1677 self.lock_acq_prio = priority
1679 if (self.curop == 3 and
1680 job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1681 # Give locks before running into blocking acquire
1682 assert self.retries == 7
1684 self.done_lock_before_blocking = True
1687 if self.retries > 0:
1688 self.assert_(timeout is not None)
1690 self.gave_lock = False
1691 raise mcpu.LockAcquireTimeout()
1693 if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1694 assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1695 assert not ts.timeouts
1696 self.assert_(timeout is None)
1698 def _AfterStart(self, op, cbs):
1701 # Setting to "running" requires an update
1702 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1703 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1705 self.assertFalse(self.queue.IsAcquired())
1706 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1708 # Job is running, cancelling shouldn't be possible
1709 (success, _) = job.Cancel()
1710 self.assertFalse(success)
1712 def _NextOpcode(self):
1713 self.curop = self.opcounter.next()
1714 self.prev_prio = self.job.ops[self.curop].priority
1715 self.prev_status = self.job.ops[self.curop].status
1717 def _NewTimeoutStrategy(self):
1720 self.assertEqual(self.retries, 0)
1722 if self.prev_tsop == self.curop:
1723 # Still on the same opcode, priority must've been increased
1724 self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1728 timeouts = range(10, 31, 10)
1729 self.retries = len(timeouts) - 1
1731 elif self.curop == 2:
1732 # Let this run into a blocking acquire
1733 timeouts = range(11, 61, 12)
1734 self.retries = len(timeouts)
1736 elif self.curop == 3:
1737 # Wait for priority to increase, but give lock before blocking acquire
1738 timeouts = range(12, 100, 14)
1739 self.retries = len(timeouts)
1741 self.assertFalse(self.done_lock_before_blocking)
1743 elif self.curop == 4:
1744 self.assert_(self.done_lock_before_blocking)
1746 # Timeouts, but no need to retry
1747 timeouts = range(10, 31, 10)
1750 elif self.curop == 5:
1752 timeouts = range(19, 100, 11)
1753 self.retries = len(timeouts)
1759 assert len(job.ops) == 10
1760 assert self.retries <= len(timeouts)
1762 ts = _FakeTimeoutStrategy(timeouts)
1764 self.timeout_strategy = ts
1765 self.prev_tsop = self.curop
1766 self.prev_prio = job.ops[self.curop].priority
1770 def testTimeout(self):
1771 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1776 job = self._CreateJob(self.queue, job_id, ops)
1779 self.opcounter = itertools.count(0)
1781 opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1783 tsf = self._NewTimeoutStrategy
1785 self.assertFalse(self.done_lock_before_blocking)
1788 proc = jqueue._JobProcessor(self.queue, opexec, job,
1789 _timeout_strategy_factory=tsf)
1791 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1793 if self.curop is not None:
1794 self.prev_status = self.job.ops[self.curop].status
1796 self.lock_acq_prio = None
1798 result = proc(_nextop_fn=self._NextOpcode)
1799 assert self.curop is not None
1801 if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
1802 # Got lock and/or job is done, result must've been written
1803 self.assertFalse(job.cur_opctx)
1804 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1805 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1806 self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1807 self.assert_(job.ops[self.curop].exec_timestamp)
1809 if result == jqueue._JobProcessor.FINISHED:
1810 self.assertFalse(job.cur_opctx)
1813 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1816 self.assertEqual(job.ops[self.curop].start_timestamp,
1817 job.start_timestamp)
1820 # Opcode finished, but job not yet done
1821 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1824 self.assert_(job.cur_opctx)
1825 self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1826 self.timeout_strategy.NextAttempt)
1827 self.assertFalse(job.ops[self.curop].exec_timestamp)
1828 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1830 # If priority has changed since acquiring locks, the job must've been
1832 if self.lock_acq_prio != job.ops[self.curop].priority:
1833 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1835 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1837 self.assert_(job.start_timestamp)
1838 self.assertFalse(job.end_timestamp)
1840 self.assertEqual(self.curop, len(job.ops) - 1)
1841 self.assertEqual(self.job, job)
1842 self.assertEqual(self.opcounter.next(), len(job.ops))
1843 self.assert_(self.done_lock_before_blocking)
1845 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1846 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1847 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1848 self.assertEqual(job.GetInfo(["opresult"]),
1849 [[op.input.result for op in job.ops]])
1850 self.assertEqual(job.GetInfo(["opstatus"]),
1851 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1852 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1855 # Calling the processor on a finished job should be a no-op
1856 self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
1857 jqueue._JobProcessor.FINISHED)
1858 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1861 class TestJobDependencyManager(unittest.TestCase):
1863 def __init__(self, job_id):
1864 self.id = str(job_id)
1869 self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1871 def _GetStatus(self, job_id):
1872 (exp_job_id, result) = self._status.pop(0)
1873 self.assertEqual(exp_job_id, job_id)
1876 def _Enqueue(self, jobs):
1877 self._queue.append(jobs)
1879 def testNotFinalizedThenCancel(self):
1880 job = self._FakeJob(17697)
1883 self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1884 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1885 self.assertEqual(result, self.jdm.WAIT)
1886 self.assertFalse(self._status)
1887 self.assertFalse(self._queue)
1888 self.assertTrue(self.jdm.JobWaiting(job))
1889 self.assertEqual(self.jdm._waiters, {
1892 self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1893 ("job/28625", None, None, [("job", [job.id])])
1896 self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1897 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1898 self.assertEqual(result, self.jdm.CANCEL)
1899 self.assertFalse(self._status)
1900 self.assertFalse(self._queue)
1901 self.assertFalse(self.jdm.JobWaiting(job))
1902 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1904 def testRequireCancel(self):
1905 job = self._FakeJob(5278)
1907 dep_status = [constants.JOB_STATUS_CANCELED]
1909 self._status.append((job_id, constants.JOB_STATUS_WAITING))
1910 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1911 self.assertEqual(result, self.jdm.WAIT)
1912 self.assertFalse(self._status)
1913 self.assertFalse(self._queue)
1914 self.assertTrue(self.jdm.JobWaiting(job))
1915 self.assertEqual(self.jdm._waiters, {
1918 self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1919 ("job/9610", None, None, [("job", [job.id])])
1922 self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1923 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1924 self.assertEqual(result, self.jdm.CONTINUE)
1925 self.assertFalse(self._status)
1926 self.assertFalse(self._queue)
1927 self.assertFalse(self.jdm.JobWaiting(job))
1928 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1930 def testRequireError(self):
1931 job = self._FakeJob(21459)
1933 dep_status = [constants.JOB_STATUS_ERROR]
1935 self._status.append((job_id, constants.JOB_STATUS_WAITING))
1936 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1937 self.assertEqual(result, self.jdm.WAIT)
1938 self.assertFalse(self._status)
1939 self.assertFalse(self._queue)
1940 self.assertTrue(self.jdm.JobWaiting(job))
1941 self.assertEqual(self.jdm._waiters, {
1945 self._status.append((job_id, constants.JOB_STATUS_ERROR))
1946 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1947 self.assertEqual(result, self.jdm.CONTINUE)
1948 self.assertFalse(self._status)
1949 self.assertFalse(self._queue)
1950 self.assertFalse(self.jdm.JobWaiting(job))
1951 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1953 def testRequireMultiple(self):
1954 dep_status = list(constants.JOBS_FINALIZED)
1956 for end_status in dep_status:
1957 job = self._FakeJob(21343)
1960 self._status.append((job_id, constants.JOB_STATUS_WAITING))
1961 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1962 self.assertEqual(result, self.jdm.WAIT)
1963 self.assertFalse(self._status)
1964 self.assertFalse(self._queue)
1965 self.assertTrue(self.jdm.JobWaiting(job))
1966 self.assertEqual(self.jdm._waiters, {
1969 self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1970 ("job/14609", None, None, [("job", [job.id])])
1973 self._status.append((job_id, end_status))
1974 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1975 self.assertEqual(result, self.jdm.CONTINUE)
1976 self.assertFalse(self._status)
1977 self.assertFalse(self._queue)
1978 self.assertFalse(self.jdm.JobWaiting(job))
1979 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1981 def testNotify(self):
1982 job = self._FakeJob(8227)
1985 self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1986 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1987 self.assertEqual(result, self.jdm.WAIT)
1988 self.assertFalse(self._status)
1989 self.assertFalse(self._queue)
1990 self.assertTrue(self.jdm.JobWaiting(job))
1991 self.assertEqual(self.jdm._waiters, {
1995 self.jdm.NotifyWaiters(job_id)
1996 self.assertFalse(self._status)
1997 self.assertFalse(self.jdm._waiters)
1998 self.assertFalse(self.jdm.JobWaiting(job))
1999 self.assertEqual(self._queue, [set([job])])
2001 def testWrongStatus(self):
2002 job = self._FakeJob(10102)
2005 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2006 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2007 [constants.JOB_STATUS_SUCCESS])
2008 self.assertEqual(result, self.jdm.WAIT)
2009 self.assertFalse(self._status)
2010 self.assertFalse(self._queue)
2011 self.assertTrue(self.jdm.JobWaiting(job))
2012 self.assertEqual(self.jdm._waiters, {
2016 self._status.append((job_id, constants.JOB_STATUS_ERROR))
2017 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2018 [constants.JOB_STATUS_SUCCESS])
2019 self.assertEqual(result, self.jdm.WRONGSTATUS)
2020 self.assertFalse(self._status)
2021 self.assertFalse(self._queue)
2022 self.assertFalse(self.jdm.JobWaiting(job))
2024 def testCorrectStatus(self):
2025 job = self._FakeJob(24273)
2028 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2029 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2030 [constants.JOB_STATUS_SUCCESS])
2031 self.assertEqual(result, self.jdm.WAIT)
2032 self.assertFalse(self._status)
2033 self.assertFalse(self._queue)
2034 self.assertTrue(self.jdm.JobWaiting(job))
2035 self.assertEqual(self.jdm._waiters, {
2039 self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2040 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2041 [constants.JOB_STATUS_SUCCESS])
2042 self.assertEqual(result, self.jdm.CONTINUE)
2043 self.assertFalse(self._status)
2044 self.assertFalse(self._queue)
2045 self.assertFalse(self.jdm.JobWaiting(job))
2047 def testFinalizedRightAway(self):
2048 job = self._FakeJob(224)
2051 self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2052 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2053 [constants.JOB_STATUS_SUCCESS])
2054 self.assertEqual(result, self.jdm.CONTINUE)
2055 self.assertFalse(self._status)
2056 self.assertFalse(self._queue)
2057 self.assertFalse(self.jdm.JobWaiting(job))
2058 self.assertEqual(self.jdm._waiters, {
2063 self.jdm.NotifyWaiters("0")
2064 self.assertFalse(self.jdm._waiters)
2065 self.assertFalse(self._status)
2066 self.assertFalse(self._queue)
2068 def testMultipleWaiting(self):
2069 # Use a deterministic random generator
2070 rnd = random.Random(21402)
2072 job_ids = map(str, rnd.sample(range(1, 10000), 150))
2074 waiters = dict((job_ids.pop(),
2075 set(map(self._FakeJob,
2077 for _ in range(rnd.randint(1, 20))])))
2080 # Ensure there are no duplicate job IDs
2081 assert not utils.FindDuplicates(waiters.keys() +
2083 for jobs in waiters.values()
2086 # Register all jobs as waiters
2087 for job_id, job in [(job_id, job)
2088 for (job_id, jobs) in waiters.items()
2090 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2091 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2092 [constants.JOB_STATUS_SUCCESS])
2093 self.assertEqual(result, self.jdm.WAIT)
2094 self.assertFalse(self._status)
2095 self.assertFalse(self._queue)
2096 self.assertTrue(self.jdm.JobWaiting(job))
2098 self.assertEqual(self.jdm._waiters, waiters)
2100 def _MakeSet((name, mode, owner_names, pending)):
2101 return (name, mode, owner_names,
2102 [(pendmode, set(pend)) for (pendmode, pend) in pending])
2104 def _CheckLockInfo():
2105 info = self.jdm.GetLockInfo([query.LQ_PENDING])
2106 self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2107 ("job/%s" % job_id, None, None,
2108 [("job", set([job.id for job in jobs]))])
2109 for job_id, jobs in waiters.items()
2115 # Notify in random order
2116 for job_id in rnd.sample(waiters, len(waiters)):
2117 # Remove from pending waiter list
2118 jobs = waiters.pop(job_id)
2120 self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2121 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2122 [constants.JOB_STATUS_SUCCESS])
2123 self.assertEqual(result, self.jdm.CONTINUE)
2124 self.assertFalse(self._status)
2125 self.assertFalse(self._queue)
2126 self.assertFalse(self.jdm.JobWaiting(job))
2130 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2134 def testSelfDependency(self):
2135 job = self._FakeJob(18937)
2137 self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2138 (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2139 self.assertEqual(result, self.jdm.ERROR)
2141 def testJobDisappears(self):
2142 job = self._FakeJob(30540)
2146 raise errors.JobLost("#msg#")
2148 jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2149 (result, _) = jdm.CheckAndRegister(job, job_id, [])
2150 self.assertEqual(result, self.jdm.ERROR)
2151 self.assertFalse(jdm.JobWaiting(job))
2152 self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2155 if __name__ == "__main__":
2156 testutils.GanetiTestProgram()