Revision fbf0262f lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
47 | 47 |
from ganeti import jstore |
48 | 48 |
from ganeti import rpc |
49 | 49 |
|
50 |
|
|
50 | 51 |
JOBQUEUE_THREADS = 25 |
51 | 52 |
|
52 | 53 |
|
54 |
class CancelJob: |
|
55 |
"""Special exception to cancel a job. |
|
56 |
|
|
57 |
""" |
|
58 |
|
|
59 |
|
|
53 | 60 |
def TimeStampNow(): |
54 | 61 |
"""Returns the current timestamp. |
55 | 62 |
|
... | ... | |
232 | 239 |
status will be the same |
233 | 240 |
- otherwise, the last opcode with the status one of: |
234 | 241 |
- waitlock |
242 |
- canceling |
|
235 | 243 |
- running |
236 | 244 |
|
237 | 245 |
will determine the job status |
... | ... | |
257 | 265 |
status = constants.JOB_STATUS_WAITLOCK |
258 | 266 |
elif op.status == constants.OP_STATUS_RUNNING: |
259 | 267 |
status = constants.JOB_STATUS_RUNNING |
268 |
elif op.status == constants.OP_STATUS_CANCELING: |
|
269 |
status = constants.JOB_STATUS_CANCELING |
|
270 |
break |
|
260 | 271 |
elif op.status == constants.OP_STATUS_ERROR: |
261 | 272 |
status = constants.JOB_STATUS_ERROR |
262 | 273 |
# The whole job fails if one opcode failed |
... | ... | |
311 | 322 |
|
312 | 323 |
self.queue.acquire() |
313 | 324 |
try: |
325 |
assert self.opcode.status in (constants.OP_STATUS_WAITLOCK, |
|
326 |
constants.OP_STATUS_CANCELING) |
|
327 |
|
|
328 |
# Cancel here if we were asked to |
|
329 |
if self.opcode.status == constants.OP_STATUS_CANCELING: |
|
330 |
raise CancelJob() |
|
331 |
|
|
314 | 332 |
self.opcode.status = constants.OP_STATUS_RUNNING |
315 | 333 |
finally: |
316 | 334 |
self.queue.release() |
... | ... | |
338 | 356 |
|
339 | 357 |
queue.acquire() |
340 | 358 |
try: |
359 |
assert op.status == constants.OP_STATUS_QUEUED |
|
341 | 360 |
job.run_op_index = idx |
342 | 361 |
op.status = constants.OP_STATUS_WAITLOCK |
343 | 362 |
op.result = None |
... | ... | |
390 | 409 |
|
391 | 410 |
logging.debug("Op %s/%s: Successfully finished %s", |
392 | 411 |
idx + 1, count, op) |
412 |
except CancelJob: |
|
413 |
# Will be handled further up |
|
414 |
raise |
|
393 | 415 |
except Exception, err: |
394 | 416 |
queue.acquire() |
395 | 417 |
try: |
... | ... | |
404 | 426 |
queue.release() |
405 | 427 |
raise |
406 | 428 |
|
429 |
except CancelJob: |
|
430 |
queue.acquire() |
|
431 |
try: |
|
432 |
queue.CancelJobUnlocked(job) |
|
433 |
finally: |
|
434 |
queue.release() |
|
407 | 435 |
except errors.GenericError, err: |
408 | 436 |
logging.exception("Ganeti exception") |
409 | 437 |
except: |
... | ... | |
535 | 563 |
self._wpool.AddTask(job) |
536 | 564 |
|
537 | 565 |
elif status in (constants.JOB_STATUS_RUNNING, |
538 |
constants.JOB_STATUS_WAITLOCK): |
|
566 |
constants.JOB_STATUS_WAITLOCK, |
|
567 |
constants.JOB_STATUS_CANCELING): |
|
539 | 568 |
logging.warning("Unfinished job %s found: %s", job.id, job) |
540 | 569 |
try: |
541 | 570 |
for op in job.ops: |
... | ... | |
1020 | 1049 |
@param job_id: job ID of job to be cancelled. |
1021 | 1050 |
|
1022 | 1051 |
""" |
1023 |
logging.debug("Cancelling job %s", job_id)
|
|
1052 |
logging.info("Cancelling job %s", job_id)
|
|
1024 | 1053 |
|
1025 | 1054 |
job = self._LoadJobUnlocked(job_id) |
1026 | 1055 |
if not job: |
1027 | 1056 |
logging.debug("Job %s not found", job_id) |
1028 |
return |
|
1057 |
return (False, "Job %s not found" % job_id) |
|
1058 |
|
|
1059 |
job_status = job.CalcStatus() |
|
1029 | 1060 |
|
1030 |
if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,): |
|
1061 |
if job_status not in (constants.JOB_STATUS_QUEUED, |
|
1062 |
constants.JOB_STATUS_WAITLOCK): |
|
1031 | 1063 |
logging.debug("Job %s is no longer in the queue", job.id) |
1032 |
return |
|
1064 |
return (False, "Job %s is no longer in the queue" % job.id) |
|
1065 |
|
|
1066 |
if job_status == constants.JOB_STATUS_QUEUED: |
|
1067 |
self.CancelJobUnlocked(job) |
|
1068 |
return (True, "Job %s canceled" % job.id) |
|
1033 | 1069 |
|
1070 |
elif job_status == constants.JOB_STATUS_WAITLOCK: |
|
1071 |
# The worker will notice the new status and cancel the job |
|
1072 |
try: |
|
1073 |
for op in job.ops: |
|
1074 |
op.status = constants.OP_STATUS_CANCELING |
|
1075 |
finally: |
|
1076 |
self.UpdateJobUnlocked(job) |
|
1077 |
return (True, "Job %s will be canceled" % job.id) |
|
1078 |
|
|
1079 |
@_RequireOpenQueue |
|
1080 |
def CancelJobUnlocked(self, job): |
|
1081 |
"""Marks a job as canceled. |
|
1082 |
|
|
1083 |
""" |
|
1034 | 1084 |
try: |
1035 | 1085 |
for op in job.ops: |
1036 | 1086 |
op.status = constants.OP_STATUS_ERROR |
1037 |
op.result = "Job cancelled by request"
|
|
1087 |
op.result = "Job canceled by request" |
|
1038 | 1088 |
finally: |
1039 | 1089 |
self.UpdateJobUnlocked(job) |
1040 | 1090 |
|
Also available in: Unified diff