"""
-import os
import logging
import errno
import re
"""
assert op in job.ops
+ assert op.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_WAITLOCK)
+
+ update = False
- op.status = constants.OP_STATUS_WAITLOCK
op.result = None
- op.start_timestamp = TimeStampNow()
+
+ if op.status == constants.OP_STATUS_QUEUED:
+ op.status = constants.OP_STATUS_WAITLOCK
+ update = True
+
+ if op.start_timestamp is None:
+ op.start_timestamp = TimeStampNow()
+ update = True
if job.start_timestamp is None:
job.start_timestamp = op.start_timestamp
+ update = True
+
+ assert op.status == constants.OP_STATUS_WAITLOCK
+
+ return update
def _ExecOpCodeUnlocked(self, opctx):
"""Processes one opcode and returns the result.
if op.status == constants.OP_STATUS_CANCELING:
return (constants.OP_STATUS_CANCELING, None)
- return (constants.OP_STATUS_QUEUED, None)
+ # Stay in waitlock while trying to re-acquire lock
+ return (constants.OP_STATUS_WAITLOCK, None)
except CancelJob:
logging.exception("%s: Canceling job", opctx.log_prefix)
assert op.status == constants.OP_STATUS_CANCELING
# Is a previous opcode still pending?
if job.cur_opctx:
opctx = job.cur_opctx
+ job.cur_opctx = None
else:
if __debug__ and _nextop_fn:
_nextop_fn()
# Consistency check
assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_CANCELING,
constants.OP_STATUS_CANCELED)
- for i in job.ops[opctx.index:])
+ for i in job.ops[opctx.index + 1:])
assert op.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_WAITLOCK,
+ constants.OP_STATUS_CANCELING,
constants.OP_STATUS_CANCELED)
assert (op.priority <= constants.OP_PRIO_LOWEST and
op.priority >= constants.OP_PRIO_HIGHEST)
- if op.status != constants.OP_STATUS_CANCELED:
+ if op.status not in (constants.OP_STATUS_CANCELING,
+ constants.OP_STATUS_CANCELED):
+ assert op.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_WAITLOCK)
+
# Prepare to start opcode
- self._MarkWaitlock(job, op)
+ if self._MarkWaitlock(job, op):
+ # Write to disk
+ queue.UpdateJobUnlocked(job)
assert op.status == constants.OP_STATUS_WAITLOCK
assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
-
- # Write to disk
- queue.UpdateJobUnlocked(job)
+ assert job.start_timestamp and op.start_timestamp
logging.info("%s: opcode %s waiting for locks",
opctx.log_prefix, opctx.summary)
op.status = op_status
op.result = op_result
- if op.status == constants.OP_STATUS_QUEUED:
+ if op.status == constants.OP_STATUS_WAITLOCK:
# Couldn't get locks in time
assert not op.end_timestamp
else:
else:
assert op.status in constants.OPS_FINALIZED
- if op.status == constants.OP_STATUS_QUEUED:
+ if op.status == constants.OP_STATUS_WAITLOCK:
finalize = False
- opctx.CheckPriorityIncrease()
+ if opctx.CheckPriorityIncrease():
+ # Priority was changed, need to update on-disk file
+ queue.UpdateJobUnlocked(job)
# Keep around for another round
job.cur_opctx = opctx
op.priority >= constants.OP_PRIO_HIGHEST)
# In no case must the status be finalized here
- assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
-
- queue.UpdateJobUnlocked(job)
+ assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
else:
# Ensure all opcodes so far have been successful
self._queue_size = 0
self._UpdateQueueSizeUnlocked()
- self._drained = self._IsQueueMarkedDrain()
+ self._drained = jstore.CheckDrainFlag()
# Setup worker pool
self._wpool = _JobQueueWorkerPool(self)
logging.exception("Can't load/parse job %s", job_id)
return None
- @staticmethod
- def _IsQueueMarkedDrain():
- """Check if the queue is marked from drain.
-
- This currently uses the queue drain file, which makes it a
- per-node flag. In the future this can be moved to the config file.
-
- @rtype: boolean
- @return: True of the job queue is marked for draining
-
- """
- return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
-
def _UpdateQueueSizeUnlocked(self):
"""Update the queue size.
@param drain_flag: Whether to set or unset the drain flag
"""
- getents = runtime.GetEnts()
-
- if drain_flag:
- utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
- uid=getents.masterd_uid, gid=getents.masterd_gid)
- else:
- utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+ jstore.SetDrainFlag(drain_flag)
self._drained = drain_flag