4 # Copyright (C) 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.jqueue"""
34 from ganeti import constants
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import jqueue
38 from ganeti import opcodes
39 from ganeti import compat
40 from ganeti import mcpu
41 from ganeti import query
42 from ganeti import workerpool
48 def __init__(self, job_id, status):
54 def SetStatus(self, status):
57 def AddLogEntry(self, msg):
58 self._log.append((len(self._log), msg))
63 def GetInfo(self, fields):
68 result.append(self._status)
70 raise Exception("Unknown field")
74 def GetLogEntries(self, newer_than):
75 assert newer_than is None or newer_than >= 0
77 if newer_than is None:
80 return self._log[newer_than:]
83 class TestJobChangesChecker(unittest.TestCase):
85 job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
86 checker = jqueue._JobChangesChecker(["status"], None, None)
87 self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
89 job.SetStatus(constants.JOB_STATUS_RUNNING)
90 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
92 job.SetStatus(constants.JOB_STATUS_SUCCESS)
93 self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
95 # job.id is used by checker
96 self.assertEqual(job.id, 9094)
98 def testStatusWithPrev(self):
99 job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
100 checker = jqueue._JobChangesChecker(["status"],
101 [constants.JOB_STATUS_QUEUED], None)
102 self.assert_(checker(job) is None)
104 job.SetStatus(constants.JOB_STATUS_RUNNING)
105 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
107 def testFinalStatus(self):
108 for status in constants.JOBS_FINALIZED:
109 job = _FakeJob(2178711, status)
110 checker = jqueue._JobChangesChecker(["status"], [status], None)
111 # There won't be any changes in this status, hence it should signal
112 # a change immediately
113 self.assertEqual(checker(job), ([status], []))
116 job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
117 checker = jqueue._JobChangesChecker(["status"], None, None)
118 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
120 job.AddLogEntry("Hello World")
121 (job_info, log_entries) = checker(job)
122 self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
123 self.assertEqual(log_entries, [[0, "Hello World"]])
125 checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
126 self.assert_(checker2(job) is None)
128 job.AddLogEntry("Foo Bar")
129 job.SetStatus(constants.JOB_STATUS_ERROR)
131 (job_info, log_entries) = checker2(job)
132 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133 self.assertEqual(log_entries, [[1, "Foo Bar"]])
135 checker3 = jqueue._JobChangesChecker(["status"], None, None)
136 (job_info, log_entries) = checker3(job)
137 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
138 self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
141 class TestJobChangesWaiter(unittest.TestCase):
143 self.tmpdir = tempfile.mkdtemp()
144 self.filename = utils.PathJoin(self.tmpdir, "job-1")
145 utils.WriteFile(self.filename, data="")
148 shutil.rmtree(self.tmpdir)
150 def _EnsureNotifierClosed(self, notifier):
152 os.fstat(notifier._fd)
153 except EnvironmentError, err:
154 self.assertEqual(err.errno, errno.EBADF)
156 self.fail("File descriptor wasn't closed")
159 for wait in [False, True]:
160 waiter = jqueue._JobFileChangesWaiter(self.filename)
167 # Ensure file descriptor was closed
168 self._EnsureNotifierClosed(waiter._notifier)
170 def testChangingFile(self):
171 waiter = jqueue._JobFileChangesWaiter(self.filename)
173 self.assertFalse(waiter.Wait(0.1))
174 utils.WriteFile(self.filename, data="changed")
175 self.assert_(waiter.Wait(60))
179 self._EnsureNotifierClosed(waiter._notifier)
181 def testChangingFile2(self):
182 waiter = jqueue._JobChangesWaiter(self.filename)
184 self.assertFalse(waiter._filewaiter)
185 self.assert_(waiter.Wait(0.1))
186 self.assert_(waiter._filewaiter)
188 # File waiter is now used, but there have been no changes
189 self.assertFalse(waiter.Wait(0.1))
190 utils.WriteFile(self.filename, data="changed")
191 self.assert_(waiter.Wait(60))
195 self._EnsureNotifierClosed(waiter._filewaiter._notifier)
198 class TestWaitForJobChangesHelper(unittest.TestCase):
200 self.tmpdir = tempfile.mkdtemp()
201 self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
202 utils.WriteFile(self.filename, data="")
205 shutil.rmtree(self.tmpdir)
207 def _LoadWaitingJob(self):
208 return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
210 def _LoadLostJob(self):
213 def testNoChanges(self):
214 wfjc = jqueue._WaitForJobChangesHelper()
217 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
218 [constants.JOB_STATUS_WAITING], None, 0.1),
219 constants.JOB_NOTCHANGED)
221 # No previous information
222 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
223 ["status"], None, None, 1.0),
224 ([constants.JOB_STATUS_WAITING], []))
226 def testLostJob(self):
227 wfjc = jqueue._WaitForJobChangesHelper()
228 self.assert_(wfjc(self.filename, self._LoadLostJob,
229 ["status"], None, None, 1.0) is None)
232 class TestEncodeOpError(unittest.TestCase):
234 encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
235 self.assert_(isinstance(encerr, tuple))
236 self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
238 encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
239 self.assert_(isinstance(encerr, tuple))
240 self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
242 encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
243 self.assert_(isinstance(encerr, tuple))
244 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
246 encerr = jqueue._EncodeOpError("Hello World")
247 self.assert_(isinstance(encerr, tuple))
248 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
251 class TestQueuedOpCode(unittest.TestCase):
252 def testDefaults(self):
254 self.assertFalse(hasattr(op.input, "dry_run"))
255 self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
256 self.assertFalse(op.log)
257 self.assert_(op.start_timestamp is None)
258 self.assert_(op.exec_timestamp is None)
259 self.assert_(op.end_timestamp is None)
260 self.assert_(op.result is None)
261 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
263 op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
265 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
267 self.assertEqual(op1.Serialize(), op2.Serialize())
269 def testPriority(self):
271 assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
272 "Default priority equals high priority; test can't work"
273 self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
274 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
276 inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
277 op1 = jqueue._QueuedOpCode(inpop)
279 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
281 self.assertEqual(op1.Serialize(), op2.Serialize())
284 class TestQueuedJob(unittest.TestCase):
285 def testNoOpCodes(self):
286 self.assertRaises(errors.GenericError, jqueue._QueuedJob,
289 def testDefaults(self):
293 opcodes.OpTestDelay(),
297 self.assertTrue(job.writable)
298 self.assertEqual(job.id, job_id)
299 self.assertEqual(job.log_serial, 0)
300 self.assert_(job.received_timestamp)
301 self.assert_(job.start_timestamp is None)
302 self.assert_(job.end_timestamp is None)
303 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
304 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
305 self.assert_(repr(job).startswith("<"))
306 self.assertEqual(len(job.ops), len(ops))
307 self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
308 for (inp, op) in zip(ops, job.ops)))
309 self.assertRaises(errors.OpPrereqError, job.GetInfo,
311 self.assertEqual(job.GetInfo(["summary"]),
312 [[op.input.Summary() for op in job.ops]])
313 self.assertFalse(job.archived)
315 job1 = jqueue._QueuedJob(None, job_id, ops, True)
317 job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
319 self.assertEqual(job1.Serialize(), job2.Serialize())
321 def testWritable(self):
322 job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
323 self.assertFalse(job.writable)
325 job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
326 self.assertTrue(job.writable)
328 def testArchived(self):
329 job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
330 self.assertFalse(job.archived)
332 newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
333 self.assertTrue(newjob.archived)
335 newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
336 self.assertFalse(newjob2.archived)
338 def testPriority(self):
341 opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
342 opcodes.OpTestDelay(),
346 self.assertEqual(job.id, job_id)
347 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
348 self.assert_(repr(job).startswith("<"))
350 job = jqueue._QueuedJob(None, job_id, ops, True)
352 self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
354 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
357 job.ops[0].priority -= 1
359 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
361 # Mark opcode as finished
362 job.ops[0].status = constants.OP_STATUS_SUCCESS
364 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
367 job.ops[1].priority -= 10
368 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
370 # Test increasing first
371 job.ops[0].status = constants.OP_STATUS_RUNNING
372 job.ops[0].priority -= 19
373 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
375 def _JobForPriority(self, job_id):
378 opcodes.OpTestDelay(),
380 opcodes.OpTestDelay(),
383 job = jqueue._QueuedJob(None, job_id, ops, True)
385 self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
387 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
388 self.assertFalse(compat.any(hasattr(op.input, "priority")
393 def testChangePriorityAllQueued(self):
394 job = self._JobForPriority(24984)
395 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
396 self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
398 result = job.ChangePriority(-10)
399 self.assertEqual(job.CalcPriority(), -10)
400 self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
401 self.assertTrue(compat.all(op.input.priority == -10 for op in job.ops))
402 self.assertEqual(result,
403 (True, ("Priorities of pending opcodes for job 24984 have"
404 " been changed to -10")))
406 def testChangePriorityAllFinished(self):
407 job = self._JobForPriority(16405)
409 for (idx, op) in enumerate(job.ops):
411 op.status = constants.OP_STATUS_ERROR
413 op.status = constants.OP_STATUS_SUCCESS
415 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
416 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
417 result = job.ChangePriority(-10)
418 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
419 self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
421 self.assertFalse(compat.any(hasattr(op.input, "priority")
423 self.assertEqual(map(operator.attrgetter("status"), job.ops), [
424 constants.OP_STATUS_SUCCESS,
425 constants.OP_STATUS_SUCCESS,
426 constants.OP_STATUS_SUCCESS,
427 constants.OP_STATUS_ERROR,
429 self.assertEqual(result, (False, "Job 16405 is finished"))
431 def testChangePriorityCancelling(self):
432 job = self._JobForPriority(31572)
434 for (idx, op) in enumerate(job.ops):
436 op.status = constants.OP_STATUS_CANCELING
438 op.status = constants.OP_STATUS_SUCCESS
440 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
441 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
442 result = job.ChangePriority(5)
443 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
444 self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
446 self.assertFalse(compat.any(hasattr(op.input, "priority")
448 self.assertEqual(map(operator.attrgetter("status"), job.ops), [
449 constants.OP_STATUS_SUCCESS,
450 constants.OP_STATUS_SUCCESS,
451 constants.OP_STATUS_CANCELING,
452 constants.OP_STATUS_CANCELING,
454 self.assertEqual(result, (False, "Job 31572 is cancelling"))
456 def testChangePriorityFirstRunning(self):
457 job = self._JobForPriority(1716215889)
459 for (idx, op) in enumerate(job.ops):
461 op.status = constants.OP_STATUS_RUNNING
463 op.status = constants.OP_STATUS_QUEUED
465 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
466 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
467 result = job.ChangePriority(7)
468 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
469 self.assertEqual(map(operator.attrgetter("priority"), job.ops),
470 [constants.OP_PRIO_DEFAULT, 7, 7, 7])
471 self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
473 self.assertEqual(map(operator.attrgetter("status"), job.ops), [
474 constants.OP_STATUS_RUNNING,
475 constants.OP_STATUS_QUEUED,
476 constants.OP_STATUS_QUEUED,
477 constants.OP_STATUS_QUEUED,
479 self.assertEqual(result,
480 (True, ("Priorities of pending opcodes for job"
481 " 1716215889 have been changed to 7")))
483 def testChangePriorityLastRunning(self):
484 job = self._JobForPriority(1308)
486 for (idx, op) in enumerate(job.ops):
487 if idx == (len(job.ops) - 1):
488 op.status = constants.OP_STATUS_RUNNING
490 op.status = constants.OP_STATUS_SUCCESS
492 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
493 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
494 result = job.ChangePriority(-3)
495 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
496 self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
498 self.assertFalse(compat.any(hasattr(op.input, "priority")
500 self.assertEqual(map(operator.attrgetter("status"), job.ops), [
501 constants.OP_STATUS_SUCCESS,
502 constants.OP_STATUS_SUCCESS,
503 constants.OP_STATUS_SUCCESS,
504 constants.OP_STATUS_RUNNING,
506 self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
508 def testChangePrioritySecondOpcodeRunning(self):
509 job = self._JobForPriority(27701)
511 self.assertEqual(len(job.ops), 4)
512 job.ops[0].status = constants.OP_STATUS_SUCCESS
513 job.ops[1].status = constants.OP_STATUS_RUNNING
514 job.ops[2].status = constants.OP_STATUS_QUEUED
515 job.ops[3].status = constants.OP_STATUS_QUEUED
517 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
518 result = job.ChangePriority(-19)
519 self.assertEqual(job.CalcPriority(), -19)
520 self.assertEqual(map(operator.attrgetter("priority"), job.ops),
521 [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
523 self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
524 [None, None, -19, -19])
525 self.assertEqual(map(operator.attrgetter("status"), job.ops), [
526 constants.OP_STATUS_SUCCESS,
527 constants.OP_STATUS_RUNNING,
528 constants.OP_STATUS_QUEUED,
529 constants.OP_STATUS_QUEUED,
531 self.assertEqual(result,
532 (True, ("Priorities of pending opcodes for job"
533 " 27701 have been changed to -19")))
535 def testChangePriorityWithInconsistentJob(self):
536 job = self._JobForPriority(30097)
538 self.assertEqual(len(job.ops), 4)
540 # This job is invalid (as it has two opcodes marked as running) and make
541 # the call fail because an unprocessed opcode precedes a running one (which
542 # should never happen in reality)
543 job.ops[0].status = constants.OP_STATUS_SUCCESS
544 job.ops[1].status = constants.OP_STATUS_RUNNING
545 job.ops[2].status = constants.OP_STATUS_QUEUED
546 job.ops[3].status = constants.OP_STATUS_RUNNING
548 self.assertRaises(AssertionError, job.ChangePriority, 19)
550 def testCalcStatus(self):
552 # The default status is "queued"
553 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
557 ops[0].status = constants.OP_STATUS_WAITING
560 ops[0].status = constants.OP_STATUS_SUCCESS
561 ops[1].status = constants.OP_STATUS_SUCCESS
562 ops[2].status = constants.OP_STATUS_WAITING
565 ops[0].status = constants.OP_STATUS_SUCCESS
566 ops[1].status = constants.OP_STATUS_RUNNING
568 op.status = constants.OP_STATUS_QUEUED
570 def _Canceling1(ops):
571 ops[0].status = constants.OP_STATUS_SUCCESS
572 ops[1].status = constants.OP_STATUS_SUCCESS
574 op.status = constants.OP_STATUS_CANCELING
576 def _Canceling2(ops):
578 op.status = constants.OP_STATUS_CANCELING
582 op.status = constants.OP_STATUS_CANCELED
585 for idx, op in enumerate(ops):
587 op.status = constants.OP_STATUS_ERROR
589 op.status = constants.OP_STATUS_SUCCESS
593 op.status = constants.OP_STATUS_ERROR
597 op.status = constants.OP_STATUS_SUCCESS
600 constants.JOB_STATUS_QUEUED: [_Queued],
601 constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
602 constants.JOB_STATUS_RUNNING: [_Running],
603 constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
604 constants.JOB_STATUS_CANCELED: [_Canceled],
605 constants.JOB_STATUS_ERROR: [_Error1, _Error2],
606 constants.JOB_STATUS_SUCCESS: [_Success],
610 job = jqueue._QueuedJob(None, 1,
611 [opcodes.OpTestDelay() for _ in range(10)],
613 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
614 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
618 for status in constants.JOB_STATUS_ALL:
619 sttests = tests[status]
624 self.assertEqual(job.CalcStatus(), status)
627 class _FakeDependencyManager:
630 self._notifications = []
631 self._waiting = set()
633 def AddCheckResult(self, job, dep_job_id, dep_status, result):
634 self._checks.append((job, dep_job_id, dep_status, result))
636 def CountPendingResults(self):
637 return len(self._checks)
639 def CountWaitingJobs(self):
640 return len(self._waiting)
642 def GetNextNotification(self):
643 return self._notifications.pop(0)
645 def JobWaiting(self, job):
646 return job in self._waiting
648 def CheckAndRegister(self, job, dep_job_id, dep_status):
649 (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
651 assert exp_job == job
652 assert exp_dep_job_id == dep_job_id
653 assert exp_dep_status == dep_status
655 (result_status, _) = result
657 if result_status == jqueue._JobDependencyManager.WAIT:
658 self._waiting.add(job)
659 elif result_status == jqueue._JobDependencyManager.CONTINUE:
660 self._waiting.remove(job)
664 def NotifyWaiters(self, job_id):
665 self._notifications.append(job_id)
668 class _DisabledFakeDependencyManager:
669 def JobWaiting(self, _):
672 def CheckAndRegister(self, *args):
673 assert False, "Should not be called"
675 def NotifyWaiters(self, _):
679 class _FakeQueueForProc:
680 def __init__(self, depmgr=None):
681 self._acquired = False
684 self._accepting_jobs = True
686 self._submit_count = itertools.count(1000)
691 self.depmgr = _DisabledFakeDependencyManager()
693 def IsAcquired(self):
694 return self._acquired
696 def GetNextUpdate(self):
697 return self._updates.pop(0)
699 def GetNextSubmittedJob(self):
700 return self._submitted.pop(0)
702 def acquire(self, shared=0):
704 self._acquired = True
707 assert self._acquired
708 self._acquired = False
710 def UpdateJobUnlocked(self, job, replicate=True):
711 assert self._acquired, "Lock not acquired while updating job"
712 self._updates.append((job, bool(replicate)))
714 def SubmitManyJobs(self, jobs):
715 assert not self._acquired, "Lock acquired while submitting jobs"
716 job_ids = [self._submit_count.next() for _ in jobs]
717 self._submitted.extend(zip(job_ids, jobs))
720 def StopAcceptingJobs(self):
721 self._accepting_jobs = False
723 def AcceptingJobsUnlocked(self):
724 return self._accepting_jobs
727 class _FakeExecOpCodeForProc:
728 def __init__(self, queue, before_start, after_start):
730 self._before_start = before_start
731 self._after_start = after_start
733 def __call__(self, op, cbs, timeout=None):
734 assert isinstance(op, opcodes.OpTestDummy)
735 assert not self._queue.IsAcquired(), \
736 "Queue lock not released when executing opcode"
738 if self._before_start:
739 self._before_start(timeout, cbs.CurrentPriority())
743 if self._after_start:
744 self._after_start(op, cbs)
746 # Check again after the callbacks
747 assert not self._queue.IsAcquired()
750 raise errors.OpExecError("Error requested (%s)" % op.result)
752 if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
753 return cbs.SubmitManyJobs(op.submit_jobs)
758 class _JobProcessorTestUtils:
759 def _CreateJob(self, queue, job_id, ops):
760 job = jqueue._QueuedJob(queue, job_id, ops, True)
761 self.assertFalse(job.start_timestamp)
762 self.assertFalse(job.end_timestamp)
763 self.assertEqual(len(ops), len(job.ops))
764 self.assert_(compat.all(op.input == inp
765 for (op, inp) in zip(job.ops, ops)))
766 self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
770 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
771 def _GenericCheckJob(self, job):
772 assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
775 self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
776 [[op.start_timestamp for op in job.ops],
777 [op.exec_timestamp for op in job.ops],
778 [op.end_timestamp for op in job.ops]])
779 self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
780 [job.received_timestamp,
783 self.assert_(job.start_timestamp)
784 self.assert_(job.end_timestamp)
785 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
787 def testSuccess(self):
788 queue = _FakeQueueForProc()
790 for (job_id, opcount) in [(25351, 1), (6637, 3),
791 (24644, 10), (32207, 100)]:
792 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
793 for i in range(opcount)]
796 job = self._CreateJob(queue, job_id, ops)
798 def _BeforeStart(timeout, priority):
799 self.assertEqual(queue.GetNextUpdate(), (job, True))
800 self.assertRaises(IndexError, queue.GetNextUpdate)
801 self.assertFalse(queue.IsAcquired())
802 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
803 self.assertFalse(job.cur_opctx)
805 def _AfterStart(op, cbs):
806 self.assertEqual(queue.GetNextUpdate(), (job, True))
807 self.assertRaises(IndexError, queue.GetNextUpdate)
809 self.assertFalse(queue.IsAcquired())
810 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
811 self.assertFalse(job.cur_opctx)
813 # Job is running, cancelling shouldn't be possible
814 (success, _) = job.Cancel()
815 self.assertFalse(success)
817 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
819 for idx in range(len(ops)):
820 self.assertRaises(IndexError, queue.GetNextUpdate)
821 result = jqueue._JobProcessor(queue, opexec, job)()
822 self.assertEqual(queue.GetNextUpdate(), (job, True))
823 self.assertRaises(IndexError, queue.GetNextUpdate)
824 if idx == len(ops) - 1:
826 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
828 self.assertEqual(result, jqueue._JobProcessor.DEFER)
830 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
831 self.assert_(job.start_timestamp)
832 self.assertFalse(job.end_timestamp)
834 self.assertRaises(IndexError, queue.GetNextUpdate)
836 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
837 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
838 self.assertEqual(job.GetInfo(["opresult"]),
839 [[op.input.result for op in job.ops]])
840 self.assertEqual(job.GetInfo(["opstatus"]),
841 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
842 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
845 self._GenericCheckJob(job)
847 # Calling the processor on a finished job should be a no-op
848 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
849 jqueue._JobProcessor.FINISHED)
850 self.assertRaises(IndexError, queue.GetNextUpdate)
852 def testOpcodeError(self):
853 queue = _FakeQueueForProc()
860 (23816, 100, 39, 45),
863 for (job_id, opcount, failfrom, failto) in testdata:
865 ops = [opcodes.OpTestDummy(result="Res%s" % i,
866 fail=(failfrom <= i and
868 for i in range(opcount)]
871 job = self._CreateJob(queue, str(job_id), ops)
873 opexec = _FakeExecOpCodeForProc(queue, None, None)
875 for idx in range(len(ops)):
876 self.assertRaises(IndexError, queue.GetNextUpdate)
877 result = jqueue._JobProcessor(queue, opexec, job)()
879 self.assertEqual(queue.GetNextUpdate(), (job, True))
880 # waitlock to running
881 self.assertEqual(queue.GetNextUpdate(), (job, True))
883 self.assertEqual(queue.GetNextUpdate(), (job, True))
884 self.assertRaises(IndexError, queue.GetNextUpdate)
886 if idx in (failfrom, len(ops) - 1):
888 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
891 self.assertEqual(result, jqueue._JobProcessor.DEFER)
893 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
895 self.assertRaises(IndexError, queue.GetNextUpdate)
898 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
899 self.assertEqual(job.GetInfo(["id"]), [job_id])
900 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
902 # Check opcode status
904 job.GetInfo(["opstatus"])[0],
905 job.GetInfo(["opresult"])[0])
907 for idx, (op, opstatus, opresult) in enumerate(data):
909 assert not op.input.fail
910 self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
911 self.assertEqual(opresult, op.input.result)
914 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
915 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
917 assert not op.input.fail
918 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
919 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
921 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
922 for op in job.ops[:failfrom]))
924 self._GenericCheckJob(job)
926 # Calling the processor on a finished job should be a no-op
927 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
928 jqueue._JobProcessor.FINISHED)
929 self.assertRaises(IndexError, queue.GetNextUpdate)
931 def testCancelWhileInQueue(self):
932 queue = _FakeQueueForProc()
934 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
939 job = self._CreateJob(queue, job_id, ops)
941 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
944 (success, _) = job.Cancel()
945 self.assert_(success)
947 self.assertRaises(IndexError, queue.GetNextUpdate)
949 self.assertFalse(job.start_timestamp)
950 self.assertTrue(job.end_timestamp)
951 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
954 # Serialize to check for differences
955 before_proc = job.Serialize()
957 # Simulate processor called in workerpool
958 opexec = _FakeExecOpCodeForProc(queue, None, None)
959 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
960 jqueue._JobProcessor.FINISHED)
963 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
964 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
965 self.assertFalse(job.start_timestamp)
966 self.assertTrue(job.end_timestamp)
967 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
969 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
970 [[constants.OP_STATUS_CANCELED for _ in job.ops],
971 ["Job canceled by request" for _ in job.ops]])
973 # Must not have changed or written
974 self.assertEqual(before_proc, job.Serialize())
975 self.assertRaises(IndexError, queue.GetNextUpdate)
977 def testCancelWhileWaitlockInQueue(self):
978 queue = _FakeQueueForProc()
980 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
985 job = self._CreateJob(queue, job_id, ops)
987 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
989 job.ops[0].status = constants.OP_STATUS_WAITING
991 assert len(job.ops) == 5
993 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
996 (success, _) = job.Cancel()
997 self.assert_(success)
999 self.assertRaises(IndexError, queue.GetNextUpdate)
1001 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1004 opexec = _FakeExecOpCodeForProc(queue, None, None)
1005 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1006 jqueue._JobProcessor.FINISHED)
1009 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1010 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1011 self.assertFalse(job.start_timestamp)
1012 self.assert_(job.end_timestamp)
1013 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1015 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1016 [[constants.OP_STATUS_CANCELED for _ in job.ops],
1017 ["Job canceled by request" for _ in job.ops]])
1019 def testCancelWhileWaitlock(self):
1020 queue = _FakeQueueForProc()
1022 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1027 job = self._CreateJob(queue, job_id, ops)
1029 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1031 def _BeforeStart(timeout, priority):
1032 self.assertEqual(queue.GetNextUpdate(), (job, True))
1033 self.assertRaises(IndexError, queue.GetNextUpdate)
1034 self.assertFalse(queue.IsAcquired())
1035 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1038 (success, _) = job.Cancel()
1039 self.assert_(success)
1041 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1043 self.assertRaises(IndexError, queue.GetNextUpdate)
1045 def _AfterStart(op, cbs):
1046 self.assertEqual(queue.GetNextUpdate(), (job, True))
1047 self.assertRaises(IndexError, queue.GetNextUpdate)
1048 self.assertFalse(queue.IsAcquired())
1049 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1051 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1053 self.assertRaises(IndexError, queue.GetNextUpdate)
1054 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1055 jqueue._JobProcessor.FINISHED)
1056 self.assertEqual(queue.GetNextUpdate(), (job, True))
1057 self.assertRaises(IndexError, queue.GetNextUpdate)
1060 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1061 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1062 self.assert_(job.start_timestamp)
1063 self.assert_(job.end_timestamp)
1064 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1066 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1067 [[constants.OP_STATUS_CANCELED for _ in job.ops],
1068 ["Job canceled by request" for _ in job.ops]])
1070 def _TestCancelWhileSomething(self, cb):
1071 queue = _FakeQueueForProc()
1073 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1078 job = self._CreateJob(queue, job_id, ops)
1080 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1082 def _BeforeStart(timeout, priority):
1083 self.assertFalse(queue.IsAcquired())
1084 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1087 (success, _) = job.Cancel()
1088 self.assert_(success)
1090 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1095 def _AfterStart(op, cbs):
1096 self.fail("Should not reach this")
1098 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1100 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1101 jqueue._JobProcessor.FINISHED)
1104 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1105 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1106 self.assert_(job.start_timestamp)
1107 self.assert_(job.end_timestamp)
1108 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1110 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1111 [[constants.OP_STATUS_CANCELED for _ in job.ops],
1112 ["Job canceled by request" for _ in job.ops]])
1116 def testCancelWhileWaitlockWithTimeout(self):
1118 # Fake an acquire attempt timing out
1119 raise mcpu.LockAcquireTimeout()
1121 self._TestCancelWhileSomething(fn)
1123 def testCancelDuringQueueShutdown(self):
1124 queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
1125 self.assertFalse(queue.AcceptingJobsUnlocked())
1127 def testCancelWhileRunning(self):
1128 # Tests canceling a job with finished opcodes and more, unprocessed ones
1129 queue = _FakeQueueForProc()
1131 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1136 job = self._CreateJob(queue, job_id, ops)
1138 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1140 opexec = _FakeExecOpCodeForProc(queue, None, None)
1143 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1144 jqueue._JobProcessor.DEFER)
1146 # Job goes back to queued
1147 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1148 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1149 [[constants.OP_STATUS_SUCCESS,
1150 constants.OP_STATUS_QUEUED,
1151 constants.OP_STATUS_QUEUED],
1152 ["Res0", None, None]])
1155 (success, _) = job.Cancel()
1156 self.assert_(success)
1158 # Try processing another opcode (this will actually cancel the job)
1159 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1160 jqueue._JobProcessor.FINISHED)
1163 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1164 self.assertEqual(job.GetInfo(["id"]), [job_id])
1165 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1166 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1167 [[constants.OP_STATUS_SUCCESS,
1168 constants.OP_STATUS_CANCELED,
1169 constants.OP_STATUS_CANCELED],
1170 ["Res0", "Job canceled by request",
1171 "Job canceled by request"]])
1173 def _TestQueueShutdown(self, queue, opexec, job, runcount):
1174 self.assertTrue(queue.AcceptingJobsUnlocked())
1177 queue.StopAcceptingJobs()
1179 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1180 jqueue._JobProcessor.DEFER)
1183 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1184 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
1185 self.assertFalse(job.cur_opctx)
1186 self.assertTrue(job.start_timestamp)
1187 self.assertFalse(job.end_timestamp)
1188 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1189 self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1190 for op in job.ops[:runcount]))
1191 self.assertFalse(job.ops[runcount].end_timestamp)
1192 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1193 for op in job.ops[(runcount + 1):]))
1194 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1195 [(([constants.OP_STATUS_SUCCESS] * runcount) +
1196 ([constants.OP_STATUS_QUEUED] *
1197 (len(job.ops) - runcount))),
1198 (["Res%s" % i for i in range(runcount)] +
1199 ([None] * (len(job.ops) - runcount)))])
1201 # Must have been written and replicated
1202 self.assertEqual(queue.GetNextUpdate(), (job, True))
1203 self.assertRaises(IndexError, queue.GetNextUpdate)
1205 def testQueueShutdownWhileRunning(self):
1206 # Tests shutting down the queue while a job is running
1207 queue = _FakeQueueForProc()
1209 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1214 job = self._CreateJob(queue, job_id, ops)
1216 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1218 opexec = _FakeExecOpCodeForProc(queue, None, None)
1220 self.assertRaises(IndexError, queue.GetNextUpdate)
1223 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1224 jqueue._JobProcessor.DEFER)
1226 # Job goes back to queued
1227 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1228 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1229 [[constants.OP_STATUS_SUCCESS,
1230 constants.OP_STATUS_QUEUED,
1231 constants.OP_STATUS_QUEUED],
1232 ["Res0", None, None]])
1233 self.assertFalse(job.cur_opctx)
1235 # Writes for waiting, running and result
1237 self.assertEqual(queue.GetNextUpdate(), (job, True))
1240 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1241 jqueue._JobProcessor.DEFER)
1243 # Job goes back to queued
1244 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1245 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1246 [[constants.OP_STATUS_SUCCESS,
1247 constants.OP_STATUS_SUCCESS,
1248 constants.OP_STATUS_QUEUED],
1249 ["Res0", "Res1", None]])
1250 self.assertFalse(job.cur_opctx)
1252 # Writes for waiting, running and result
1254 self.assertEqual(queue.GetNextUpdate(), (job, True))
1256 self._TestQueueShutdown(queue, opexec, job, 2)
1258 def testQueueShutdownWithLockTimeout(self):
1259 # Tests shutting down while a lock acquire times out
1260 queue = _FakeQueueForProc()
1262 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1267 job = self._CreateJob(queue, job_id, ops)
1269 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1271 acquire_timeout = False
1273 def _BeforeStart(timeout, priority):
1274 self.assertFalse(queue.IsAcquired())
1275 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1277 raise mcpu.LockAcquireTimeout()
1279 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
1281 self.assertRaises(IndexError, queue.GetNextUpdate)
1284 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1285 jqueue._JobProcessor.DEFER)
1287 # Job goes back to queued
1288 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1289 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1290 [[constants.OP_STATUS_SUCCESS,
1291 constants.OP_STATUS_QUEUED,
1292 constants.OP_STATUS_QUEUED],
1293 ["Res0", None, None]])
1294 self.assertFalse(job.cur_opctx)
1296 # Writes for waiting, running and result
1298 self.assertEqual(queue.GetNextUpdate(), (job, True))
1300 # The next opcode should have expiring lock acquires
1301 acquire_timeout = True
1303 self._TestQueueShutdown(queue, opexec, job, 1)
1305 def testQueueShutdownWhileInQueue(self):
1306 # This should never happen in reality (no new jobs are started by the
1307 # workerpool once a shutdown has been initiated), but it's better to test
1308 # the job processor for this scenario
1309 queue = _FakeQueueForProc()
1311 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1316 job = self._CreateJob(queue, job_id, ops)
1318 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1319 self.assertRaises(IndexError, queue.GetNextUpdate)
1321 self.assertFalse(job.start_timestamp)
1322 self.assertFalse(job.end_timestamp)
1323 self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
1326 opexec = _FakeExecOpCodeForProc(queue, None, None)
1327 self._TestQueueShutdown(queue, opexec, job, 0)
1329 def testQueueShutdownWhileWaitlockInQueue(self):
1330 queue = _FakeQueueForProc()
1332 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1337 job = self._CreateJob(queue, job_id, ops)
1339 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1341 job.ops[0].status = constants.OP_STATUS_WAITING
1343 assert len(job.ops) == 5
1345 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1347 self.assertRaises(IndexError, queue.GetNextUpdate)
1349 opexec = _FakeExecOpCodeForProc(queue, None, None)
1350 self._TestQueueShutdown(queue, opexec, job, 0)
1352 def testPartiallyRun(self):
1353 # Tests calling the processor on a job that's been partially run before the
1354 # program was restarted
1355 queue = _FakeQueueForProc()
1357 opexec = _FakeExecOpCodeForProc(queue, None, None)
1359 for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
1360 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1364 job = self._CreateJob(queue, job_id, ops)
1366 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1368 for _ in range(successcount):
1369 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1370 jqueue._JobProcessor.DEFER)
1372 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1373 self.assertEqual(job.GetInfo(["opstatus"]),
1374 [[constants.OP_STATUS_SUCCESS
1375 for _ in range(successcount)] +
1376 [constants.OP_STATUS_QUEUED
1377 for _ in range(len(ops) - successcount)]])
1379 self.assert_(job.ops_iter)
1381 # Serialize and restore (simulates program restart)
1382 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1383 self.assertFalse(newjob.ops_iter)
1384 self._TestPartial(newjob, successcount)
1386 def _TestPartial(self, job, successcount):
1387 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1388 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1390 queue = _FakeQueueForProc()
1391 opexec = _FakeExecOpCodeForProc(queue, None, None)
1393 for remaining in reversed(range(len(job.ops) - successcount)):
1394 result = jqueue._JobProcessor(queue, opexec, job)()
1395 self.assertEqual(queue.GetNextUpdate(), (job, True))
1396 self.assertEqual(queue.GetNextUpdate(), (job, True))
1397 self.assertEqual(queue.GetNextUpdate(), (job, True))
1398 self.assertRaises(IndexError, queue.GetNextUpdate)
1402 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1405 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1407 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1409 self.assertRaises(IndexError, queue.GetNextUpdate)
1410 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1411 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1412 self.assertEqual(job.GetInfo(["opresult"]),
1413 [[op.input.result for op in job.ops]])
1414 self.assertEqual(job.GetInfo(["opstatus"]),
1415 [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1416 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1419 self._GenericCheckJob(job)
1421 # Calling the processor on a finished job should be a no-op
1422 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1423 jqueue._JobProcessor.FINISHED)
1424 self.assertRaises(IndexError, queue.GetNextUpdate)
1426 # ... also after being restored
1427 job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1428 # Calling the processor on a finished job should be a no-op
1429 self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1430 jqueue._JobProcessor.FINISHED)
1431 self.assertRaises(IndexError, queue.GetNextUpdate)
1433 def testProcessorOnRunningJob(self):
1434 ops = [opcodes.OpTestDummy(result="result", fail=False)]
1436 queue = _FakeQueueForProc()
1437 opexec = _FakeExecOpCodeForProc(queue, None, None)
1440 job = self._CreateJob(queue, 9571, ops)
1442 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1444 job.ops[0].status = constants.OP_STATUS_RUNNING
1446 assert len(job.ops) == 1
1448 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1450 # Calling on running job must fail
1451 self.assertRaises(errors.ProgrammerError,
1452 jqueue._JobProcessor(queue, opexec, job))
1454 def testLogMessages(self):
1455 # Tests the "Feedback" callback function
1456 queue = _FakeQueueForProc()
1462 (constants.ELOG_MESSAGE, "there"),
1465 (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1466 (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1469 ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1470 messages=messages.get(i, []))
1474 job = self._CreateJob(queue, 29386, ops)
1476 def _BeforeStart(timeout, priority):
1477 self.assertEqual(queue.GetNextUpdate(), (job, True))
1478 self.assertRaises(IndexError, queue.GetNextUpdate)
1479 self.assertFalse(queue.IsAcquired())
1480 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1482 def _AfterStart(op, cbs):
1483 self.assertEqual(queue.GetNextUpdate(), (job, True))
1484 self.assertRaises(IndexError, queue.GetNextUpdate)
1485 self.assertFalse(queue.IsAcquired())
1486 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1488 self.assertRaises(AssertionError, cbs.Feedback,
1489 "too", "many", "arguments")
1491 for (log_type, msg) in op.messages:
1492 self.assertRaises(IndexError, queue.GetNextUpdate)
1494 cbs.Feedback(log_type, msg)
1497 # Check for job update without replication
1498 self.assertEqual(queue.GetNextUpdate(), (job, False))
1499 self.assertRaises(IndexError, queue.GetNextUpdate)
1501 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1503 for remaining in reversed(range(len(job.ops))):
1504 self.assertRaises(IndexError, queue.GetNextUpdate)
1505 result = jqueue._JobProcessor(queue, opexec, job)()
1506 self.assertEqual(queue.GetNextUpdate(), (job, True))
1507 self.assertRaises(IndexError, queue.GetNextUpdate)
1511 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1514 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1516 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1518 self.assertRaises(IndexError, queue.GetNextUpdate)
1520 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1521 self.assertEqual(job.GetInfo(["opresult"]),
1522 [[op.input.result for op in job.ops]])
1524 logmsgcount = sum(len(m) for m in messages.values())
1526 self._CheckLogMessages(job, logmsgcount)
1528 # Serialize and restore (simulates program restart)
1529 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1530 self._CheckLogMessages(newjob, logmsgcount)
1532 # Check each message
1534 for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1535 for (serial, timestamp, log_type, msg) in oplog:
1536 (exptype, expmsg) = messages.get(idx).pop(0)
1538 self.assertEqual(log_type, exptype)
1540 self.assertEqual(log_type, constants.ELOG_MESSAGE)
1541 self.assertEqual(expmsg, msg)
1542 self.assert_(serial > prevserial)
1545 def _CheckLogMessages(self, job, count):
1547 self.assertEqual(job.log_serial, count)
1550 self.assertEqual(job.GetLogEntries(None),
1551 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1552 for entry in entries])
1554 # Filter with serial
1556 self.assert_(job.GetLogEntries(3))
1557 self.assertEqual(job.GetLogEntries(3),
1558 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1559 for entry in entries][3:])
1561 # No log message after highest serial
1562 self.assertFalse(job.GetLogEntries(count))
1563 self.assertFalse(job.GetLogEntries(count + 3))
1565 def testSubmitManyJobs(self):
1566 queue = _FakeQueueForProc()
1570 opcodes.OpTestDummy(result="Res0", fail=False,
1572 opcodes.OpTestDummy(result="Res1", fail=False,
1574 [opcodes.OpTestDummy(result="r1j0", fail=False)],
1576 opcodes.OpTestDummy(result="Res2", fail=False,
1578 [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1579 opcodes.OpTestDummy(result="r2j0o1", fail=False),
1580 opcodes.OpTestDummy(result="r2j0o2", fail=False),
1581 opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1582 [opcodes.OpTestDummy(result="r2j1", fail=False)],
1583 [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1584 opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1589 job = self._CreateJob(queue, job_id, ops)
1591 def _BeforeStart(timeout, priority):
1592 self.assertEqual(queue.GetNextUpdate(), (job, True))
1593 self.assertRaises(IndexError, queue.GetNextUpdate)
1594 self.assertFalse(queue.IsAcquired())
1595 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1596 self.assertFalse(job.cur_opctx)
1598 def _AfterStart(op, cbs):
1599 self.assertEqual(queue.GetNextUpdate(), (job, True))
1600 self.assertRaises(IndexError, queue.GetNextUpdate)
1602 self.assertFalse(queue.IsAcquired())
1603 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1604 self.assertFalse(job.cur_opctx)
1606 # Job is running, cancelling shouldn't be possible
1607 (success, _) = job.Cancel()
1608 self.assertFalse(success)
1610 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1612 for idx in range(len(ops)):
1613 self.assertRaises(IndexError, queue.GetNextUpdate)
1614 result = jqueue._JobProcessor(queue, opexec, job)()
1615 self.assertEqual(queue.GetNextUpdate(), (job, True))
1616 self.assertRaises(IndexError, queue.GetNextUpdate)
1617 if idx == len(ops) - 1:
1619 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1621 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1623 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1624 self.assert_(job.start_timestamp)
1625 self.assertFalse(job.end_timestamp)
1627 self.assertRaises(IndexError, queue.GetNextUpdate)
1629 for idx, submitted_ops in enumerate(job_ops
1631 for job_ops in op.submit_jobs):
1632 self.assertEqual(queue.GetNextSubmittedJob(),
1633 (1000 + idx, submitted_ops))
1634 self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1636 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1637 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1638 self.assertEqual(job.GetInfo(["opresult"]),
1639 [[[], [1000], [1001, 1002, 1003]]])
1640 self.assertEqual(job.GetInfo(["opstatus"]),
1641 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1643 self._GenericCheckJob(job)
1645 # Calling the processor on a finished job should be a no-op
1646 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1647 jqueue._JobProcessor.FINISHED)
1648 self.assertRaises(IndexError, queue.GetNextUpdate)
1650 def testJobDependency(self):
1651 depmgr = _FakeDependencyManager()
1652 queue = _FakeQueueForProc(depmgr=depmgr)
1654 self.assertEqual(queue.depmgr, depmgr)
1657 prev_job_id2 = 28102
1660 opcodes.OpTestDummy(result="Res0", fail=False,
1662 [prev_job_id2, None],
1663 [prev_job_id, None],
1665 opcodes.OpTestDummy(result="Res1", fail=False),
1669 job = self._CreateJob(queue, job_id, ops)
1671 def _BeforeStart(timeout, priority):
1672 if attempt == 0 or attempt > 5:
1673 # Job should only be updated when it wasn't waiting for another job
1674 self.assertEqual(queue.GetNextUpdate(), (job, True))
1675 self.assertRaises(IndexError, queue.GetNextUpdate)
1676 self.assertFalse(queue.IsAcquired())
1677 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1678 self.assertFalse(job.cur_opctx)
1680 def _AfterStart(op, cbs):
1681 self.assertEqual(queue.GetNextUpdate(), (job, True))
1682 self.assertRaises(IndexError, queue.GetNextUpdate)
1684 self.assertFalse(queue.IsAcquired())
1685 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1686 self.assertFalse(job.cur_opctx)
1688 # Job is running, cancelling shouldn't be possible
1689 (success, _) = job.Cancel()
1690 self.assertFalse(success)
1692 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1694 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1696 counter = itertools.count()
1698 attempt = counter.next()
1700 self.assertRaises(IndexError, queue.GetNextUpdate)
1701 self.assertRaises(IndexError, depmgr.GetNextNotification)
1704 depmgr.AddCheckResult(job, prev_job_id2, None,
1705 (jqueue._JobDependencyManager.WAIT, "wait2"))
1707 depmgr.AddCheckResult(job, prev_job_id2, None,
1708 (jqueue._JobDependencyManager.CONTINUE, "cont"))
1709 # The processor will ask for the next dependency immediately
1710 depmgr.AddCheckResult(job, prev_job_id, None,
1711 (jqueue._JobDependencyManager.WAIT, "wait"))
1713 depmgr.AddCheckResult(job, prev_job_id, None,
1714 (jqueue._JobDependencyManager.WAIT, "wait"))
1716 depmgr.AddCheckResult(job, prev_job_id, None,
1717 (jqueue._JobDependencyManager.CONTINUE, "cont"))
1719 self.assertEqual(depmgr.CountPendingResults(), 2)
1721 self.assertEqual(depmgr.CountPendingResults(), 0)
1723 self.assertEqual(depmgr.CountPendingResults(), 1)
1725 result = jqueue._JobProcessor(queue, opexec, job)()
1726 if attempt == 0 or attempt >= 5:
1727 # Job should only be updated if there was an actual change
1728 self.assertEqual(queue.GetNextUpdate(), (job, True))
1729 self.assertRaises(IndexError, queue.GetNextUpdate)
1730 self.assertFalse(depmgr.CountPendingResults())
1733 # Simulate waiting for other job
1734 self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1735 self.assertTrue(job.cur_opctx)
1736 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1737 self.assertRaises(IndexError, depmgr.GetNextNotification)
1738 self.assert_(job.start_timestamp)
1739 self.assertFalse(job.end_timestamp)
1742 if result == jqueue._JobProcessor.FINISHED:
1744 self.assertFalse(job.cur_opctx)
1747 self.assertRaises(IndexError, depmgr.GetNextNotification)
1749 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1750 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1751 self.assert_(job.start_timestamp)
1752 self.assertFalse(job.end_timestamp)
1754 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1755 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1756 self.assertEqual(job.GetInfo(["opresult"]),
1757 [[op.input.result for op in job.ops]])
1758 self.assertEqual(job.GetInfo(["opstatus"]),
1759 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1760 self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1763 self._GenericCheckJob(job)
1765 self.assertRaises(IndexError, queue.GetNextUpdate)
1766 self.assertRaises(IndexError, depmgr.GetNextNotification)
1767 self.assertFalse(depmgr.CountPendingResults())
1768 self.assertFalse(depmgr.CountWaitingJobs())
1770 # Calling the processor on a finished job should be a no-op
1771 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1772 jqueue._JobProcessor.FINISHED)
1773 self.assertRaises(IndexError, queue.GetNextUpdate)
1775 def testJobDependencyCancel(self):
1776 depmgr = _FakeDependencyManager()
1777 queue = _FakeQueueForProc(depmgr=depmgr)
1779 self.assertEqual(queue.depmgr, depmgr)
1784 opcodes.OpTestDummy(result="Res0", fail=False),
1785 opcodes.OpTestDummy(result="Res1", fail=False,
1787 [prev_job_id, None],
1789 opcodes.OpTestDummy(result="Res2", fail=False),
1793 job = self._CreateJob(queue, job_id, ops)
1795 def _BeforeStart(timeout, priority):
1796 if attempt == 0 or attempt > 5:
1797 # Job should only be updated when it wasn't waiting for another job
1798 self.assertEqual(queue.GetNextUpdate(), (job, True))
1799 self.assertRaises(IndexError, queue.GetNextUpdate)
1800 self.assertFalse(queue.IsAcquired())
1801 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1802 self.assertFalse(job.cur_opctx)
1804 def _AfterStart(op, cbs):
1805 self.assertEqual(queue.GetNextUpdate(), (job, True))
1806 self.assertRaises(IndexError, queue.GetNextUpdate)
1808 self.assertFalse(queue.IsAcquired())
1809 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1810 self.assertFalse(job.cur_opctx)
1812 # Job is running, cancelling shouldn't be possible
1813 (success, _) = job.Cancel()
1814 self.assertFalse(success)
1816 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1818 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1820 counter = itertools.count()
1822 attempt = counter.next()
1824 self.assertRaises(IndexError, queue.GetNextUpdate)
1825 self.assertRaises(IndexError, depmgr.GetNextNotification)
1828 # This will handle the first opcode
1831 depmgr.AddCheckResult(job, prev_job_id, None,
1832 (jqueue._JobDependencyManager.WAIT, "wait"))
1834 # Other job was cancelled
1835 depmgr.AddCheckResult(job, prev_job_id, None,
1836 (jqueue._JobDependencyManager.CANCEL, "cancel"))
1839 self.assertEqual(depmgr.CountPendingResults(), 0)
1841 self.assertEqual(depmgr.CountPendingResults(), 1)
1843 result = jqueue._JobProcessor(queue, opexec, job)()
1844 if attempt <= 1 or attempt >= 4:
1845 # Job should only be updated if there was an actual change
1846 self.assertEqual(queue.GetNextUpdate(), (job, True))
1847 self.assertRaises(IndexError, queue.GetNextUpdate)
1848 self.assertFalse(depmgr.CountPendingResults())
1850 if attempt > 0 and attempt < 4:
1851 # Simulate waiting for other job
1852 self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1853 self.assertTrue(job.cur_opctx)
1854 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1855 self.assertRaises(IndexError, depmgr.GetNextNotification)
1856 self.assert_(job.start_timestamp)
1857 self.assertFalse(job.end_timestamp)
1860 if result == jqueue._JobProcessor.FINISHED:
1862 self.assertFalse(job.cur_opctx)
1865 self.assertRaises(IndexError, depmgr.GetNextNotification)
1867 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1868 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1869 self.assert_(job.start_timestamp)
1870 self.assertFalse(job.end_timestamp)
1872 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1873 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1874 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1875 [[constants.OP_STATUS_SUCCESS,
1876 constants.OP_STATUS_CANCELED,
1877 constants.OP_STATUS_CANCELED],
1878 ["Res0", "Job canceled by request",
1879 "Job canceled by request"]])
1881 self._GenericCheckJob(job)
1883 self.assertRaises(IndexError, queue.GetNextUpdate)
1884 self.assertRaises(IndexError, depmgr.GetNextNotification)
1885 self.assertFalse(depmgr.CountPendingResults())
1887 # Calling the processor on a finished job should be a no-op
1888 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1889 jqueue._JobProcessor.FINISHED)
1890 self.assertRaises(IndexError, queue.GetNextUpdate)
1892 def testJobDependencyWrongstatus(self):
1893 depmgr = _FakeDependencyManager()
1894 queue = _FakeQueueForProc(depmgr=depmgr)
1896 self.assertEqual(queue.depmgr, depmgr)
1901 opcodes.OpTestDummy(result="Res0", fail=False),
1902 opcodes.OpTestDummy(result="Res1", fail=False,
1904 [prev_job_id, None],
1906 opcodes.OpTestDummy(result="Res2", fail=False),
1910 job = self._CreateJob(queue, job_id, ops)
1912 def _BeforeStart(timeout, priority):
1913 if attempt == 0 or attempt > 5:
1914 # Job should only be updated when it wasn't waiting for another job
1915 self.assertEqual(queue.GetNextUpdate(), (job, True))
1916 self.assertRaises(IndexError, queue.GetNextUpdate)
1917 self.assertFalse(queue.IsAcquired())
1918 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1919 self.assertFalse(job.cur_opctx)
1921 def _AfterStart(op, cbs):
1922 self.assertEqual(queue.GetNextUpdate(), (job, True))
1923 self.assertRaises(IndexError, queue.GetNextUpdate)
1925 self.assertFalse(queue.IsAcquired())
1926 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1927 self.assertFalse(job.cur_opctx)
1929 # Job is running, cancelling shouldn't be possible
1930 (success, _) = job.Cancel()
1931 self.assertFalse(success)
1933 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1935 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1937 counter = itertools.count()
1939 attempt = counter.next()
1941 self.assertRaises(IndexError, queue.GetNextUpdate)
1942 self.assertRaises(IndexError, depmgr.GetNextNotification)
1945 # This will handle the first opcode
1948 depmgr.AddCheckResult(job, prev_job_id, None,
1949 (jqueue._JobDependencyManager.WAIT, "wait"))
1952 depmgr.AddCheckResult(job, prev_job_id, None,
1953 (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1956 self.assertEqual(depmgr.CountPendingResults(), 0)
1958 self.assertEqual(depmgr.CountPendingResults(), 1)
1960 result = jqueue._JobProcessor(queue, opexec, job)()
1961 if attempt <= 1 or attempt >= 4:
1962 # Job should only be updated if there was an actual change
1963 self.assertEqual(queue.GetNextUpdate(), (job, True))
1964 self.assertRaises(IndexError, queue.GetNextUpdate)
1965 self.assertFalse(depmgr.CountPendingResults())
1967 if attempt > 0 and attempt < 4:
1968 # Simulate waiting for other job
1969 self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1970 self.assertTrue(job.cur_opctx)
1971 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1972 self.assertRaises(IndexError, depmgr.GetNextNotification)
1973 self.assert_(job.start_timestamp)
1974 self.assertFalse(job.end_timestamp)
1977 if result == jqueue._JobProcessor.FINISHED:
1979 self.assertFalse(job.cur_opctx)
1982 self.assertRaises(IndexError, depmgr.GetNextNotification)
1984 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1985 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1986 self.assert_(job.start_timestamp)
1987 self.assertFalse(job.end_timestamp)
1989 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1990 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1991 self.assertEqual(job.GetInfo(["opstatus"]),
1992 [[constants.OP_STATUS_SUCCESS,
1993 constants.OP_STATUS_ERROR,
1994 constants.OP_STATUS_ERROR]]),
1996 (opresult, ) = job.GetInfo(["opresult"])
1997 self.assertEqual(len(opresult), len(ops))
1998 self.assertEqual(opresult[0], "Res0")
1999 self.assertTrue(errors.GetEncodedError(opresult[1]))
2000 self.assertTrue(errors.GetEncodedError(opresult[2]))
2002 self._GenericCheckJob(job)
2004 self.assertRaises(IndexError, queue.GetNextUpdate)
2005 self.assertRaises(IndexError, depmgr.GetNextNotification)
2006 self.assertFalse(depmgr.CountPendingResults())
2008 # Calling the processor on a finished job should be a no-op
2009 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
2010 jqueue._JobProcessor.FINISHED)
2011 self.assertRaises(IndexError, queue.GetNextUpdate)
2014 class TestEvaluateJobProcessorResult(unittest.TestCase):
2015 def testFinished(self):
2016 depmgr = _FakeDependencyManager()
2017 job = _IdOnlyFakeJob(30953)
2018 jqueue._EvaluateJobProcessorResult(depmgr, job,
2019 jqueue._JobProcessor.FINISHED)
2020 self.assertEqual(depmgr.GetNextNotification(), job.id)
2021 self.assertRaises(IndexError, depmgr.GetNextNotification)
2023 def testDefer(self):
2024 depmgr = _FakeDependencyManager()
2025 job = _IdOnlyFakeJob(11326, priority=5463)
2027 jqueue._EvaluateJobProcessorResult(depmgr, job,
2028 jqueue._JobProcessor.DEFER)
2029 except workerpool.DeferTask, err:
2030 self.assertEqual(err.priority, 5463)
2032 self.fail("Didn't raise exception")
2033 self.assertRaises(IndexError, depmgr.GetNextNotification)
2035 def testWaitdep(self):
2036 depmgr = _FakeDependencyManager()
2037 job = _IdOnlyFakeJob(21317)
2038 jqueue._EvaluateJobProcessorResult(depmgr, job,
2039 jqueue._JobProcessor.WAITDEP)
2040 self.assertRaises(IndexError, depmgr.GetNextNotification)
2042 def testOther(self):
2043 depmgr = _FakeDependencyManager()
2044 job = _IdOnlyFakeJob(5813)
2045 self.assertRaises(errors.ProgrammerError,
2046 jqueue._EvaluateJobProcessorResult,
2047 depmgr, job, "Other result")
2048 self.assertRaises(IndexError, depmgr.GetNextNotification)
2051 class _FakeTimeoutStrategy:
2052 def __init__(self, timeouts):
2053 self.timeouts = timeouts
2055 self.last_timeout = None
2057 def NextAttempt(self):
2060 timeout = self.timeouts.pop(0)
2063 self.last_timeout = timeout
2067 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
2069 self.queue = _FakeQueueForProc()
2072 self.opcounter = None
2073 self.timeout_strategy = None
2075 self.prev_tsop = None
2076 self.prev_prio = None
2077 self.prev_status = None
2078 self.lock_acq_prio = None
2079 self.gave_lock = None
2080 self.done_lock_before_blocking = False
2082 def _BeforeStart(self, timeout, priority):
2085 # If status has changed, job must've been written
2086 if self.prev_status != self.job.ops[self.curop].status:
2087 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2088 self.assertRaises(IndexError, self.queue.GetNextUpdate)
2090 self.assertFalse(self.queue.IsAcquired())
2091 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2093 ts = self.timeout_strategy
2095 self.assert_(timeout is None or isinstance(timeout, (int, float)))
2096 self.assertEqual(timeout, ts.last_timeout)
2097 self.assertEqual(priority, job.ops[self.curop].priority)
2099 self.gave_lock = True
2100 self.lock_acq_prio = priority
2102 if (self.curop == 3 and
2103 job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
2104 # Give locks before running into blocking acquire
2105 assert self.retries == 7
2107 self.done_lock_before_blocking = True
2110 if self.retries > 0:
2111 self.assert_(timeout is not None)
2113 self.gave_lock = False
2114 raise mcpu.LockAcquireTimeout()
2116 if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
2117 assert self.retries == 0, "Didn't exhaust all retries at highest priority"
2118 assert not ts.timeouts
2119 self.assert_(timeout is None)
2121 def _AfterStart(self, op, cbs):
2124 # Setting to "running" requires an update
2125 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2126 self.assertRaises(IndexError, self.queue.GetNextUpdate)
2128 self.assertFalse(self.queue.IsAcquired())
2129 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
2131 # Job is running, cancelling shouldn't be possible
2132 (success, _) = job.Cancel()
2133 self.assertFalse(success)
2135 def _NextOpcode(self):
2136 self.curop = self.opcounter.next()
2137 self.prev_prio = self.job.ops[self.curop].priority
2138 self.prev_status = self.job.ops[self.curop].status
2140 def _NewTimeoutStrategy(self):
2143 self.assertEqual(self.retries, 0)
2145 if self.prev_tsop == self.curop:
2146 # Still on the same opcode, priority must've been increased
2147 self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
2151 timeouts = range(10, 31, 10)
2152 self.retries = len(timeouts) - 1
2154 elif self.curop == 2:
2155 # Let this run into a blocking acquire
2156 timeouts = range(11, 61, 12)
2157 self.retries = len(timeouts)
2159 elif self.curop == 3:
2160 # Wait for priority to increase, but give lock before blocking acquire
2161 timeouts = range(12, 100, 14)
2162 self.retries = len(timeouts)
2164 self.assertFalse(self.done_lock_before_blocking)
2166 elif self.curop == 4:
2167 self.assert_(self.done_lock_before_blocking)
2169 # Timeouts, but no need to retry
2170 timeouts = range(10, 31, 10)
2173 elif self.curop == 5:
2175 timeouts = range(19, 100, 11)
2176 self.retries = len(timeouts)
2182 assert len(job.ops) == 10
2183 assert self.retries <= len(timeouts)
2185 ts = _FakeTimeoutStrategy(timeouts)
2187 self.timeout_strategy = ts
2188 self.prev_tsop = self.curop
2189 self.prev_prio = job.ops[self.curop].priority
2193 def testTimeout(self):
2194 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2199 job = self._CreateJob(self.queue, job_id, ops)
2202 self.opcounter = itertools.count(0)
2204 opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
2206 tsf = self._NewTimeoutStrategy
2208 self.assertFalse(self.done_lock_before_blocking)
2211 proc = jqueue._JobProcessor(self.queue, opexec, job,
2212 _timeout_strategy_factory=tsf)
2214 self.assertRaises(IndexError, self.queue.GetNextUpdate)
2216 if self.curop is not None:
2217 self.prev_status = self.job.ops[self.curop].status
2219 self.lock_acq_prio = None
2221 result = proc(_nextop_fn=self._NextOpcode)
2222 assert self.curop is not None
2224 if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
2225 # Got lock and/or job is done, result must've been written
2226 self.assertFalse(job.cur_opctx)
2227 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2228 self.assertRaises(IndexError, self.queue.GetNextUpdate)
2229 self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
2230 self.assert_(job.ops[self.curop].exec_timestamp)
2232 if result == jqueue._JobProcessor.FINISHED:
2233 self.assertFalse(job.cur_opctx)
2236 self.assertEqual(result, jqueue._JobProcessor.DEFER)
2239 self.assertEqual(job.ops[self.curop].start_timestamp,
2240 job.start_timestamp)
2243 # Opcode finished, but job not yet done
2244 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2247 self.assert_(job.cur_opctx)
2248 self.assertEqual(job.cur_opctx._timeout_strategy._fn,
2249 self.timeout_strategy.NextAttempt)
2250 self.assertFalse(job.ops[self.curop].exec_timestamp)
2251 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2253 # If priority has changed since acquiring locks, the job must've been
2255 if self.lock_acq_prio != job.ops[self.curop].priority:
2256 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2258 self.assertRaises(IndexError, self.queue.GetNextUpdate)
2260 self.assert_(job.start_timestamp)
2261 self.assertFalse(job.end_timestamp)
2263 self.assertEqual(self.curop, len(job.ops) - 1)
2264 self.assertEqual(self.job, job)
2265 self.assertEqual(self.opcounter.next(), len(job.ops))
2266 self.assert_(self.done_lock_before_blocking)
2268 self.assertRaises(IndexError, self.queue.GetNextUpdate)
2269 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2270 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2271 self.assertEqual(job.GetInfo(["opresult"]),
2272 [[op.input.result for op in job.ops]])
2273 self.assertEqual(job.GetInfo(["opstatus"]),
2274 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
2275 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
2278 # Calling the processor on a finished job should be a no-op
2279 self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2280 jqueue._JobProcessor.FINISHED)
2281 self.assertRaises(IndexError, self.queue.GetNextUpdate)
2284 class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
2286 self.queue = _FakeQueueForProc()
2287 self.opexecprio = []
2289 def _BeforeStart(self, timeout, priority):
2290 self.assertFalse(self.queue.IsAcquired())
2291 self.opexecprio.append(priority)
2293 def testChangePriorityWhileRunning(self):
2294 # Tests changing the priority on a job while it has finished opcodes
2295 # (successful) and more, unprocessed ones
2296 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2301 job = self._CreateJob(self.queue, job_id, ops)
2303 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2305 opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
2308 self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2309 jqueue._JobProcessor.DEFER)
2311 # Job goes back to queued
2312 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2313 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2314 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2315 [[constants.OP_STATUS_SUCCESS,
2316 constants.OP_STATUS_QUEUED,
2317 constants.OP_STATUS_QUEUED],
2318 ["Res0", None, None]])
2320 self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
2321 self.assertRaises(IndexError, self.opexecprio.pop, 0)
2324 self.assertEqual(job.ChangePriority(-10),
2326 ("Priorities of pending opcodes for job 3499 have"
2327 " been changed to -10")))
2328 self.assertEqual(job.CalcPriority(), -10)
2330 # Process second opcode
2331 self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2332 jqueue._JobProcessor.DEFER)
2334 self.assertEqual(self.opexecprio.pop(0), -10)
2335 self.assertRaises(IndexError, self.opexecprio.pop, 0)
2338 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2339 self.assertEqual(job.CalcPriority(), -10)
2340 self.assertEqual(job.GetInfo(["id"]), [job_id])
2341 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
2342 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2343 [[constants.OP_STATUS_SUCCESS,
2344 constants.OP_STATUS_SUCCESS,
2345 constants.OP_STATUS_QUEUED],
2346 ["Res0", "Res1", None]])
2348 # Change priority once more
2349 self.assertEqual(job.ChangePriority(5),
2351 ("Priorities of pending opcodes for job 3499 have"
2352 " been changed to 5")))
2353 self.assertEqual(job.CalcPriority(), 5)
2355 # Process third opcode
2356 self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2357 jqueue._JobProcessor.FINISHED)
2359 self.assertEqual(self.opexecprio.pop(0), 5)
2360 self.assertRaises(IndexError, self.opexecprio.pop, 0)
2363 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2364 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2365 self.assertEqual(job.GetInfo(["id"]), [job_id])
2366 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2367 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2368 [[constants.OP_STATUS_SUCCESS,
2369 constants.OP_STATUS_SUCCESS,
2370 constants.OP_STATUS_SUCCESS],
2371 ["Res0", "Res1", "Res2"]])
2372 self.assertEqual(map(operator.attrgetter("priority"), job.ops),
2373 [constants.OP_PRIO_DEFAULT, -10, 5])
2376 class _IdOnlyFakeJob:
2377 def __init__(self, job_id, priority=NotImplemented):
2378 self.id = str(job_id)
2379 self._priority = priority
2381 def CalcPriority(self):
2382 return self._priority
2385 class TestJobDependencyManager(unittest.TestCase):
2389 self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
2391 def _GetStatus(self, job_id):
2392 (exp_job_id, result) = self._status.pop(0)
2393 self.assertEqual(exp_job_id, job_id)
2396 def _Enqueue(self, jobs):
2397 self.assertFalse(self.jdm._lock.is_owned(),
2398 msg=("Must not own manager lock while re-adding jobs"
2399 " (potential deadlock)"))
2400 self._queue.append(jobs)
2402 def testNotFinalizedThenCancel(self):
2403 job = _IdOnlyFakeJob(17697)
2406 self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2407 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2408 self.assertEqual(result, self.jdm.WAIT)
2409 self.assertFalse(self._status)
2410 self.assertFalse(self._queue)
2411 self.assertTrue(self.jdm.JobWaiting(job))
2412 self.assertEqual(self.jdm._waiters, {
2415 self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2416 ("job/28625", None, None, [("job", [job.id])])
2419 self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2420 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2421 self.assertEqual(result, self.jdm.CANCEL)
2422 self.assertFalse(self._status)
2423 self.assertFalse(self._queue)
2424 self.assertFalse(self.jdm.JobWaiting(job))
2425 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2427 def testNotFinalizedThenQueued(self):
2428 # This can happen on a queue shutdown
2429 job = _IdOnlyFakeJob(1320)
2434 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2436 self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2437 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2438 self.assertEqual(result, self.jdm.WAIT)
2439 self.assertFalse(self._status)
2440 self.assertFalse(self._queue)
2441 self.assertTrue(self.jdm.JobWaiting(job))
2442 self.assertEqual(self.jdm._waiters, {
2445 self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2446 ("job/22971", None, None, [("job", [job.id])])
2449 def testRequireCancel(self):
2450 job = _IdOnlyFakeJob(5278)
2452 dep_status = [constants.JOB_STATUS_CANCELED]
2454 self._status.append((job_id, constants.JOB_STATUS_WAITING))
2455 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2456 self.assertEqual(result, self.jdm.WAIT)
2457 self.assertFalse(self._status)
2458 self.assertFalse(self._queue)
2459 self.assertTrue(self.jdm.JobWaiting(job))
2460 self.assertEqual(self.jdm._waiters, {
2463 self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2464 ("job/9610", None, None, [("job", [job.id])])
2467 self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2468 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2469 self.assertEqual(result, self.jdm.CONTINUE)
2470 self.assertFalse(self._status)
2471 self.assertFalse(self._queue)
2472 self.assertFalse(self.jdm.JobWaiting(job))
2473 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2475 def testRequireError(self):
2476 job = _IdOnlyFakeJob(21459)
2478 dep_status = [constants.JOB_STATUS_ERROR]
2480 self._status.append((job_id, constants.JOB_STATUS_WAITING))
2481 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2482 self.assertEqual(result, self.jdm.WAIT)
2483 self.assertFalse(self._status)
2484 self.assertFalse(self._queue)
2485 self.assertTrue(self.jdm.JobWaiting(job))
2486 self.assertEqual(self.jdm._waiters, {
2490 self._status.append((job_id, constants.JOB_STATUS_ERROR))
2491 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2492 self.assertEqual(result, self.jdm.CONTINUE)
2493 self.assertFalse(self._status)
2494 self.assertFalse(self._queue)
2495 self.assertFalse(self.jdm.JobWaiting(job))
2496 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2498 def testRequireMultiple(self):
2499 dep_status = list(constants.JOBS_FINALIZED)
2501 for end_status in dep_status:
2502 job = _IdOnlyFakeJob(21343)
2505 self._status.append((job_id, constants.JOB_STATUS_WAITING))
2506 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2507 self.assertEqual(result, self.jdm.WAIT)
2508 self.assertFalse(self._status)
2509 self.assertFalse(self._queue)
2510 self.assertTrue(self.jdm.JobWaiting(job))
2511 self.assertEqual(self.jdm._waiters, {
2514 self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2515 ("job/14609", None, None, [("job", [job.id])])
2518 self._status.append((job_id, end_status))
2519 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2520 self.assertEqual(result, self.jdm.CONTINUE)
2521 self.assertFalse(self._status)
2522 self.assertFalse(self._queue)
2523 self.assertFalse(self.jdm.JobWaiting(job))
2524 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2526 def testNotify(self):
2527 job = _IdOnlyFakeJob(8227)
2530 self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2531 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2532 self.assertEqual(result, self.jdm.WAIT)
2533 self.assertFalse(self._status)
2534 self.assertFalse(self._queue)
2535 self.assertTrue(self.jdm.JobWaiting(job))
2536 self.assertEqual(self.jdm._waiters, {
2540 self.jdm.NotifyWaiters(job_id)
2541 self.assertFalse(self._status)
2542 self.assertFalse(self.jdm._waiters)
2543 self.assertFalse(self.jdm.JobWaiting(job))
2544 self.assertEqual(self._queue, [set([job])])
2546 def testWrongStatus(self):
2547 job = _IdOnlyFakeJob(10102)
2550 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2551 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2552 [constants.JOB_STATUS_SUCCESS])
2553 self.assertEqual(result, self.jdm.WAIT)
2554 self.assertFalse(self._status)
2555 self.assertFalse(self._queue)
2556 self.assertTrue(self.jdm.JobWaiting(job))
2557 self.assertEqual(self.jdm._waiters, {
2561 self._status.append((job_id, constants.JOB_STATUS_ERROR))
2562 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2563 [constants.JOB_STATUS_SUCCESS])
2564 self.assertEqual(result, self.jdm.WRONGSTATUS)
2565 self.assertFalse(self._status)
2566 self.assertFalse(self._queue)
2567 self.assertFalse(self.jdm.JobWaiting(job))
2569 def testCorrectStatus(self):
2570 job = _IdOnlyFakeJob(24273)
2573 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2574 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2575 [constants.JOB_STATUS_SUCCESS])
2576 self.assertEqual(result, self.jdm.WAIT)
2577 self.assertFalse(self._status)
2578 self.assertFalse(self._queue)
2579 self.assertTrue(self.jdm.JobWaiting(job))
2580 self.assertEqual(self.jdm._waiters, {
2584 self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2585 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2586 [constants.JOB_STATUS_SUCCESS])
2587 self.assertEqual(result, self.jdm.CONTINUE)
2588 self.assertFalse(self._status)
2589 self.assertFalse(self._queue)
2590 self.assertFalse(self.jdm.JobWaiting(job))
2592 def testFinalizedRightAway(self):
2593 job = _IdOnlyFakeJob(224)
2596 self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2597 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2598 [constants.JOB_STATUS_SUCCESS])
2599 self.assertEqual(result, self.jdm.CONTINUE)
2600 self.assertFalse(self._status)
2601 self.assertFalse(self._queue)
2602 self.assertFalse(self.jdm.JobWaiting(job))
2603 self.assertEqual(self.jdm._waiters, {
2608 self.jdm.NotifyWaiters("0")
2609 self.assertFalse(self.jdm._waiters)
2610 self.assertFalse(self._status)
2611 self.assertFalse(self._queue)
2613 def testMultipleWaiting(self):
2614 # Use a deterministic random generator
2615 rnd = random.Random(21402)
2617 job_ids = map(str, rnd.sample(range(1, 10000), 150))
2619 waiters = dict((job_ids.pop(),
2620 set(map(_IdOnlyFakeJob,
2622 for _ in range(rnd.randint(1, 20))])))
2625 # Ensure there are no duplicate job IDs
2626 assert not utils.FindDuplicates(waiters.keys() +
2628 for jobs in waiters.values()
2631 # Register all jobs as waiters
2632 for job_id, job in [(job_id, job)
2633 for (job_id, jobs) in waiters.items()
2635 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2636 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2637 [constants.JOB_STATUS_SUCCESS])
2638 self.assertEqual(result, self.jdm.WAIT)
2639 self.assertFalse(self._status)
2640 self.assertFalse(self._queue)
2641 self.assertTrue(self.jdm.JobWaiting(job))
2643 self.assertEqual(self.jdm._waiters, waiters)
2645 def _MakeSet((name, mode, owner_names, pending)):
2646 return (name, mode, owner_names,
2647 [(pendmode, set(pend)) for (pendmode, pend) in pending])
2649 def _CheckLockInfo():
2650 info = self.jdm.GetLockInfo([query.LQ_PENDING])
2651 self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2652 ("job/%s" % job_id, None, None,
2653 [("job", set([job.id for job in jobs]))])
2654 for job_id, jobs in waiters.items()
2660 # Notify in random order
2661 for job_id in rnd.sample(waiters, len(waiters)):
2662 # Remove from pending waiter list
2663 jobs = waiters.pop(job_id)
2665 self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2666 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2667 [constants.JOB_STATUS_SUCCESS])
2668 self.assertEqual(result, self.jdm.CONTINUE)
2669 self.assertFalse(self._status)
2670 self.assertFalse(self._queue)
2671 self.assertFalse(self.jdm.JobWaiting(job))
2675 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2679 def testSelfDependency(self):
2680 job = _IdOnlyFakeJob(18937)
2682 self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2683 (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2684 self.assertEqual(result, self.jdm.ERROR)
2686 def testJobDisappears(self):
2687 job = _IdOnlyFakeJob(30540)
2691 raise errors.JobLost("#msg#")
2693 jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2694 (result, _) = jdm.CheckAndRegister(job, job_id, [])
2695 self.assertEqual(result, self.jdm.ERROR)
2696 self.assertFalse(jdm.JobWaiting(job))
2697 self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2700 if __name__ == "__main__":
2701 testutils.GanetiTestProgram()