4 # Copyright (C) 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.jqueue"""
35 # pylint: disable=E0611
36 from pyinotify import pyinotify
40 from ganeti import constants
41 from ganeti import utils
42 from ganeti import errors
43 from ganeti import jqueue
44 from ganeti import opcodes
45 from ganeti import compat
46 from ganeti import mcpu
47 from ganeti import query
48 from ganeti import workerpool
54 def __init__(self, job_id, status):
60 def SetStatus(self, status):
63 def AddLogEntry(self, msg):
64 self._log.append((len(self._log), msg))
69 def GetInfo(self, fields):
74 result.append(self._status)
76 raise Exception("Unknown field")
80 def GetLogEntries(self, newer_than):
81 assert newer_than is None or newer_than >= 0
83 if newer_than is None:
86 return self._log[newer_than:]
89 class TestJobChangesChecker(unittest.TestCase):
91 job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
92 checker = jqueue._JobChangesChecker(["status"], None, None)
93 self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
95 job.SetStatus(constants.JOB_STATUS_RUNNING)
96 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
98 job.SetStatus(constants.JOB_STATUS_SUCCESS)
99 self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
101 # job.id is used by checker
102 self.assertEqual(job.id, 9094)
104 def testStatusWithPrev(self):
105 job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
106 checker = jqueue._JobChangesChecker(["status"],
107 [constants.JOB_STATUS_QUEUED], None)
108 self.assert_(checker(job) is None)
110 job.SetStatus(constants.JOB_STATUS_RUNNING)
111 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
113 def testFinalStatus(self):
114 for status in constants.JOBS_FINALIZED:
115 job = _FakeJob(2178711, status)
116 checker = jqueue._JobChangesChecker(["status"], [status], None)
117 # There won't be any changes in this status, hence it should signal
118 # a change immediately
119 self.assertEqual(checker(job), ([status], []))
122 job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
123 checker = jqueue._JobChangesChecker(["status"], None, None)
124 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
126 job.AddLogEntry("Hello World")
127 (job_info, log_entries) = checker(job)
128 self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
129 self.assertEqual(log_entries, [[0, "Hello World"]])
131 checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
132 self.assert_(checker2(job) is None)
134 job.AddLogEntry("Foo Bar")
135 job.SetStatus(constants.JOB_STATUS_ERROR)
137 (job_info, log_entries) = checker2(job)
138 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
139 self.assertEqual(log_entries, [[1, "Foo Bar"]])
141 checker3 = jqueue._JobChangesChecker(["status"], None, None)
142 (job_info, log_entries) = checker3(job)
143 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
144 self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
147 class TestJobChangesWaiter(unittest.TestCase):
149 self.tmpdir = tempfile.mkdtemp()
150 self.filename = utils.PathJoin(self.tmpdir, "job-1")
151 utils.WriteFile(self.filename, data="")
154 shutil.rmtree(self.tmpdir)
156 def _EnsureNotifierClosed(self, notifier):
158 os.fstat(notifier._fd)
159 except EnvironmentError, err:
160 self.assertEqual(err.errno, errno.EBADF)
162 self.fail("File descriptor wasn't closed")
165 for wait in [False, True]:
166 waiter = jqueue._JobFileChangesWaiter(self.filename)
173 # Ensure file descriptor was closed
174 self._EnsureNotifierClosed(waiter._notifier)
176 def testChangingFile(self):
177 waiter = jqueue._JobFileChangesWaiter(self.filename)
179 self.assertFalse(waiter.Wait(0.1))
180 utils.WriteFile(self.filename, data="changed")
181 self.assert_(waiter.Wait(60))
185 self._EnsureNotifierClosed(waiter._notifier)
187 def testChangingFile2(self):
188 waiter = jqueue._JobChangesWaiter(self.filename)
190 self.assertFalse(waiter._filewaiter)
191 self.assert_(waiter.Wait(0.1))
192 self.assert_(waiter._filewaiter)
194 # File waiter is now used, but there have been no changes
195 self.assertFalse(waiter.Wait(0.1))
196 utils.WriteFile(self.filename, data="changed")
197 self.assert_(waiter.Wait(60))
201 self._EnsureNotifierClosed(waiter._filewaiter._notifier)
204 class _FailingWatchManager(pyinotify.WatchManager):
205 """Subclass of L{pyinotify.WatchManager} which always fails to register.
208 def add_watch(self, filename, mask):
209 assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] |
210 pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"])
217 class TestWaitForJobChangesHelper(unittest.TestCase):
219 self.tmpdir = tempfile.mkdtemp()
220 self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
221 utils.WriteFile(self.filename, data="")
224 shutil.rmtree(self.tmpdir)
226 def _LoadWaitingJob(self):
227 return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
229 def _LoadLostJob(self):
232 def testNoChanges(self):
233 wfjc = jqueue._WaitForJobChangesHelper()
236 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
237 [constants.JOB_STATUS_WAITING], None, 0.1),
238 constants.JOB_NOTCHANGED)
240 # No previous information
241 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
242 ["status"], None, None, 1.0),
243 ([constants.JOB_STATUS_WAITING], []))
245 def testLostJob(self):
246 wfjc = jqueue._WaitForJobChangesHelper()
247 self.assert_(wfjc(self.filename, self._LoadLostJob,
248 ["status"], None, None, 1.0) is None)
250 def testNonExistentFile(self):
251 wfjc = jqueue._WaitForJobChangesHelper()
253 filename = utils.PathJoin(self.tmpdir, "does-not-exist")
254 self.assertFalse(os.path.exists(filename))
256 result = wfjc(filename, self._LoadLostJob, ["status"], None, None, 1.0,
257 _waiter_cls=compat.partial(jqueue._JobChangesWaiter,
258 _waiter_cls=NotImplemented))
259 self.assertTrue(result is None)
261 def testInotifyError(self):
262 jobfile_waiter_cls = \
263 compat.partial(jqueue._JobFileChangesWaiter,
264 _inotify_wm_cls=_FailingWatchManager)
266 jobchange_waiter_cls = \
267 compat.partial(jqueue._JobChangesWaiter, _waiter_cls=jobfile_waiter_cls)
269 wfjc = jqueue._WaitForJobChangesHelper()
271 # Test if failing to watch a job file (e.g. due to
272 # fs.inotify.max_user_watches being too low) raises errors.InotifyError
273 self.assertRaises(errors.InotifyError, wfjc,
274 self.filename, self._LoadWaitingJob,
275 ["status"], [constants.JOB_STATUS_WAITING], None, 1.0,
276 _waiter_cls=jobchange_waiter_cls)
279 class TestEncodeOpError(unittest.TestCase):
281 encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
282 self.assert_(isinstance(encerr, tuple))
283 self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
285 encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
286 self.assert_(isinstance(encerr, tuple))
287 self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
289 encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
290 self.assert_(isinstance(encerr, tuple))
291 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
293 encerr = jqueue._EncodeOpError("Hello World")
294 self.assert_(isinstance(encerr, tuple))
295 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
298 class TestQueuedOpCode(unittest.TestCase):
299 def testDefaults(self):
301 self.assertFalse(hasattr(op.input, "dry_run"))
302 self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
303 self.assertFalse(op.log)
304 self.assert_(op.start_timestamp is None)
305 self.assert_(op.exec_timestamp is None)
306 self.assert_(op.end_timestamp is None)
307 self.assert_(op.result is None)
308 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
310 op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
312 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
314 self.assertEqual(op1.Serialize(), op2.Serialize())
316 def testPriority(self):
318 assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
319 "Default priority equals high priority; test can't work"
320 self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
321 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
323 inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
324 op1 = jqueue._QueuedOpCode(inpop)
326 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
328 self.assertEqual(op1.Serialize(), op2.Serialize())
331 class TestQueuedJob(unittest.TestCase):
332 def testNoOpCodes(self):
333 self.assertRaises(errors.GenericError, jqueue._QueuedJob,
336 def testDefaults(self):
340 opcodes.OpTestDelay(),
344 self.assertTrue(job.writable)
345 self.assertEqual(job.id, job_id)
346 self.assertEqual(job.log_serial, 0)
347 self.assert_(job.received_timestamp)
348 self.assert_(job.start_timestamp is None)
349 self.assert_(job.end_timestamp is None)
350 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
351 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
352 self.assert_(repr(job).startswith("<"))
353 self.assertEqual(len(job.ops), len(ops))
354 self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
355 for (inp, op) in zip(ops, job.ops)))
356 self.assertRaises(errors.OpPrereqError, job.GetInfo,
358 self.assertEqual(job.GetInfo(["summary"]),
359 [[op.input.Summary() for op in job.ops]])
360 self.assertFalse(job.archived)
362 job1 = jqueue._QueuedJob(None, job_id, ops, True)
364 job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
366 self.assertEqual(job1.Serialize(), job2.Serialize())
368 def testWritable(self):
369 job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
370 self.assertFalse(job.writable)
372 job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
373 self.assertTrue(job.writable)
375 def testArchived(self):
376 job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
377 self.assertFalse(job.archived)
379 newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
380 self.assertTrue(newjob.archived)
382 newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
383 self.assertFalse(newjob2.archived)
385 def testPriority(self):
388 opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
389 opcodes.OpTestDelay(),
393 self.assertEqual(job.id, job_id)
394 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
395 self.assert_(repr(job).startswith("<"))
397 job = jqueue._QueuedJob(None, job_id, ops, True)
399 self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
401 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
404 job.ops[0].priority -= 1
406 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
408 # Mark opcode as finished
409 job.ops[0].status = constants.OP_STATUS_SUCCESS
411 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
414 job.ops[1].priority -= 10
415 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
417 # Test increasing first
418 job.ops[0].status = constants.OP_STATUS_RUNNING
419 job.ops[0].priority -= 19
420 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
422 def _JobForPriority(self, job_id):
425 opcodes.OpTestDelay(),
427 opcodes.OpTestDelay(),
430 job = jqueue._QueuedJob(None, job_id, ops, True)
432 self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
434 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
435 self.assertFalse(compat.any(hasattr(op.input, "priority")
440 def testChangePriorityAllQueued(self):
441 job = self._JobForPriority(24984)
442 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
443 self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
445 result = job.ChangePriority(-10)
446 self.assertEqual(job.CalcPriority(), -10)
447 self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
448 self.assertFalse(compat.any(hasattr(op.input, "priority")
450 self.assertEqual(result,
451 (True, ("Priorities of pending opcodes for job 24984 have"
452 " been changed to -10")))
454 def testChangePriorityAllFinished(self):
455 job = self._JobForPriority(16405)
457 for (idx, op) in enumerate(job.ops):
459 op.status = constants.OP_STATUS_ERROR
461 op.status = constants.OP_STATUS_SUCCESS
463 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
464 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
465 result = job.ChangePriority(-10)
466 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
467 self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
469 self.assertFalse(compat.any(hasattr(op.input, "priority")
471 self.assertEqual(map(operator.attrgetter("status"), job.ops), [
472 constants.OP_STATUS_SUCCESS,
473 constants.OP_STATUS_SUCCESS,
474 constants.OP_STATUS_SUCCESS,
475 constants.OP_STATUS_ERROR,
477 self.assertEqual(result, (False, "Job 16405 is finished"))
479 def testChangePriorityCancelling(self):
480 job = self._JobForPriority(31572)
482 for (idx, op) in enumerate(job.ops):
484 op.status = constants.OP_STATUS_CANCELING
486 op.status = constants.OP_STATUS_SUCCESS
488 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
489 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
490 result = job.ChangePriority(5)
491 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
492 self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
494 self.assertFalse(compat.any(hasattr(op.input, "priority")
496 self.assertEqual(map(operator.attrgetter("status"), job.ops), [
497 constants.OP_STATUS_SUCCESS,
498 constants.OP_STATUS_SUCCESS,
499 constants.OP_STATUS_CANCELING,
500 constants.OP_STATUS_CANCELING,
502 self.assertEqual(result, (False, "Job 31572 is cancelling"))
504 def testChangePriorityFirstRunning(self):
505 job = self._JobForPriority(1716215889)
507 for (idx, op) in enumerate(job.ops):
509 op.status = constants.OP_STATUS_RUNNING
511 op.status = constants.OP_STATUS_QUEUED
513 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
514 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
515 result = job.ChangePriority(7)
516 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
517 self.assertEqual(map(operator.attrgetter("priority"), job.ops),
518 [constants.OP_PRIO_DEFAULT, 7, 7, 7])
519 self.assertFalse(compat.any(hasattr(op.input, "priority")
521 self.assertEqual(map(operator.attrgetter("status"), job.ops), [
522 constants.OP_STATUS_RUNNING,
523 constants.OP_STATUS_QUEUED,
524 constants.OP_STATUS_QUEUED,
525 constants.OP_STATUS_QUEUED,
527 self.assertEqual(result,
528 (True, ("Priorities of pending opcodes for job"
529 " 1716215889 have been changed to 7")))
531 def testChangePriorityLastRunning(self):
532 job = self._JobForPriority(1308)
534 for (idx, op) in enumerate(job.ops):
535 if idx == (len(job.ops) - 1):
536 op.status = constants.OP_STATUS_RUNNING
538 op.status = constants.OP_STATUS_SUCCESS
540 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
541 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
542 result = job.ChangePriority(-3)
543 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
544 self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
546 self.assertFalse(compat.any(hasattr(op.input, "priority")
548 self.assertEqual(map(operator.attrgetter("status"), job.ops), [
549 constants.OP_STATUS_SUCCESS,
550 constants.OP_STATUS_SUCCESS,
551 constants.OP_STATUS_SUCCESS,
552 constants.OP_STATUS_RUNNING,
554 self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
556 def testChangePrioritySecondOpcodeRunning(self):
557 job = self._JobForPriority(27701)
559 self.assertEqual(len(job.ops), 4)
560 job.ops[0].status = constants.OP_STATUS_SUCCESS
561 job.ops[1].status = constants.OP_STATUS_RUNNING
562 job.ops[2].status = constants.OP_STATUS_QUEUED
563 job.ops[3].status = constants.OP_STATUS_QUEUED
565 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
566 result = job.ChangePriority(-19)
567 self.assertEqual(job.CalcPriority(), -19)
568 self.assertEqual(map(operator.attrgetter("priority"), job.ops),
569 [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
571 self.assertFalse(compat.any(hasattr(op.input, "priority")
573 self.assertEqual(map(operator.attrgetter("status"), job.ops), [
574 constants.OP_STATUS_SUCCESS,
575 constants.OP_STATUS_RUNNING,
576 constants.OP_STATUS_QUEUED,
577 constants.OP_STATUS_QUEUED,
579 self.assertEqual(result,
580 (True, ("Priorities of pending opcodes for job"
581 " 27701 have been changed to -19")))
583 def testChangePriorityWithInconsistentJob(self):
584 job = self._JobForPriority(30097)
586 self.assertEqual(len(job.ops), 4)
588 # This job is invalid (as it has two opcodes marked as running) and make
589 # the call fail because an unprocessed opcode precedes a running one (which
590 # should never happen in reality)
591 job.ops[0].status = constants.OP_STATUS_SUCCESS
592 job.ops[1].status = constants.OP_STATUS_RUNNING
593 job.ops[2].status = constants.OP_STATUS_QUEUED
594 job.ops[3].status = constants.OP_STATUS_RUNNING
596 self.assertRaises(AssertionError, job.ChangePriority, 19)
598 def testCalcStatus(self):
600 # The default status is "queued"
601 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
605 ops[0].status = constants.OP_STATUS_WAITING
608 ops[0].status = constants.OP_STATUS_SUCCESS
609 ops[1].status = constants.OP_STATUS_SUCCESS
610 ops[2].status = constants.OP_STATUS_WAITING
613 ops[0].status = constants.OP_STATUS_SUCCESS
614 ops[1].status = constants.OP_STATUS_RUNNING
616 op.status = constants.OP_STATUS_QUEUED
618 def _Canceling1(ops):
619 ops[0].status = constants.OP_STATUS_SUCCESS
620 ops[1].status = constants.OP_STATUS_SUCCESS
622 op.status = constants.OP_STATUS_CANCELING
624 def _Canceling2(ops):
626 op.status = constants.OP_STATUS_CANCELING
630 op.status = constants.OP_STATUS_CANCELED
633 for idx, op in enumerate(ops):
635 op.status = constants.OP_STATUS_ERROR
637 op.status = constants.OP_STATUS_SUCCESS
641 op.status = constants.OP_STATUS_ERROR
645 op.status = constants.OP_STATUS_SUCCESS
648 constants.JOB_STATUS_QUEUED: [_Queued],
649 constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
650 constants.JOB_STATUS_RUNNING: [_Running],
651 constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
652 constants.JOB_STATUS_CANCELED: [_Canceled],
653 constants.JOB_STATUS_ERROR: [_Error1, _Error2],
654 constants.JOB_STATUS_SUCCESS: [_Success],
658 job = jqueue._QueuedJob(None, 1,
659 [opcodes.OpTestDelay() for _ in range(10)],
661 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
662 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
666 for status in constants.JOB_STATUS_ALL:
667 sttests = tests[status]
672 self.assertEqual(job.CalcStatus(), status)
675 class _FakeDependencyManager:
678 self._notifications = []
679 self._waiting = set()
681 def AddCheckResult(self, job, dep_job_id, dep_status, result):
682 self._checks.append((job, dep_job_id, dep_status, result))
684 def CountPendingResults(self):
685 return len(self._checks)
687 def CountWaitingJobs(self):
688 return len(self._waiting)
690 def GetNextNotification(self):
691 return self._notifications.pop(0)
693 def JobWaiting(self, job):
694 return job in self._waiting
696 def CheckAndRegister(self, job, dep_job_id, dep_status):
697 (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
699 assert exp_job == job
700 assert exp_dep_job_id == dep_job_id
701 assert exp_dep_status == dep_status
703 (result_status, _) = result
705 if result_status == jqueue._JobDependencyManager.WAIT:
706 self._waiting.add(job)
707 elif result_status == jqueue._JobDependencyManager.CONTINUE:
708 self._waiting.remove(job)
712 def NotifyWaiters(self, job_id):
713 self._notifications.append(job_id)
716 class _DisabledFakeDependencyManager:
717 def JobWaiting(self, _):
720 def CheckAndRegister(self, *args):
721 assert False, "Should not be called"
723 def NotifyWaiters(self, _):
727 class _FakeQueueForProc:
728 def __init__(self, depmgr=None):
729 self._acquired = False
732 self._accepting_jobs = True
734 self._submit_count = itertools.count(1000)
739 self.depmgr = _DisabledFakeDependencyManager()
741 def IsAcquired(self):
742 return self._acquired
744 def GetNextUpdate(self):
745 return self._updates.pop(0)
747 def GetNextSubmittedJob(self):
748 return self._submitted.pop(0)
750 def acquire(self, shared=0):
752 self._acquired = True
755 assert self._acquired
756 self._acquired = False
758 def UpdateJobUnlocked(self, job, replicate=True):
759 assert self._acquired, "Lock not acquired while updating job"
760 self._updates.append((job, bool(replicate)))
762 def SubmitManyJobs(self, jobs):
763 assert not self._acquired, "Lock acquired while submitting jobs"
764 job_ids = [self._submit_count.next() for _ in jobs]
765 self._submitted.extend(zip(job_ids, jobs))
768 def StopAcceptingJobs(self):
769 self._accepting_jobs = False
771 def AcceptingJobsUnlocked(self):
772 return self._accepting_jobs
775 class _FakeExecOpCodeForProc:
776 def __init__(self, queue, before_start, after_start):
778 self._before_start = before_start
779 self._after_start = after_start
781 def __call__(self, op, cbs, timeout=None):
782 assert isinstance(op, opcodes.OpTestDummy)
783 assert not self._queue.IsAcquired(), \
784 "Queue lock not released when executing opcode"
786 if self._before_start:
787 self._before_start(timeout, cbs.CurrentPriority())
791 if self._after_start:
792 self._after_start(op, cbs)
794 # Check again after the callbacks
795 assert not self._queue.IsAcquired()
798 raise errors.OpExecError("Error requested (%s)" % op.result)
800 if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
801 return cbs.SubmitManyJobs(op.submit_jobs)
806 class _JobProcessorTestUtils:
807 def _CreateJob(self, queue, job_id, ops):
808 job = jqueue._QueuedJob(queue, job_id, ops, True)
809 self.assertFalse(job.start_timestamp)
810 self.assertFalse(job.end_timestamp)
811 self.assertEqual(len(ops), len(job.ops))
812 self.assert_(compat.all(op.input == inp
813 for (op, inp) in zip(job.ops, ops)))
814 self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
818 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
819 def _GenericCheckJob(self, job):
820 assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
823 self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
824 [[op.start_timestamp for op in job.ops],
825 [op.exec_timestamp for op in job.ops],
826 [op.end_timestamp for op in job.ops]])
827 self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
828 [job.received_timestamp,
831 self.assert_(job.start_timestamp)
832 self.assert_(job.end_timestamp)
833 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
835 def testSuccess(self):
836 queue = _FakeQueueForProc()
838 for (job_id, opcount) in [(25351, 1), (6637, 3),
839 (24644, 10), (32207, 100)]:
840 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
841 for i in range(opcount)]
844 job = self._CreateJob(queue, job_id, ops)
846 def _BeforeStart(timeout, priority):
847 self.assertEqual(queue.GetNextUpdate(), (job, True))
848 self.assertRaises(IndexError, queue.GetNextUpdate)
849 self.assertFalse(queue.IsAcquired())
850 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
851 self.assertFalse(job.cur_opctx)
853 def _AfterStart(op, cbs):
854 self.assertEqual(queue.GetNextUpdate(), (job, True))
855 self.assertRaises(IndexError, queue.GetNextUpdate)
857 self.assertFalse(queue.IsAcquired())
858 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
859 self.assertFalse(job.cur_opctx)
861 # Job is running, cancelling shouldn't be possible
862 (success, _) = job.Cancel()
863 self.assertFalse(success)
865 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
867 for idx in range(len(ops)):
868 self.assertRaises(IndexError, queue.GetNextUpdate)
869 result = jqueue._JobProcessor(queue, opexec, job)()
870 self.assertEqual(queue.GetNextUpdate(), (job, True))
871 self.assertRaises(IndexError, queue.GetNextUpdate)
872 if idx == len(ops) - 1:
874 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
876 self.assertEqual(result, jqueue._JobProcessor.DEFER)
878 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
879 self.assert_(job.start_timestamp)
880 self.assertFalse(job.end_timestamp)
882 self.assertRaises(IndexError, queue.GetNextUpdate)
884 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
885 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
886 self.assertEqual(job.GetInfo(["opresult"]),
887 [[op.input.result for op in job.ops]])
888 self.assertEqual(job.GetInfo(["opstatus"]),
889 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
890 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
893 self._GenericCheckJob(job)
895 # Calling the processor on a finished job should be a no-op
896 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
897 jqueue._JobProcessor.FINISHED)
898 self.assertRaises(IndexError, queue.GetNextUpdate)
900 def testOpcodeError(self):
901 queue = _FakeQueueForProc()
908 (23816, 100, 39, 45),
911 for (job_id, opcount, failfrom, failto) in testdata:
913 ops = [opcodes.OpTestDummy(result="Res%s" % i,
914 fail=(failfrom <= i and
916 for i in range(opcount)]
919 job = self._CreateJob(queue, str(job_id), ops)
921 opexec = _FakeExecOpCodeForProc(queue, None, None)
923 for idx in range(len(ops)):
924 self.assertRaises(IndexError, queue.GetNextUpdate)
925 result = jqueue._JobProcessor(queue, opexec, job)()
927 self.assertEqual(queue.GetNextUpdate(), (job, True))
928 # waitlock to running
929 self.assertEqual(queue.GetNextUpdate(), (job, True))
931 self.assertEqual(queue.GetNextUpdate(), (job, True))
932 self.assertRaises(IndexError, queue.GetNextUpdate)
934 if idx in (failfrom, len(ops) - 1):
936 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
939 self.assertEqual(result, jqueue._JobProcessor.DEFER)
941 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
943 self.assertRaises(IndexError, queue.GetNextUpdate)
946 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
947 self.assertEqual(job.GetInfo(["id"]), [job_id])
948 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
950 # Check opcode status
952 job.GetInfo(["opstatus"])[0],
953 job.GetInfo(["opresult"])[0])
955 for idx, (op, opstatus, opresult) in enumerate(data):
957 assert not op.input.fail
958 self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
959 self.assertEqual(opresult, op.input.result)
962 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
963 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
965 assert not op.input.fail
966 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
967 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
969 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
970 for op in job.ops[:failfrom]))
972 self._GenericCheckJob(job)
974 # Calling the processor on a finished job should be a no-op
975 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
976 jqueue._JobProcessor.FINISHED)
977 self.assertRaises(IndexError, queue.GetNextUpdate)
979 def testCancelWhileInQueue(self):
980 queue = _FakeQueueForProc()
982 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
987 job = self._CreateJob(queue, job_id, ops)
989 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
992 (success, _) = job.Cancel()
993 self.assert_(success)
995 self.assertRaises(IndexError, queue.GetNextUpdate)
997 self.assertFalse(job.start_timestamp)
998 self.assertTrue(job.end_timestamp)
999 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
1002 # Serialize to check for differences
1003 before_proc = job.Serialize()
1005 # Simulate processor called in workerpool
1006 opexec = _FakeExecOpCodeForProc(queue, None, None)
1007 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1008 jqueue._JobProcessor.FINISHED)
1011 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1012 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1013 self.assertFalse(job.start_timestamp)
1014 self.assertTrue(job.end_timestamp)
1015 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1017 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1018 [[constants.OP_STATUS_CANCELED for _ in job.ops],
1019 ["Job canceled by request" for _ in job.ops]])
1021 # Must not have changed or written
1022 self.assertEqual(before_proc, job.Serialize())
1023 self.assertRaises(IndexError, queue.GetNextUpdate)
1025 def testCancelWhileWaitlockInQueue(self):
1026 queue = _FakeQueueForProc()
1028 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1033 job = self._CreateJob(queue, job_id, ops)
1035 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1037 job.ops[0].status = constants.OP_STATUS_WAITING
1039 assert len(job.ops) == 5
1041 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1043 # Mark as cancelling
1044 (success, _) = job.Cancel()
1045 self.assert_(success)
1047 self.assertRaises(IndexError, queue.GetNextUpdate)
1049 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1052 opexec = _FakeExecOpCodeForProc(queue, None, None)
1053 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1054 jqueue._JobProcessor.FINISHED)
1057 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1058 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1059 self.assertFalse(job.start_timestamp)
1060 self.assert_(job.end_timestamp)
1061 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1063 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1064 [[constants.OP_STATUS_CANCELED for _ in job.ops],
1065 ["Job canceled by request" for _ in job.ops]])
1067 def testCancelWhileWaitlock(self):
1068 queue = _FakeQueueForProc()
1070 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1075 job = self._CreateJob(queue, job_id, ops)
1077 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1079 def _BeforeStart(timeout, priority):
1080 self.assertEqual(queue.GetNextUpdate(), (job, True))
1081 self.assertRaises(IndexError, queue.GetNextUpdate)
1082 self.assertFalse(queue.IsAcquired())
1083 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1086 (success, _) = job.Cancel()
1087 self.assert_(success)
1089 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1091 self.assertRaises(IndexError, queue.GetNextUpdate)
1093 def _AfterStart(op, cbs):
1094 self.assertEqual(queue.GetNextUpdate(), (job, True))
1095 self.assertRaises(IndexError, queue.GetNextUpdate)
1096 self.assertFalse(queue.IsAcquired())
1097 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1099 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1101 self.assertRaises(IndexError, queue.GetNextUpdate)
1102 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1103 jqueue._JobProcessor.FINISHED)
1104 self.assertEqual(queue.GetNextUpdate(), (job, True))
1105 self.assertRaises(IndexError, queue.GetNextUpdate)
1108 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1109 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1110 self.assert_(job.start_timestamp)
1111 self.assert_(job.end_timestamp)
1112 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1114 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1115 [[constants.OP_STATUS_CANCELED for _ in job.ops],
1116 ["Job canceled by request" for _ in job.ops]])
1118 def _TestCancelWhileSomething(self, cb):
1119 queue = _FakeQueueForProc()
1121 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1126 job = self._CreateJob(queue, job_id, ops)
1128 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1130 def _BeforeStart(timeout, priority):
1131 self.assertFalse(queue.IsAcquired())
1132 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1135 (success, _) = job.Cancel()
1136 self.assert_(success)
1138 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1143 def _AfterStart(op, cbs):
1144 self.fail("Should not reach this")
1146 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1148 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1149 jqueue._JobProcessor.FINISHED)
1152 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1153 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1154 self.assert_(job.start_timestamp)
1155 self.assert_(job.end_timestamp)
1156 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1158 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1159 [[constants.OP_STATUS_CANCELED for _ in job.ops],
1160 ["Job canceled by request" for _ in job.ops]])
1164 def testCancelWhileWaitlockWithTimeout(self):
1166 # Fake an acquire attempt timing out
1167 raise mcpu.LockAcquireTimeout()
1169 self._TestCancelWhileSomething(fn)
1171 def testCancelDuringQueueShutdown(self):
1172 queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
1173 self.assertFalse(queue.AcceptingJobsUnlocked())
1175 def testCancelWhileRunning(self):
1176 # Tests canceling a job with finished opcodes and more, unprocessed ones
1177 queue = _FakeQueueForProc()
1179 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1184 job = self._CreateJob(queue, job_id, ops)
1186 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1188 opexec = _FakeExecOpCodeForProc(queue, None, None)
1191 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1192 jqueue._JobProcessor.DEFER)
1194 # Job goes back to queued
1195 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1196 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1197 [[constants.OP_STATUS_SUCCESS,
1198 constants.OP_STATUS_QUEUED,
1199 constants.OP_STATUS_QUEUED],
1200 ["Res0", None, None]])
1203 (success, _) = job.Cancel()
1204 self.assert_(success)
1206 # Try processing another opcode (this will actually cancel the job)
1207 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1208 jqueue._JobProcessor.FINISHED)
1211 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1212 self.assertEqual(job.GetInfo(["id"]), [job_id])
1213 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1214 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1215 [[constants.OP_STATUS_SUCCESS,
1216 constants.OP_STATUS_CANCELED,
1217 constants.OP_STATUS_CANCELED],
1218 ["Res0", "Job canceled by request",
1219 "Job canceled by request"]])
1221 def _TestQueueShutdown(self, queue, opexec, job, runcount):
1222 self.assertTrue(queue.AcceptingJobsUnlocked())
1225 queue.StopAcceptingJobs()
1227 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1228 jqueue._JobProcessor.DEFER)
1231 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1232 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
1233 self.assertFalse(job.cur_opctx)
1234 self.assertTrue(job.start_timestamp)
1235 self.assertFalse(job.end_timestamp)
1236 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1237 self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1238 for op in job.ops[:runcount]))
1239 self.assertFalse(job.ops[runcount].end_timestamp)
1240 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1241 for op in job.ops[(runcount + 1):]))
1242 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1243 [(([constants.OP_STATUS_SUCCESS] * runcount) +
1244 ([constants.OP_STATUS_QUEUED] *
1245 (len(job.ops) - runcount))),
1246 (["Res%s" % i for i in range(runcount)] +
1247 ([None] * (len(job.ops) - runcount)))])
1249 # Must have been written and replicated
1250 self.assertEqual(queue.GetNextUpdate(), (job, True))
1251 self.assertRaises(IndexError, queue.GetNextUpdate)
1253 def testQueueShutdownWhileRunning(self):
1254 # Tests shutting down the queue while a job is running
1255 queue = _FakeQueueForProc()
1257 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1262 job = self._CreateJob(queue, job_id, ops)
1264 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1266 opexec = _FakeExecOpCodeForProc(queue, None, None)
1268 self.assertRaises(IndexError, queue.GetNextUpdate)
1271 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1272 jqueue._JobProcessor.DEFER)
1274 # Job goes back to queued
1275 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1276 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1277 [[constants.OP_STATUS_SUCCESS,
1278 constants.OP_STATUS_QUEUED,
1279 constants.OP_STATUS_QUEUED],
1280 ["Res0", None, None]])
1281 self.assertFalse(job.cur_opctx)
1283 # Writes for waiting, running and result
1285 self.assertEqual(queue.GetNextUpdate(), (job, True))
1288 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1289 jqueue._JobProcessor.DEFER)
1291 # Job goes back to queued
1292 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1293 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1294 [[constants.OP_STATUS_SUCCESS,
1295 constants.OP_STATUS_SUCCESS,
1296 constants.OP_STATUS_QUEUED],
1297 ["Res0", "Res1", None]])
1298 self.assertFalse(job.cur_opctx)
1300 # Writes for waiting, running and result
1302 self.assertEqual(queue.GetNextUpdate(), (job, True))
1304 self._TestQueueShutdown(queue, opexec, job, 2)
1306 def testQueueShutdownWithLockTimeout(self):
1307 # Tests shutting down while a lock acquire times out
1308 queue = _FakeQueueForProc()
1310 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1315 job = self._CreateJob(queue, job_id, ops)
1317 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1319 acquire_timeout = False
1321 def _BeforeStart(timeout, priority):
1322 self.assertFalse(queue.IsAcquired())
1323 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1325 raise mcpu.LockAcquireTimeout()
1327 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
1329 self.assertRaises(IndexError, queue.GetNextUpdate)
1332 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1333 jqueue._JobProcessor.DEFER)
1335 # Job goes back to queued
1336 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1337 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1338 [[constants.OP_STATUS_SUCCESS,
1339 constants.OP_STATUS_QUEUED,
1340 constants.OP_STATUS_QUEUED],
1341 ["Res0", None, None]])
1342 self.assertFalse(job.cur_opctx)
1344 # Writes for waiting, running and result
1346 self.assertEqual(queue.GetNextUpdate(), (job, True))
1348 # The next opcode should have expiring lock acquires
1349 acquire_timeout = True
1351 self._TestQueueShutdown(queue, opexec, job, 1)
1353 def testQueueShutdownWhileInQueue(self):
1354 # This should never happen in reality (no new jobs are started by the
1355 # workerpool once a shutdown has been initiated), but it's better to test
1356 # the job processor for this scenario
1357 queue = _FakeQueueForProc()
1359 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1364 job = self._CreateJob(queue, job_id, ops)
1366 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1367 self.assertRaises(IndexError, queue.GetNextUpdate)
1369 self.assertFalse(job.start_timestamp)
1370 self.assertFalse(job.end_timestamp)
1371 self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
1374 opexec = _FakeExecOpCodeForProc(queue, None, None)
1375 self._TestQueueShutdown(queue, opexec, job, 0)
1377 def testQueueShutdownWhileWaitlockInQueue(self):
1378 queue = _FakeQueueForProc()
1380 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1385 job = self._CreateJob(queue, job_id, ops)
1387 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1389 job.ops[0].status = constants.OP_STATUS_WAITING
1391 assert len(job.ops) == 5
1393 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1395 self.assertRaises(IndexError, queue.GetNextUpdate)
1397 opexec = _FakeExecOpCodeForProc(queue, None, None)
1398 self._TestQueueShutdown(queue, opexec, job, 0)
1400 def testPartiallyRun(self):
1401 # Tests calling the processor on a job that's been partially run before the
1402 # program was restarted
1403 queue = _FakeQueueForProc()
1405 opexec = _FakeExecOpCodeForProc(queue, None, None)
1407 for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
1408 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1412 job = self._CreateJob(queue, job_id, ops)
1414 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1416 for _ in range(successcount):
1417 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1418 jqueue._JobProcessor.DEFER)
1420 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1421 self.assertEqual(job.GetInfo(["opstatus"]),
1422 [[constants.OP_STATUS_SUCCESS
1423 for _ in range(successcount)] +
1424 [constants.OP_STATUS_QUEUED
1425 for _ in range(len(ops) - successcount)]])
1427 self.assert_(job.ops_iter)
1429 # Serialize and restore (simulates program restart)
1430 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1431 self.assertFalse(newjob.ops_iter)
1432 self._TestPartial(newjob, successcount)
1434 def _TestPartial(self, job, successcount):
1435 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1436 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1438 queue = _FakeQueueForProc()
1439 opexec = _FakeExecOpCodeForProc(queue, None, None)
1441 for remaining in reversed(range(len(job.ops) - successcount)):
1442 result = jqueue._JobProcessor(queue, opexec, job)()
1443 self.assertEqual(queue.GetNextUpdate(), (job, True))
1444 self.assertEqual(queue.GetNextUpdate(), (job, True))
1445 self.assertEqual(queue.GetNextUpdate(), (job, True))
1446 self.assertRaises(IndexError, queue.GetNextUpdate)
1450 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1453 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1455 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1457 self.assertRaises(IndexError, queue.GetNextUpdate)
1458 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1459 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1460 self.assertEqual(job.GetInfo(["opresult"]),
1461 [[op.input.result for op in job.ops]])
1462 self.assertEqual(job.GetInfo(["opstatus"]),
1463 [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1464 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1467 self._GenericCheckJob(job)
1469 # Calling the processor on a finished job should be a no-op
1470 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1471 jqueue._JobProcessor.FINISHED)
1472 self.assertRaises(IndexError, queue.GetNextUpdate)
1474 # ... also after being restored
1475 job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1476 # Calling the processor on a finished job should be a no-op
1477 self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1478 jqueue._JobProcessor.FINISHED)
1479 self.assertRaises(IndexError, queue.GetNextUpdate)
1481 def testProcessorOnRunningJob(self):
1482 ops = [opcodes.OpTestDummy(result="result", fail=False)]
1484 queue = _FakeQueueForProc()
1485 opexec = _FakeExecOpCodeForProc(queue, None, None)
1488 job = self._CreateJob(queue, 9571, ops)
1490 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1492 job.ops[0].status = constants.OP_STATUS_RUNNING
1494 assert len(job.ops) == 1
1496 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1498 # Calling on running job must fail
1499 self.assertRaises(errors.ProgrammerError,
1500 jqueue._JobProcessor(queue, opexec, job))
1502 def testLogMessages(self):
1503 # Tests the "Feedback" callback function
1504 queue = _FakeQueueForProc()
1510 (constants.ELOG_MESSAGE, "there"),
1513 (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1514 (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1517 ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1518 messages=messages.get(i, []))
1522 job = self._CreateJob(queue, 29386, ops)
1524 def _BeforeStart(timeout, priority):
1525 self.assertEqual(queue.GetNextUpdate(), (job, True))
1526 self.assertRaises(IndexError, queue.GetNextUpdate)
1527 self.assertFalse(queue.IsAcquired())
1528 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1530 def _AfterStart(op, cbs):
1531 self.assertEqual(queue.GetNextUpdate(), (job, True))
1532 self.assertRaises(IndexError, queue.GetNextUpdate)
1533 self.assertFalse(queue.IsAcquired())
1534 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1536 self.assertRaises(AssertionError, cbs.Feedback,
1537 "too", "many", "arguments")
1539 for (log_type, msg) in op.messages:
1540 self.assertRaises(IndexError, queue.GetNextUpdate)
1542 cbs.Feedback(log_type, msg)
1545 # Check for job update without replication
1546 self.assertEqual(queue.GetNextUpdate(), (job, False))
1547 self.assertRaises(IndexError, queue.GetNextUpdate)
1549 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1551 for remaining in reversed(range(len(job.ops))):
1552 self.assertRaises(IndexError, queue.GetNextUpdate)
1553 result = jqueue._JobProcessor(queue, opexec, job)()
1554 self.assertEqual(queue.GetNextUpdate(), (job, True))
1555 self.assertRaises(IndexError, queue.GetNextUpdate)
1559 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1562 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1564 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1566 self.assertRaises(IndexError, queue.GetNextUpdate)
1568 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1569 self.assertEqual(job.GetInfo(["opresult"]),
1570 [[op.input.result for op in job.ops]])
1572 logmsgcount = sum(len(m) for m in messages.values())
1574 self._CheckLogMessages(job, logmsgcount)
1576 # Serialize and restore (simulates program restart)
1577 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1578 self._CheckLogMessages(newjob, logmsgcount)
1580 # Check each message
1582 for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1583 for (serial, timestamp, log_type, msg) in oplog:
1584 (exptype, expmsg) = messages.get(idx).pop(0)
1586 self.assertEqual(log_type, exptype)
1588 self.assertEqual(log_type, constants.ELOG_MESSAGE)
1589 self.assertEqual(expmsg, msg)
1590 self.assert_(serial > prevserial)
1593 def _CheckLogMessages(self, job, count):
1595 self.assertEqual(job.log_serial, count)
1598 self.assertEqual(job.GetLogEntries(None),
1599 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1600 for entry in entries])
1602 # Filter with serial
1604 self.assert_(job.GetLogEntries(3))
1605 self.assertEqual(job.GetLogEntries(3),
1606 [entry for entries in job.GetInfo(["oplog"])[0] if entries
1607 for entry in entries][3:])
1609 # No log message after highest serial
1610 self.assertFalse(job.GetLogEntries(count))
1611 self.assertFalse(job.GetLogEntries(count + 3))
1613 def testSubmitManyJobs(self):
1614 queue = _FakeQueueForProc()
1618 opcodes.OpTestDummy(result="Res0", fail=False,
1620 opcodes.OpTestDummy(result="Res1", fail=False,
1622 [opcodes.OpTestDummy(result="r1j0", fail=False)],
1624 opcodes.OpTestDummy(result="Res2", fail=False,
1626 [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1627 opcodes.OpTestDummy(result="r2j0o1", fail=False),
1628 opcodes.OpTestDummy(result="r2j0o2", fail=False),
1629 opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1630 [opcodes.OpTestDummy(result="r2j1", fail=False)],
1631 [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1632 opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1637 job = self._CreateJob(queue, job_id, ops)
1639 def _BeforeStart(timeout, priority):
1640 self.assertEqual(queue.GetNextUpdate(), (job, True))
1641 self.assertRaises(IndexError, queue.GetNextUpdate)
1642 self.assertFalse(queue.IsAcquired())
1643 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1644 self.assertFalse(job.cur_opctx)
1646 def _AfterStart(op, cbs):
1647 self.assertEqual(queue.GetNextUpdate(), (job, True))
1648 self.assertRaises(IndexError, queue.GetNextUpdate)
1650 self.assertFalse(queue.IsAcquired())
1651 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1652 self.assertFalse(job.cur_opctx)
1654 # Job is running, cancelling shouldn't be possible
1655 (success, _) = job.Cancel()
1656 self.assertFalse(success)
1658 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1660 for idx in range(len(ops)):
1661 self.assertRaises(IndexError, queue.GetNextUpdate)
1662 result = jqueue._JobProcessor(queue, opexec, job)()
1663 self.assertEqual(queue.GetNextUpdate(), (job, True))
1664 self.assertRaises(IndexError, queue.GetNextUpdate)
1665 if idx == len(ops) - 1:
1667 self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1669 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1671 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1672 self.assert_(job.start_timestamp)
1673 self.assertFalse(job.end_timestamp)
1675 self.assertRaises(IndexError, queue.GetNextUpdate)
1677 for idx, submitted_ops in enumerate(job_ops
1679 for job_ops in op.submit_jobs):
1680 self.assertEqual(queue.GetNextSubmittedJob(),
1681 (1000 + idx, submitted_ops))
1682 self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1684 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1685 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1686 self.assertEqual(job.GetInfo(["opresult"]),
1687 [[[], [1000], [1001, 1002, 1003]]])
1688 self.assertEqual(job.GetInfo(["opstatus"]),
1689 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1691 self._GenericCheckJob(job)
1693 # Calling the processor on a finished job should be a no-op
1694 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1695 jqueue._JobProcessor.FINISHED)
1696 self.assertRaises(IndexError, queue.GetNextUpdate)
1698 def testJobDependency(self):
1699 depmgr = _FakeDependencyManager()
1700 queue = _FakeQueueForProc(depmgr=depmgr)
1702 self.assertEqual(queue.depmgr, depmgr)
1705 prev_job_id2 = 28102
1708 opcodes.OpTestDummy(result="Res0", fail=False,
1710 [prev_job_id2, None],
1711 [prev_job_id, None],
1713 opcodes.OpTestDummy(result="Res1", fail=False),
1717 job = self._CreateJob(queue, job_id, ops)
1719 def _BeforeStart(timeout, priority):
1720 if attempt == 0 or attempt > 5:
1721 # Job should only be updated when it wasn't waiting for another job
1722 self.assertEqual(queue.GetNextUpdate(), (job, True))
1723 self.assertRaises(IndexError, queue.GetNextUpdate)
1724 self.assertFalse(queue.IsAcquired())
1725 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1726 self.assertFalse(job.cur_opctx)
1728 def _AfterStart(op, cbs):
1729 self.assertEqual(queue.GetNextUpdate(), (job, True))
1730 self.assertRaises(IndexError, queue.GetNextUpdate)
1732 self.assertFalse(queue.IsAcquired())
1733 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1734 self.assertFalse(job.cur_opctx)
1736 # Job is running, cancelling shouldn't be possible
1737 (success, _) = job.Cancel()
1738 self.assertFalse(success)
1740 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1742 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1744 counter = itertools.count()
1746 attempt = counter.next()
1748 self.assertRaises(IndexError, queue.GetNextUpdate)
1749 self.assertRaises(IndexError, depmgr.GetNextNotification)
1752 depmgr.AddCheckResult(job, prev_job_id2, None,
1753 (jqueue._JobDependencyManager.WAIT, "wait2"))
1755 depmgr.AddCheckResult(job, prev_job_id2, None,
1756 (jqueue._JobDependencyManager.CONTINUE, "cont"))
1757 # The processor will ask for the next dependency immediately
1758 depmgr.AddCheckResult(job, prev_job_id, None,
1759 (jqueue._JobDependencyManager.WAIT, "wait"))
1761 depmgr.AddCheckResult(job, prev_job_id, None,
1762 (jqueue._JobDependencyManager.WAIT, "wait"))
1764 depmgr.AddCheckResult(job, prev_job_id, None,
1765 (jqueue._JobDependencyManager.CONTINUE, "cont"))
1767 self.assertEqual(depmgr.CountPendingResults(), 2)
1769 self.assertEqual(depmgr.CountPendingResults(), 0)
1771 self.assertEqual(depmgr.CountPendingResults(), 1)
1773 result = jqueue._JobProcessor(queue, opexec, job)()
1774 if attempt == 0 or attempt >= 5:
1775 # Job should only be updated if there was an actual change
1776 self.assertEqual(queue.GetNextUpdate(), (job, True))
1777 self.assertRaises(IndexError, queue.GetNextUpdate)
1778 self.assertFalse(depmgr.CountPendingResults())
1781 # Simulate waiting for other job
1782 self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1783 self.assertTrue(job.cur_opctx)
1784 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1785 self.assertRaises(IndexError, depmgr.GetNextNotification)
1786 self.assert_(job.start_timestamp)
1787 self.assertFalse(job.end_timestamp)
1790 if result == jqueue._JobProcessor.FINISHED:
1792 self.assertFalse(job.cur_opctx)
1795 self.assertRaises(IndexError, depmgr.GetNextNotification)
1797 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1798 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1799 self.assert_(job.start_timestamp)
1800 self.assertFalse(job.end_timestamp)
1802 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1803 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1804 self.assertEqual(job.GetInfo(["opresult"]),
1805 [[op.input.result for op in job.ops]])
1806 self.assertEqual(job.GetInfo(["opstatus"]),
1807 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1808 self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1811 self._GenericCheckJob(job)
1813 self.assertRaises(IndexError, queue.GetNextUpdate)
1814 self.assertRaises(IndexError, depmgr.GetNextNotification)
1815 self.assertFalse(depmgr.CountPendingResults())
1816 self.assertFalse(depmgr.CountWaitingJobs())
1818 # Calling the processor on a finished job should be a no-op
1819 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1820 jqueue._JobProcessor.FINISHED)
1821 self.assertRaises(IndexError, queue.GetNextUpdate)
1823 def testJobDependencyCancel(self):
1824 depmgr = _FakeDependencyManager()
1825 queue = _FakeQueueForProc(depmgr=depmgr)
1827 self.assertEqual(queue.depmgr, depmgr)
1832 opcodes.OpTestDummy(result="Res0", fail=False),
1833 opcodes.OpTestDummy(result="Res1", fail=False,
1835 [prev_job_id, None],
1837 opcodes.OpTestDummy(result="Res2", fail=False),
1841 job = self._CreateJob(queue, job_id, ops)
1843 def _BeforeStart(timeout, priority):
1844 if attempt == 0 or attempt > 5:
1845 # Job should only be updated when it wasn't waiting for another job
1846 self.assertEqual(queue.GetNextUpdate(), (job, True))
1847 self.assertRaises(IndexError, queue.GetNextUpdate)
1848 self.assertFalse(queue.IsAcquired())
1849 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1850 self.assertFalse(job.cur_opctx)
1852 def _AfterStart(op, cbs):
1853 self.assertEqual(queue.GetNextUpdate(), (job, True))
1854 self.assertRaises(IndexError, queue.GetNextUpdate)
1856 self.assertFalse(queue.IsAcquired())
1857 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1858 self.assertFalse(job.cur_opctx)
1860 # Job is running, cancelling shouldn't be possible
1861 (success, _) = job.Cancel()
1862 self.assertFalse(success)
1864 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1866 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1868 counter = itertools.count()
1870 attempt = counter.next()
1872 self.assertRaises(IndexError, queue.GetNextUpdate)
1873 self.assertRaises(IndexError, depmgr.GetNextNotification)
1876 # This will handle the first opcode
1879 depmgr.AddCheckResult(job, prev_job_id, None,
1880 (jqueue._JobDependencyManager.WAIT, "wait"))
1882 # Other job was cancelled
1883 depmgr.AddCheckResult(job, prev_job_id, None,
1884 (jqueue._JobDependencyManager.CANCEL, "cancel"))
1887 self.assertEqual(depmgr.CountPendingResults(), 0)
1889 self.assertEqual(depmgr.CountPendingResults(), 1)
1891 result = jqueue._JobProcessor(queue, opexec, job)()
1892 if attempt <= 1 or attempt >= 4:
1893 # Job should only be updated if there was an actual change
1894 self.assertEqual(queue.GetNextUpdate(), (job, True))
1895 self.assertRaises(IndexError, queue.GetNextUpdate)
1896 self.assertFalse(depmgr.CountPendingResults())
1898 if attempt > 0 and attempt < 4:
1899 # Simulate waiting for other job
1900 self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1901 self.assertTrue(job.cur_opctx)
1902 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1903 self.assertRaises(IndexError, depmgr.GetNextNotification)
1904 self.assert_(job.start_timestamp)
1905 self.assertFalse(job.end_timestamp)
1908 if result == jqueue._JobProcessor.FINISHED:
1910 self.assertFalse(job.cur_opctx)
1913 self.assertRaises(IndexError, depmgr.GetNextNotification)
1915 self.assertEqual(result, jqueue._JobProcessor.DEFER)
1916 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1917 self.assert_(job.start_timestamp)
1918 self.assertFalse(job.end_timestamp)
1920 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1921 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1922 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1923 [[constants.OP_STATUS_SUCCESS,
1924 constants.OP_STATUS_CANCELED,
1925 constants.OP_STATUS_CANCELED],
1926 ["Res0", "Job canceled by request",
1927 "Job canceled by request"]])
1929 self._GenericCheckJob(job)
1931 self.assertRaises(IndexError, queue.GetNextUpdate)
1932 self.assertRaises(IndexError, depmgr.GetNextNotification)
1933 self.assertFalse(depmgr.CountPendingResults())
1935 # Calling the processor on a finished job should be a no-op
1936 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1937 jqueue._JobProcessor.FINISHED)
1938 self.assertRaises(IndexError, queue.GetNextUpdate)
1940 def testJobDependencyWrongstatus(self):
1941 depmgr = _FakeDependencyManager()
1942 queue = _FakeQueueForProc(depmgr=depmgr)
1944 self.assertEqual(queue.depmgr, depmgr)
1949 opcodes.OpTestDummy(result="Res0", fail=False),
1950 opcodes.OpTestDummy(result="Res1", fail=False,
1952 [prev_job_id, None],
1954 opcodes.OpTestDummy(result="Res2", fail=False),
1958 job = self._CreateJob(queue, job_id, ops)
1960 def _BeforeStart(timeout, priority):
1961 if attempt == 0 or attempt > 5:
1962 # Job should only be updated when it wasn't waiting for another job
1963 self.assertEqual(queue.GetNextUpdate(), (job, True))
1964 self.assertRaises(IndexError, queue.GetNextUpdate)
1965 self.assertFalse(queue.IsAcquired())
1966 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1967 self.assertFalse(job.cur_opctx)
1969 def _AfterStart(op, cbs):
1970 self.assertEqual(queue.GetNextUpdate(), (job, True))
1971 self.assertRaises(IndexError, queue.GetNextUpdate)
1973 self.assertFalse(queue.IsAcquired())
1974 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1975 self.assertFalse(job.cur_opctx)
1977 # Job is running, cancelling shouldn't be possible
1978 (success, _) = job.Cancel()
1979 self.assertFalse(success)
1981 opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1983 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1985 counter = itertools.count()
1987 attempt = counter.next()
1989 self.assertRaises(IndexError, queue.GetNextUpdate)
1990 self.assertRaises(IndexError, depmgr.GetNextNotification)
1993 # This will handle the first opcode
1996 depmgr.AddCheckResult(job, prev_job_id, None,
1997 (jqueue._JobDependencyManager.WAIT, "wait"))
2000 depmgr.AddCheckResult(job, prev_job_id, None,
2001 (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
2004 self.assertEqual(depmgr.CountPendingResults(), 0)
2006 self.assertEqual(depmgr.CountPendingResults(), 1)
2008 result = jqueue._JobProcessor(queue, opexec, job)()
2009 if attempt <= 1 or attempt >= 4:
2010 # Job should only be updated if there was an actual change
2011 self.assertEqual(queue.GetNextUpdate(), (job, True))
2012 self.assertRaises(IndexError, queue.GetNextUpdate)
2013 self.assertFalse(depmgr.CountPendingResults())
2015 if attempt > 0 and attempt < 4:
2016 # Simulate waiting for other job
2017 self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
2018 self.assertTrue(job.cur_opctx)
2019 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2020 self.assertRaises(IndexError, depmgr.GetNextNotification)
2021 self.assert_(job.start_timestamp)
2022 self.assertFalse(job.end_timestamp)
2025 if result == jqueue._JobProcessor.FINISHED:
2027 self.assertFalse(job.cur_opctx)
2030 self.assertRaises(IndexError, depmgr.GetNextNotification)
2032 self.assertEqual(result, jqueue._JobProcessor.DEFER)
2033 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2034 self.assert_(job.start_timestamp)
2035 self.assertFalse(job.end_timestamp)
2037 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
2038 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
2039 self.assertEqual(job.GetInfo(["opstatus"]),
2040 [[constants.OP_STATUS_SUCCESS,
2041 constants.OP_STATUS_ERROR,
2042 constants.OP_STATUS_ERROR]]),
2044 (opresult, ) = job.GetInfo(["opresult"])
2045 self.assertEqual(len(opresult), len(ops))
2046 self.assertEqual(opresult[0], "Res0")
2047 self.assertTrue(errors.GetEncodedError(opresult[1]))
2048 self.assertTrue(errors.GetEncodedError(opresult[2]))
2050 self._GenericCheckJob(job)
2052 self.assertRaises(IndexError, queue.GetNextUpdate)
2053 self.assertRaises(IndexError, depmgr.GetNextNotification)
2054 self.assertFalse(depmgr.CountPendingResults())
2056 # Calling the processor on a finished job should be a no-op
2057 self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
2058 jqueue._JobProcessor.FINISHED)
2059 self.assertRaises(IndexError, queue.GetNextUpdate)
2062 class TestEvaluateJobProcessorResult(unittest.TestCase):
2063 def testFinished(self):
2064 depmgr = _FakeDependencyManager()
2065 job = _IdOnlyFakeJob(30953)
2066 jqueue._EvaluateJobProcessorResult(depmgr, job,
2067 jqueue._JobProcessor.FINISHED)
2068 self.assertEqual(depmgr.GetNextNotification(), job.id)
2069 self.assertRaises(IndexError, depmgr.GetNextNotification)
2071 def testDefer(self):
2072 depmgr = _FakeDependencyManager()
2073 job = _IdOnlyFakeJob(11326, priority=5463)
2075 jqueue._EvaluateJobProcessorResult(depmgr, job,
2076 jqueue._JobProcessor.DEFER)
2077 except workerpool.DeferTask, err:
2078 self.assertEqual(err.priority, 5463)
2080 self.fail("Didn't raise exception")
2081 self.assertRaises(IndexError, depmgr.GetNextNotification)
2083 def testWaitdep(self):
2084 depmgr = _FakeDependencyManager()
2085 job = _IdOnlyFakeJob(21317)
2086 jqueue._EvaluateJobProcessorResult(depmgr, job,
2087 jqueue._JobProcessor.WAITDEP)
2088 self.assertRaises(IndexError, depmgr.GetNextNotification)
2090 def testOther(self):
2091 depmgr = _FakeDependencyManager()
2092 job = _IdOnlyFakeJob(5813)
2093 self.assertRaises(errors.ProgrammerError,
2094 jqueue._EvaluateJobProcessorResult,
2095 depmgr, job, "Other result")
2096 self.assertRaises(IndexError, depmgr.GetNextNotification)
2099 class _FakeTimeoutStrategy:
2100 def __init__(self, timeouts):
2101 self.timeouts = timeouts
2103 self.last_timeout = None
2105 def NextAttempt(self):
2108 timeout = self.timeouts.pop(0)
2111 self.last_timeout = timeout
2115 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
2117 self.queue = _FakeQueueForProc()
2120 self.opcounter = None
2121 self.timeout_strategy = None
2123 self.prev_tsop = None
2124 self.prev_prio = None
2125 self.prev_status = None
2126 self.lock_acq_prio = None
2127 self.gave_lock = None
2128 self.done_lock_before_blocking = False
2130 def _BeforeStart(self, timeout, priority):
2133 # If status has changed, job must've been written
2134 if self.prev_status != self.job.ops[self.curop].status:
2135 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2136 self.assertRaises(IndexError, self.queue.GetNextUpdate)
2138 self.assertFalse(self.queue.IsAcquired())
2139 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2141 ts = self.timeout_strategy
2143 self.assert_(timeout is None or isinstance(timeout, (int, float)))
2144 self.assertEqual(timeout, ts.last_timeout)
2145 self.assertEqual(priority, job.ops[self.curop].priority)
2147 self.gave_lock = True
2148 self.lock_acq_prio = priority
2150 if (self.curop == 3 and
2151 job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
2152 # Give locks before running into blocking acquire
2153 assert self.retries == 7
2155 self.done_lock_before_blocking = True
2158 if self.retries > 0:
2159 self.assert_(timeout is not None)
2161 self.gave_lock = False
2162 raise mcpu.LockAcquireTimeout()
2164 if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
2165 assert self.retries == 0, "Didn't exhaust all retries at highest priority"
2166 assert not ts.timeouts
2167 self.assert_(timeout is None)
2169 def _AfterStart(self, op, cbs):
2172 # Setting to "running" requires an update
2173 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2174 self.assertRaises(IndexError, self.queue.GetNextUpdate)
2176 self.assertFalse(self.queue.IsAcquired())
2177 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
2179 # Job is running, cancelling shouldn't be possible
2180 (success, _) = job.Cancel()
2181 self.assertFalse(success)
2183 def _NextOpcode(self):
2184 self.curop = self.opcounter.next()
2185 self.prev_prio = self.job.ops[self.curop].priority
2186 self.prev_status = self.job.ops[self.curop].status
2188 def _NewTimeoutStrategy(self):
2191 self.assertEqual(self.retries, 0)
2193 if self.prev_tsop == self.curop:
2194 # Still on the same opcode, priority must've been increased
2195 self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
2199 timeouts = range(10, 31, 10)
2200 self.retries = len(timeouts) - 1
2202 elif self.curop == 2:
2203 # Let this run into a blocking acquire
2204 timeouts = range(11, 61, 12)
2205 self.retries = len(timeouts)
2207 elif self.curop == 3:
2208 # Wait for priority to increase, but give lock before blocking acquire
2209 timeouts = range(12, 100, 14)
2210 self.retries = len(timeouts)
2212 self.assertFalse(self.done_lock_before_blocking)
2214 elif self.curop == 4:
2215 self.assert_(self.done_lock_before_blocking)
2217 # Timeouts, but no need to retry
2218 timeouts = range(10, 31, 10)
2221 elif self.curop == 5:
2223 timeouts = range(19, 100, 11)
2224 self.retries = len(timeouts)
2230 assert len(job.ops) == 10
2231 assert self.retries <= len(timeouts)
2233 ts = _FakeTimeoutStrategy(timeouts)
2235 self.timeout_strategy = ts
2236 self.prev_tsop = self.curop
2237 self.prev_prio = job.ops[self.curop].priority
2241 def testTimeout(self):
2242 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2247 job = self._CreateJob(self.queue, job_id, ops)
2250 self.opcounter = itertools.count(0)
2252 opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
2254 tsf = self._NewTimeoutStrategy
2256 self.assertFalse(self.done_lock_before_blocking)
2259 proc = jqueue._JobProcessor(self.queue, opexec, job,
2260 _timeout_strategy_factory=tsf)
2262 self.assertRaises(IndexError, self.queue.GetNextUpdate)
2264 if self.curop is not None:
2265 self.prev_status = self.job.ops[self.curop].status
2267 self.lock_acq_prio = None
2269 result = proc(_nextop_fn=self._NextOpcode)
2270 assert self.curop is not None
2272 # Input priority should never be set or modified
2273 self.assertFalse(compat.any(hasattr(op.input, "priority")
2276 if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
2277 # Got lock and/or job is done, result must've been written
2278 self.assertFalse(job.cur_opctx)
2279 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2280 self.assertRaises(IndexError, self.queue.GetNextUpdate)
2281 self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
2282 self.assert_(job.ops[self.curop].exec_timestamp)
2284 if result == jqueue._JobProcessor.FINISHED:
2285 self.assertFalse(job.cur_opctx)
2288 self.assertEqual(result, jqueue._JobProcessor.DEFER)
2291 self.assertEqual(job.ops[self.curop].start_timestamp,
2292 job.start_timestamp)
2295 # Opcode finished, but job not yet done
2296 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2299 self.assert_(job.cur_opctx)
2300 self.assertEqual(job.cur_opctx._timeout_strategy._fn,
2301 self.timeout_strategy.NextAttempt)
2302 self.assertFalse(job.ops[self.curop].exec_timestamp)
2303 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2305 # If priority has changed since acquiring locks, the job must've been
2307 if self.lock_acq_prio != job.ops[self.curop].priority:
2308 self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2310 self.assertRaises(IndexError, self.queue.GetNextUpdate)
2312 self.assert_(job.start_timestamp)
2313 self.assertFalse(job.end_timestamp)
2315 self.assertEqual(self.curop, len(job.ops) - 1)
2316 self.assertEqual(self.job, job)
2317 self.assertEqual(self.opcounter.next(), len(job.ops))
2318 self.assert_(self.done_lock_before_blocking)
2320 self.assertRaises(IndexError, self.queue.GetNextUpdate)
2321 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2322 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2323 self.assertEqual(job.GetInfo(["opresult"]),
2324 [[op.input.result for op in job.ops]])
2325 self.assertEqual(job.GetInfo(["opstatus"]),
2326 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
2327 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
2330 # Calling the processor on a finished job should be a no-op
2331 self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2332 jqueue._JobProcessor.FINISHED)
2333 self.assertRaises(IndexError, self.queue.GetNextUpdate)
2336 class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
2338 self.queue = _FakeQueueForProc()
2339 self.opexecprio = []
2341 def _BeforeStart(self, timeout, priority):
2342 self.assertFalse(self.queue.IsAcquired())
2343 self.opexecprio.append(priority)
2345 def testChangePriorityWhileRunning(self):
2346 # Tests changing the priority on a job while it has finished opcodes
2347 # (successful) and more, unprocessed ones
2348 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2353 job = self._CreateJob(self.queue, job_id, ops)
2355 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2357 opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
2360 self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2361 jqueue._JobProcessor.DEFER)
2363 # Job goes back to queued
2364 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2365 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2366 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2367 [[constants.OP_STATUS_SUCCESS,
2368 constants.OP_STATUS_QUEUED,
2369 constants.OP_STATUS_QUEUED],
2370 ["Res0", None, None]])
2372 self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
2373 self.assertRaises(IndexError, self.opexecprio.pop, 0)
2376 self.assertEqual(job.ChangePriority(-10),
2378 ("Priorities of pending opcodes for job 3499 have"
2379 " been changed to -10")))
2380 self.assertEqual(job.CalcPriority(), -10)
2382 # Process second opcode
2383 self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2384 jqueue._JobProcessor.DEFER)
2386 self.assertEqual(self.opexecprio.pop(0), -10)
2387 self.assertRaises(IndexError, self.opexecprio.pop, 0)
2390 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2391 self.assertEqual(job.CalcPriority(), -10)
2392 self.assertEqual(job.GetInfo(["id"]), [job_id])
2393 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
2394 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2395 [[constants.OP_STATUS_SUCCESS,
2396 constants.OP_STATUS_SUCCESS,
2397 constants.OP_STATUS_QUEUED],
2398 ["Res0", "Res1", None]])
2400 # Change priority once more
2401 self.assertEqual(job.ChangePriority(5),
2403 ("Priorities of pending opcodes for job 3499 have"
2404 " been changed to 5")))
2405 self.assertEqual(job.CalcPriority(), 5)
2407 # Process third opcode
2408 self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2409 jqueue._JobProcessor.FINISHED)
2411 self.assertEqual(self.opexecprio.pop(0), 5)
2412 self.assertRaises(IndexError, self.opexecprio.pop, 0)
2415 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2416 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2417 self.assertEqual(job.GetInfo(["id"]), [job_id])
2418 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2419 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2420 [[constants.OP_STATUS_SUCCESS,
2421 constants.OP_STATUS_SUCCESS,
2422 constants.OP_STATUS_SUCCESS],
2423 ["Res0", "Res1", "Res2"]])
2424 self.assertEqual(map(operator.attrgetter("priority"), job.ops),
2425 [constants.OP_PRIO_DEFAULT, -10, 5])
2428 class _IdOnlyFakeJob:
2429 def __init__(self, job_id, priority=NotImplemented):
2430 self.id = str(job_id)
2431 self._priority = priority
2433 def CalcPriority(self):
2434 return self._priority
2437 class TestJobDependencyManager(unittest.TestCase):
2441 self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
2443 def _GetStatus(self, job_id):
2444 (exp_job_id, result) = self._status.pop(0)
2445 self.assertEqual(exp_job_id, job_id)
2448 def _Enqueue(self, jobs):
2449 self.assertFalse(self.jdm._lock.is_owned(),
2450 msg=("Must not own manager lock while re-adding jobs"
2451 " (potential deadlock)"))
2452 self._queue.append(jobs)
2454 def testNotFinalizedThenCancel(self):
2455 job = _IdOnlyFakeJob(17697)
2458 self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2459 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2460 self.assertEqual(result, self.jdm.WAIT)
2461 self.assertFalse(self._status)
2462 self.assertFalse(self._queue)
2463 self.assertTrue(self.jdm.JobWaiting(job))
2464 self.assertEqual(self.jdm._waiters, {
2467 self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2468 ("job/28625", None, None, [("job", [job.id])])
2471 self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2472 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2473 self.assertEqual(result, self.jdm.CANCEL)
2474 self.assertFalse(self._status)
2475 self.assertFalse(self._queue)
2476 self.assertFalse(self.jdm.JobWaiting(job))
2477 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2479 def testNotFinalizedThenQueued(self):
2480 # This can happen on a queue shutdown
2481 job = _IdOnlyFakeJob(1320)
2486 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2488 self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2489 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2490 self.assertEqual(result, self.jdm.WAIT)
2491 self.assertFalse(self._status)
2492 self.assertFalse(self._queue)
2493 self.assertTrue(self.jdm.JobWaiting(job))
2494 self.assertEqual(self.jdm._waiters, {
2497 self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2498 ("job/22971", None, None, [("job", [job.id])])
2501 def testRequireCancel(self):
2502 job = _IdOnlyFakeJob(5278)
2504 dep_status = [constants.JOB_STATUS_CANCELED]
2506 self._status.append((job_id, constants.JOB_STATUS_WAITING))
2507 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2508 self.assertEqual(result, self.jdm.WAIT)
2509 self.assertFalse(self._status)
2510 self.assertFalse(self._queue)
2511 self.assertTrue(self.jdm.JobWaiting(job))
2512 self.assertEqual(self.jdm._waiters, {
2515 self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2516 ("job/9610", None, None, [("job", [job.id])])
2519 self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2520 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2521 self.assertEqual(result, self.jdm.CONTINUE)
2522 self.assertFalse(self._status)
2523 self.assertFalse(self._queue)
2524 self.assertFalse(self.jdm.JobWaiting(job))
2525 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2527 def testRequireError(self):
2528 job = _IdOnlyFakeJob(21459)
2530 dep_status = [constants.JOB_STATUS_ERROR]
2532 self._status.append((job_id, constants.JOB_STATUS_WAITING))
2533 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2534 self.assertEqual(result, self.jdm.WAIT)
2535 self.assertFalse(self._status)
2536 self.assertFalse(self._queue)
2537 self.assertTrue(self.jdm.JobWaiting(job))
2538 self.assertEqual(self.jdm._waiters, {
2542 self._status.append((job_id, constants.JOB_STATUS_ERROR))
2543 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2544 self.assertEqual(result, self.jdm.CONTINUE)
2545 self.assertFalse(self._status)
2546 self.assertFalse(self._queue)
2547 self.assertFalse(self.jdm.JobWaiting(job))
2548 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2550 def testRequireMultiple(self):
2551 dep_status = list(constants.JOBS_FINALIZED)
2553 for end_status in dep_status:
2554 job = _IdOnlyFakeJob(21343)
2557 self._status.append((job_id, constants.JOB_STATUS_WAITING))
2558 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2559 self.assertEqual(result, self.jdm.WAIT)
2560 self.assertFalse(self._status)
2561 self.assertFalse(self._queue)
2562 self.assertTrue(self.jdm.JobWaiting(job))
2563 self.assertEqual(self.jdm._waiters, {
2566 self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2567 ("job/14609", None, None, [("job", [job.id])])
2570 self._status.append((job_id, end_status))
2571 (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2572 self.assertEqual(result, self.jdm.CONTINUE)
2573 self.assertFalse(self._status)
2574 self.assertFalse(self._queue)
2575 self.assertFalse(self.jdm.JobWaiting(job))
2576 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2578 def testNotify(self):
2579 job = _IdOnlyFakeJob(8227)
2582 self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2583 (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2584 self.assertEqual(result, self.jdm.WAIT)
2585 self.assertFalse(self._status)
2586 self.assertFalse(self._queue)
2587 self.assertTrue(self.jdm.JobWaiting(job))
2588 self.assertEqual(self.jdm._waiters, {
2592 self.jdm.NotifyWaiters(job_id)
2593 self.assertFalse(self._status)
2594 self.assertFalse(self.jdm._waiters)
2595 self.assertFalse(self.jdm.JobWaiting(job))
2596 self.assertEqual(self._queue, [set([job])])
2598 def testWrongStatus(self):
2599 job = _IdOnlyFakeJob(10102)
2602 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2603 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2604 [constants.JOB_STATUS_SUCCESS])
2605 self.assertEqual(result, self.jdm.WAIT)
2606 self.assertFalse(self._status)
2607 self.assertFalse(self._queue)
2608 self.assertTrue(self.jdm.JobWaiting(job))
2609 self.assertEqual(self.jdm._waiters, {
2613 self._status.append((job_id, constants.JOB_STATUS_ERROR))
2614 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2615 [constants.JOB_STATUS_SUCCESS])
2616 self.assertEqual(result, self.jdm.WRONGSTATUS)
2617 self.assertFalse(self._status)
2618 self.assertFalse(self._queue)
2619 self.assertFalse(self.jdm.JobWaiting(job))
2621 def testCorrectStatus(self):
2622 job = _IdOnlyFakeJob(24273)
2625 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2626 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2627 [constants.JOB_STATUS_SUCCESS])
2628 self.assertEqual(result, self.jdm.WAIT)
2629 self.assertFalse(self._status)
2630 self.assertFalse(self._queue)
2631 self.assertTrue(self.jdm.JobWaiting(job))
2632 self.assertEqual(self.jdm._waiters, {
2636 self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2637 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2638 [constants.JOB_STATUS_SUCCESS])
2639 self.assertEqual(result, self.jdm.CONTINUE)
2640 self.assertFalse(self._status)
2641 self.assertFalse(self._queue)
2642 self.assertFalse(self.jdm.JobWaiting(job))
2644 def testFinalizedRightAway(self):
2645 job = _IdOnlyFakeJob(224)
2648 self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2649 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2650 [constants.JOB_STATUS_SUCCESS])
2651 self.assertEqual(result, self.jdm.CONTINUE)
2652 self.assertFalse(self._status)
2653 self.assertFalse(self._queue)
2654 self.assertFalse(self.jdm.JobWaiting(job))
2655 self.assertEqual(self.jdm._waiters, {
2660 self.jdm.NotifyWaiters("0")
2661 self.assertFalse(self.jdm._waiters)
2662 self.assertFalse(self._status)
2663 self.assertFalse(self._queue)
2665 def testMultipleWaiting(self):
2666 # Use a deterministic random generator
2667 rnd = random.Random(21402)
2669 job_ids = map(str, rnd.sample(range(1, 10000), 150))
2671 waiters = dict((job_ids.pop(),
2672 set(map(_IdOnlyFakeJob,
2674 for _ in range(rnd.randint(1, 20))])))
2677 # Ensure there are no duplicate job IDs
2678 assert not utils.FindDuplicates(waiters.keys() +
2680 for jobs in waiters.values()
2683 # Register all jobs as waiters
2684 for job_id, job in [(job_id, job)
2685 for (job_id, jobs) in waiters.items()
2687 self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2688 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2689 [constants.JOB_STATUS_SUCCESS])
2690 self.assertEqual(result, self.jdm.WAIT)
2691 self.assertFalse(self._status)
2692 self.assertFalse(self._queue)
2693 self.assertTrue(self.jdm.JobWaiting(job))
2695 self.assertEqual(self.jdm._waiters, waiters)
2697 def _MakeSet((name, mode, owner_names, pending)):
2698 return (name, mode, owner_names,
2699 [(pendmode, set(pend)) for (pendmode, pend) in pending])
2701 def _CheckLockInfo():
2702 info = self.jdm.GetLockInfo([query.LQ_PENDING])
2703 self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2704 ("job/%s" % job_id, None, None,
2705 [("job", set([job.id for job in jobs]))])
2706 for job_id, jobs in waiters.items()
2712 # Notify in random order
2713 for job_id in rnd.sample(waiters, len(waiters)):
2714 # Remove from pending waiter list
2715 jobs = waiters.pop(job_id)
2717 self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2718 (result, _) = self.jdm.CheckAndRegister(job, job_id,
2719 [constants.JOB_STATUS_SUCCESS])
2720 self.assertEqual(result, self.jdm.CONTINUE)
2721 self.assertFalse(self._status)
2722 self.assertFalse(self._queue)
2723 self.assertFalse(self.jdm.JobWaiting(job))
2727 self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2731 def testSelfDependency(self):
2732 job = _IdOnlyFakeJob(18937)
2734 self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2735 (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2736 self.assertEqual(result, self.jdm.ERROR)
2738 def testJobDisappears(self):
2739 job = _IdOnlyFakeJob(30540)
2743 raise errors.JobLost("#msg#")
2745 jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2746 (result, _) = jdm.CheckAndRegister(job, job_id, [])
2747 self.assertEqual(result, self.jdm.ERROR)
2748 self.assertFalse(jdm.JobWaiting(job))
2749 self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2752 if __name__ == "__main__":
2753 testutils.GanetiTestProgram()