Revision 4679547e

b/lib/jqueue.py
482 482
      logging.debug("Job %s is no longer waiting in the queue", self.id)
483 483
      return (False, "Job %s is no longer waiting in the queue" % self.id)
484 484

  
485
  def ChangePriority(self, priority):
486
    """Changes the job priority.
487

  
488
    @type priority: int
489
    @param priority: New priority
490
    @rtype: tuple; (bool, string)
491
    @return: Boolean describing whether job's priority was successfully changed
492
      and a text message
493

  
494
    """
495
    status = self.CalcStatus()
496

  
497
    if status in constants.JOBS_FINALIZED:
498
      return (False, "Job %s is finished" % self.id)
499
    elif status == constants.JOB_STATUS_CANCELING:
500
      return (False, "Job %s is cancelling" % self.id)
501
    else:
502
      assert status in (constants.JOB_STATUS_QUEUED,
503
                        constants.JOB_STATUS_WAITING,
504
                        constants.JOB_STATUS_RUNNING)
505

  
506
      changed = False
507
      for op in self.ops:
508
        if (op.status == constants.OP_STATUS_RUNNING or
509
            op.status in constants.OPS_FINALIZED):
510
          assert not changed, \
511
            ("Found opcode for which priority should not be changed after"
512
             " priority has been changed for previous opcodes")
513
          continue
514

  
515
        assert op.status in (constants.OP_STATUS_QUEUED,
516
                             constants.OP_STATUS_WAITING)
517

  
518
        changed = True
519

  
520
        # Note: this also changes the on-disk priority ("op.priority" is only in
521
        # memory)
522
        op.input.priority = priority
523
        op.priority = priority
524

  
525
      if changed:
526
        return (True, ("Priorities of pending opcodes for job %s have been"
527
                       " changed to %s" % (self.id, priority)))
528
      else:
529
        return (False, "Job %s had no pending opcodes" % self.id)
530

  
485 531

  
486 532
class _OpExecCallbacks(mcpu.OpExecCbBase):
487 533
  def __init__(self, queue, job, op):
......
2356 2402

  
2357 2403
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2358 2404

  
2405
  @locking.ssynchronized(_LOCK)
2406
  @_RequireOpenQueue
2407
  def ChangeJobPriority(self, job_id, priority):
2408
    """Changes a job's priority.
2409

  
2410
    @type job_id: int
2411
    @param job_id: ID of the job whose priority should be changed
2412
    @type priority: int
2413
    @param priority: New priority
2414

  
2415
    """
2416
    logging.info("Changing priority of job %s to %s", job_id, priority)
2417

  
2418
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2419
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2420
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2421
                                (priority, allowed))
2422

  
2423
    def fn(job):
2424
      (success, msg) = job.ChangePriority(priority)
2425

  
2426
      if success:
2427
        try:
2428
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2429
        except workerpool.NoSuchTask:
2430
          logging.debug("Job %s is not in workerpool at this time", job.id)
2431

  
2432
      return (success, msg)
2433

  
2434
    return self._ModifyJobUnlocked(job_id, fn)
