Revision 39ed3a98

b/lib/jqueue.py
381 381
    @param result: the opcode result
382 382

  
383 383
    """
384
    not_marked = True
385
    for op in self.ops:
386
      if op.status in constants.OPS_FINALIZED:
387
        assert not_marked, "Finalized opcodes found after non-finalized ones"
388
        continue
389
      op.status = status
390
      op.result = result
391
      not_marked = False
384
    try:
385
      not_marked = True
386
      for op in self.ops:
387
        if op.status in constants.OPS_FINALIZED:
388
          assert not_marked, "Finalized opcodes found after non-finalized ones"
389
          continue
390
        op.status = status
391
        op.result = result
392
        not_marked = False
393
    finally:
394
      self.queue.UpdateJobUnlocked(self)
392 395

  
393 396

  
394 397
class _OpExecCallbacks(mcpu.OpExecCbBase):
......
670 673
      except CancelJob:
671 674
        queue.acquire()
672 675
        try:
673
          queue.CancelJobUnlocked(job)
676
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
677
                                "Job canceled by request")
674 678
        finally:
675 679
          queue.release()
676 680
      except errors.GenericError, err:
......
817 821
                          constants.JOB_STATUS_WAITLOCK,
818 822
                          constants.JOB_STATUS_CANCELING):
819 823
            logging.warning("Unfinished job %s found: %s", job.id, job)
820
            try:
821
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
822
                                    "Unclean master daemon shutdown")
823
            finally:
824
              self.UpdateJobUnlocked(job)
824
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
825
                                  "Unclean master daemon shutdown")
825 826

  
826 827
        logging.info("Job queue inspection finished")
827 828
      finally:
......
1344 1345
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1345 1346

  
1346 1347
    if job_status == constants.JOB_STATUS_QUEUED:
1347
      self.CancelJobUnlocked(job)
1348
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1349
                            "Job canceled by request")
1348 1350
      return (True, "Job %s canceled" % job.id)
1349 1351

  
1350 1352
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1351 1353
      # The worker will notice the new status and cancel the job
1352
      try:
1353
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1354
      finally:
1355
        self.UpdateJobUnlocked(job)
1354
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1356 1355
      return (True, "Job %s will be canceled" % job.id)
1357 1356

  
1358 1357
  @_RequireOpenQueue
1359
  def CancelJobUnlocked(self, job):
1360
    """Marks a job as canceled.
1361

  
1362
    """
1363
    try:
1364
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1365
                            "Job canceled by request")
1366
    finally:
1367
      self.UpdateJobUnlocked(job)
1368

  
1369
  @_RequireOpenQueue
1370 1358
  def _ArchiveJobsUnlocked(self, jobs):
1371 1359
    """Archives jobs.
1372 1360

  

Also available in: Unified diff