47 |
47 |
from ganeti import jstore
|
48 |
48 |
from ganeti import rpc
|
49 |
49 |
|
|
50 |
|
50 |
51 |
JOBQUEUE_THREADS = 25
|
51 |
52 |
|
52 |
53 |
|
|
54 |
class CancelJob:
|
|
55 |
"""Special exception to cancel a job.
|
|
56 |
|
|
57 |
"""
|
|
58 |
|
|
59 |
|
53 |
60 |
def TimeStampNow():
|
54 |
61 |
"""Returns the current timestamp.
|
55 |
62 |
|
... | ... | |
232 |
239 |
status will be the same
|
233 |
240 |
- otherwise, the last opcode with the status one of:
|
234 |
241 |
- waitlock
|
|
242 |
- canceling
|
235 |
243 |
- running
|
236 |
244 |
|
237 |
245 |
will determine the job status
|
... | ... | |
257 |
265 |
status = constants.JOB_STATUS_WAITLOCK
|
258 |
266 |
elif op.status == constants.OP_STATUS_RUNNING:
|
259 |
267 |
status = constants.JOB_STATUS_RUNNING
|
|
268 |
elif op.status == constants.OP_STATUS_CANCELING:
|
|
269 |
status = constants.JOB_STATUS_CANCELING
|
|
270 |
break
|
260 |
271 |
elif op.status == constants.OP_STATUS_ERROR:
|
261 |
272 |
status = constants.JOB_STATUS_ERROR
|
262 |
273 |
# The whole job fails if one opcode failed
|
... | ... | |
311 |
322 |
|
312 |
323 |
self.queue.acquire()
|
313 |
324 |
try:
|
|
325 |
assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
|
|
326 |
constants.OP_STATUS_CANCELING)
|
|
327 |
|
|
328 |
# Cancel here if we were asked to
|
|
329 |
if self.opcode.status == constants.OP_STATUS_CANCELING:
|
|
330 |
raise CancelJob()
|
|
331 |
|
314 |
332 |
self.opcode.status = constants.OP_STATUS_RUNNING
|
315 |
333 |
finally:
|
316 |
334 |
self.queue.release()
|
... | ... | |
338 |
356 |
|
339 |
357 |
queue.acquire()
|
340 |
358 |
try:
|
|
359 |
assert op.status == constants.OP_STATUS_QUEUED
|
341 |
360 |
job.run_op_index = idx
|
342 |
361 |
op.status = constants.OP_STATUS_WAITLOCK
|
343 |
362 |
op.result = None
|
... | ... | |
390 |
409 |
|
391 |
410 |
logging.debug("Op %s/%s: Successfully finished %s",
|
392 |
411 |
idx + 1, count, op)
|
|
412 |
except CancelJob:
|
|
413 |
# Will be handled further up
|
|
414 |
raise
|
393 |
415 |
except Exception, err:
|
394 |
416 |
queue.acquire()
|
395 |
417 |
try:
|
... | ... | |
404 |
426 |
queue.release()
|
405 |
427 |
raise
|
406 |
428 |
|
|
429 |
except CancelJob:
|
|
430 |
queue.acquire()
|
|
431 |
try:
|
|
432 |
queue.CancelJobUnlocked(job)
|
|
433 |
finally:
|
|
434 |
queue.release()
|
407 |
435 |
except errors.GenericError, err:
|
408 |
436 |
logging.exception("Ganeti exception")
|
409 |
437 |
except:
|
... | ... | |
535 |
563 |
self._wpool.AddTask(job)
|
536 |
564 |
|
537 |
565 |
elif status in (constants.JOB_STATUS_RUNNING,
|
538 |
|
constants.JOB_STATUS_WAITLOCK):
|
|
566 |
constants.JOB_STATUS_WAITLOCK,
|
|
567 |
constants.JOB_STATUS_CANCELING):
|
539 |
568 |
logging.warning("Unfinished job %s found: %s", job.id, job)
|
540 |
569 |
try:
|
541 |
570 |
for op in job.ops:
|
... | ... | |
1020 |
1049 |
@param job_id: job ID of job to be cancelled.
|
1021 |
1050 |
|
1022 |
1051 |
"""
|
1023 |
|
logging.debug("Cancelling job %s", job_id)
|
|
1052 |
logging.info("Cancelling job %s", job_id)
|
1024 |
1053 |
|
1025 |
1054 |
job = self._LoadJobUnlocked(job_id)
|
1026 |
1055 |
if not job:
|
1027 |
1056 |
logging.debug("Job %s not found", job_id)
|
1028 |
|
return
|
|
1057 |
return (False, "Job %s not found" % job_id)
|
|
1058 |
|
|
1059 |
job_status = job.CalcStatus()
|
1029 |
1060 |
|
1030 |
|
if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
|
|
1061 |
if job_status not in (constants.JOB_STATUS_QUEUED,
|
|
1062 |
constants.JOB_STATUS_WAITLOCK):
|
1031 |
1063 |
logging.debug("Job %s is no longer in the queue", job.id)
|
1032 |
|
return
|
|
1064 |
return (False, "Job %s is no longer in the queue" % job.id)
|
|
1065 |
|
|
1066 |
if job_status == constants.JOB_STATUS_QUEUED:
|
|
1067 |
self.CancelJobUnlocked(job)
|
|
1068 |
return (True, "Job %s canceled" % job.id)
|
1033 |
1069 |
|
|
1070 |
elif job_status == constants.JOB_STATUS_WAITLOCK:
|
|
1071 |
# The worker will notice the new status and cancel the job
|
|
1072 |
try:
|
|
1073 |
for op in job.ops:
|
|
1074 |
op.status = constants.OP_STATUS_CANCELING
|
|
1075 |
finally:
|
|
1076 |
self.UpdateJobUnlocked(job)
|
|
1077 |
return (True, "Job %s will be canceled" % job.id)
|
|
1078 |
|
|
1079 |
@_RequireOpenQueue
|
|
1080 |
def CancelJobUnlocked(self, job):
|
|
1081 |
"""Marks a job as canceled.
|
|
1082 |
|
|
1083 |
"""
|
1034 |
1084 |
try:
|
1035 |
1085 |
for op in job.ops:
|
1036 |
1086 |
op.status = constants.OP_STATUS_ERROR
|
1037 |
|
op.result = "Job cancelled by request"
|
|
1087 |
op.result = "Job canceled by request"
|
1038 |
1088 |
finally:
|
1039 |
1089 |
self.UpdateJobUnlocked(job)
|
1040 |
1090 |
|