2435

  
2359 2436
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2360 2437
    """Modifies a job.
2361 2438

  
b/test/ganeti.jqueue_unittest.py
29 29
import errno
30 30
import itertools
31 31
import random
32
import operator
32 33

  
33 34
from ganeti import constants
34 35
from ganeti import utils
......
281 282

  
282 283

  
283 284
class TestQueuedJob(unittest.TestCase):
284
  def test(self):
285
  def testNoOpCodes(self):
285 286
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
286 287
                      None, 1, [], False)
287 288

  
......
371 372
    job.ops[0].priority -= 19
372 373
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
373 374

  
375
  def _JobForPriority(self, job_id):
376
    ops = [
377
      opcodes.OpTagsGet(),
378
      opcodes.OpTestDelay(),
379
      opcodes.OpTagsGet(),
380
      opcodes.OpTestDelay(),
381
      ]
382

  
383
    job = jqueue._QueuedJob(None, job_id, ops, True)
384

  
385
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
386
                               for op in job.ops))
387
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
388
    self.assertFalse(compat.any(hasattr(op.input, "priority")
389
                                for op in job.ops))
390

  
391
    return job
392

  
393
  def testChangePriorityAllQueued(self):
394
    job = self._JobForPriority(24984)
395
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
396
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
397
                               for op in job.ops))
398
    result = job.ChangePriority(-10)
399
    self.assertEqual(job.CalcPriority(), -10)
400
    self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
401
    self.assertTrue(compat.all(op.input.priority == -10 for op in job.ops))
402
    self.assertEqual(result,
403
                     (True, ("Priorities of pending opcodes for job 24984 have"
404
                             " been changed to -10")))
405

  
406
  def testChangePriorityAllFinished(self):
407
    job = self._JobForPriority(16405)
408

  
409
    for (idx, op) in enumerate(job.ops):
410
      if idx > 2:
411
        op.status = constants.OP_STATUS_ERROR
412
      else:
413
        op.status = constants.OP_STATUS_SUCCESS
414

  
415
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
416
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
417
    result = job.ChangePriority(-10)
418
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
419
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
420
                               for op in job.ops))
421
    self.assertFalse(compat.any(hasattr(op.input, "priority")
422
                                for op in job.ops))
423
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
424
      constants.OP_STATUS_SUCCESS,
425
      constants.OP_STATUS_SUCCESS,
426
      constants.OP_STATUS_SUCCESS,
427
      constants.OP_STATUS_ERROR,
428
      ])
429
    self.assertEqual(result, (False, "Job 16405 is finished"))
430

  
431
  def testChangePriorityCancelling(self):
432
    job = self._JobForPriority(31572)
433

  
434
    for (idx, op) in enumerate(job.ops):
435
      if idx > 1:
436
        op.status = constants.OP_STATUS_CANCELING
437
      else:
438
        op.status = constants.OP_STATUS_SUCCESS
439

  
440
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
441
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
442
    result = job.ChangePriority(5)
443
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
444
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
445
                               for op in job.ops))
446
    self.assertFalse(compat.any(hasattr(op.input, "priority")
447
                                for op in job.ops))
448
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
449
      constants.OP_STATUS_SUCCESS,
450
      constants.OP_STATUS_SUCCESS,
451
      constants.OP_STATUS_CANCELING,
452
      constants.OP_STATUS_CANCELING,
453
      ])
454
    self.assertEqual(result, (False, "Job 31572 is cancelling"))
455

  
456
  def testChangePriorityFirstRunning(self):
457
    job = self._JobForPriority(1716215889)
458

  
459
    for (idx, op) in enumerate(job.ops):
460
      if idx == 0:
461
        op.status = constants.OP_STATUS_RUNNING
462
      else:
463
        op.status = constants.OP_STATUS_QUEUED
464

  
465
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
466
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
467
    result = job.ChangePriority(7)
468
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
469
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
470
                     [constants.OP_PRIO_DEFAULT, 7, 7, 7])
471
    self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
472
                     [None, 7, 7, 7])
473
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
474
      constants.OP_STATUS_RUNNING,
475
      constants.OP_STATUS_QUEUED,
476
      constants.OP_STATUS_QUEUED,
477
      constants.OP_STATUS_QUEUED,
478
      ])
479
    self.assertEqual(result,
480
                     (True, ("Priorities of pending opcodes for job"
481
                             " 1716215889 have been changed to 7")))
482

  
483
  def testChangePriorityLastRunning(self):
484
    job = self._JobForPriority(1308)
485

  
486
    for (idx, op) in enumerate(job.ops):
487
      if idx == (len(job.ops) - 1):
488
        op.status = constants.OP_STATUS_RUNNING
489
      else:
490
        op.status = constants.OP_STATUS_SUCCESS
491

  
492
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
493
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
494
    result = job.ChangePriority(-3)
495
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
496
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
497
                               for op in job.ops))
498
    self.assertFalse(compat.any(hasattr(op.input, "priority")
499
                                for op in job.ops))
500
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
501
      constants.OP_STATUS_SUCCESS,
502
      constants.OP_STATUS_SUCCESS,
503
      constants.OP_STATUS_SUCCESS,
504
      constants.OP_STATUS_RUNNING,
505
      ])
506
    self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
507

  
508
  def testChangePrioritySecondOpcodeRunning(self):
509
    job = self._JobForPriority(27701)
510

  
511
    self.assertEqual(len(job.ops), 4)
512
    job.ops[0].status = constants.OP_STATUS_SUCCESS
513
    job.ops[1].status = constants.OP_STATUS_RUNNING
514
    job.ops[2].status = constants.OP_STATUS_QUEUED
515
    job.ops[3].status = constants.OP_STATUS_QUEUED
516

  
517
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
518
    result = job.ChangePriority(-19)
519
    self.assertEqual(job.CalcPriority(), -19)
520
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
521
                     [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
522
                      -19, -19])
523
    self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
524
                     [None, None, -19, -19])
525
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
526
      constants.OP_STATUS_SUCCESS,
527
      constants.OP_STATUS_RUNNING,
528
      constants.OP_STATUS_QUEUED,
529
      constants.OP_STATUS_QUEUED,
530
      ])
531
    self.assertEqual(result,
532
                     (True, ("Priorities of pending opcodes for job"
533
                             " 27701 have been changed to -19")))
534

  
535
  def testChangePriorityWithInconsistentJob(self):
536
    job = self._JobForPriority(30097)
537

  
538
    self.assertEqual(len(job.ops), 4)
539

  
540
    # This job is invalid (as it has two opcodes marked as running) and make
541
    # the call fail because an unprocessed opcode precedes a running one (which
542
    # should never happen in reality)
543
    job.ops[0].status = constants.OP_STATUS_SUCCESS
544
    job.ops[1].status = constants.OP_STATUS_RUNNING
545
    job.ops[2].status = constants.OP_STATUS_QUEUED
546
    job.ops[3].status = constants.OP_STATUS_RUNNING
547

  
548
    self.assertRaises(AssertionError, job.ChangePriority, 19)
549

  
374 550
  def testCalcStatus(self):
375 551
    def _Queued(ops):
376 552
      # The default status is "queued"
......
2105 2281
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2106 2282

  
2107 2283

  
2284
class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
2285
  def setUp(self):
2286
    self.queue = _FakeQueueForProc()
2287
    self.opexecprio = []
2288

  
2289
  def _BeforeStart(self, timeout, priority):
2290
    self.assertFalse(self.queue.IsAcquired())
2291
    self.opexecprio.append(priority)
2292

  
2293
  def testChangePriorityWhileRunning(self):
2294
    # Tests changing the priority on a job while it has finished opcodes
2295
    # (successful) and more, unprocessed ones
2296
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2297
           for i in range(3)]
2298

  
2299
    # Create job
2300
    job_id = 3499
2301
    job = self._CreateJob(self.queue, job_id, ops)
2302

  
2303
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2304

  
2305
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
2306

  
2307
    # Run first opcode
2308
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2309
                     jqueue._JobProcessor.DEFER)
2310

  
2311
    # Job goes back to queued
2312
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2313
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2314
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2315
                     [[constants.OP_STATUS_SUCCESS,
2316
                       constants.OP_STATUS_QUEUED,
2317
                       constants.OP_STATUS_QUEUED],
2318
                      ["Res0", None, None]])
2319

  
2320
    self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
2321
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2322

  
2323
    # Change priority
2324
    self.assertEqual(job.ChangePriority(-10),
2325
                     (True,
2326
                      ("Priorities of pending opcodes for job 3499 have"
2327
                       " been changed to -10")))
2328
    self.assertEqual(job.CalcPriority(), -10)
2329

  
2330
    # Process second opcode
2331
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2332
                     jqueue._JobProcessor.DEFER)
2333

  
2334
    self.assertEqual(self.opexecprio.pop(0), -10)
2335
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2336

  
2337
    # Check status
2338
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2339
    self.assertEqual(job.CalcPriority(), -10)
2340
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2341
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
2342
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2343
                     [[constants.OP_STATUS_SUCCESS,
2344
                       constants.OP_STATUS_SUCCESS,
2345
                       constants.OP_STATUS_QUEUED],
2346
                      ["Res0", "Res1", None]])
2347

  
2348
    # Change priority once more
2349
    self.assertEqual(job.ChangePriority(5),
2350
                     (True,
2351
                      ("Priorities of pending opcodes for job 3499 have"
2352
                       " been changed to 5")))
2353
    self.assertEqual(job.CalcPriority(), 5)
2354

  
2355
    # Process third opcode
2356
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2357
                     jqueue._JobProcessor.FINISHED)
2358

  
2359
    self.assertEqual(self.opexecprio.pop(0), 5)
2360
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2361

  
2362
    # Check status
2363
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2364
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2365
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2366
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2367
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2368
                     [[constants.OP_STATUS_SUCCESS,
2369
                       constants.OP_STATUS_SUCCESS,
2370
                       constants.OP_STATUS_SUCCESS],
2371
                      ["Res0", "Res1", "Res2"]])
2372
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
2373
                     [constants.OP_PRIO_DEFAULT, -10, 5])
2374

  
2375

  
2108 2376
class _IdOnlyFakeJob:
2109 2377
  def __init__(self, job_id, priority=NotImplemented):
2110 2378
    self.id = str(job_id)

Also available in: Unified diff