"""
-import os
import logging
import errno
import re
row.append(self.id)
elif fname == "status":
row.append(self.CalcStatus())
+ elif fname == "priority":
+ row.append(self.CalcPriority())
elif fname == "ops":
row.append([op.input.__getstate__() for op in self.ops])
elif fname == "opresult":
row.append([op.exec_timestamp for op in self.ops])
elif fname == "opend":
row.append([op.end_timestamp for op in self.ops])
+ elif fname == "oppriority":
+ row.append([op.priority for op in self.ops])
elif fname == "received_ts":
row.append(self.received_timestamp)
elif fname == "start_ts":
"""
status = self.CalcStatus()
- if status not in (constants.JOB_STATUS_QUEUED,
- constants.JOB_STATUS_WAITLOCK):
- logging.debug("Job %s is no longer waiting in the queue", self.id)
- return (False, "Job %s is no longer waiting in the queue" % self.id)
-
if status == constants.JOB_STATUS_QUEUED:
self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
"Job canceled by request")
- msg = "Job %s canceled" % self.id
+ return (True, "Job %s canceled" % self.id)
elif status == constants.JOB_STATUS_WAITLOCK:
# The worker will notice the new status and cancel the job
self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
- msg = "Job %s will be canceled" % self.id
+ return (True, "Job %s will be canceled" % self.id)
- return (True, msg)
+ else:
+ logging.debug("Job %s is no longer waiting in the queue", self.id)
+ return (False, "Job %s is no longer waiting in the queue" % self.id)
class _OpExecCallbacks(mcpu.OpExecCbBase):
# Cancel here if we were asked to
self._CheckCancel()
+ def SubmitManyJobs(self, jobs):
+ """Submits jobs for processing.
+
+ See L{JobQueue.SubmitManyJobs}.
+
+ """
+ # Locking is done in job queue
+ return self._queue.SubmitManyJobs(jobs)
+
class _JobChangesChecker(object):
def __init__(self, fields, prev_job_info, prev_log_serial):
@type job: L{_QueuedJob}
@param job: Job object
- @type job: L{_QueuedOpCode}
- @param job: Opcode object
+ @type op: L{_QueuedOpCode}
+ @param op: Opcode object
"""
assert op in job.ops
+ assert op.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_WAITLOCK)
+
+ update = False
- op.status = constants.OP_STATUS_WAITLOCK
op.result = None
- op.start_timestamp = TimeStampNow()
+
+ if op.status == constants.OP_STATUS_QUEUED:
+ op.status = constants.OP_STATUS_WAITLOCK
+ update = True
+
+ if op.start_timestamp is None:
+ op.start_timestamp = TimeStampNow()
+ update = True
if job.start_timestamp is None:
job.start_timestamp = op.start_timestamp
+ update = True
+
+ assert op.status == constants.OP_STATUS_WAITLOCK
+
+ return update
def _ExecOpCodeUnlocked(self, opctx):
"""Processes one opcode and returns the result.
except mcpu.LockAcquireTimeout:
assert timeout is not None, "Received timeout for blocking acquire"
logging.debug("Couldn't acquire locks in %0.6fs", timeout)
- assert op.status == constants.OP_STATUS_WAITLOCK
- return (constants.OP_STATUS_QUEUED, None)
+
+ assert op.status in (constants.OP_STATUS_WAITLOCK,
+ constants.OP_STATUS_CANCELING)
+
+ # Was job cancelled while we were waiting for the lock?
+ if op.status == constants.OP_STATUS_CANCELING:
+ return (constants.OP_STATUS_CANCELING, None)
+
+ # Stay in waitlock while trying to re-acquire lock
+ return (constants.OP_STATUS_WAITLOCK, None)
except CancelJob:
logging.exception("%s: Canceling job", opctx.log_prefix)
assert op.status == constants.OP_STATUS_CANCELING
# Is a previous opcode still pending?
if job.cur_opctx:
opctx = job.cur_opctx
+ job.cur_opctx = None
else:
if __debug__ and _nextop_fn:
_nextop_fn()
# Consistency check
assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_CANCELING,
constants.OP_STATUS_CANCELED)
- for i in job.ops[opctx.index:])
+ for i in job.ops[opctx.index + 1:])
assert op.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_WAITLOCK,
+ constants.OP_STATUS_CANCELING,
constants.OP_STATUS_CANCELED)
assert (op.priority <= constants.OP_PRIO_LOWEST and
op.priority >= constants.OP_PRIO_HIGHEST)
- if op.status != constants.OP_STATUS_CANCELED:
+ if op.status not in (constants.OP_STATUS_CANCELING,
+ constants.OP_STATUS_CANCELED):
+ assert op.status in (constants.OP_STATUS_QUEUED,
+ constants.OP_STATUS_WAITLOCK)
+
# Prepare to start opcode
- self._MarkWaitlock(job, op)
+ if self._MarkWaitlock(job, op):
+ # Write to disk
+ queue.UpdateJobUnlocked(job)
assert op.status == constants.OP_STATUS_WAITLOCK
assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
-
- # Write to disk
- queue.UpdateJobUnlocked(job)
+ assert job.start_timestamp and op.start_timestamp
logging.info("%s: opcode %s waiting for locks",
opctx.log_prefix, opctx.summary)
op.status = op_status
op.result = op_result
- if op.status == constants.OP_STATUS_QUEUED:
+ if op.status == constants.OP_STATUS_WAITLOCK:
# Couldn't get locks in time
assert not op.end_timestamp
else:
else:
assert op.status in constants.OPS_FINALIZED
- if op.status == constants.OP_STATUS_QUEUED:
+ if op.status == constants.OP_STATUS_WAITLOCK:
finalize = False
- opctx.CheckPriorityIncrease()
+ if opctx.CheckPriorityIncrease():
+ # Priority was changed, need to update on-disk file
+ queue.UpdateJobUnlocked(job)
# Keep around for another round
job.cur_opctx = opctx
op.priority >= constants.OP_PRIO_HIGHEST)
# In no case must the status be finalized here
- assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
-
- queue.UpdateJobUnlocked(job)
+ assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
else:
# Ensure all opcodes so far have been successful
queue = job.queue
assert queue == self.pool.queue
- self.SetTaskName("Job%s" % job.id)
+ setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
+ setname_fn(None)
proc = mcpu.Processor(queue.context, job.id)
- if not _JobProcessor(queue, proc.ExecOpCode, job)():
+ # Create wrapper for setting thread name
+ wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
+ proc.ExecOpCode)
+
+ if not _JobProcessor(queue, wrap_execop_fn, job)():
# Schedule again
raise workerpool.DeferTask(priority=job.CalcPriority())
+ @staticmethod
+ def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
+ """Updates the worker thread name to include a short summary of the opcode.
+
+ @param setname_fn: Callable setting worker thread name
+ @param execop_fn: Callable for executing opcode (usually
+ L{mcpu.Processor.ExecOpCode})
+
+ """
+ setname_fn(op)
+ try:
+ return execop_fn(op, *args, **kwargs)
+ finally:
+ setname_fn(None)
+
+ @staticmethod
+ def _GetWorkerName(job, op):
+ """Sets the worker thread name.
+
+ @type job: L{_QueuedJob}
+ @type op: L{opcodes.OpCode}
+
+ """
+ parts = ["Job%s" % job.id]
+
+ if op:
+ parts.append(op.TinySummary())
+
+ return "/".join(parts)
+
class _JobQueueWorkerPool(workerpool.WorkerPool):
"""Simple class implementing a job-processing workerpool.
"""
def __init__(self, queue):
- super(_JobQueueWorkerPool, self).__init__("JobQueue",
+ super(_JobQueueWorkerPool, self).__init__("Jq",
JOBQUEUE_THREADS,
_JobQueueWorker)
self.queue = queue
self._queue_size = 0
self._UpdateQueueSizeUnlocked()
- self._drained = self._IsQueueMarkedDrain()
+ self._drained = jstore.CheckDrainFlag()
# Setup worker pool
self._wpool = _JobQueueWorkerPool(self)
status = job.CalcStatus()
- if status in (constants.JOB_STATUS_QUEUED, ):
+ if status == constants.JOB_STATUS_QUEUED:
restartjobs.append(job)
elif status in (constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITLOCK,
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
- job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
- "Unclean master daemon shutdown")
+
+ if status == constants.JOB_STATUS_WAITLOCK:
+ # Restart job
+ job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
+ restartjobs.append(job)
+ else:
+ job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+ "Unclean master daemon shutdown")
+
self.UpdateJobUnlocked(job)
if restartjobs:
logging.exception("Can't load/parse job %s", job_id)
return None
- @staticmethod
- def _IsQueueMarkedDrain():
- """Check if the queue is marked from drain.
-
- This currently uses the queue drain file, which makes it a
- per-node flag. In the future this can be moved to the config file.
-
- @rtype: boolean
- @return: True of the job queue is marked for draining
-
- """
- return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
-
def _UpdateQueueSizeUnlocked(self):
"""Update the queue size.
@param drain_flag: Whether to set or unset the drain flag
"""
- getents = runtime.GetEnts()
-
- if drain_flag:
- utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
- uid=getents.masterd_uid, gid=getents.masterd_gid)
- else:
- utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+ jstore.SetDrainFlag(drain_flag)
self._drained = drain_flag
status = True
data = job_id
except errors.GenericError, err:
- data = str(err)
+ data = ("%s; opcodes %s" %
+ (err, utils.CommaJoin(op.Summary() for op in ops)))
status = False
results.append((status, data))