"""
-import os
import logging
import errno
import re
row.append(self.id)
elif fname == "status":
row.append(self.CalcStatus())
+ elif fname == "priority":
+ row.append(self.CalcPriority())
elif fname == "ops":
row.append([op.input.__getstate__() for op in self.ops])
elif fname == "opresult":
row.append([op.exec_timestamp for op in self.ops])
elif fname == "opend":
row.append([op.end_timestamp for op in self.ops])
+ elif fname == "oppriority":
+ row.append([op.priority for op in self.ops])
elif fname == "received_ts":
row.append(self.received_timestamp)
elif fname == "start_ts":
"""
assert op in job.ops
+ assert op.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_WAITLOCK)
+
+ update = False
- op.status = constants.OP_STATUS_WAITLOCK
op.result = None
- op.start_timestamp = TimeStampNow()
+
+ if op.status == constants.OP_STATUS_QUEUED:
+ op.status = constants.OP_STATUS_WAITLOCK
+ update = True
+
+ if op.start_timestamp is None:
+ op.start_timestamp = TimeStampNow()
+ update = True
if job.start_timestamp is None:
job.start_timestamp = op.start_timestamp
+ update = True
+
+ assert op.status == constants.OP_STATUS_WAITLOCK
+
+ return update
def _ExecOpCodeUnlocked(self, opctx):
"""Processes one opcode and returns the result.
except mcpu.LockAcquireTimeout:
assert timeout is not None, "Received timeout for blocking acquire"
logging.debug("Couldn't acquire locks in %0.6fs", timeout)
- assert op.status == constants.OP_STATUS_WAITLOCK
- return (constants.OP_STATUS_QUEUED, None)
+
+ assert op.status in (constants.OP_STATUS_WAITLOCK,
+ constants.OP_STATUS_CANCELING)
+
+ # Was job cancelled while we were waiting for the lock?
+ if op.status == constants.OP_STATUS_CANCELING:
+ return (constants.OP_STATUS_CANCELING, None)
+
+ # Stay in waitlock while trying to re-acquire lock
+ return (constants.OP_STATUS_WAITLOCK, None)
except CancelJob:
logging.exception("%s: Canceling job", opctx.log_prefix)
assert op.status == constants.OP_STATUS_CANCELING
# Is a previous opcode still pending?
if job.cur_opctx:
opctx = job.cur_opctx
+ job.cur_opctx = None
else:
if __debug__ and _nextop_fn:
_nextop_fn()
# Consistency check
assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_CANCELING,
constants.OP_STATUS_CANCELED)
- for i in job.ops[opctx.index:])
+ for i in job.ops[opctx.index + 1:])
assert op.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_WAITLOCK,
+ constants.OP_STATUS_CANCELING,
constants.OP_STATUS_CANCELED)
assert (op.priority <= constants.OP_PRIO_LOWEST and
op.priority >= constants.OP_PRIO_HIGHEST)
- if op.status != constants.OP_STATUS_CANCELED:
+ if op.status not in (constants.OP_STATUS_CANCELING,
+ constants.OP_STATUS_CANCELED):
+ assert op.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_WAITLOCK)
+
# Prepare to start opcode
- self._MarkWaitlock(job, op)
+ if self._MarkWaitlock(job, op):
+ # Write to disk
+ queue.UpdateJobUnlocked(job)
assert op.status == constants.OP_STATUS_WAITLOCK
assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
-
- # Write to disk
- queue.UpdateJobUnlocked(job)
+ assert job.start_timestamp and op.start_timestamp
logging.info("%s: opcode %s waiting for locks",
opctx.log_prefix, opctx.summary)
op.status = op_status
op.result = op_result
- if op.status == constants.OP_STATUS_QUEUED:
+ if op.status == constants.OP_STATUS_WAITLOCK:
# Couldn't get locks in time
assert not op.end_timestamp
else:
else:
assert op.status in constants.OPS_FINALIZED
- if op.status == constants.OP_STATUS_QUEUED:
+ if op.status == constants.OP_STATUS_WAITLOCK:
finalize = False
- opctx.CheckPriorityIncrease()
+ if opctx.CheckPriorityIncrease():
+ # Priority was changed, need to update on-disk file
+ queue.UpdateJobUnlocked(job)
# Keep around for another round
job.cur_opctx = opctx
op.priority >= constants.OP_PRIO_HIGHEST)
# In no case must the status be finalized here
- assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
-
- queue.UpdateJobUnlocked(job)
+ assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
else:
# Ensure all opcodes so far have been successful
self._queue_size = 0
self._UpdateQueueSizeUnlocked()
- self._drained = self._IsQueueMarkedDrain()
+ self._drained = jstore.CheckDrainFlag()
# Setup worker pool
self._wpool = _JobQueueWorkerPool(self)
status = job.CalcStatus()
- if status in (constants.JOB_STATUS_QUEUED, ):
+ if status == constants.JOB_STATUS_QUEUED:
restartjobs.append(job)
elif status in (constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITLOCK,
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
- job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
- "Unclean master daemon shutdown")
+
+ if status == constants.JOB_STATUS_WAITLOCK:
+ # Restart job
+ job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
+ restartjobs.append(job)
+ else:
+ job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+ "Unclean master daemon shutdown")
+
self.UpdateJobUnlocked(job)
if restartjobs:
logging.exception("Can't load/parse job %s", job_id)
return None
- @staticmethod
- def _IsQueueMarkedDrain():
- """Check if the queue is marked from drain.
-
- This currently uses the queue drain file, which makes it a
- per-node flag. In the future this can be moved to the config file.
-
- @rtype: boolean
- @return: True of the job queue is marked for draining
-
- """
- return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
-
def _UpdateQueueSizeUnlocked(self):
"""Update the queue size.
@param drain_flag: Whether to set or unset the drain flag
"""
- getents = runtime.GetEnts()
-
- if drain_flag:
- utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
- uid=getents.masterd_uid, gid=getents.masterd_gid)
- else:
- utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+ jstore.SetDrainFlag(drain_flag)
self._drained = drain_flag