X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/0cb941052557db55e91e5bed86a1aa60b2cd2140..572e52bf4b0da1776e7f9019d39cfc2fbdf34d22:/lib/jqueue.py diff --git a/lib/jqueue.py b/lib/jqueue.py index ed81ff7..3364a93 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2008 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -19,7 +19,15 @@ # 02110-1301, USA. -"""Module implementing the job queue handling.""" +"""Module implementing the job queue handling. + +Locking: there's a single, large lock in the L{JobQueue} class. It's +used by all other classes in this module. + +@var JOBQUEUE_THREADS: the number of worker threads we start for + processing jobs + +""" import os import logging @@ -27,6 +35,7 @@ import threading import errno import re import time +import weakref from ganeti import constants from ganeti import serializer @@ -35,393 +44,784 @@ from ganeti import opcodes from ganeti import errors from ganeti import mcpu from ganeti import utils +from ganeti import jstore from ganeti import rpc -JOBQUEUE_THREADS = 5 - - -class _QueuedOpCode(object): - """Encasulates an opcode object. +JOBQUEUE_THREADS = 25 +JOBS_PER_ARCHIVE_DIRECTORY = 10000 - Access is synchronized by the '_lock' attribute. - The 'log' attribute holds the execution log and consists of tuples - of the form (timestamp, level, message). +class CancelJob(Exception): + """Special exception to cancel a job. """ - def __init__(self, op): - self.__Setup(op, constants.OP_STATUS_QUEUED, None, []) - def __Setup(self, input_, status, result, log): - self._lock = threading.Lock() - self.input = input_ - self.status = status - self.result = result - self.log = log - @classmethod - def Restore(cls, state): - obj = object.__new__(cls) - obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]), - state["status"], state["result"], state["log"]) - return obj +def TimeStampNow(): + """Returns the current timestamp. - @utils.LockedMethod - def Serialize(self): - return { - "input": self.input.__getstate__(), - "status": self.status, - "result": self.result, - "log": self.log, - } + @rtype: tuple + @return: the current time in the (seconds, microseconds) format - @utils.LockedMethod - def GetInput(self): - """Returns the original opcode. - - """ - return self.input + """ + return utils.SplitTime(time.time()) - @utils.LockedMethod - def SetStatus(self, status, result): - """Update the opcode status and result. - """ - self.status = status - self.result = result +class _QueuedOpCode(object): + """Encasulates an opcode object. - @utils.LockedMethod - def GetStatus(self): - """Get the opcode status. + @ivar log: holds the execution log and consists of tuples + of the form C{(log_serial, timestamp, level, message)} + @ivar input: the OpCode we encapsulate + @ivar status: the current status + @ivar result: the result of the LU execution + @ivar start_timestamp: timestamp for the start of the execution + @ivar stop_timestamp: timestamp for the end of the execution - """ - return self.status + """ + def __init__(self, op): + """Constructor for the _QuededOpCode. - @utils.LockedMethod - def GetResult(self): - """Get the opcode result. + @type op: L{opcodes.OpCode} + @param op: the opcode we encapsulate """ - return self.result + self.input = op + self.status = constants.OP_STATUS_QUEUED + self.result = None + self.log = [] + self.start_timestamp = None + self.end_timestamp = None - @utils.LockedMethod - def Log(self, *args): - """Append a log entry. + @classmethod + def Restore(cls, state): + """Restore the _QueuedOpCode from the serialized form. + + @type state: dict + @param state: the serialized state + @rtype: _QueuedOpCode + @return: a new _QueuedOpCode instance """ - assert len(args) < 2 + obj = _QueuedOpCode.__new__(cls) + obj.input = opcodes.OpCode.LoadOpCode(state["input"]) + obj.status = state["status"] + obj.result = state["result"] + obj.log = state["log"] + obj.start_timestamp = state.get("start_timestamp", None) + obj.end_timestamp = state.get("end_timestamp", None) + return obj - if len(args) == 1: - log_type = constants.ELOG_MESSAGE - log_msg = args[0] - else: - log_type, log_msg = args - self.log.append((time.time(), log_type, log_msg)) + def Serialize(self): + """Serializes this _QueuedOpCode. - @utils.LockedMethod - def RetrieveLog(self, start_at=0): - """Retrieve (a part of) the execution log. + @rtype: dict + @return: the dictionary holding the serialized state """ - return self.log[start_at:] + return { + "input": self.input.__getstate__(), + "status": self.status, + "result": self.result, + "log": self.log, + "start_timestamp": self.start_timestamp, + "end_timestamp": self.end_timestamp, + } class _QueuedJob(object): """In-memory job representation. - This is what we use to track the user-submitted jobs. + This is what we use to track the user-submitted jobs. Locking must + be taken care of by users of this class. + + @type queue: L{JobQueue} + @ivar queue: the parent queue + @ivar id: the job ID + @type ops: list + @ivar ops: the list of _QueuedOpCode that constitute the job + @type run_op_index: int + @ivar run_op_index: the currently executing opcode, or -1 if + we didn't yet start executing + @type log_serial: int + @ivar log_serial: holds the index for the next log entry + @ivar received_timestamp: the timestamp for when the job was received + @ivar start_timestmap: the timestamp for start of execution + @ivar end_timestamp: the timestamp for end of execution + @ivar change: a Condition variable we use for waiting for job changes """ - def __init__(self, storage, job_id, ops): + def __init__(self, queue, job_id, ops): + """Constructor for the _QueuedJob. + + @type queue: L{JobQueue} + @param queue: our parent queue + @type job_id: job_id + @param job_id: our job id + @type ops: list + @param ops: the list of opcodes we hold, which will be encapsulated + in _QueuedOpCodes + + """ if not ops: - # TODO + # TODO: use a better exception raise Exception("No opcodes") - self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1) - - def __Setup(self, storage, job_id, ops, run_op_index): - self._lock = threading.Lock() - self.storage = storage + self.queue = queue self.id = job_id - self._ops = ops - self.run_op_index = run_op_index + self.ops = [_QueuedOpCode(op) for op in ops] + self.run_op_index = -1 + self.log_serial = 0 + self.received_timestamp = TimeStampNow() + self.start_timestamp = None + self.end_timestamp = None + + # Condition to wait for changes + self.change = threading.Condition(self.queue._lock) @classmethod - def Restore(cls, storage, state): - obj = object.__new__(cls) - op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]] - obj.__Setup(storage, state["id"], op_list, state["run_op_index"]) + def Restore(cls, queue, state): + """Restore a _QueuedJob from serialized state: + + @type queue: L{JobQueue} + @param queue: to which queue the restored job belongs + @type state: dict + @param state: the serialized state + @rtype: _JobQueue + @return: the restored _JobQueue instance + + """ + obj = _QueuedJob.__new__(cls) + obj.queue = queue + obj.id = state["id"] + obj.run_op_index = state["run_op_index"] + obj.received_timestamp = state.get("received_timestamp", None) + obj.start_timestamp = state.get("start_timestamp", None) + obj.end_timestamp = state.get("end_timestamp", None) + + obj.ops = [] + obj.log_serial = 0 + for op_state in state["ops"]: + op = _QueuedOpCode.Restore(op_state) + for log_entry in op.log: + obj.log_serial = max(obj.log_serial, log_entry[0]) + obj.ops.append(op) + + # Condition to wait for changes + obj.change = threading.Condition(obj.queue._lock) + return obj def Serialize(self): + """Serialize the _JobQueue instance. + + @rtype: dict + @return: the serialized state + + """ return { "id": self.id, - "ops": [op.Serialize() for op in self._ops], + "ops": [op.Serialize() for op in self.ops], "run_op_index": self.run_op_index, + "start_timestamp": self.start_timestamp, + "end_timestamp": self.end_timestamp, + "received_timestamp": self.received_timestamp, } - def SetUnclean(self, msg): - try: - for op in self._ops: - op.SetStatus(constants.OP_STATUS_ERROR, msg) - finally: - self.storage.UpdateJob(self) + def CalcStatus(self): + """Compute the status of this job. + + This function iterates over all the _QueuedOpCodes in the job and + based on their status, computes the job status. - def GetStatus(self): + The algorithm is: + - if we find a cancelled, or finished with error, the job + status will be the same + - otherwise, the last opcode with the status one of: + - waitlock + - canceling + - running + + will determine the job status + + - otherwise, it means either all opcodes are queued, or success, + and the job status will be the same + + @return: the job status + + """ status = constants.JOB_STATUS_QUEUED all_success = True - for op in self._ops: - op_status = op.GetStatus() - if op_status == constants.OP_STATUS_SUCCESS: + for op in self.ops: + if op.status == constants.OP_STATUS_SUCCESS: continue all_success = False - if op_status == constants.OP_STATUS_QUEUED: + if op.status == constants.OP_STATUS_QUEUED: pass - elif op_status == constants.OP_STATUS_RUNNING: + elif op.status == constants.OP_STATUS_WAITLOCK: + status = constants.JOB_STATUS_WAITLOCK + elif op.status == constants.OP_STATUS_RUNNING: status = constants.JOB_STATUS_RUNNING - elif op_status == constants.OP_STATUS_ERROR: + elif op.status == constants.OP_STATUS_CANCELING: + status = constants.JOB_STATUS_CANCELING + break + elif op.status == constants.OP_STATUS_ERROR: status = constants.JOB_STATUS_ERROR # The whole job fails if one opcode failed break + elif op.status == constants.OP_STATUS_CANCELED: + status = constants.OP_STATUS_CANCELED + break if all_success: status = constants.JOB_STATUS_SUCCESS return status - @utils.LockedMethod - def GetRunOpIndex(self): - return self.run_op_index + def GetLogEntries(self, newer_than): + """Selectively returns the log entries. - def Run(self, proc): - """Job executor. + @type newer_than: None or int + @param newer_than: if this is None, return all log enties, + otherwise return only the log entries with serial higher + than this value + @rtype: list + @return: the list of the log entries selected + + """ + if newer_than is None: + serial = -1 + else: + serial = newer_than + + entries = [] + for op in self.ops: + entries.extend(filter(lambda entry: entry[0] > serial, op.log)) + + return entries - This functions processes a this job in the context of given processor - instance. - Args: - - proc: Ganeti Processor to run the job with +class _JobQueueWorker(workerpool.BaseWorker): + """The actual job workers. + + """ + def _NotifyStart(self): + """Mark the opcode as running, not lock-waiting. + + This is called from the mcpu code as a notifier function, when the + LU is finally about to start the Exec() method. Of course, to have + end-user visible results, the opcode must be initially (before + calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. """ + assert self.queue, "Queue attribute is missing" + assert self.opcode, "Opcode attribute is missing" + + self.queue.acquire() try: - count = len(self._ops) - for idx, op in enumerate(self._ops): - try: - logging.debug("Op %s/%s: Starting %s", idx + 1, count, op) + assert self.opcode.status in (constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_CANCELING) - self._lock.acquire() - try: - self.run_op_index = idx - finally: - self._lock.release() + # Cancel here if we were asked to + if self.opcode.status == constants.OP_STATUS_CANCELING: + raise CancelJob() - op.SetStatus(constants.OP_STATUS_RUNNING, None) - self.storage.UpdateJob(self) + self.opcode.status = constants.OP_STATUS_RUNNING + finally: + self.queue.release() - result = proc.ExecOpCode(op.input, op.Log) + def RunTask(self, job): + """Job executor. - op.SetStatus(constants.OP_STATUS_SUCCESS, result) - self.storage.UpdateJob(self) - logging.debug("Op %s/%s: Successfully finished %s", - idx + 1, count, op) - except Exception, err: - try: - op.SetStatus(constants.OP_STATUS_ERROR, str(err)) - logging.debug("Op %s/%s: Error in %s", idx + 1, count, op) - finally: - self.storage.UpdateJob(self) - raise - - except errors.GenericError, err: - logging.error("ganeti exception %s", exc_info=err) - except Exception, err: - logging.error("unhandled exception %s", exc_info=err) - except: - logging.error("unhandled unknown exception %s", exc_info=err) + This functions processes a job. It is closely tied to the _QueuedJob and + _QueuedOpCode classes. + @type job: L{_QueuedJob} + @param job: the job to be processed -class _JobQueueWorker(workerpool.BaseWorker): - def RunTask(self, job): - logging.debug("Worker %s processing job %s", + """ + logging.info("Worker %s processing job %s", self.worker_id, job.id) - # TODO: feedback function - proc = mcpu.Processor(self.pool.context) + proc = mcpu.Processor(self.pool.queue.context) + self.queue = queue = job.queue try: - job.Run(proc) + try: + count = len(job.ops) + for idx, op in enumerate(job.ops): + op_summary = op.input.Summary() + try: + logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, + op_summary) + + queue.acquire() + try: + if op.status == constants.OP_STATUS_CANCELED: + raise CancelJob() + assert op.status == constants.OP_STATUS_QUEUED + job.run_op_index = idx + op.status = constants.OP_STATUS_WAITLOCK + op.result = None + op.start_timestamp = TimeStampNow() + if idx == 0: # first opcode + job.start_timestamp = op.start_timestamp + queue.UpdateJobUnlocked(job) + + input_opcode = op.input + finally: + queue.release() + + def _Log(*args): + """Append a log entry. + + """ + assert len(args) < 3 + + if len(args) == 1: + log_type = constants.ELOG_MESSAGE + log_msg = args[0] + else: + log_type, log_msg = args + + # The time is split to make serialization easier and not lose + # precision. + timestamp = utils.SplitTime(time.time()) + + queue.acquire() + try: + job.log_serial += 1 + op.log.append((job.log_serial, timestamp, log_type, log_msg)) + + job.change.notifyAll() + finally: + queue.release() + + # Make sure not to hold lock while _Log is called + self.opcode = op + result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart) + + queue.acquire() + try: + op.status = constants.OP_STATUS_SUCCESS + op.result = result + op.end_timestamp = TimeStampNow() + queue.UpdateJobUnlocked(job) + finally: + queue.release() + + logging.info("Op %s/%s: Successfully finished opcode %s", + idx + 1, count, op_summary) + except CancelJob: + # Will be handled further up + raise + except Exception, err: + queue.acquire() + try: + try: + op.status = constants.OP_STATUS_ERROR + op.result = str(err) + op.end_timestamp = TimeStampNow() + logging.info("Op %s/%s: Error in opcode %s: %s", + idx + 1, count, op_summary, err) + finally: + queue.UpdateJobUnlocked(job) + finally: + queue.release() + raise + + except CancelJob: + queue.acquire() + try: + queue.CancelJobUnlocked(job) + finally: + queue.release() + except errors.GenericError, err: + logging.exception("Ganeti exception") + except: + logging.exception("Unhandled exception") finally: - logging.debug("Worker %s finished job %s, status = %s", - self.worker_id, job.id, job.GetStatus()) + queue.acquire() + try: + try: + job.run_op_idx = -1 + job.end_timestamp = TimeStampNow() + queue.UpdateJobUnlocked(job) + finally: + job_id = job.id + status = job.CalcStatus() + finally: + queue.release() + logging.info("Worker %s finished job %s, status = %s", + self.worker_id, job_id, status) class _JobQueueWorkerPool(workerpool.WorkerPool): - def __init__(self, context): + """Simple class implementing a job-processing workerpool. + + """ + def __init__(self, queue): super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS, _JobQueueWorker) - self.context = context + self.queue = queue -class JobStorageBase(object): - def __init__(self, id_prefix): - self.id_prefix = id_prefix +class JobQueue(object): + """Quue used to manaage the jobs. - if id_prefix: - prefix_pattern = re.escape("%s-" % id_prefix) - else: - prefix_pattern = "" + @cvar _RE_JOB_FILE: regex matching the valid job file names - # Apart from the prefix, all job IDs are numeric - self._re_job_id = re.compile(r"^%s\d+$" % prefix_pattern) + """ + _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) - def OwnsJobId(self, job_id): - return self._re_job_id.match(job_id) + def _RequireOpenQueue(fn): + """Decorator for "public" functions. - def FormatJobID(self, job_id): - if not isinstance(job_id, (int, long)): - raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) - if job_id < 0: - raise errors.ProgrammerError("Job ID %s is negative" % job_id) + This function should be used for all 'public' functions. That is, + functions usually called from other classes. - if self.id_prefix: - prefix = "%s-" % self.id_prefix - else: - prefix = "" + @warning: Use this decorator only after utils.LockedMethod! - return "%s%010d" % (prefix, job_id) + Example:: + @utils.LockedMethod + @_RequireOpenQueue + def Example(self): + pass + """ + def wrapper(self, *args, **kwargs): + assert self._queue_lock is not None, "Queue should be open" + return fn(self, *args, **kwargs) + return wrapper -class DiskJobStorage(JobStorageBase): - _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) + def __init__(self, context): + """Constructor for JobQueue. - def __init__(self, id_prefix): - JobStorageBase.__init__(self, id_prefix) + The constructor will initialize the job queue object and then + start loading the current jobs from disk, either for starting them + (if they were queue) or for aborting them (if they were already + running). - self._lock = threading.Lock() - self._memcache = {} + @type context: GanetiContext + @param context: the context object for access to the configuration + data and other ganeti objects + + """ + self.context = context + self._memcache = weakref.WeakValueDictionary() self._my_hostname = utils.HostInfo().name - # Make sure our directories exists - for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR): - try: - os.mkdir(path, 0700) - except OSError, err: - if err.errno not in (errno.EEXIST, ): - raise + # Locking + self._lock = threading.Lock() + self.acquire = self._lock.acquire + self.release = self._lock.release + + # Initialize + self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True) + + # Read serial file + self._last_serial = jstore.ReadSerial() + assert self._last_serial is not None, ("Serial file was modified between" + " check in jstore and here") + + # Get initial list of nodes + self._nodes = dict((n.name, n.primary_ip) + for n in self.context.cfg.GetAllNodesInfo().values() + if n.master_candidate) + + # Remove master node + try: + del self._nodes[self._my_hostname] + except KeyError: + pass + + # TODO: Check consistency across nodes - # Get queue lock - self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w") + # Setup worker pool + self._wpool = _JobQueueWorkerPool(self) try: - utils.LockFile(self.lock_fd) + # We need to lock here because WorkerPool.AddTask() may start a job while + # we're still doing our work. + self.acquire() + try: + logging.info("Inspecting job queue") + + all_job_ids = self._GetJobIDsUnlocked() + jobs_count = len(all_job_ids) + lastinfo = time.time() + for idx, job_id in enumerate(all_job_ids): + # Give an update every 1000 jobs or 10 seconds + if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or + idx == (jobs_count - 1)): + logging.info("Job queue inspection: %d/%d (%0.1f %%)", + idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) + lastinfo = time.time() + + job = self._LoadJobUnlocked(job_id) + + # a failure in loading the job can cause 'None' to be returned + if job is None: + continue + + status = job.CalcStatus() + + if status in (constants.JOB_STATUS_QUEUED, ): + self._wpool.AddTask(job) + + elif status in (constants.JOB_STATUS_RUNNING, + constants.JOB_STATUS_WAITLOCK, + constants.JOB_STATUS_CANCELING): + logging.warning("Unfinished job %s found: %s", job.id, job) + try: + for op in job.ops: + op.status = constants.OP_STATUS_ERROR + op.result = "Unclean master daemon shutdown" + finally: + self.UpdateJobUnlocked(job) + + logging.info("Job queue inspection finished") + finally: + self.release() except: - self.lock_fd.close() + self._wpool.TerminateWorkers() raise - # Read version - try: - version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r") - except IOError, err: - if err.errno not in (errno.ENOENT, ): - raise + @utils.LockedMethod + @_RequireOpenQueue + def AddNode(self, node): + """Register a new node with the queue. - # Setup a new queue - self._InitQueueUnlocked() + @type node: L{objects.Node} + @param node: the node object to be added - # Try to open again - version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r") + """ + node_name = node.name + assert node_name != self._my_hostname - try: - # Try to read version - version = int(version_fd.read(128)) + # Clean queue directory on added node + rpc.RpcRunner.call_jobqueue_purge(node_name) - # Verify version - if version != constants.JOB_QUEUE_VERSION: - raise errors.JobQueueError("Found version %s, expected %s", - version, constants.JOB_QUEUE_VERSION) - finally: - version_fd.close() + if not node.master_candidate: + # remove if existing, ignoring errors + self._nodes.pop(node_name, None) + # and skip the replication of the job ids + return - self._last_serial = self._ReadSerial() - if self._last_serial is None: - raise errors.ConfigurationError("Can't read/parse the job queue serial" - " file") + # Upload the whole queue excluding archived jobs + files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] - @staticmethod - def _ReadSerial(): - """Try to read the job serial file. + # Upload current serial file + files.append(constants.JOB_QUEUE_SERIAL_FILE) + + for file_name in files: + # Read file content + fd = open(file_name, "r") + try: + content = fd.read() + finally: + fd.close() + + result = rpc.RpcRunner.call_jobqueue_update([node_name], + [node.primary_ip], + file_name, content) + if not result[node_name]: + logging.error("Failed to upload %s to %s", file_name, node_name) - @rtype: None or int - @return: If the serial can be read, then it is returned. Otherwise None - is returned. + self._nodes[node_name] = node.primary_ip + + @utils.LockedMethod + @_RequireOpenQueue + def RemoveNode(self, node_name): + """Callback called when removing nodes from the cluster. + + @type node_name: str + @param node_name: the name of the node to remove """ try: - serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r") - try: - # Read last serial - serial = int(serial_fd.read(1024).strip()) - finally: - serial_fd.close() - except (ValueError, EnvironmentError): - serial = None + # The queue is removed by the "leave node" RPC call. + del self._nodes[node_name] + except KeyError: + pass + + def _CheckRpcResult(self, result, nodes, failmsg): + """Verifies the status of an RPC call. + + Since we aim to keep consistency should this node (the current + master) fail, we will log errors if our rpc fail, and especially + log the case when more than half of the nodes failes. + + @param result: the data as returned from the rpc call + @type nodes: list + @param nodes: the list of nodes we made the call to + @type failmsg: str + @param failmsg: the identifier to be used for logging - return serial + """ + failed = [] + success = [] + + for node in nodes: + if result[node]: + success.append(node) + else: + failed.append(node) - def Close(self): - assert self.lock_fd, "Queue should be open" + if failed: + logging.error("%s failed on %s", failmsg, ", ".join(failed)) - self.lock_fd.close() - self.lock_fd = None + # +1 for the master node + if (len(success) + 1) < len(failed): + # TODO: Handle failing nodes + logging.error("More than half of the nodes failed") - def _InitQueueUnlocked(self): - assert self.lock_fd, "Queue should be open" + def _GetNodeIp(self): + """Helper for returning the node name/ip list. - utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE, - data="%s\n" % constants.JOB_QUEUE_VERSION) - if self._ReadSerial() is None: - utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE, - data="%s\n" % 0) + @rtype: (list, list) + @return: a tuple of two lists, the first one with the node + names and the second one with the node addresses - def _NewSerialUnlocked(self, nodes): + """ + name_list = self._nodes.keys() + addr_list = [self._nodes[name] for name in name_list] + return name_list, addr_list + + def _WriteAndReplicateFileUnlocked(self, file_name, data): + """Writes a file locally and then replicates it to all nodes. + + This function will replace the contents of a file on the local + node and then replicate it to all the other nodes we have. + + @type file_name: str + @param file_name: the path of the file to be replicated + @type data: str + @param data: the new contents of the file + + """ + utils.WriteFile(file_name, data=data) + + names, addrs = self._GetNodeIp() + result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) + self._CheckRpcResult(result, self._nodes, + "Updating %s" % file_name) + + def _RenameFilesUnlocked(self, rename): + """Renames a file locally and then replicate the change. + + This function will rename a file in the local queue directory + and then replicate this rename to all the other nodes we have. + + @type rename: list of (old, new) + @param rename: List containing tuples mapping old to new names + + """ + # Rename them locally + for old, new in rename: + utils.RenameFile(old, new, mkdir=True) + + # ... and on all nodes + names, addrs = self._GetNodeIp() + result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) + self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) + + def _FormatJobID(self, job_id): + """Convert a job ID to string format. + + Currently this just does C{str(job_id)} after performing some + checks, but if we want to change the job id format this will + abstract this change. + + @type job_id: int or long + @param job_id: the numeric job id + @rtype: str + @return: the formatted job id + + """ + if not isinstance(job_id, (int, long)): + raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) + if job_id < 0: + raise errors.ProgrammerError("Job ID %s is negative" % job_id) + + return str(job_id) + + @classmethod + def _GetArchiveDirectory(cls, job_id): + """Returns the archive directory for a job. + + @type job_id: str + @param job_id: Job identifier + @rtype: str + @return: Directory name + + """ + return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY) + + def _NewSerialUnlocked(self): """Generates a new job identifier. Job identifiers are unique during the lifetime of a cluster. - Returns: A string representing the job identifier. + @rtype: str + @return: a string representing the job identifier. """ - assert self.lock_fd, "Queue should be open" - # New number serial = self._last_serial + 1 # Write to file - utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE, - data="%s\n" % serial) + self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE, + "%s\n" % serial) # Keep it only if we were able to write the file self._last_serial = serial - # Distribute the serial to the other nodes - try: - nodes.remove(self._my_hostname) - except ValueError: - pass + return self._FormatJobID(serial) - result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE) - for node in nodes: - if not result[node]: - logging.error("copy of job queue file to node %s failed", node) + @staticmethod + def _GetJobPath(job_id): + """Returns the job file for a given job id. - return self.FormatJobID(serial) + @type job_id: str + @param job_id: the job identifier + @rtype: str + @return: the path to the job file - def _GetJobPath(self, job_id): + """ return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id) - def _GetArchivedJobPath(self, job_id): - return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id) + @classmethod + def _GetArchivedJobPath(cls, job_id): + """Returns the archived job file for a give job id. + + @type job_id: str + @param job_id: the job identifier + @rtype: str + @return: the path to the archived job file + + """ + path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id) + return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path) + + @classmethod + def _ExtractJobID(cls, name): + """Extract the job id from a filename. + + @type name: str + @param name: the job filename + @rtype: job id or None + @return: the job id corresponding to the given filename, + or None if the filename does not represent a valid + job file + + """ + m = cls._RE_JOB_FILE.match(name) + if m: + return m.group(1) + else: + return None def _GetJobIDsUnlocked(self, archived=False): """Return all known job IDs. @@ -433,25 +833,40 @@ class DiskJobStorage(JobStorageBase): jobs are present on disk (so in the _memcache we don't have any extra IDs). + @rtype: list + @return: the list of job IDs + """ - jfiles = self._ListJobFiles() - jlist = [m.group(1) for m in - [self._RE_JOB_FILE.match(name) for name in jfiles]] - jlist.sort() + jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()] + jlist = utils.NiceSort(jlist) return jlist def _ListJobFiles(self): - assert self.lock_fd, "Queue should be open" + """Returns the list of current job files. + + @rtype: list + @return: the list of job file names + """ return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR) if self._RE_JOB_FILE.match(name)] def _LoadJobUnlocked(self, job_id): - assert self.lock_fd, "Queue should be open" + """Loads a job from the disk or memory. + + Given a job id, this will return the cached job object if + existing, or try to load the job from the disk. If loading from + disk, it will also add the job to the cache. - if job_id in self._memcache: + @param job_id: the job id + @rtype: L{_QueuedJob} or None + @return: either None or the job object + + """ + job = self._memcache.get(job_id, None) + if job: logging.debug("Found job %s in memcache", job_id) - return self._memcache[job_id] + return job filepath = self._GetJobPath(job_id) logging.debug("Loading job from %s", filepath) @@ -466,182 +881,442 @@ class DiskJobStorage(JobStorageBase): finally: fd.close() - job = _QueuedJob.Restore(self, data) + try: + job = _QueuedJob.Restore(self, data) + except Exception, err: + new_path = self._GetArchivedJobPath(job_id) + if filepath == new_path: + # job already archived (future case) + logging.exception("Can't parse job %s", job_id) + else: + # non-archived case + logging.exception("Can't parse job %s, will archive.", job_id) + self._RenameFilesUnlocked([(filepath, new_path)]) + return None + self._memcache[job_id] = job logging.debug("Added job %s to the cache", job_id) return job def _GetJobsUnlocked(self, job_ids): + """Return a list of jobs based on their IDs. + + @type job_ids: list + @param job_ids: either an empty list (meaning all jobs), + or a list of job IDs + @rtype: list + @return: the list of job objects + + """ if not job_ids: job_ids = self._GetJobIDsUnlocked() return [self._LoadJobUnlocked(job_id) for job_id in job_ids] - @utils.LockedMethod - def GetJobs(self, job_ids): - return self._GetJobsUnlocked(job_ids) + @staticmethod + def _IsQueueMarkedDrain(): + """Check if the queue is marked from drain. + + This currently uses the queue drain file, which makes it a + per-node flag. In the future this can be moved to the config file. + + @rtype: boolean + @return: True of the job queue is marked for draining + + """ + return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) + + @staticmethod + def SetDrainFlag(drain_flag): + """Sets the drain flag for the queue. + + This is similar to the function L{backend.JobQueueSetDrainFlag}, + and in the future we might merge them. + + @type drain_flag: boolean + @param drain_flag: wheter to set or unset the drain flag + + """ + if drain_flag: + utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True) + else: + utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) + return True @utils.LockedMethod - def AddJob(self, ops, nodes): - """Create and store on disk a new job. + @_RequireOpenQueue + def SubmitJob(self, ops): + """Create and store a new job. + + This enters the job into our job queue and also puts it on the new + queue, in order for it to be picked up by the queue processors. @type ops: list @param ops: The list of OpCodes that will become the new job. - @type nodes: list - @param nodes: The list of nodes to which the new job serial will be - distributed. + @rtype: job ID + @return: the job ID of the newly created job + @raise errors.JobQueueDrainError: if the job is marked for draining """ - assert self.lock_fd, "Queue should be open" + if self._IsQueueMarkedDrain(): + raise errors.JobQueueDrainError() + + # Check job queue size + size = len(self._ListJobFiles()) + if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT: + # TODO: Autoarchive jobs. Make sure it's not done on every job + # submission, though. + #size = ... + pass + + if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: + raise errors.JobQueueFull() # Get job identifier - job_id = self._NewSerialUnlocked(nodes) + job_id = self._NewSerialUnlocked() job = _QueuedJob(self, job_id, ops) # Write to disk - self._UpdateJobUnlocked(job) + self.UpdateJobUnlocked(job) - logging.debug("Added new job %s to the cache", job_id) + logging.debug("Adding new job %s to the cache", job_id) self._memcache[job_id] = job - return job + # Add to worker pool + self._wpool.AddTask(job) + + return job.id + + @_RequireOpenQueue + def UpdateJobUnlocked(self, job): + """Update a job's on disk storage. - def _UpdateJobUnlocked(self, job): - assert self.lock_fd, "Queue should be open" + After a job has been modified, this function needs to be called in + order to write the changes to disk and replicate them to the other + nodes. + @type job: L{_QueuedJob} + @param job: the changed job + + """ filename = self._GetJobPath(job.id) + data = serializer.DumpJson(job.Serialize(), indent=False) logging.debug("Writing job %s to %s", job.id, filename) - utils.WriteFile(filename, - data=serializer.DumpJson(job.Serialize(), indent=False)) - self._CleanCacheUnlocked([job.id]) + self._WriteAndReplicateFileUnlocked(filename, data) - def _CleanCacheUnlocked(self, exclude): - """Clean the memory cache. + # Notify waiters about potential changes + job.change.notifyAll() - The exceptions argument contains job IDs that should not be - cleaned. + @utils.LockedMethod + @_RequireOpenQueue + def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, + timeout): + """Waits for changes in a job. + + @type job_id: string + @param job_id: Job identifier + @type fields: list of strings + @param fields: Which fields to check for changes + @type prev_job_info: list or None + @param prev_job_info: Last job information returned + @type prev_log_serial: int + @param prev_log_serial: Last job message serial number + @type timeout: float + @param timeout: maximum time to wait + @rtype: tuple (job info, log entries) + @return: a tuple of the job information as required via + the fields parameter, and the log entries as a list + + if the job has not changed and the timeout has expired, + we instead return a special value, + L{constants.JOB_NOTCHANGED}, which should be interpreted + as such by the clients """ - assert isinstance(exclude, list) - for job in self._memcache.values(): - if job.id in exclude: - continue - if job.GetStatus() not in (constants.JOB_STATUS_QUEUED, - constants.JOB_STATUS_RUNNING): - logging.debug("Cleaning job %s from the cache", job.id) - try: - del self._memcache[job.id] - except KeyError: - pass + logging.debug("Waiting for changes in job %s", job_id) + end_time = time.time() + timeout + while True: + delta_time = end_time - time.time() + if delta_time < 0: + return constants.JOB_NOTCHANGED + + job = self._LoadJobUnlocked(job_id) + if not job: + logging.debug("Job %s not found", job_id) + break + + status = job.CalcStatus() + job_info = self._GetJobInfoUnlocked(job, fields) + log_entries = job.GetLogEntries(prev_log_serial) + + # Serializing and deserializing data can cause type changes (e.g. from + # tuple to list) or precision loss. We're doing it here so that we get + # the same modifications as the data received from the client. Without + # this, the comparison afterwards might fail without the data being + # significantly different. + job_info = serializer.LoadJson(serializer.DumpJson(job_info)) + log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) + + if status not in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_RUNNING, + constants.JOB_STATUS_WAITLOCK): + # Don't even try to wait if the job is no longer running, there will be + # no changes. + break + + if (prev_job_info != job_info or + (log_entries and prev_log_serial != log_entries[0][0])): + break + + logging.debug("Waiting again") + + # Release the queue lock while waiting + job.change.wait(delta_time) + + logging.debug("Job %s changed", job_id) + + return (job_info, log_entries) @utils.LockedMethod - def UpdateJob(self, job): - return self._UpdateJobUnlocked(job) + @_RequireOpenQueue + def CancelJob(self, job_id): + """Cancels a job. + This will only succeed if the job has not started yet. + + @type job_id: string + @param job_id: job ID of job to be cancelled. + + """ + logging.info("Cancelling job %s", job_id) + + job = self._LoadJobUnlocked(job_id) + if not job: + logging.debug("Job %s not found", job_id) + return (False, "Job %s not found" % job_id) + + job_status = job.CalcStatus() + + if job_status not in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_WAITLOCK): + logging.debug("Job %s is no longer in the queue", job.id) + return (False, "Job %s is no longer in the queue" % job.id) + + if job_status == constants.JOB_STATUS_QUEUED: + self.CancelJobUnlocked(job) + return (True, "Job %s canceled" % job.id) + + elif job_status == constants.JOB_STATUS_WAITLOCK: + # The worker will notice the new status and cancel the job + try: + for op in job.ops: + op.status = constants.OP_STATUS_CANCELING + finally: + self.UpdateJobUnlocked(job) + return (True, "Job %s will be canceled" % job.id) + + @_RequireOpenQueue + def CancelJobUnlocked(self, job): + """Marks a job as canceled. + + """ + try: + for op in job.ops: + op.status = constants.OP_STATUS_CANCELED + op.result = "Job canceled by request" + finally: + self.UpdateJobUnlocked(job) + + @_RequireOpenQueue + def _ArchiveJobsUnlocked(self, jobs): + """Archives jobs. + + @type jobs: list of L{_QueuedJob} + @param jobs: Job objects + @rtype: int + @return: Number of archived jobs + + """ + archive_jobs = [] + rename_files = [] + for job in jobs: + if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, + constants.JOB_STATUS_SUCCESS, + constants.JOB_STATUS_ERROR): + logging.debug("Job %s is not yet done", job.id) + continue + + archive_jobs.append(job) + + old = self._GetJobPath(job.id) + new = self._GetArchivedJobPath(job.id) + rename_files.append((old, new)) + + # TODO: What if 1..n files fail to rename? + self._RenameFilesUnlocked(rename_files) + + logging.debug("Successfully archived job(s) %s", + ", ".join(job.id for job in archive_jobs)) + + return len(archive_jobs) + + @utils.LockedMethod + @_RequireOpenQueue def ArchiveJob(self, job_id): - raise NotImplementedError() + """Archives a job. + This is just a wrapper over L{_ArchiveJobsUnlocked}. -class JobQueue: - """The job queue. + @type job_id: string + @param job_id: Job ID of job to be archived. + @rtype: bool + @return: Whether job was archived - """ - def __init__(self, context): - self._lock = threading.Lock() - self._jobs = DiskJobStorage("") - self._wpool = _JobQueueWorkerPool(context) + """ + logging.info("Archiving job %s", job_id) - for job in self._jobs.GetJobs(None): - status = job.GetStatus() - if status in (constants.JOB_STATUS_QUEUED, ): - self._wpool.AddTask(job) + job = self._LoadJobUnlocked(job_id) + if not job: + logging.debug("Job %s not found", job_id) + return False - elif status in (constants.JOB_STATUS_RUNNING, ): - logging.warning("Unfinished job %s found: %s", job.id, job) - job.SetUnclean("Unclean master daemon shutdown") + return self._ArchiveJobsUnlocked([job]) == 1 @utils.LockedMethod - def SubmitJob(self, ops, nodes): - """Add a new job to the queue. + @_RequireOpenQueue + def AutoArchiveJobs(self, age, timeout): + """Archives all jobs based on age. - This enters the job into our job queue and also puts it on the new - queue, in order for it to be picked up by the queue processors. + The method will archive all jobs which are older than the age + parameter. For jobs that don't have an end timestamp, the start + timestamp will be considered. The special '-1' age will cause + archival of all jobs (that are not running or queued). - @type ops: list - @param ops: the sequence of opcodes that will become the new job - @type nodes: list - @param nodes: the list of nodes to which the queue should be - distributed + @type age: int + @param age: the minimum age in seconds """ - job = self._jobs.AddJob(ops, nodes) + logging.info("Archiving jobs with age more than %s seconds", age) + + now = time.time() + end_time = now + timeout + archived_count = 0 + last_touched = 0 + + all_job_ids = self._GetJobIDsUnlocked(archived=False) + pending = [] + for idx, job_id in enumerate(all_job_ids): + last_touched = idx + + # Not optimal because jobs could be pending + # TODO: Measure average duration for job archival and take number of + # pending jobs into account. + if time.time() > end_time: + break - # Add to worker pool - self._wpool.AddTask(job) + # Returns None if the job failed to load + job = self._LoadJobUnlocked(job_id) + if job: + if job.end_timestamp is None: + if job.start_timestamp is None: + job_age = job.received_timestamp + else: + job_age = job.start_timestamp + else: + job_age = job.end_timestamp - return job.id + if age == -1 or now - job_age[0] > age: + pending.append(job) - def ArchiveJob(self, job_id): - raise NotImplementedError() + # Archive 10 jobs at a time + if len(pending) >= 10: + archived_count += self._ArchiveJobsUnlocked(pending) + pending = [] - def CancelJob(self, job_id): - raise NotImplementedError() + if pending: + archived_count += self._ArchiveJobsUnlocked(pending) - def _GetJobInfo(self, job, fields): + return (archived_count, len(all_job_ids) - last_touched - 1) + + def _GetJobInfoUnlocked(self, job, fields): + """Returns information about a job. + + @type job: L{_QueuedJob} + @param job: the job which we query + @type fields: list + @param fields: names of fields to return + @rtype: list + @return: list with one element for each field + @raise errors.OpExecError: when an invalid field + has been passed + + """ row = [] for fname in fields: if fname == "id": row.append(job.id) elif fname == "status": - row.append(job.GetStatus()) + row.append(job.CalcStatus()) elif fname == "ops": - row.append([op.GetInput().__getstate__() for op in job._ops]) + row.append([op.input.__getstate__() for op in job.ops]) elif fname == "opresult": - row.append([op.GetResult() for op in job._ops]) + row.append([op.result for op in job.ops]) elif fname == "opstatus": - row.append([op.GetStatus() for op in job._ops]) - elif fname == "ticker": - ji = job.GetRunOpIndex() - if ji < 0: - lmsg = None - else: - lmsg = job._ops[ji].RetrieveLog(-1) - # message might be empty here - if lmsg: - lmsg = lmsg[0] - else: - lmsg = None - row.append(lmsg) + row.append([op.status for op in job.ops]) + elif fname == "oplog": + row.append([op.log for op in job.ops]) + elif fname == "opstart": + row.append([op.start_timestamp for op in job.ops]) + elif fname == "opend": + row.append([op.end_timestamp for op in job.ops]) + elif fname == "received_ts": + row.append(job.received_timestamp) + elif fname == "start_ts": + row.append(job.start_timestamp) + elif fname == "end_ts": + row.append(job.end_timestamp) + elif fname == "summary": + row.append([op.input.Summary() for op in job.ops]) else: raise errors.OpExecError("Invalid job query field '%s'" % fname) return row + @utils.LockedMethod + @_RequireOpenQueue def QueryJobs(self, job_ids, fields): """Returns a list of jobs in queue. - Args: - - job_ids: Sequence of job identifiers or None for all - - fields: Names of fields to return + This is a wrapper of L{_GetJobsUnlocked}, which actually does the + processing for each job. + + @type job_ids: list + @param job_ids: sequence of job identifiers or None for all + @type fields: list + @param fields: names of fields to return + @rtype: list + @return: list one element per job, each element being list with + the requested fields """ - self._lock.acquire() - try: - jobs = [] + jobs = [] - for job in self._jobs.GetJobs(job_ids): - if job is None: - jobs.append(None) - else: - jobs.append(self._GetJobInfo(job, fields)) + for job in self._GetJobsUnlocked(job_ids): + if job is None: + jobs.append(None) + else: + jobs.append(self._GetJobInfoUnlocked(job, fields)) - return jobs - finally: - self._lock.release() + return jobs @utils.LockedMethod + @_RequireOpenQueue def Shutdown(self): """Stops the job queue. + This shutdowns all the worker threads an closes the queue. + """ self._wpool.TerminateWorkers() - self._jobs.Close() + + self._queue_lock.Close() + self._queue_lock = None