Revision fbf0262f lib/jqueue.py

b/lib/jqueue.py
47 47
from ganeti import jstore
48 48
from ganeti import rpc
49 49

  
50

  
50 51
JOBQUEUE_THREADS = 25
51 52

  
52 53

  
54
class CancelJob:
55
  """Special exception to cancel a job.
56

  
57
  """
58

  
59

  
53 60
def TimeStampNow():
54 61
  """Returns the current timestamp.
55 62

  
......
232 239
        status will be the same
233 240
      - otherwise, the last opcode with the status one of:
234 241
          - waitlock
242
          - canceling
235 243
          - running
236 244

  
237 245
        will determine the job status
......
257 265
        status = constants.JOB_STATUS_WAITLOCK
258 266
      elif op.status == constants.OP_STATUS_RUNNING:
259 267
        status = constants.JOB_STATUS_RUNNING
268
      elif op.status == constants.OP_STATUS_CANCELING:
269
        status = constants.JOB_STATUS_CANCELING
270
        break
260 271
      elif op.status == constants.OP_STATUS_ERROR:
261 272
        status = constants.JOB_STATUS_ERROR
262 273
        # The whole job fails if one opcode failed
......
311 322

  
312 323
    self.queue.acquire()
313 324
    try:
325
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
326
                                    constants.OP_STATUS_CANCELING)
327

  
328
      # Cancel here if we were asked to
329
      if self.opcode.status == constants.OP_STATUS_CANCELING:
330
        raise CancelJob()
331

  
314 332
      self.opcode.status = constants.OP_STATUS_RUNNING
315 333
    finally:
316 334
      self.queue.release()
......
338 356

  
339 357
            queue.acquire()
340 358
            try:
359
              assert op.status == constants.OP_STATUS_QUEUED
341 360
              job.run_op_index = idx
342 361
              op.status = constants.OP_STATUS_WAITLOCK
343 362
              op.result = None
......
390 409

  
391 410
            logging.debug("Op %s/%s: Successfully finished %s",
392 411
                          idx + 1, count, op)
412
          except CancelJob:
413
            # Will be handled further up
414
            raise
393 415
          except Exception, err:
394 416
            queue.acquire()
395 417
            try:
......
404 426
              queue.release()
405 427
            raise
406 428

  
429
      except CancelJob:
430
        queue.acquire()
431
        try:
432
          queue.CancelJobUnlocked(job)
433
        finally:
434
          queue.release()
407 435
      except errors.GenericError, err:
408 436
        logging.exception("Ganeti exception")
409 437
      except:
......
535 563
            self._wpool.AddTask(job)
536 564

  
537 565
          elif status in (constants.JOB_STATUS_RUNNING,
538
                          constants.JOB_STATUS_WAITLOCK):
566
                          constants.JOB_STATUS_WAITLOCK,
567
                          constants.JOB_STATUS_CANCELING):
539 568
            logging.warning("Unfinished job %s found: %s", job.id, job)
540 569
            try:
541 570
              for op in job.ops:
......
1020 1049
    @param job_id: job ID of job to be cancelled.
1021 1050

  
1022 1051
    """
1023
    logging.debug("Cancelling job %s", job_id)
1052
    logging.info("Cancelling job %s", job_id)
1024 1053

  
1025 1054
    job = self._LoadJobUnlocked(job_id)
1026 1055
    if not job:
1027 1056
      logging.debug("Job %s not found", job_id)
1028
      return
1057
      return (False, "Job %s not found" % job_id)
1058

  
1059
    job_status = job.CalcStatus()
1029 1060

  
1030
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
1061
    if job_status not in (constants.JOB_STATUS_QUEUED,
1062
                          constants.JOB_STATUS_WAITLOCK):
1031 1063
      logging.debug("Job %s is no longer in the queue", job.id)
1032
      return
1064
      return (False, "Job %s is no longer in the queue" % job.id)
1065

  
1066
    if job_status == constants.JOB_STATUS_QUEUED:
1067
      self.CancelJobUnlocked(job)
1068
      return (True, "Job %s canceled" % job.id)
1033 1069

  
1070
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1071
      # The worker will notice the new status and cancel the job
1072
      try:
1073
        for op in job.ops:
1074
          op.status = constants.OP_STATUS_CANCELING
1075
      finally:
1076
        self.UpdateJobUnlocked(job)
1077
      return (True, "Job %s will be canceled" % job.id)
1078

  
1079
  @_RequireOpenQueue
1080
  def CancelJobUnlocked(self, job):
1081
    """Marks a job as canceled.
1082

  
1083
    """
1034 1084
    try:
1035 1085
      for op in job.ops:
1036 1086
        op.status = constants.OP_STATUS_ERROR
1037
        op.result = "Job cancelled by request"
1087
        op.result = "Job canceled by request"
1038 1088
    finally:
1039 1089
      self.UpdateJobUnlocked(job)
1040 1090

  

Also available in: Unified diff