29 |
29 |
import errno
|
30 |
30 |
import itertools
|
31 |
31 |
import random
|
|
32 |
import operator
|
32 |
33 |
|
33 |
34 |
from ganeti import constants
|
34 |
35 |
from ganeti import utils
|
... | ... | |
281 |
282 |
|
282 |
283 |
|
283 |
284 |
class TestQueuedJob(unittest.TestCase):
|
284 |
|
def test(self):
|
|
285 |
def testNoOpCodes(self):
|
285 |
286 |
self.assertRaises(errors.GenericError, jqueue._QueuedJob,
|
286 |
287 |
None, 1, [], False)
|
287 |
288 |
|
... | ... | |
371 |
372 |
job.ops[0].priority -= 19
|
372 |
373 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
|
373 |
374 |
|
|
375 |
def _JobForPriority(self, job_id):
|
|
376 |
ops = [
|
|
377 |
opcodes.OpTagsGet(),
|
|
378 |
opcodes.OpTestDelay(),
|
|
379 |
opcodes.OpTagsGet(),
|
|
380 |
opcodes.OpTestDelay(),
|
|
381 |
]
|
|
382 |
|
|
383 |
job = jqueue._QueuedJob(None, job_id, ops, True)
|
|
384 |
|
|
385 |
self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
|
|
386 |
for op in job.ops))
|
|
387 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
|
|
388 |
self.assertFalse(compat.any(hasattr(op.input, "priority")
|
|
389 |
for op in job.ops))
|
|
390 |
|
|
391 |
return job
|
|
392 |
|
|
393 |
def testChangePriorityAllQueued(self):
|
|
394 |
job = self._JobForPriority(24984)
|
|
395 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
|
|
396 |
self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
|
|
397 |
for op in job.ops))
|
|
398 |
result = job.ChangePriority(-10)
|
|
399 |
self.assertEqual(job.CalcPriority(), -10)
|
|
400 |
self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
|
|
401 |
self.assertTrue(compat.all(op.input.priority == -10 for op in job.ops))
|
|
402 |
self.assertEqual(result,
|
|
403 |
(True, ("Priorities of pending opcodes for job 24984 have"
|
|
404 |
" been changed to -10")))
|
|
405 |
|
|
406 |
def testChangePriorityAllFinished(self):
|
|
407 |
job = self._JobForPriority(16405)
|
|
408 |
|
|
409 |
for (idx, op) in enumerate(job.ops):
|
|
410 |
if idx > 2:
|
|
411 |
op.status = constants.OP_STATUS_ERROR
|
|
412 |
else:
|
|
413 |
op.status = constants.OP_STATUS_SUCCESS
|
|
414 |
|
|
415 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
|
|
416 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
|
|
417 |
result = job.ChangePriority(-10)
|
|
418 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
|
|
419 |
self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
|
|
420 |
for op in job.ops))
|
|
421 |
self.assertFalse(compat.any(hasattr(op.input, "priority")
|
|
422 |
for op in job.ops))
|
|
423 |
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
|
|
424 |
constants.OP_STATUS_SUCCESS,
|
|
425 |
constants.OP_STATUS_SUCCESS,
|
|
426 |
constants.OP_STATUS_SUCCESS,
|
|
427 |
constants.OP_STATUS_ERROR,
|
|
428 |
])
|
|
429 |
self.assertEqual(result, (False, "Job 16405 is finished"))
|
|
430 |
|
|
431 |
def testChangePriorityCancelling(self):
|
|
432 |
job = self._JobForPriority(31572)
|
|
433 |
|
|
434 |
for (idx, op) in enumerate(job.ops):
|
|
435 |
if idx > 1:
|
|
436 |
op.status = constants.OP_STATUS_CANCELING
|
|
437 |
else:
|
|
438 |
op.status = constants.OP_STATUS_SUCCESS
|
|
439 |
|
|
440 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
|
|
441 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
|
|
442 |
result = job.ChangePriority(5)
|
|
443 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
|
|
444 |
self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
|
|
445 |
for op in job.ops))
|
|
446 |
self.assertFalse(compat.any(hasattr(op.input, "priority")
|
|
447 |
for op in job.ops))
|
|
448 |
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
|
|
449 |
constants.OP_STATUS_SUCCESS,
|
|
450 |
constants.OP_STATUS_SUCCESS,
|
|
451 |
constants.OP_STATUS_CANCELING,
|
|
452 |
constants.OP_STATUS_CANCELING,
|
|
453 |
])
|
|
454 |
self.assertEqual(result, (False, "Job 31572 is cancelling"))
|
|
455 |
|
|
456 |
def testChangePriorityFirstRunning(self):
|
|
457 |
job = self._JobForPriority(1716215889)
|
|
458 |
|
|
459 |
for (idx, op) in enumerate(job.ops):
|
|
460 |
if idx == 0:
|
|
461 |
op.status = constants.OP_STATUS_RUNNING
|
|
462 |
else:
|
|
463 |
op.status = constants.OP_STATUS_QUEUED
|
|
464 |
|
|
465 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
|
|
466 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
|
|
467 |
result = job.ChangePriority(7)
|
|
468 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
|
|
469 |
self.assertEqual(map(operator.attrgetter("priority"), job.ops),
|
|
470 |
[constants.OP_PRIO_DEFAULT, 7, 7, 7])
|
|
471 |
self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
|
|
472 |
[None, 7, 7, 7])
|
|
473 |
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
|
|
474 |
constants.OP_STATUS_RUNNING,
|
|
475 |
constants.OP_STATUS_QUEUED,
|
|
476 |
constants.OP_STATUS_QUEUED,
|
|
477 |
constants.OP_STATUS_QUEUED,
|
|
478 |
])
|
|
479 |
self.assertEqual(result,
|
|
480 |
(True, ("Priorities of pending opcodes for job"
|
|
481 |
" 1716215889 have been changed to 7")))
|
|
482 |
|
|
483 |
def testChangePriorityLastRunning(self):
|
|
484 |
job = self._JobForPriority(1308)
|
|
485 |
|
|
486 |
for (idx, op) in enumerate(job.ops):
|
|
487 |
if idx == (len(job.ops) - 1):
|
|
488 |
op.status = constants.OP_STATUS_RUNNING
|
|
489 |
else:
|
|
490 |
op.status = constants.OP_STATUS_SUCCESS
|
|
491 |
|
|
492 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
|
|
493 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
|
|
494 |
result = job.ChangePriority(-3)
|
|
495 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
|
|
496 |
self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
|
|
497 |
for op in job.ops))
|
|
498 |
self.assertFalse(compat.any(hasattr(op.input, "priority")
|
|
499 |
for op in job.ops))
|
|
500 |
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
|
|
501 |
constants.OP_STATUS_SUCCESS,
|
|
502 |
constants.OP_STATUS_SUCCESS,
|
|
503 |
constants.OP_STATUS_SUCCESS,
|
|
504 |
constants.OP_STATUS_RUNNING,
|
|
505 |
])
|
|
506 |
self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
|
|
507 |
|
|
508 |
def testChangePrioritySecondOpcodeRunning(self):
|
|
509 |
job = self._JobForPriority(27701)
|
|
510 |
|
|
511 |
self.assertEqual(len(job.ops), 4)
|
|
512 |
job.ops[0].status = constants.OP_STATUS_SUCCESS
|
|
513 |
job.ops[1].status = constants.OP_STATUS_RUNNING
|
|
514 |
job.ops[2].status = constants.OP_STATUS_QUEUED
|
|
515 |
job.ops[3].status = constants.OP_STATUS_QUEUED
|
|
516 |
|
|
517 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
|
|
518 |
result = job.ChangePriority(-19)
|
|
519 |
self.assertEqual(job.CalcPriority(), -19)
|
|
520 |
self.assertEqual(map(operator.attrgetter("priority"), job.ops),
|
|
521 |
[constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
|
|
522 |
-19, -19])
|
|
523 |
self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
|
|
524 |
[None, None, -19, -19])
|
|
525 |
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
|
|
526 |
constants.OP_STATUS_SUCCESS,
|
|
527 |
constants.OP_STATUS_RUNNING,
|
|
528 |
constants.OP_STATUS_QUEUED,
|
|
529 |
constants.OP_STATUS_QUEUED,
|
|
530 |
])
|
|
531 |
self.assertEqual(result,
|
|
532 |
(True, ("Priorities of pending opcodes for job"
|
|
533 |
" 27701 have been changed to -19")))
|
|
534 |
|
|
535 |
def testChangePriorityWithInconsistentJob(self):
|
|
536 |
job = self._JobForPriority(30097)
|
|
537 |
|
|
538 |
self.assertEqual(len(job.ops), 4)
|
|
539 |
|
|
540 |
# This job is invalid (as it has two opcodes marked as running) and make
|
|
541 |
# the call fail because an unprocessed opcode precedes a running one (which
|
|
542 |
# should never happen in reality)
|
|
543 |
job.ops[0].status = constants.OP_STATUS_SUCCESS
|
|
544 |
job.ops[1].status = constants.OP_STATUS_RUNNING
|
|
545 |
job.ops[2].status = constants.OP_STATUS_QUEUED
|
|
546 |
job.ops[3].status = constants.OP_STATUS_RUNNING
|
|
547 |
|
|
548 |
self.assertRaises(AssertionError, job.ChangePriority, 19)
|
|
549 |
|
374 |
550 |
def testCalcStatus(self):
|
375 |
551 |
def _Queued(ops):
|
376 |
552 |
# The default status is "queued"
|
... | ... | |
2105 |
2281 |
self.assertRaises(IndexError, self.queue.GetNextUpdate)
|
2106 |
2282 |
|
2107 |
2283 |
|
|
2284 |
class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
|
|
2285 |
def setUp(self):
|
|
2286 |
self.queue = _FakeQueueForProc()
|
|
2287 |
self.opexecprio = []
|
|
2288 |
|
|
2289 |
def _BeforeStart(self, timeout, priority):
|
|
2290 |
self.assertFalse(self.queue.IsAcquired())
|
|
2291 |
self.opexecprio.append(priority)
|
|
2292 |
|
|
2293 |
def testChangePriorityWhileRunning(self):
|
|
2294 |
# Tests changing the priority on a job while it has finished opcodes
|
|
2295 |
# (successful) and more, unprocessed ones
|
|
2296 |
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
|
|
2297 |
for i in range(3)]
|
|
2298 |
|
|
2299 |
# Create job
|
|
2300 |
job_id = 3499
|
|
2301 |
job = self._CreateJob(self.queue, job_id, ops)
|
|
2302 |
|
|
2303 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
|
|
2304 |
|
|
2305 |
opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
|
|
2306 |
|
|
2307 |
# Run first opcode
|
|
2308 |
self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
|
|
2309 |
jqueue._JobProcessor.DEFER)
|
|
2310 |
|
|
2311 |
# Job goes back to queued
|
|
2312 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
|
|
2313 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
|
|
2314 |
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
|
|
2315 |
[[constants.OP_STATUS_SUCCESS,
|
|
2316 |
constants.OP_STATUS_QUEUED,
|
|
2317 |
constants.OP_STATUS_QUEUED],
|
|
2318 |
["Res0", None, None]])
|
|
2319 |
|
|
2320 |
self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
|
|
2321 |
self.assertRaises(IndexError, self.opexecprio.pop, 0)
|
|
2322 |
|
|
2323 |
# Change priority
|
|
2324 |
self.assertEqual(job.ChangePriority(-10),
|
|
2325 |
(True,
|
|
2326 |
("Priorities of pending opcodes for job 3499 have"
|
|
2327 |
" been changed to -10")))
|
|
2328 |
self.assertEqual(job.CalcPriority(), -10)
|
|
2329 |
|
|
2330 |
# Process second opcode
|
|
2331 |
self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
|
|
2332 |
jqueue._JobProcessor.DEFER)
|
|
2333 |
|
|
2334 |
self.assertEqual(self.opexecprio.pop(0), -10)
|
|
2335 |
self.assertRaises(IndexError, self.opexecprio.pop, 0)
|
|
2336 |
|
|
2337 |
# Check status
|
|
2338 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
|
|
2339 |
self.assertEqual(job.CalcPriority(), -10)
|
|
2340 |
self.assertEqual(job.GetInfo(["id"]), [job_id])
|
|
2341 |
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
|
|
2342 |
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
|
|
2343 |
[[constants.OP_STATUS_SUCCESS,
|
|
2344 |
constants.OP_STATUS_SUCCESS,
|
|
2345 |
constants.OP_STATUS_QUEUED],
|
|
2346 |
["Res0", "Res1", None]])
|
|
2347 |
|
|
2348 |
# Change priority once more
|
|
2349 |
self.assertEqual(job.ChangePriority(5),
|
|
2350 |
(True,
|
|
2351 |
("Priorities of pending opcodes for job 3499 have"
|
|
2352 |
" been changed to 5")))
|
|
2353 |
self.assertEqual(job.CalcPriority(), 5)
|
|
2354 |
|
|
2355 |
# Process third opcode
|
|
2356 |
self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
|
|
2357 |
jqueue._JobProcessor.FINISHED)
|
|
2358 |
|
|
2359 |
self.assertEqual(self.opexecprio.pop(0), 5)
|
|
2360 |
self.assertRaises(IndexError, self.opexecprio.pop, 0)
|
|
2361 |
|
|
2362 |
# Check status
|
|
2363 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
|
|
2364 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
|
|
2365 |
self.assertEqual(job.GetInfo(["id"]), [job_id])
|
|
2366 |
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
|
|
2367 |
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
|
|
2368 |
[[constants.OP_STATUS_SUCCESS,
|
|
2369 |
constants.OP_STATUS_SUCCESS,
|
|
2370 |
constants.OP_STATUS_SUCCESS],
|
|
2371 |
["Res0", "Res1", "Res2"]])
|
|
2372 |
self.assertEqual(map(operator.attrgetter("priority"), job.ops),
|
|
2373 |
[constants.OP_PRIO_DEFAULT, -10, 5])
|
|
2374 |
|
|
2375 |
|
2108 |
2376 |
class _IdOnlyFakeJob:
|
2109 |
2377 |
def __init__(self, job_id, priority=NotImplemented):
|
2110 |
2378 |
self.id = str(job_id)
|