X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/5278185a96d2d829b60d723e563ff063f5d05c4e..b13dfb92ab93b63c3feb238d27e8b5cd2a495fce:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index dcdce1e..2c2345b 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -69,7 +69,7 @@ def TimeStampNow(): class _QueuedOpCode(object): - """Encasulates an opcode object. + """Encapsulates an opcode object. @ivar log: holds the execution log and consists of tuples of the form C{(log_serial, timestamp, level, message)} @@ -80,6 +80,10 @@ class _QueuedOpCode(object): @ivar stop_timestamp: timestamp for the end of the execution """ + __slots__ = ["input", "status", "result", "log", + "start_timestamp", "end_timestamp", + "__weakref__"] + def __init__(self, op): """Constructor for the _QuededOpCode. @@ -141,17 +145,21 @@ class _QueuedJob(object): @ivar id: the job ID @type ops: list @ivar ops: the list of _QueuedOpCode that constitute the job - @type run_op_index: int - @ivar run_op_index: the currently executing opcode, or -1 if - we didn't yet start executing @type log_serial: int @ivar log_serial: holds the index for the next log entry @ivar received_timestamp: the timestamp for when the job was received @ivar start_timestmap: the timestamp for start of execution @ivar end_timestamp: the timestamp for end of execution + @ivar lock_status: In-memory locking information for debugging @ivar change: a Condition variable we use for waiting for job changes """ + # pylint: disable-msg=W0212 + __slots__ = ["queue", "id", "ops", "log_serial", + "received_timestamp", "start_timestamp", "end_timestamp", + "lock_status", "change", + "__weakref__"] + def __init__(self, queue, job_id, ops): """Constructor for the _QueuedJob. @@ -171,12 +179,14 @@ class _QueuedJob(object): self.queue = queue self.id = job_id self.ops = [_QueuedOpCode(op) for op in ops] - self.run_op_index = -1 self.log_serial = 0 self.received_timestamp = TimeStampNow() self.start_timestamp = None self.end_timestamp = None + # In-memory attributes + self.lock_status = None + # Condition to wait for changes self.change = threading.Condition(self.queue._lock) @@ -195,11 +205,13 @@ class _QueuedJob(object): obj = _QueuedJob.__new__(cls) obj.queue = queue obj.id = state["id"] - obj.run_op_index = state["run_op_index"] obj.received_timestamp = state.get("received_timestamp", None) obj.start_timestamp = state.get("start_timestamp", None) obj.end_timestamp = state.get("end_timestamp", None) + # In-memory attributes + obj.lock_status = None + obj.ops = [] obj.log_serial = 0 for op_state in state["ops"]: @@ -223,7 +235,6 @@ class _QueuedJob(object): return { "id": self.id, "ops": [op.Serialize() for op in self.ops], - "run_op_index": self.run_op_index, "start_timestamp": self.start_timestamp, "end_timestamp": self.end_timestamp, "received_timestamp": self.received_timestamp, @@ -286,7 +297,7 @@ class _QueuedJob(object): """Selectively returns the log entries. @type newer_than: None or int - @param newer_than: if this is None, return all log enties, + @param newer_than: if this is None, return all log entries, otherwise return only the log entries with serial higher than this value @rtype: list @@ -304,37 +315,112 @@ class _QueuedJob(object): return entries + def MarkUnfinishedOps(self, status, result): + """Mark unfinished opcodes with a given status and result. -class _JobQueueWorker(workerpool.BaseWorker): - """The actual job workers. + This is an utility function for marking all running or waiting to + be run opcodes with a given status. Opcodes which are already + finalised are not changed. - """ - def _NotifyStart(self): - """Mark the opcode as running, not lock-waiting. + @param status: a given opcode status + @param result: the opcode result + + """ + not_marked = True + for op in self.ops: + if op.status in constants.OPS_FINALIZED: + assert not_marked, "Finalized opcodes found after non-finalized ones" + continue + op.status = status + op.result = result + not_marked = False + + +class _OpExecCallbacks(mcpu.OpExecCbBase): + def __init__(self, queue, job, op): + """Initializes this class. - This is called from the mcpu code as a notifier function, when the - LU is finally about to start the Exec() method. Of course, to have - end-user visible results, the opcode must be initially (before - calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. + @type queue: L{JobQueue} + @param queue: Job queue + @type job: L{_QueuedJob} + @param job: Job object + @type op: L{_QueuedOpCode} + @param op: OpCode """ - assert self.queue, "Queue attribute is missing" - assert self.opcode, "Opcode attribute is missing" + assert queue, "Queue is missing" + assert job, "Job is missing" + assert op, "Opcode is missing" - self.queue.acquire() + self._queue = queue + self._job = job + self._op = op + + def NotifyStart(self): + """Mark the opcode as running, not lock-waiting. + + This is called from the mcpu code as a notifier function, when the LU is + finally about to start the Exec() method. Of course, to have end-user + visible results, the opcode must be initially (before calling into + Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. + + """ + self._queue.acquire() try: - assert self.opcode.status in (constants.OP_STATUS_WAITLOCK, - constants.OP_STATUS_CANCELING) + assert self._op.status in (constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_CANCELING) + + # All locks are acquired by now + self._job.lock_status = None # Cancel here if we were asked to - if self.opcode.status == constants.OP_STATUS_CANCELING: + if self._op.status == constants.OP_STATUS_CANCELING: raise CancelJob() - self.opcode.status = constants.OP_STATUS_RUNNING + self._op.status = constants.OP_STATUS_RUNNING finally: - self.queue.release() + self._queue.release() + + def Feedback(self, *args): + """Append a log entry. + + """ + assert len(args) < 3 + + if len(args) == 1: + log_type = constants.ELOG_MESSAGE + log_msg = args[0] + else: + (log_type, log_msg) = args + + # The time is split to make serialization easier and not lose + # precision. + timestamp = utils.SplitTime(time.time()) - def RunTask(self, job): + self._queue.acquire() + try: + self._job.log_serial += 1 + self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) + + self._job.change.notifyAll() + finally: + self._queue.release() + + def ReportLocks(self, msg): + """Write locking information to the job. + + Called whenever the LU processor is waiting for a lock or has acquired one. + + """ + # Not getting the queue lock because this is a single assignment + self._job.lock_status = msg + + +class _JobQueueWorker(workerpool.BaseWorker): + """The actual job workers. + + """ + def RunTask(self, job): # pylint: disable-msg=W0221 """Job executor. This functions processes a job. It is closely tied to the _QueuedJob and @@ -346,21 +432,31 @@ class _JobQueueWorker(workerpool.BaseWorker): """ logging.info("Worker %s processing job %s", self.worker_id, job.id) - proc = mcpu.Processor(self.pool.queue.context) - self.queue = queue = job.queue + proc = mcpu.Processor(self.pool.queue.context, job.id) + queue = job.queue try: try: count = len(job.ops) for idx, op in enumerate(job.ops): op_summary = op.input.Summary() + if op.status == constants.OP_STATUS_SUCCESS: + # this is a job that was partially completed before master + # daemon shutdown, so it can be expected that some opcodes + # are already completed successfully (if any did error + # out, then the whole job should have been aborted and not + # resubmitted for processing) + logging.info("Op %s/%s: opcode %s already processed, skipping", + idx + 1, count, op_summary) + continue try: logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, op_summary) queue.acquire() try: + if op.status == constants.OP_STATUS_CANCELED: + raise CancelJob() assert op.status == constants.OP_STATUS_QUEUED - job.run_op_index = idx op.status = constants.OP_STATUS_WAITLOCK op.result = None op.start_timestamp = TimeStampNow() @@ -372,34 +468,9 @@ class _JobQueueWorker(workerpool.BaseWorker): finally: queue.release() - def _Log(*args): - """Append a log entry. - - """ - assert len(args) < 3 - - if len(args) == 1: - log_type = constants.ELOG_MESSAGE - log_msg = args[0] - else: - log_type, log_msg = args - - # The time is split to make serialization easier and not lose - # precision. - timestamp = utils.SplitTime(time.time()) - - queue.acquire() - try: - job.log_serial += 1 - op.log.append((job.log_serial, timestamp, log_type, log_msg)) - - job.change.notifyAll() - finally: - queue.release() - - # Make sure not to hold lock while _Log is called - self.opcode = op - result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart) + # Make sure not to hold queue lock while calling ExecOpCode + result = proc.ExecOpCode(input_opcode, + _OpExecCallbacks(queue, job, op)) queue.acquire() try: @@ -420,10 +491,13 @@ class _JobQueueWorker(workerpool.BaseWorker): try: try: op.status = constants.OP_STATUS_ERROR - op.result = str(err) + if isinstance(err, errors.GenericError): + op.result = errors.EncodeException(err) + else: + op.result = str(err) op.end_timestamp = TimeStampNow() - logging.info("Op %s/%s: Error in opcode %s", idx + 1, count, - op_summary) + logging.info("Op %s/%s: Error in opcode %s: %s", + idx + 1, count, op_summary, err) finally: queue.UpdateJobUnlocked(job) finally: @@ -444,7 +518,7 @@ class _JobQueueWorker(workerpool.BaseWorker): queue.acquire() try: try: - job.run_op_idx = -1 + job.lock_status = None job.end_timestamp = TimeStampNow() queue.UpdateJobUnlocked(job) finally: @@ -452,6 +526,7 @@ class _JobQueueWorker(workerpool.BaseWorker): status = job.CalcStatus() finally: queue.release() + logging.info("Worker %s finished job %s, status = %s", self.worker_id, job_id, status) @@ -466,33 +541,38 @@ class _JobQueueWorkerPool(workerpool.WorkerPool): self.queue = queue -class JobQueue(object): - """Quue used to manaage the jobs. +def _RequireOpenQueue(fn): + """Decorator for "public" functions. - @cvar _RE_JOB_FILE: regex matching the valid job file names + This function should be used for all 'public' functions. That is, + functions usually called from other classes. Note that this should + be applied only to methods (not plain functions), since it expects + that the decorated function is called with a first argument that has + a '_queue_lock' argument. - """ - _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) + @warning: Use this decorator only after utils.LockedMethod! - def _RequireOpenQueue(fn): - """Decorator for "public" functions. + Example:: + @utils.LockedMethod + @_RequireOpenQueue + def Example(self): + pass - This function should be used for all 'public' functions. That is, - functions usually called from other classes. + """ + def wrapper(self, *args, **kwargs): + # pylint: disable-msg=W0212 + assert self._queue_lock is not None, "Queue should be open" + return fn(self, *args, **kwargs) + return wrapper - @warning: Use this decorator only after utils.LockedMethod! - Example:: - @utils.LockedMethod - @_RequireOpenQueue - def Example(self): - pass +class JobQueue(object): + """Queue used to manage the jobs. - """ - def wrapper(self, *args, **kwargs): - assert self._queue_lock is not None, "Queue should be open" - return fn(self, *args, **kwargs) - return wrapper + @cvar _RE_JOB_FILE: regex matching the valid job file names + + """ + _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) def __init__(self, context): """Constructor for JobQueue. @@ -573,9 +653,8 @@ class JobQueue(object): constants.JOB_STATUS_CANCELING): logging.warning("Unfinished job %s found: %s", job.id, job) try: - for op in job.ops: - op.status = constants.OP_STATUS_ERROR - op.result = "Unclean master daemon shutdown" + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + "Unclean master daemon shutdown") finally: self.UpdateJobUnlocked(job) @@ -599,7 +678,11 @@ class JobQueue(object): assert node_name != self._my_hostname # Clean queue directory on added node - rpc.RpcRunner.call_jobqueue_purge(node_name) + result = rpc.RpcRunner.call_jobqueue_purge(node_name) + msg = result.fail_msg + if msg: + logging.warning("Cannot cleanup queue directory on node %s: %s", + node_name, msg) if not node.master_candidate: # remove if existing, ignoring errors @@ -615,17 +698,15 @@ class JobQueue(object): for file_name in files: # Read file content - fd = open(file_name, "r") - try: - content = fd.read() - finally: - fd.close() + content = utils.ReadFile(file_name) result = rpc.RpcRunner.call_jobqueue_update([node_name], [node.primary_ip], file_name, content) - if not result[node_name]: - logging.error("Failed to upload %s to %s", file_name, node_name) + msg = result[node_name].fail_msg + if msg: + logging.error("Failed to upload file %s to node %s: %s", + file_name, node_name, msg) self._nodes[node_name] = node.primary_ip @@ -644,12 +725,13 @@ class JobQueue(object): except KeyError: pass - def _CheckRpcResult(self, result, nodes, failmsg): + @staticmethod + def _CheckRpcResult(result, nodes, failmsg): """Verifies the status of an RPC call. Since we aim to keep consistency should this node (the current master) fail, we will log errors if our rpc fail, and especially - log the case when more than half of the nodes failes. + log the case when more than half of the nodes fails. @param result: the data as returned from the rpc call @type nodes: list @@ -662,13 +744,13 @@ class JobQueue(object): success = [] for node in nodes: - if result[node]: - success.append(node) - else: + msg = result[node].fail_msg + if msg: failed.append(node) - - if failed: - logging.error("%s failed on %s", failmsg, ", ".join(failed)) + logging.error("RPC call %s (%s) failed on node %s: %s", + result[node].call, failmsg, node, msg) + else: + success.append(node) # +1 for the master node if (len(success) + 1) < len(failed): @@ -725,7 +807,8 @@ class JobQueue(object): result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) - def _FormatJobID(self, job_id): + @staticmethod + def _FormatJobID(job_id): """Convert a job ID to string format. Currently this just does C{str(job_id)} after performing some @@ -757,26 +840,31 @@ class JobQueue(object): """ return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY) - def _NewSerialUnlocked(self): + def _NewSerialsUnlocked(self, count): """Generates a new job identifier. Job identifiers are unique during the lifetime of a cluster. + @type count: integer + @param count: how many serials to return @rtype: str @return: a string representing the job identifier. """ + assert count > 0 # New number - serial = self._last_serial + 1 + serial = self._last_serial + count # Write to file self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE, "%s\n" % serial) + result = [self._FormatJobID(v) + for v in range(self._last_serial, serial + 1)] # Keep it only if we were able to write the file self._last_serial = serial - return self._FormatJobID(serial) + return result @staticmethod def _GetJobPath(job_id): @@ -835,6 +923,7 @@ class JobQueue(object): @return: the list of job IDs """ + # pylint: disable-msg=W0613 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()] jlist = utils.NiceSort(jlist) return jlist @@ -869,19 +958,17 @@ class JobQueue(object): filepath = self._GetJobPath(job_id) logging.debug("Loading job from %s", filepath) try: - fd = open(filepath, "r") + raw_data = utils.ReadFile(filepath) except IOError, err: if err.errno in (errno.ENOENT, ): return None raise - try: - data = serializer.LoadJson(fd.read()) - finally: - fd.close() + + data = serializer.LoadJson(raw_data) try: job = _QueuedJob.Restore(self, data) - except Exception, err: + except Exception, err: # pylint: disable-msg=W0703 new_path = self._GetArchivedJobPath(job_id) if filepath == new_path: # job already archived (future case) @@ -932,7 +1019,7 @@ class JobQueue(object): and in the future we might merge them. @type drain_flag: boolean - @param drain_flag: wheter to set or unset the drain flag + @param drain_flag: Whether to set or unset the drain flag """ if drain_flag: @@ -941,14 +1028,15 @@ class JobQueue(object): utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) return True - @utils.LockedMethod @_RequireOpenQueue - def SubmitJob(self, ops): + def _SubmitJobUnlocked(self, job_id, ops): """Create and store a new job. This enters the job into our job queue and also puts it on the new queue, in order for it to be picked up by the queue processors. + @type job_id: job ID + @param job_id: the job ID for the new job @type ops: list @param ops: The list of OpCodes that will become the new job. @rtype: job ID @@ -957,7 +1045,7 @@ class JobQueue(object): """ if self._IsQueueMarkedDrain(): - raise errors.JobQueueDrainError() + raise errors.JobQueueDrainError("Job queue is drained, refusing job") # Check job queue size size = len(self._ListJobFiles()) @@ -970,8 +1058,6 @@ class JobQueue(object): if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: raise errors.JobQueueFull() - # Get job identifier - job_id = self._NewSerialUnlocked() job = _QueuedJob(self, job_id, ops) # Write to disk @@ -985,6 +1071,38 @@ class JobQueue(object): return job.id + @utils.LockedMethod + @_RequireOpenQueue + def SubmitJob(self, ops): + """Create and store a new job. + + @see: L{_SubmitJobUnlocked} + + """ + job_id = self._NewSerialsUnlocked(1)[0] + return self._SubmitJobUnlocked(job_id, ops) + + @utils.LockedMethod + @_RequireOpenQueue + def SubmitManyJobs(self, jobs): + """Create and store multiple jobs. + + @see: L{_SubmitJobUnlocked} + + """ + results = [] + all_job_ids = self._NewSerialsUnlocked(len(jobs)) + for job_id, ops in zip(all_job_ids, jobs): + try: + data = self._SubmitJobUnlocked(job_id, ops) + status = True + except errors.GenericError, err: + data = str(err) + status = False + results.append((status, data)) + + return results + @_RequireOpenQueue def UpdateJobUnlocked(self, job): """Update a job's on disk storage. @@ -1031,17 +1149,13 @@ class JobQueue(object): as such by the clients """ - logging.debug("Waiting for changes in job %s", job_id) - end_time = time.time() + timeout - while True: - delta_time = end_time - time.time() - if delta_time < 0: - return constants.JOB_NOTCHANGED + job = self._LoadJobUnlocked(job_id) + if not job: + logging.debug("Job %s not found", job_id) + return None - job = self._LoadJobUnlocked(job_id) - if not job: - logging.debug("Job %s not found", job_id) - break + def _CheckForChanges(): + logging.debug("Waiting for changes in job %s", job_id) status = job.CalcStatus() job_info = self._GetJobInfoUnlocked(job, fields) @@ -1055,25 +1169,24 @@ class JobQueue(object): job_info = serializer.LoadJson(serializer.DumpJson(job_info)) log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) - if status not in (constants.JOB_STATUS_QUEUED, - constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITLOCK): - # Don't even try to wait if the job is no longer running, there will be - # no changes. - break - - if (prev_job_info != job_info or + # Don't even try to wait if the job is no longer running, there will be + # no changes. + if (status not in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_RUNNING, + constants.JOB_STATUS_WAITLOCK) or + prev_job_info != job_info or (log_entries and prev_log_serial != log_entries[0][0])): - break - - logging.debug("Waiting again") + logging.debug("Job %s changed", job_id) + return (job_info, log_entries) - # Release the queue lock while waiting - job.change.wait(delta_time) + raise utils.RetryAgain() - logging.debug("Job %s changed", job_id) - - return (job_info, log_entries) + try: + # Setting wait function to release the queue lock while waiting + return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout, + wait_fn=job.change.wait) + except utils.RetryTimeout: + return constants.JOB_NOTCHANGED @utils.LockedMethod @_RequireOpenQueue @@ -1097,8 +1210,8 @@ class JobQueue(object): if job_status not in (constants.JOB_STATUS_QUEUED, constants.JOB_STATUS_WAITLOCK): - logging.debug("Job %s is no longer in the queue", job.id) - return (False, "Job %s is no longer in the queue" % job.id) + logging.debug("Job %s is no longer waiting in the queue", job.id) + return (False, "Job %s is no longer waiting in the queue" % job.id) if job_status == constants.JOB_STATUS_QUEUED: self.CancelJobUnlocked(job) @@ -1107,8 +1220,7 @@ class JobQueue(object): elif job_status == constants.JOB_STATUS_WAITLOCK: # The worker will notice the new status and cancel the job try: - for op in job.ops: - op.status = constants.OP_STATUS_CANCELING + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) finally: self.UpdateJobUnlocked(job) return (True, "Job %s will be canceled" % job.id) @@ -1119,9 +1231,8 @@ class JobQueue(object): """ try: - for op in job.ops: - op.status = constants.OP_STATUS_ERROR - op.result = "Job canceled by request" + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") finally: self.UpdateJobUnlocked(job) @@ -1154,7 +1265,7 @@ class JobQueue(object): self._RenameFilesUnlocked(rename_files) logging.debug("Successfully archived job(s) %s", - ", ".join(job.id for job in archive_jobs)) + utils.CommaJoin(job.id for job in archive_jobs)) return len(archive_jobs) @@ -1236,7 +1347,8 @@ class JobQueue(object): return (archived_count, len(all_job_ids) - last_touched - 1) - def _GetJobInfoUnlocked(self, job, fields): + @staticmethod + def _GetJobInfoUnlocked(job, fields): """Returns information about a job. @type job: L{_QueuedJob} @@ -1273,6 +1385,8 @@ class JobQueue(object): row.append(job.start_timestamp) elif fname == "end_ts": row.append(job.end_timestamp) + elif fname == "lock_status": + row.append(job.lock_status) elif fname == "summary": row.append([op.input.Summary() for op in job.ops]) else: