4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.jqueue"""
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import jqueue
36 from ganeti import opcodes
37 from ganeti import compat
38 from ganeti import mcpu
44 def __init__(self, job_id, status):
49 def SetStatus(self, status):
52 def AddLogEntry(self, msg):
53 self._log.append((len(self._log), msg))
58 def GetInfo(self, fields):
63 result.append(self._status)
65 raise Exception("Unknown field")
69 def GetLogEntries(self, newer_than):
70 assert newer_than is None or newer_than >= 0
72 if newer_than is None:
75 return self._log[newer_than:]
78 class TestJobChangesChecker(unittest.TestCase):
80 job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81 checker = jqueue._JobChangesChecker(["status"], None, None)
82 self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
84 job.SetStatus(constants.JOB_STATUS_RUNNING)
85 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
87 job.SetStatus(constants.JOB_STATUS_SUCCESS)
88 self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
90 # job.id is used by checker
91 self.assertEqual(job.id, 9094)
93 def testStatusWithPrev(self):
94 job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95 checker = jqueue._JobChangesChecker(["status"],
96 [constants.JOB_STATUS_QUEUED], None)
97 self.assert_(checker(job) is None)
99 job.SetStatus(constants.JOB_STATUS_RUNNING)
100 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
102 def testFinalStatus(self):
103 for status in constants.JOBS_FINALIZED:
104 job = _FakeJob(2178711, status)
105 checker = jqueue._JobChangesChecker(["status"], [status], None)
106 # There won't be any changes in this status, hence it should signal
107 # a change immediately
108 self.assertEqual(checker(job), ([status], []))
111 job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112 checker = jqueue._JobChangesChecker(["status"], None, None)
113 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
115 job.AddLogEntry("Hello World")
116 (job_info, log_entries) = checker(job)
117 self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118 self.assertEqual(log_entries, [[0, "Hello World"]])
120 checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121 self.assert_(checker2(job) is None)
123 job.AddLogEntry("Foo Bar")
124 job.SetStatus(constants.JOB_STATUS_ERROR)
126 (job_info, log_entries) = checker2(job)
127 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128 self.assertEqual(log_entries, [[1, "Foo Bar"]])
130 checker3 = jqueue._JobChangesChecker(["status"], None, None)
131 (job_info, log_entries) = checker3(job)
132 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133 self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
136 class TestJobChangesWaiter(unittest.TestCase):
138 self.tmpdir = tempfile.mkdtemp()
139 self.filename = utils.PathJoin(self.tmpdir, "job-1")
140 utils.WriteFile(self.filename, data="")
143 shutil.rmtree(self.tmpdir)
145 def _EnsureNotifierClosed(self, notifier):
147 os.fstat(notifier._fd)
148 except EnvironmentError, err:
149 self.assertEqual(err.errno, errno.EBADF)
151 self.fail("File descriptor wasn't closed")
154 for wait in [False, True]:
155 waiter = jqueue._JobFileChangesWaiter(self.filename)
162 # Ensure file descriptor was closed
163 self._EnsureNotifierClosed(waiter._notifier)
165 def testChangingFile(self):
166 waiter = jqueue._JobFileChangesWaiter(self.filename)
168 self.assertFalse(waiter.Wait(0.1))
169 utils.WriteFile(self.filename, data="changed")
170 self.assert_(waiter.Wait(60))
174 self._EnsureNotifierClosed(waiter._notifier)
176 def testChangingFile2(self):
177 waiter = jqueue._JobChangesWaiter(self.filename)
179 self.assertFalse(waiter._filewaiter)
180 self.assert_(waiter.Wait(0.1))
181 self.assert_(waiter._filewaiter)
183 # File waiter is now used, but there have been no changes
184 self.assertFalse(waiter.Wait(0.1))
185 utils.WriteFile(self.filename, data="changed")
186 self.assert_(waiter.Wait(60))
190 self._EnsureNotifierClosed(waiter._filewaiter._notifier)
193 class TestWaitForJobChangesHelper(unittest.TestCase):
195 self.tmpdir = tempfile.mkdtemp()
196 self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197 utils.WriteFile(self.filename, data="")
200 shutil.rmtree(self.tmpdir)
202 def _LoadWaitingJob(self):
203 return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
205 def _LoadLostJob(self):
208 def testNoChanges(self):
209 wfjc = jqueue._WaitForJobChangesHelper()
212 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213 [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214 constants.JOB_NOTCHANGED)
216 # No previous information
217 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218 ["status"], None, None, 1.0),
219 ([constants.JOB_STATUS_WAITLOCK], []))
221 def testLostJob(self):
222 wfjc = jqueue._WaitForJobChangesHelper()
223 self.assert_(wfjc(self.filename, self._LoadLostJob,
224 ["status"], None, None, 1.0) is None)
227 class TestEncodeOpError(unittest.TestCase):
229 encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230 self.assert_(isinstance(encerr, tuple))
231 self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
233 encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234 self.assert_(isinstance(encerr, tuple))
235 self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
237 encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238 self.assert_(isinstance(encerr, tuple))
239 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
241 encerr = jqueue._EncodeOpError("Hello World")
242 self.assert_(isinstance(encerr, tuple))
243 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
246 class TestQueuedOpCode(unittest.TestCase):
247 def testDefaults(self):
249 self.assertFalse(hasattr(op.input, "dry_run"))
250 self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251 self.assertFalse(op.log)
252 self.assert_(op.start_timestamp is None)
253 self.assert_(op.exec_timestamp is None)
254 self.assert_(op.end_timestamp is None)
255 self.assert_(op.result is None)
256 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
258 op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
260 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
262 self.assertEqual(op1.Serialize(), op2.Serialize())
264 def testPriority(self):
266 assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267 "Default priority equals high priority; test can't work"
268 self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
271 inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
272 op1 = jqueue._QueuedOpCode(inpop)
274 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
276 self.assertEqual(op1.Serialize(), op2.Serialize())
279 class TestQueuedJob(unittest.TestCase):
281 self.assertRaises(errors.GenericError, jqueue._QueuedJob,
284 def testDefaults(self):
288 opcodes.OpTestDelay(),
292 self.assertEqual(job.id, job_id)
293 self.assertEqual(job.log_serial, 0)
294 self.assert_(job.received_timestamp)
295 self.assert_(job.start_timestamp is None)
296 self.assert_(job.end_timestamp is None)
297 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299 self.assert_(repr(job).startswith("<"))
300 self.assertEqual(len(job.ops), len(ops))
301 self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302 for (inp, op) in zip(ops, job.ops)))
303 self.assertRaises(errors.OpExecError, job.GetInfo,
305 self.assertEqual(job.GetInfo(["summary"]),
306 [[op.input.Summary() for op in job.ops]])
308 job1 = jqueue._QueuedJob(None, job_id, ops)
310 job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
312 self.assertEqual(job1.Serialize(), job2.Serialize())
314 def testPriority(self):
317 opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
318 opcodes.OpTestDelay(),
322 self.assertEqual(job.id, job_id)
323 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324 self.assert_(repr(job).startswith("<"))
326 job = jqueue._QueuedJob(None, job_id, ops)
328 self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
330 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
333 job.ops[0].priority -= 1
335 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
337 # Mark opcode as finished
338 job.ops[0].status = constants.OP_STATUS_SUCCESS
340 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
343 job.ops[1].priority -= 10
344 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
346 # Test increasing first
347 job.ops[0].status = constants.OP_STATUS_RUNNING
348 job.ops[0].priority -= 19
349 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
351 def testCalcStatus(self):
353 # The default status is "queued"
354 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
358 ops[0].status = constants.OP_STATUS_WAITLOCK
361 ops[0].status = constants.OP_STATUS_SUCCESS
362 ops[1].status = constants.OP_STATUS_SUCCESS
363 ops[2].status = constants.OP_STATUS_WAITLOCK
366 ops[0].status = constants.OP_STATUS_SUCCESS
367 ops[1].status = constants.OP_STATUS_RUNNING
369 op.status = constants.OP_STATUS_QUEUED
371 def _Canceling1(ops):
372 ops[0].status = constants.OP_STATUS_SUCCESS
373 ops[1].status = constants.OP_STATUS_SUCCESS
375 op.status = constants.OP_STATUS_CANCELING
377 def _Canceling2(ops):
379 op.status = constants.OP_STATUS_CANCELING
383 op.status = constants.OP_STATUS_CANCELED
386 for idx, op in enumerate(ops):
388 op.status = constants.OP_STATUS_ERROR
390 op.status = constants.OP_STATUS_SUCCESS
394 op.status = constants.OP_STATUS_ERROR
398 op.status = constants.OP_STATUS_SUCCESS
401 constants.JOB_STATUS_QUEUED: [_Queued],
402 constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403 constants.JOB_STATUS_RUNNING: [_Running],
404 constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405 constants.JOB_STATUS_CANCELED: [_Canceled],
406 constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407 constants.JOB_STATUS_SUCCESS: [_Success],
411 job = jqueue._QueuedJob(None, 1,
412 [opcodes.OpTestDelay() for _ in range(10)])
413 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414 self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
418 for status in constants.JOB_STATUS_ALL:
419 sttests = tests[status]
424 self.assertEqual(job.CalcStatus(), status)
427 class _FakeQueueForProc:
429 self._acquired = False
431 def IsAcquired(self):
432 return self._acquired
434 def acquire(self, shared=0):
436 self._acquired = True
439 assert self._acquired
440 self._acquired = False
442 def UpdateJobUnlocked(self, job, replicate=None):
443 # TODO: Ensure job is updated at the correct places
447 class _FakeExecOpCodeForProc:
448 def __init__(self, before_start, after_start):
449 self._before_start = before_start
450 self._after_start = after_start
452 def __call__(self, op, cbs, timeout=None):
453 assert isinstance(op, opcodes.OpTestDummy)
455 if self._before_start:
456 self._before_start(timeout)
460 if self._after_start:
461 self._after_start(op, cbs)
464 raise errors.OpExecError("Error requested (%s)" % op.result)
469 class _JobProcessorTestUtils:
470 def _CreateJob(self, queue, job_id, ops):
471 job = jqueue._QueuedJob(queue, job_id, ops)
472 self.assertFalse(job.start_timestamp)
473 self.assertFalse(job.end_timestamp)
474 self.assertEqual(len(ops), len(job.ops))
475 self.assert_(compat.all(op.input == inp
476 for (op, inp) in zip(job.ops, ops)))
477 self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
481 class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
482 def _GenericCheckJob(self, job):
483 assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
486 self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
487 [[op.start_timestamp for op in job.ops],
488 [op.exec_timestamp for op in job.ops],
489 [op.end_timestamp for op in job.ops]])
490 self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
491 [job.received_timestamp,
494 self.assert_(job.start_timestamp)
495 self.assert_(job.end_timestamp)
496 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
498 def testSuccess(self):
499 queue = _FakeQueueForProc()
501 for (job_id, opcount) in [(25351, 1), (6637, 3),
502 (24644, 10), (32207, 100)]:
503 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
504 for i in range(opcount)]
507 job = self._CreateJob(queue, job_id, ops)
510 self.assertFalse(queue.IsAcquired())
511 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
513 def _AfterStart(op, cbs):
514 self.assertFalse(queue.IsAcquired())
515 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
517 # Job is running, cancelling shouldn't be possible
518 (success, _) = job.Cancel()
519 self.assertFalse(success)
521 opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
523 for idx in range(len(ops)):
524 result = jqueue._JobProcessor(queue, opexec, job)()
525 if idx == len(ops) - 1:
529 self.assertFalse(result)
531 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
532 self.assert_(job.start_timestamp)
533 self.assertFalse(job.end_timestamp)
535 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
536 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
537 self.assertEqual(job.GetInfo(["opresult"]),
538 [[op.input.result for op in job.ops]])
539 self.assertEqual(job.GetInfo(["opstatus"]),
540 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
541 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
544 self._GenericCheckJob(job)
546 # Finished jobs can't be processed any further
547 self.assertRaises(errors.ProgrammerError,
548 jqueue._JobProcessor(queue, opexec, job))
550 def testOpcodeError(self):
551 queue = _FakeQueueForProc()
558 (23816, 100, 39, 45),
561 for (job_id, opcount, failfrom, failto) in testdata:
563 ops = [opcodes.OpTestDummy(result="Res%s" % i,
564 fail=(failfrom <= i and
566 for i in range(opcount)]
569 job = self._CreateJob(queue, job_id, ops)
571 opexec = _FakeExecOpCodeForProc(None, None)
573 for idx in range(len(ops)):
574 result = jqueue._JobProcessor(queue, opexec, job)()
576 if idx in (failfrom, len(ops) - 1):
581 self.assertFalse(result)
583 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
586 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
587 self.assertEqual(job.GetInfo(["id"]), [job_id])
588 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
590 # Check opcode status
592 job.GetInfo(["opstatus"])[0],
593 job.GetInfo(["opresult"])[0])
595 for idx, (op, opstatus, opresult) in enumerate(data):
597 assert not op.input.fail
598 self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
599 self.assertEqual(opresult, op.input.result)
602 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
603 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
605 assert not op.input.fail
606 self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
607 self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
609 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
610 for op in job.ops[:failfrom]))
612 self._GenericCheckJob(job)
614 # Finished jobs can't be processed any further
615 self.assertRaises(errors.ProgrammerError,
616 jqueue._JobProcessor(queue, opexec, job))
618 def testCancelWhileInQueue(self):
619 queue = _FakeQueueForProc()
621 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
626 job = self._CreateJob(queue, job_id, ops)
628 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
631 (success, _) = job.Cancel()
632 self.assert_(success)
634 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
637 opexec = _FakeExecOpCodeForProc(None, None)
638 jqueue._JobProcessor(queue, opexec, job)()
641 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
642 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
643 self.assertFalse(job.start_timestamp)
644 self.assert_(job.end_timestamp)
645 self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
647 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
648 [[constants.OP_STATUS_CANCELED for _ in job.ops],
649 ["Job canceled by request" for _ in job.ops]])
651 def testCancelWhileWaitlock(self):
652 queue = _FakeQueueForProc()
654 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
659 job = self._CreateJob(queue, job_id, ops)
661 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
664 self.assertFalse(queue.IsAcquired())
665 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
668 (success, _) = job.Cancel()
669 self.assert_(success)
671 self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
674 def _AfterStart(op, cbs):
675 self.assertFalse(queue.IsAcquired())
676 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
678 opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
680 jqueue._JobProcessor(queue, opexec, job)()
683 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
684 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
685 self.assert_(job.start_timestamp)
686 self.assert_(job.end_timestamp)
687 self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
689 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
690 [[constants.OP_STATUS_CANCELED for _ in job.ops],
691 ["Job canceled by request" for _ in job.ops]])
693 def testCancelWhileRunning(self):
694 # Tests canceling a job with finished opcodes and more, unprocessed ones
695 queue = _FakeQueueForProc()
697 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
702 job = self._CreateJob(queue, job_id, ops)
704 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
706 opexec = _FakeExecOpCodeForProc(None, None)
709 self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
711 # Job goes back to queued
712 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
713 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
714 [[constants.OP_STATUS_SUCCESS,
715 constants.OP_STATUS_QUEUED,
716 constants.OP_STATUS_QUEUED],
717 ["Res0", None, None]])
720 (success, _) = job.Cancel()
721 self.assert_(success)
723 # Try processing another opcode (this will actually cancel the job)
724 self.assert_(jqueue._JobProcessor(queue, opexec, job)())
727 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
728 self.assertEqual(job.GetInfo(["id"]), [job_id])
729 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
730 self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
731 [[constants.OP_STATUS_SUCCESS,
732 constants.OP_STATUS_CANCELED,
733 constants.OP_STATUS_CANCELED],
734 ["Res0", "Job canceled by request",
735 "Job canceled by request"]])
737 def testPartiallyRun(self):
738 # Tests calling the processor on a job that's been partially run before the
739 # program was restarted
740 queue = _FakeQueueForProc()
742 opexec = _FakeExecOpCodeForProc(None, None)
744 for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
745 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
749 job = self._CreateJob(queue, job_id, ops)
751 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
753 for _ in range(successcount):
754 self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
756 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
757 self.assertEqual(job.GetInfo(["opstatus"]),
758 [[constants.OP_STATUS_SUCCESS
759 for _ in range(successcount)] +
760 [constants.OP_STATUS_QUEUED
761 for _ in range(len(ops) - successcount)]])
763 self.assert_(job.ops_iter)
765 # Serialize and restore (simulates program restart)
766 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
767 self.assertFalse(newjob.ops_iter)
768 self._TestPartial(newjob, successcount)
770 def _TestPartial(self, job, successcount):
771 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
772 self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
774 queue = _FakeQueueForProc()
775 opexec = _FakeExecOpCodeForProc(None, None)
777 for remaining in reversed(range(len(job.ops) - successcount)):
778 result = jqueue._JobProcessor(queue, opexec, job)()
785 self.assertFalse(result)
787 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
789 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
790 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
791 self.assertEqual(job.GetInfo(["opresult"]),
792 [[op.input.result for op in job.ops]])
793 self.assertEqual(job.GetInfo(["opstatus"]),
794 [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
795 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
798 self._GenericCheckJob(job)
800 # Finished jobs can't be processed any further
801 self.assertRaises(errors.ProgrammerError,
802 jqueue._JobProcessor(queue, opexec, job))
804 # ... also after being restored
805 job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
806 self.assertRaises(errors.ProgrammerError,
807 jqueue._JobProcessor(queue, opexec, job2))
809 def testProcessorOnRunningJob(self):
810 ops = [opcodes.OpTestDummy(result="result", fail=False)]
812 queue = _FakeQueueForProc()
813 opexec = _FakeExecOpCodeForProc(None, None)
816 job = self._CreateJob(queue, 9571, ops)
818 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
820 job.ops[0].status = constants.OP_STATUS_RUNNING
822 assert len(job.ops) == 1
824 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
826 # Calling on running job must fail
827 self.assertRaises(errors.ProgrammerError,
828 jqueue._JobProcessor(queue, opexec, job))
830 def testLogMessages(self):
831 # Tests the "Feedback" callback function
832 queue = _FakeQueueForProc()
838 (constants.ELOG_MESSAGE, "there"),
841 (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
842 (constants.ELOG_JQUEUE_TEST, ("other", "type")),
845 ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
846 messages=messages.get(i, []))
850 job = self._CreateJob(queue, 29386, ops)
853 self.assertFalse(queue.IsAcquired())
854 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
856 def _AfterStart(op, cbs):
857 self.assertFalse(queue.IsAcquired())
858 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
860 self.assertRaises(AssertionError, cbs.Feedback,
861 "too", "many", "arguments")
863 for (log_type, msg) in op.messages:
865 cbs.Feedback(log_type, msg)
869 opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
871 for remaining in reversed(range(len(job.ops))):
872 result = jqueue._JobProcessor(queue, opexec, job)()
879 self.assertFalse(result)
881 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
883 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
884 self.assertEqual(job.GetInfo(["opresult"]),
885 [[op.input.result for op in job.ops]])
887 logmsgcount = sum(len(m) for m in messages.values())
889 self._CheckLogMessages(job, logmsgcount)
891 # Serialize and restore (simulates program restart)
892 newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
893 self._CheckLogMessages(newjob, logmsgcount)
897 for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
898 for (serial, timestamp, log_type, msg) in oplog:
899 (exptype, expmsg) = messages.get(idx).pop(0)
901 self.assertEqual(log_type, exptype)
903 self.assertEqual(log_type, constants.ELOG_MESSAGE)
904 self.assertEqual(expmsg, msg)
905 self.assert_(serial > prevserial)
908 def _CheckLogMessages(self, job, count):
910 self.assertEqual(job.log_serial, count)
913 self.assertEqual(job.GetLogEntries(None),
914 [entry for entries in job.GetInfo(["oplog"])[0] if entries
915 for entry in entries])
919 self.assert_(job.GetLogEntries(3))
920 self.assertEqual(job.GetLogEntries(3),
921 [entry for entries in job.GetInfo(["oplog"])[0] if entries
922 for entry in entries][3:])
924 # No log message after highest serial
925 self.assertFalse(job.GetLogEntries(count))
926 self.assertFalse(job.GetLogEntries(count + 3))
929 class _FakeTimeoutStrategy:
930 def __init__(self, timeouts):
931 self.timeouts = timeouts
933 self.last_timeout = None
935 def NextAttempt(self):
938 timeout = self.timeouts.pop(0)
941 self.last_timeout = timeout
945 class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
947 self.queue = _FakeQueueForProc()
950 self.opcounter = None
951 self.timeout_strategy = None
953 self.prev_tsop = None
954 self.prev_prio = None
955 self.gave_lock = None
956 self.done_lock_before_blocking = False
958 def _BeforeStart(self, timeout):
961 self.assertFalse(self.queue.IsAcquired())
962 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
964 ts = self.timeout_strategy
966 self.assert_(timeout is None or isinstance(timeout, (int, float)))
967 self.assertEqual(timeout, ts.last_timeout)
969 self.gave_lock = True
971 if (self.curop == 3 and
972 job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
973 # Give locks before running into blocking acquire
974 assert self.retries == 7
976 self.done_lock_before_blocking = True
980 self.assert_(timeout is not None)
982 self.gave_lock = False
983 raise mcpu.LockAcquireTimeout()
985 if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
986 assert self.retries == 0, "Didn't exhaust all retries at highest priority"
987 assert not ts.timeouts
988 self.assert_(timeout is None)
990 def _AfterStart(self, op, cbs):
993 self.assertFalse(self.queue.IsAcquired())
994 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
996 # Job is running, cancelling shouldn't be possible
997 (success, _) = job.Cancel()
998 self.assertFalse(success)
1000 def _NextOpcode(self):
1001 self.curop = self.opcounter.next()
1002 self.prev_prio = self.job.ops[self.curop].priority
1004 def _NewTimeoutStrategy(self):
1007 self.assertEqual(self.retries, 0)
1009 if self.prev_tsop == self.curop:
1010 # Still on the same opcode, priority must've been increased
1011 self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1015 timeouts = range(10, 31, 10)
1016 self.retries = len(timeouts) - 1
1018 elif self.curop == 2:
1019 # Let this run into a blocking acquire
1020 timeouts = range(11, 61, 12)
1021 self.retries = len(timeouts)
1023 elif self.curop == 3:
1024 # Wait for priority to increase, but give lock before blocking acquire
1025 timeouts = range(12, 100, 14)
1026 self.retries = len(timeouts)
1028 self.assertFalse(self.done_lock_before_blocking)
1030 elif self.curop == 4:
1031 self.assert_(self.done_lock_before_blocking)
1033 # Timeouts, but no need to retry
1034 timeouts = range(10, 31, 10)
1037 elif self.curop == 5:
1039 timeouts = range(19, 100, 11)
1040 self.retries = len(timeouts)
1046 assert len(job.ops) == 10
1047 assert self.retries <= len(timeouts)
1049 ts = _FakeTimeoutStrategy(timeouts)
1051 self.timeout_strategy = ts
1052 self.prev_tsop = self.curop
1053 self.prev_prio = job.ops[self.curop].priority
1057 def testTimeout(self):
1058 ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1063 job = self._CreateJob(self.queue, job_id, ops)
1066 self.opcounter = itertools.count(0)
1068 opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart)
1069 tsf = self._NewTimeoutStrategy
1071 self.assertFalse(self.done_lock_before_blocking)
1073 for i in itertools.count(0):
1074 proc = jqueue._JobProcessor(self.queue, opexec, job,
1075 _timeout_strategy_factory=tsf)
1077 result = proc(_nextop_fn=self._NextOpcode)
1079 self.assertFalse(job.cur_opctx)
1082 self.assertFalse(result)
1085 self.assertFalse(job.cur_opctx)
1087 self.assert_(job.cur_opctx)
1088 self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1089 self.timeout_strategy.NextAttempt)
1091 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1092 self.assert_(job.start_timestamp)
1093 self.assertFalse(job.end_timestamp)
1095 self.assertEqual(self.curop, len(job.ops) - 1)
1096 self.assertEqual(self.job, job)
1097 self.assertEqual(self.opcounter.next(), len(job.ops))
1098 self.assert_(self.done_lock_before_blocking)
1100 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1101 self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1102 self.assertEqual(job.GetInfo(["opresult"]),
1103 [[op.input.result for op in job.ops]])
1104 self.assertEqual(job.GetInfo(["opstatus"]),
1105 [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1106 self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1109 # Finished jobs can't be processed any further
1110 self.assertRaises(errors.ProgrammerError,
1111 jqueue._JobProcessor(self.queue, opexec, job))
1114 if __name__ == "__main__":
1115 testutils.GanetiTestProgram()