from ganeti import jstore
from ganeti import rpc
-from ganeti.rpc import RpcRunner
JOBQUEUE_THREADS = 25
+class CancelJob:
+ """Special exception to cancel a job.
+
+ """
+
+
def TimeStampNow():
"""Returns the current timestamp.
status will be the same
- otherwise, the last opcode with the status one of:
- waitlock
+ - canceling
- running
will determine the job status
status = constants.JOB_STATUS_WAITLOCK
elif op.status == constants.OP_STATUS_RUNNING:
status = constants.JOB_STATUS_RUNNING
+ elif op.status == constants.OP_STATUS_CANCELING:
+ status = constants.JOB_STATUS_CANCELING
+ break
elif op.status == constants.OP_STATUS_ERROR:
status = constants.JOB_STATUS_ERROR
# The whole job fails if one opcode failed
self.queue.acquire()
try:
+ assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
+ constants.OP_STATUS_CANCELING)
+
+ # Cancel here if we were asked to
+ if self.opcode.status == constants.OP_STATUS_CANCELING:
+ raise CancelJob()
+
self.opcode.status = constants.OP_STATUS_RUNNING
finally:
self.queue.release()
queue.acquire()
try:
+ assert op.status == constants.OP_STATUS_QUEUED
job.run_op_index = idx
op.status = constants.OP_STATUS_WAITLOCK
op.result = None
logging.debug("Op %s/%s: Successfully finished %s",
idx + 1, count, op)
+ except CancelJob:
+ # Will be handled further up
+ raise
except Exception, err:
queue.acquire()
try:
queue.release()
raise
+ except CancelJob:
+ queue.acquire()
+ try:
+ queue.CancelJobUnlocked(job)
+ finally:
+ queue.release()
except errors.GenericError, err:
logging.exception("Ganeti exception")
except:
# Get initial list of nodes
self._nodes = dict((n.name, n.primary_ip)
- for n in self.context.cfg.GetAllNodesInfo().values())
+ for n in self.context.cfg.GetAllNodesInfo().values()
+ if n.master_candidate)
# Remove master node
try:
del self._nodes[self._my_hostname]
- except ValueError:
+ except KeyError:
pass
# TODO: Check consistency across nodes
# Setup worker pool
self._wpool = _JobQueueWorkerPool(self)
-
- # We need to lock here because WorkerPool.AddTask() may start a job while
- # we're still doing our work.
- self.acquire()
try:
- for job in self._GetJobsUnlocked(None):
- # a failure in loading the job can cause 'None' to be returned
- if job is None:
- continue
+ # We need to lock here because WorkerPool.AddTask() may start a job while
+ # we're still doing our work.
+ self.acquire()
+ try:
+ logging.info("Inspecting job queue")
- status = job.CalcStatus()
+ all_job_ids = self._GetJobIDsUnlocked()
+ jobs_count = len(all_job_ids)
+ lastinfo = time.time()
+ for idx, job_id in enumerate(all_job_ids):
+ # Give an update every 1000 jobs or 10 seconds
+ if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
+ idx == (jobs_count - 1)):
+ logging.info("Job queue inspection: %d/%d (%0.1f %%)",
+ idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
+ lastinfo = time.time()
- if status in (constants.JOB_STATUS_QUEUED, ):
- self._wpool.AddTask(job)
+ job = self._LoadJobUnlocked(job_id)
- elif status in (constants.JOB_STATUS_RUNNING,
- constants.JOB_STATUS_WAITLOCK):
- logging.warning("Unfinished job %s found: %s", job.id, job)
- try:
- for op in job.ops:
- op.status = constants.OP_STATUS_ERROR
- op.result = "Unclean master daemon shutdown"
- finally:
- self.UpdateJobUnlocked(job)
- finally:
- self.release()
+ # a failure in loading the job can cause 'None' to be returned
+ if job is None:
+ continue
+
+ status = job.CalcStatus()
+
+ if status in (constants.JOB_STATUS_QUEUED, ):
+ self._wpool.AddTask(job)
+
+ elif status in (constants.JOB_STATUS_RUNNING,
+ constants.JOB_STATUS_WAITLOCK,
+ constants.JOB_STATUS_CANCELING):
+ logging.warning("Unfinished job %s found: %s", job.id, job)
+ try:
+ for op in job.ops:
+ op.status = constants.OP_STATUS_ERROR
+ op.result = "Unclean master daemon shutdown"
+ finally:
+ self.UpdateJobUnlocked(job)
+
+ logging.info("Job queue inspection finished")
+ finally:
+ self.release()
+ except:
+ self._wpool.TerminateWorkers()
+ raise
@utils.LockedMethod
@_RequireOpenQueue
assert node_name != self._my_hostname
# Clean queue directory on added node
- RpcRunner.call_jobqueue_purge(node_name)
+ rpc.RpcRunner.call_jobqueue_purge(node_name)
+
+ if not node.master_candidate:
+ # remove if existing, ignoring errors
+ self._nodes.pop(node_name, None)
+ # and skip the replication of the job ids
+ return
# Upload the whole queue excluding archived jobs
files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
finally:
fd.close()
- result = RpcRunner.call_jobqueue_update([node_name], [node.primary_ip],
- file_name, content)
+ result = rpc.RpcRunner.call_jobqueue_update([node_name],
+ [node.primary_ip],
+ file_name, content)
if not result[node_name]:
logging.error("Failed to upload %s to %s", file_name, node_name)
utils.WriteFile(file_name, data=data)
names, addrs = self._GetNodeIp()
- result = RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
+ result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
self._CheckRpcResult(result, self._nodes,
"Updating %s" % file_name)
os.rename(old, new)
names, addrs = self._GetNodeIp()
- result = RpcRunner.call_jobqueue_rename(names, addrs, old, new)
+ result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
self._CheckRpcResult(result, self._nodes,
"Moving %s to %s" % (old, new))
@param job_id: job ID of job to be cancelled.
"""
- logging.debug("Cancelling job %s", job_id)
+ logging.info("Cancelling job %s", job_id)
job = self._LoadJobUnlocked(job_id)
if not job:
logging.debug("Job %s not found", job_id)
- return
+ return (False, "Job %s not found" % job_id)
- if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
+ job_status = job.CalcStatus()
+
+ if job_status not in (constants.JOB_STATUS_QUEUED,
+ constants.JOB_STATUS_WAITLOCK):
logging.debug("Job %s is no longer in the queue", job.id)
- return
+ return (False, "Job %s is no longer in the queue" % job.id)
+
+ if job_status == constants.JOB_STATUS_QUEUED:
+ self.CancelJobUnlocked(job)
+ return (True, "Job %s canceled" % job.id)
+ elif job_status == constants.JOB_STATUS_WAITLOCK:
+ # The worker will notice the new status and cancel the job
+ try:
+ for op in job.ops:
+ op.status = constants.OP_STATUS_CANCELING
+ finally:
+ self.UpdateJobUnlocked(job)
+ return (True, "Job %s will be canceled" % job.id)
+
+ @_RequireOpenQueue
+ def CancelJobUnlocked(self, job):
+ """Marks a job as canceled.
+
+ """
try:
for op in job.ops:
op.status = constants.OP_STATUS_ERROR
- op.result = "Job cancelled by request"
+ op.result = "Job canceled by request"
finally:
self.UpdateJobUnlocked(job)