4 # Copyright (C) 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.jqueue"""
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import jqueue
36 from ganeti import opcodes
37 from ganeti import compat
38 from ganeti import mcpu
44 def __init__(self, job_id, status):
49 def SetStatus(self, status):
52 def AddLogEntry(self, msg):
53 self._log.append((len(self._log), msg))
58 def GetInfo(self, fields):
63 result.append(self._status)
65 raise Exception("Unknown field")
69 def GetLogEntries(self, newer_than):
70 assert newer_than is None or newer_than >= 0
72 if newer_than is None:
75 return self._log[newer_than:]
78 class TestJobChangesChecker(unittest.TestCase):
80 job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81 checker = jqueue._JobChangesChecker(["status"], None, None)
82 self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
84 job.SetStatus(constants.JOB_STATUS_RUNNING)
85 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
87 job.SetStatus(constants.JOB_STATUS_SUCCESS)
88 self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
90 # job.id is used by checker
91 self.assertEqual(job.id, 9094)
93 def testStatusWithPrev(self):
94 job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95 checker = jqueue._JobChangesChecker(["status"],
96 [constants.JOB_STATUS_QUEUED], None)
97 self.assert_(checker(job) is None)
99 job.SetStatus(constants.JOB_STATUS_RUNNING)
100 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
102 def testFinalStatus(self):
103 for status in constants.JOBS_FINALIZED:
104 job = _FakeJob(2178711, status)
105 checker = jqueue._JobChangesChecker(["status"], [status], None)
106 # There won't be any changes in this status, hence it should signal
107 # a change immediately
108 self.assertEqual(checker(job), ([status], []))
111 job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112 checker = jqueue._JobChangesChecker(["status"], None, None)
113 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
115 job.AddLogEntry("Hello World")
116 (job_info, log_entries) = checker(job)
117 self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118 self.assertEqual(log_entries, [[0, "Hello World"]])
120 checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121 self.assert_(checker2(job) is None)
123 job.AddLogEntry("Foo Bar")
124 job.SetStatus(constants.JOB_STATUS_ERROR)
126 (job_info, log_entries) = checker2(job)
127 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128 self.assertEqual(log_entries, [[1, "Foo Bar"]])
130 checker3 = jqueue._JobChangesChecker(["status"], None, None)
131 (job_info, log_entries) = checker3(job)
132 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133 self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
136 class TestJobChangesWaiter(unittest.TestCase):
138 self.tmpdir = tempfile.mkdtemp()
139 self.filename = utils.PathJoin(self.tmpdir, "job-1")
140 utils.WriteFile(self.filename, data="")
143 shutil.rmtree(self.tmpdir)
145 def _EnsureNotifierClosed(self, notifier):
147 os.fstat(notifier._fd)
148 except EnvironmentError, err:
149 self.assertEqual(err.errno, errno.EBADF)
151 self.fail("File descriptor wasn't closed")
154 for wait in [False, True]:
155 waiter = jqueue._JobFileChangesWaiter(self.filename)
162 # Ensure file descriptor was closed
163 self._EnsureNotifierClosed(waiter._notifier)
165 def testChangingFile(self):
166 waiter = jqueue._JobFileChangesWaiter(self.filename)
168 self.assertFalse(waiter.Wait(0.1))
169 utils.WriteFile(self.filename, data="changed")
170 self.assert_(waiter.Wait(60))
174 self._EnsureNotifierClosed(waiter._notifier)
176 def testChangingFile2(self):
177 waiter = jqueue._JobChangesWaiter(self.filename)
179 self.assertFalse(waiter._filewaiter)
180 self.assert_(waiter.Wait(0.1))
181 self.assert_(waiter._filewaiter)
183 # File waiter is now used, but there have been no changes
184 self.assertFalse(waiter.Wait(0.1))
185 utils.WriteFile(self.filename, data="changed")
186 self.assert_(waiter.Wait(60))
190 self._EnsureNotifierClosed(waiter._filewaiter._notifier)
193 class TestWaitForJobChangesHelper(unittest.TestCase):
195 self.tmpdir = tempfile.mkdtemp()
196 self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197 utils.WriteFile(self.filename, data="")
200 shutil.rmtree(self.tmpdir)
202 def _LoadWaitingJob(self):
203 return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
205 def _LoadLostJob(self):
208 def testNoChanges(self):
209 wfjc = jqueue._WaitForJobChangesHelper()
212 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213 [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214 constants.JOB_NOTCHANGED)
216 # No previous information
217 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218 ["status"], None, None, 1.0),
219 ([constants.JOB_STATUS_WAITLOCK], []))
221 def testLostJob(self):
222 wfjc = jqueue._WaitForJobChangesHelper()
223 self.assert_(wfjc(self.filename, self._LoadLostJob,
224 ["status"], None, None, 1.0) is None)
227 class TestEncodeOpError(unittest.TestCase):
229 encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230 self.assert_(isinstance(encerr, tuple))
231 self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
233 encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234 self.assert_(isinstance(encerr, tuple))
235 self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
237 encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238 self.assert_(isinstance(encerr, tuple))
239 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
241 encerr = jqueue._EncodeOpError("Hello World")
242 self.assert_(isinstance(encerr, tuple))
243 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
246 class TestQueuedOpCode(unittest.TestCase):
247 def testDefaults(self):
249 self.assertFalse(hasattr(op.input, "dry_run"))
250 self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251 self.assertFalse(op.log)
252 self.assert_(op.start_timestamp is None)
253 self.assert_(op.exec_timestamp is None)
254 self.assert_(op.end_timestamp is None)
255 self.assert_(op.result is None)
256 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
258 op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
260 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
262 self.assertEqual(op1.Serialize(), op2.Serialize())
264 def testPriority(self):
266 assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267 "Default priority equals high priority; test can't work"
268 self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
271 inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
272 op1 = jqueue._QueuedOpCode(inpop)
274 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
276 self.assertEqual(op1.Serialize(), op2.Serialize())
279 class TestQueuedJob(unittest.TestCase):
281 self.assertRaises(errors.GenericError, jqueue._QueuedJob,
284 def testDefaults(self):
288 opcodes.OpTestDelay(),
292 self.assertEqual(job.id, job_id)
293 self.assertEqual(job.log_serial, 0)
294 self.assert_(job.received_timestamp)
295 self.assert_(job.start_timestamp is None)
296 self.assert_(job.end_timestamp is None)
297 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299 self.assert_(repr(job).startswith("<"))
300 self.assertEqual(len(job.ops), len(ops))
301 self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302 for (inp, op) in zip(ops, job.ops)))
303 self.assertRaises(errors.OpExecError, job.GetInfo,
305 self.assertEqual(job.GetInfo(["summary"]),
306 [[op.input.Summary() for op in job.ops]])
308 job1 = jqueue._QueuedJob(None, job_id, ops)
310 job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
312 self.assertEqual(job1.Serialize(), job2.Serialize())
314 def testPriority(self):
317 opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
318 opcodes.OpTestDelay(),
322 self.assertEqual(job.id, job_id)
323 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324 self.assert_(repr(job).startswith("<"))
326 job = jqueue._QueuedJob(None, job_id, ops)
328 self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
330 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
333 job.ops[0].priority -= 1
335 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
337 # Mark opcode as finished
338 job.ops[0].status = constants.OP_STATUS_SUCCESS
340 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
343 job.ops[1].priority -= 10
344 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
346 # Test increasing first
347 job.ops[0].status = constants.OP_STATUS_RUNNING
348 job.ops[0].priority -= 19
349 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
351 def testCalcStatus(self):
353 # The default status is "queued"
354 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
358 ops[0].status = constants.OP_STATUS_WAITLOCK
361 ops[0].status = constants.OP_STATUS_SUCCESS
362 ops[1].status = constants.OP_STATUS_SUCCESS
363 ops[2].status = constants.OP_STATUS_WAITLOCK
366 ops[0].status = constants.OP_STATUS_SUCCESS
367 ops[1].status = constants.OP_STATUS_RUNNING
369 op.status = constants.OP_STATUS_QUEUED
371 def _Canceling1(ops):
372 ops[0].status = constants.OP_STATUS_SUCCESS
373 ops[1].status = constants.OP_STATUS_SUCCESS
375 op.status = constants.OP_STATUS_CANCELING
377 def _Canceling2(ops):
379 op.status = constants.OP_STATUS_CANCELING
383 op.status = constants.OP_STATUS_CANCELED
386 for idx, op in enumerate(ops):
388 op.status = constants.OP_STATUS_ERROR
390 op.status = constants.OP_STATUS_SUCCESS
394 op.status = constants.OP_STATUS_ERROR
398 op.status = constants.OP_STATUS_SUCCESS
401 constants.JOB_STATUS_QUEUED: [_Queued],
402 constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403 constants.JOB_STATUS_RUNNING: [_Running],
404 constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405 constants.JOB_STATUS_CANCELED: [_Canceled],
406 constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407 constants.JOB_STATUS_SUCCESS: [_Success],
411 job = jqueue._QueuedJob(None, 1,
412 [opcodes.OpTestDelay() for _ in range(10)])
413 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
418 for status in constants.JOB_STATUS_ALL:
419 sttests = tests[status]
424 self.assertEqual(job.CalcStatus(), status)
427 class _FakeQueueForProc:
429 self._acquired = False
433 self._submit_count = itertools.count(1000)
435 def IsAcquired(self):
436 return self._acquired
438 def GetNextUpdate(self):
439 return self._updates.pop(0)
441 def GetNextSubmittedJob(self):
442 return self._submitted.pop(0)
444 def acquire(self, shared=0):
446 self._acquired = True
449 assert self._acquired
450 self._acquired = False
452 def UpdateJobUnlocked(self, job, replicate=True):
453 assert self._acquired, "Lock not acquired while updating job"
454 self._updates.append((job, bool(replicate)))
456 def SubmitManyJobs(self, jobs):
457 assert not self._acquired, "Lock acquired while submitting jobs"
458 job_ids = [self._submit_count.next() for _ in jobs]
459 self._submitted.extend(zip(job_ids, jobs))
463 class _FakeExecOpCodeForProc:
464 def __init__(self, queue, before_start, after_start):
466 self._before_start = before_start
467 self._after_start = after_start
469 def __call__(self, op, cbs, timeout=None, priority=None):
470 assert isinstance(op, opcodes.OpTestDummy)
471 assert not self._queue.IsAcquired(), \
472 "Queue lock not released when executing opcode"
474 if self._before_start:
475 self._before_start(timeout, priority)
479 if self._after_start:
480 self._after_start(op, cbs)
482 # Check again after the callbacks
483 assert not self._queue.IsAcquired()
486 raise errors.OpExecError("Error requested (%s)" % op.result)
488 if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
489 return cbs.SubmitManyJobs(op.submit_jobs)
494 class _JobProcessorTestUtils:
495 def _CreateJob(self, queue, job_id, ops):
496 job = jqueue._QueuedJob(queue, job_id, ops)
497 self.assertFalse(job.start_timestamp)
498 self.assertFalse(job.end_timestamp)
499 self.assertEqual(len(ops), len(job.ops))
500 self.assert_(compat.all(op.input == inp
501 for (op, inp) in zip(job.ops, ops)))
502 self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
506 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
507 def _GenericCheckJob(self, job):
508 assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
511 self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
512 [[op.start_timestamp for op in job.ops],
513 [op.exec_timestamp for op in job.ops],
514 [op.end_timestamp for op in job.ops]])
515 self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
516 [job.received_timestamp,
519 self.assert_(job.start_timestamp)
520 self.assert_(job.end_timestamp)
521 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
523 def testSuccess(self):
524 queue = _FakeQueueForProc()
526 for (job_id, opcount) in [(25351, 1), (6637, 3),
527 (24644, 10), (32207, 100)]:
528 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
529 for i in range(opcount)]
532 job = self._CreateJob(queue, job_id, ops)
534 def _BeforeStart(timeout, priority):
535 self.assertEqual(queue.GetNextUpdate(), (job, True))
536 self.assertRaises(IndexError, queue.GetNextUpdate)
537 self.assertFalse(queue.IsAcquired())
538 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
539 self.assertFalse(job.cur_opctx)
541 def _AfterStart(op, cbs):
542 self.assertEqual(queue.GetNextUpdate(), (job, True))
543 self.assertRaises(IndexError, queue.GetNextUpdate)
545 self.assertFalse(queue.IsAcquired())
546 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
547 self.assertFalse(job.cur_opctx)
549 # Job is running, cancelling shouldn't be possible
550 (success, _) = job.Cancel()
551 self.assertFalse(success)
553 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
555 for idx in range(len(ops)):
556 self.assertRaises(IndexError, queue.GetNextUpdate)
557 result = jqueue._JobProcessor(queue, opexec, job)()
558 self.assertEqual(queue.GetNextUpdate(), (job, True))
559 self.assertRaises(IndexError, queue.GetNextUpdate)
560 if idx == len(ops) - 1:
564 self.assertFalse(result)
566 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
567 self.assert_(job.start_timestamp)
568 self.assertFalse(job.end_timestamp)
570 self.assertRaises(IndexError, queue.GetNextUpdate)
572 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
573 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
574 self.assertEqual(job.GetInfo(["opresult"]),
575 [[op.input.result for op in job.ops]])
576 self.assertEqual(job.GetInfo(["opstatus"]),
577 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
578 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
581 self._GenericCheckJob(job)
583 # Calling the processor on a finished job should be a no-op
584 self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
585 self.assertRaises(IndexError, queue.GetNextUpdate)
587 def testOpcodeError(self):
588 queue = _FakeQueueForProc()
595 (23816, 100, 39, 45),
598 for (job_id, opcount, failfrom, failto) in testdata:
600 ops = [opcodes.OpTestDummy(result="Res%s" % i,
601 fail=(failfrom <= i and
603 for i in range(opcount)]
606 job = self._CreateJob(queue, job_id, ops)
608 opexec = _FakeExecOpCodeForProc(queue, None, None)
610 for idx in range(len(ops)):
611 self.assertRaises(IndexError, queue.GetNextUpdate)
612 result = jqueue._JobProcessor(queue, opexec, job)()
614 self.assertEqual(queue.GetNextUpdate(), (job, True))
615 # waitlock to running
616 self.assertEqual(queue.GetNextUpdate(), (job, True))
618 self.assertEqual(queue.GetNextUpdate(), (job, True))
619 self.assertRaises(IndexError, queue.GetNextUpdate)
621 if idx in (failfrom, len(ops) - 1):
626 self.assertFalse(result)
628 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
630 self.assertRaises(IndexError, queue.GetNextUpdate)
633 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
634 self.assertEqual(job.GetInfo(["id"]), [job_id])
635 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
637 # Check opcode status
639 job.GetInfo(["opstatus"])[0],
640 job.GetInfo(["opresult"])[0])
642 for idx, (op, opstatus, opresult) in enumerate(data):
644 assert not op.input.fail
645 self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
646 self.assertEqual(opresult, op.input.result)
649 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
650 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
652 assert not op.input.fail
653 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
654 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
656 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
657 for op in job.ops[:failfrom]))
659 self._GenericCheckJob(job)
661 # Calling the processor on a finished job should be a no-op
662 self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
663 self.assertRaises(IndexError, queue.GetNextUpdate)
665 def testCancelWhileInQueue(self):
666 queue = _FakeQueueForProc()
668 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
673 job = self._CreateJob(queue, job_id, ops)
675 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
678 (success, _) = job.Cancel()
679 self.assert_(success)
681 self.assertRaises(IndexError, queue.GetNextUpdate)
683 self.assertFalse(job.start_timestamp)
684 self.assertTrue(job.end_timestamp)
685 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
688 # Serialize to check for differences
689 before_proc = job.Serialize()
691 # Simulate processor called in workerpool
692 opexec = _FakeExecOpCodeForProc(queue, None, None)
693 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
696 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
697 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
698 self.assertFalse(job.start_timestamp)
699 self.assertTrue(job.end_timestamp)
700 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
702 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
703 [[constants.OP_STATUS_CANCELED for _ in job.ops],
704 ["Job canceled by request" for _ in job.ops]])
706 # Must not have changed or written
707 self.assertEqual(before_proc, job.Serialize())
708 self.assertRaises(IndexError, queue.GetNextUpdate)
710 def testCancelWhileWaitlockInQueue(self):
711 queue = _FakeQueueForProc()
713 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
718 job = self._CreateJob(queue, job_id, ops)
720 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
722 job.ops[0].status = constants.OP_STATUS_WAITLOCK
724 assert len(job.ops) == 5
726 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
729 (success, _) = job.Cancel()
730 self.assert_(success)
732 self.assertRaises(IndexError, queue.GetNextUpdate)
734 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
737 opexec = _FakeExecOpCodeForProc(queue, None, None)
738 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
741 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
742 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
743 self.assertFalse(job.start_timestamp)
744 self.assert_(job.end_timestamp)
745 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
747 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
748 [[constants.OP_STATUS_CANCELED for _ in job.ops],
749 ["Job canceled by request" for _ in job.ops]])
751 def testCancelWhileWaitlock(self):
752 queue = _FakeQueueForProc()
754 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
759 job = self._CreateJob(queue, job_id, ops)
761 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
763 def _BeforeStart(timeout, priority):
764 self.assertEqual(queue.GetNextUpdate(), (job, True))
765 self.assertRaises(IndexError, queue.GetNextUpdate)
766 self.assertFalse(queue.IsAcquired())
767 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
770 (success, _) = job.Cancel()
771 self.assert_(success)
773 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
775 self.assertRaises(IndexError, queue.GetNextUpdate)
777 def _AfterStart(op, cbs):
778 self.assertEqual(queue.GetNextUpdate(), (job, True))
779 self.assertRaises(IndexError, queue.GetNextUpdate)
780 self.assertFalse(queue.IsAcquired())
781 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
783 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
785 self.assertRaises(IndexError, queue.GetNextUpdate)
786 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
787 self.assertEqual(queue.GetNextUpdate(), (job, True))
788 self.assertRaises(IndexError, queue.GetNextUpdate)
791 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
792 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
793 self.assert_(job.start_timestamp)
794 self.assert_(job.end_timestamp)
795 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
797 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
798 [[constants.OP_STATUS_CANCELED for _ in job.ops],
799 ["Job canceled by request" for _ in job.ops]])
801 def testCancelWhileWaitlockWithTimeout(self):
802 queue = _FakeQueueForProc()
804 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
809 job = self._CreateJob(queue, job_id, ops)
811 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
813 def _BeforeStart(timeout, priority):
814 self.assertFalse(queue.IsAcquired())
815 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
818 (success, _) = job.Cancel()
819 self.assert_(success)
821 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
824 # Fake an acquire attempt timing out
825 raise mcpu.LockAcquireTimeout()
827 def _AfterStart(op, cbs):
828 self.fail("Should not reach this")
830 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
832 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
835 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
836 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
837 self.assert_(job.start_timestamp)
838 self.assert_(job.end_timestamp)
839 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
841 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
842 [[constants.OP_STATUS_CANCELED for _ in job.ops],
843 ["Job canceled by request" for _ in job.ops]])
845 def testCancelWhileRunning(self):
846 # Tests canceling a job with finished opcodes and more, unprocessed ones
847 queue = _FakeQueueForProc()
849 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
854 job = self._CreateJob(queue, job_id, ops)
856 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
858 opexec = _FakeExecOpCodeForProc(queue, None, None)
861 self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
863 # Job goes back to queued
864 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
865 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
866 [[constants.OP_STATUS_SUCCESS,
867 constants.OP_STATUS_QUEUED,
868 constants.OP_STATUS_QUEUED],
869 ["Res0", None, None]])
872 (success, _) = job.Cancel()
873 self.assert_(success)
875 # Try processing another opcode (this will actually cancel the job)
876 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
879 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
880 self.assertEqual(job.GetInfo(["id"]), [job_id])
881 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
882 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
883 [[constants.OP_STATUS_SUCCESS,
884 constants.OP_STATUS_CANCELED,
885 constants.OP_STATUS_CANCELED],
886 ["Res0", "Job canceled by request",
887 "Job canceled by request"]])
889 def testPartiallyRun(self):
890 # Tests calling the processor on a job that's been partially run before the
891 # program was restarted
892 queue = _FakeQueueForProc()
894 opexec = _FakeExecOpCodeForProc(queue, None, None)
896 for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
897 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
901 job = self._CreateJob(queue, job_id, ops)
903 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
905 for _ in range(successcount):
906 self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
908 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
909 self.assertEqual(job.GetInfo(["opstatus"]),
910 [[constants.OP_STATUS_SUCCESS
911 for _ in range(successcount)] +
912 [constants.OP_STATUS_QUEUED
913 for _ in range(len(ops) - successcount)]])
915 self.assert_(job.ops_iter)
917 # Serialize and restore (simulates program restart)
918 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
919 self.assertFalse(newjob.ops_iter)
920 self._TestPartial(newjob, successcount)
922 def _TestPartial(self, job, successcount):
923 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
924 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
926 queue = _FakeQueueForProc()
927 opexec = _FakeExecOpCodeForProc(queue, None, None)
929 for remaining in reversed(range(len(job.ops) - successcount)):
930 result = jqueue._JobProcessor(queue, opexec, job)()
931 self.assertEqual(queue.GetNextUpdate(), (job, True))
932 self.assertEqual(queue.GetNextUpdate(), (job, True))
933 self.assertEqual(queue.GetNextUpdate(), (job, True))
934 self.assertRaises(IndexError, queue.GetNextUpdate)
941 self.assertFalse(result)
943 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
945 self.assertRaises(IndexError, queue.GetNextUpdate)
946 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
947 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
948 self.assertEqual(job.GetInfo(["opresult"]),
949 [[op.input.result for op in job.ops]])
950 self.assertEqual(job.GetInfo(["opstatus"]),
951 [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
952 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
955 self._GenericCheckJob(job)
957 # Calling the processor on a finished job should be a no-op
958 self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
959 self.assertRaises(IndexError, queue.GetNextUpdate)
961 # ... also after being restored
962 job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
963 self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
964 self.assertRaises(IndexError, queue.GetNextUpdate)
966 def testProcessorOnRunningJob(self):
967 ops = [opcodes.OpTestDummy(result="result", fail=False)]
969 queue = _FakeQueueForProc()
970 opexec = _FakeExecOpCodeForProc(queue, None, None)
973 job = self._CreateJob(queue, 9571, ops)
975 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
977 job.ops[0].status = constants.OP_STATUS_RUNNING
979 assert len(job.ops) == 1
981 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
983 # Calling on running job must fail
984 self.assertRaises(errors.ProgrammerError,
985 jqueue._JobProcessor(queue, opexec, job))
987 def testLogMessages(self):
988 # Tests the "Feedback" callback function
989 queue = _FakeQueueForProc()
995 (constants.ELOG_MESSAGE, "there"),
998 (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
999 (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1002 ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1003 messages=messages.get(i, []))
1007 job = self._CreateJob(queue, 29386, ops)
1009 def _BeforeStart(timeout, priority):
1010 self.assertEqual(queue.GetNextUpdate(), (job, True))
1011 self.assertRaises(IndexError, queue.GetNextUpdate)
1012 self.assertFalse(queue.IsAcquired())
1013 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1015 def _AfterStart(op, cbs):
1016 self.assertEqual(queue.GetNextUpdate(), (job, True))
1017 self.assertRaises(IndexError, queue.GetNextUpdate)
1018 self.assertFalse(queue.IsAcquired())
1019 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1021 self.assertRaises(AssertionError, cbs.Feedback,
1022 "too", "many", "arguments")
1024 for (log_type, msg) in op.messages:
1025 self.assertRaises(IndexError, queue.GetNextUpdate)
1027 cbs.Feedback(log_type, msg)
1030 # Check for job update without replication
1031 self.assertEqual(queue.GetNextUpdate(), (job, False))
1032 self.assertRaises(IndexError, queue.GetNextUpdate)
1034 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1036 for remaining in reversed(range(len(job.ops))):
1037 self.assertRaises(IndexError, queue.GetNextUpdate)
1038 result = jqueue._JobProcessor(queue, opexec, job)()
1039 self.assertEqual(queue.GetNextUpdate(), (job, True))
1040 self.assertRaises(IndexError, queue.GetNextUpdate)
1044 self.assert_(result)
1047 self.assertFalse(result)
1049 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1051 self.assertRaises(IndexError, queue.GetNextUpdate)
1053 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1054 self.assertEqual(job.GetInfo(["opresult"]),
1055 [[op.input.result for op in job.ops]])
1057 logmsgcount = sum(len(m) for m in messages.values())
1059 self._CheckLogMessages(job, logmsgcount)
1061 # Serialize and restore (simulates program restart)
1062 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
1063 self._CheckLogMessages(newjob, logmsgcount)
1065 # Check each message
1067 for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1068 for (serial, timestamp, log_type, msg) in oplog:
1069 (exptype, expmsg) = messages.get(idx).pop(0)
1071 self.assertEqual(log_type, exptype)
1073 self.assertEqual(log_type, constants.ELOG_MESSAGE)
1074 self.assertEqual(expmsg, msg)
1075 self.assert_(serial > prevserial)
1078 def _CheckLogMessages(self, job, count):
1080 self.assertEqual(job.log_serial, count)
1083 self.assertEqual(job.GetLogEntries(None),
1084 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1085 for entry in entries])
1087 # Filter with serial
1089 self.assert_(job.GetLogEntries(3))
1090 self.assertEqual(job.GetLogEntries(3),
1091 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1092 for entry in entries][3:])
1094 # No log message after highest serial
1095 self.assertFalse(job.GetLogEntries(count))
1096 self.assertFalse(job.GetLogEntries(count + 3))
1098 def testSubmitManyJobs(self):
1099 queue = _FakeQueueForProc()
1103 opcodes.OpTestDummy(result="Res0", fail=False,
1105 opcodes.OpTestDummy(result="Res1", fail=False,
1107 [opcodes.OpTestDummy(result="r1j0", fail=False)],
1109 opcodes.OpTestDummy(result="Res2", fail=False,
1111 [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1112 opcodes.OpTestDummy(result="r2j0o1", fail=False),
1113 opcodes.OpTestDummy(result="r2j0o2", fail=False),
1114 opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1115 [opcodes.OpTestDummy(result="r2j1", fail=False)],
1116 [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1117 opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1122 job = self._CreateJob(queue, job_id, ops)
1124 def _BeforeStart(timeout, priority):
1125 self.assertEqual(queue.GetNextUpdate(), (job, True))
1126 self.assertRaises(IndexError, queue.GetNextUpdate)
1127 self.assertFalse(queue.IsAcquired())
1128 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1129 self.assertFalse(job.cur_opctx)
1131 def _AfterStart(op, cbs):
1132 self.assertEqual(queue.GetNextUpdate(), (job, True))
1133 self.assertRaises(IndexError, queue.GetNextUpdate)
1135 self.assertFalse(queue.IsAcquired())
1136 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1137 self.assertFalse(job.cur_opctx)
1139 # Job is running, cancelling shouldn't be possible
1140 (success, _) = job.Cancel()
1141 self.assertFalse(success)
1143 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1145 for idx in range(len(ops)):
1146 self.assertRaises(IndexError, queue.GetNextUpdate)
1147 result = jqueue._JobProcessor(queue, opexec, job)()
1148 self.assertEqual(queue.GetNextUpdate(), (job, True))
1149 self.assertRaises(IndexError, queue.GetNextUpdate)
1150 if idx == len(ops) - 1:
1152 self.assert_(result)
1154 self.assertFalse(result)
1156 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1157 self.assert_(job.start_timestamp)
1158 self.assertFalse(job.end_timestamp)
1160 self.assertRaises(IndexError, queue.GetNextUpdate)
1162 for idx, submitted_ops in enumerate(job_ops
1164 for job_ops in op.submit_jobs):
1165 self.assertEqual(queue.GetNextSubmittedJob(),
1166 (1000 + idx, submitted_ops))
1167 self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1169 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1170 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1171 self.assertEqual(job.GetInfo(["opresult"]),
1172 [[[], [1000], [1001, 1002, 1003]]])
1173 self.assertEqual(job.GetInfo(["opstatus"]),
1174 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1176 self._GenericCheckJob(job)
1178 # Calling the processor on a finished job should be a no-op
1179 self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1180 self.assertRaises(IndexError, queue.GetNextUpdate)
1183 class _FakeTimeoutStrategy:
1184 def __init__(self, timeouts):
1185 self.timeouts = timeouts
1187 self.last_timeout = None
1189 def NextAttempt(self):
1192 timeout = self.timeouts.pop(0)
1195 self.last_timeout = timeout
1199 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1201 self.queue = _FakeQueueForProc()
1204 self.opcounter = None
1205 self.timeout_strategy = None
1207 self.prev_tsop = None
1208 self.prev_prio = None
1209 self.prev_status = None
1210 self.lock_acq_prio = None
1211 self.gave_lock = None
1212 self.done_lock_before_blocking = False
1214 def _BeforeStart(self, timeout, priority):
1217 # If status has changed, job must've been written
1218 if self.prev_status != self.job.ops[self.curop].status:
1219 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1220 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1222 self.assertFalse(self.queue.IsAcquired())
1223 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1225 ts = self.timeout_strategy
1227 self.assert_(timeout is None or isinstance(timeout, (int, float)))
1228 self.assertEqual(timeout, ts.last_timeout)
1229 self.assertEqual(priority, job.ops[self.curop].priority)
1231 self.gave_lock = True
1232 self.lock_acq_prio = priority
1234 if (self.curop == 3 and
1235 job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1236 # Give locks before running into blocking acquire
1237 assert self.retries == 7
1239 self.done_lock_before_blocking = True
1242 if self.retries > 0:
1243 self.assert_(timeout is not None)
1245 self.gave_lock = False
1246 raise mcpu.LockAcquireTimeout()
1248 if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1249 assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1250 assert not ts.timeouts
1251 self.assert_(timeout is None)
1253 def _AfterStart(self, op, cbs):
1256 # Setting to "running" requires an update
1257 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1258 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1260 self.assertFalse(self.queue.IsAcquired())
1261 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1263 # Job is running, cancelling shouldn't be possible
1264 (success, _) = job.Cancel()
1265 self.assertFalse(success)
1267 def _NextOpcode(self):
1268 self.curop = self.opcounter.next()
1269 self.prev_prio = self.job.ops[self.curop].priority
1270 self.prev_status = self.job.ops[self.curop].status
1272 def _NewTimeoutStrategy(self):
1275 self.assertEqual(self.retries, 0)
1277 if self.prev_tsop == self.curop:
1278 # Still on the same opcode, priority must've been increased
1279 self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1283 timeouts = range(10, 31, 10)
1284 self.retries = len(timeouts) - 1
1286 elif self.curop == 2:
1287 # Let this run into a blocking acquire
1288 timeouts = range(11, 61, 12)
1289 self.retries = len(timeouts)
1291 elif self.curop == 3:
1292 # Wait for priority to increase, but give lock before blocking acquire
1293 timeouts = range(12, 100, 14)
1294 self.retries = len(timeouts)
1296 self.assertFalse(self.done_lock_before_blocking)
1298 elif self.curop == 4:
1299 self.assert_(self.done_lock_before_blocking)
1301 # Timeouts, but no need to retry
1302 timeouts = range(10, 31, 10)
1305 elif self.curop == 5:
1307 timeouts = range(19, 100, 11)
1308 self.retries = len(timeouts)
1314 assert len(job.ops) == 10
1315 assert self.retries <= len(timeouts)
1317 ts = _FakeTimeoutStrategy(timeouts)
1319 self.timeout_strategy = ts
1320 self.prev_tsop = self.curop
1321 self.prev_prio = job.ops[self.curop].priority
1325 def testTimeout(self):
1326 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1331 job = self._CreateJob(self.queue, job_id, ops)
1334 self.opcounter = itertools.count(0)
1336 opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1338 tsf = self._NewTimeoutStrategy
1340 self.assertFalse(self.done_lock_before_blocking)
1343 proc = jqueue._JobProcessor(self.queue, opexec, job,
1344 _timeout_strategy_factory=tsf)
1346 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1348 if self.curop is not None:
1349 self.prev_status = self.job.ops[self.curop].status
1351 self.lock_acq_prio = None
1353 result = proc(_nextop_fn=self._NextOpcode)
1354 assert self.curop is not None
1356 if result or self.gave_lock:
1357 # Got lock and/or job is done, result must've been written
1358 self.assertFalse(job.cur_opctx)
1359 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1360 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1361 self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1362 self.assert_(job.ops[self.curop].exec_timestamp)
1365 self.assertFalse(job.cur_opctx)
1368 self.assertFalse(result)
1371 self.assertEqual(job.ops[self.curop].start_timestamp,
1372 job.start_timestamp)
1375 # Opcode finished, but job not yet done
1376 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1379 self.assert_(job.cur_opctx)
1380 self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1381 self.timeout_strategy.NextAttempt)
1382 self.assertFalse(job.ops[self.curop].exec_timestamp)
1383 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1385 # If priority has changed since acquiring locks, the job must've been
1387 if self.lock_acq_prio != job.ops[self.curop].priority:
1388 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1390 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1392 self.assert_(job.start_timestamp)
1393 self.assertFalse(job.end_timestamp)
1395 self.assertEqual(self.curop, len(job.ops) - 1)
1396 self.assertEqual(self.job, job)
1397 self.assertEqual(self.opcounter.next(), len(job.ops))
1398 self.assert_(self.done_lock_before_blocking)
1400 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1401 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1402 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1403 self.assertEqual(job.GetInfo(["opresult"]),
1404 [[op.input.result for op in job.ops]])
1405 self.assertEqual(job.GetInfo(["opstatus"]),
1406 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1407 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1410 # Calling the processor on a finished job should be a no-op
1411 self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
1412 self.assertRaises(IndexError, self.queue.GetNextUpdate)
1415 if __name__ == "__main__":
1416 testutils.GanetiTestProgram()