X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/8b537bb0fd52a71c6f260f1f8ca0549423aa3d82..9f2265bce8fe16a79ad78098a7681d69f6730277:/lib/jstore.py diff --git a/lib/jstore.py b/lib/jstore.py index 5065e86..320a034 100644 --- a/lib/jstore.py +++ b/lib/jstore.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -21,14 +21,17 @@ """Module implementing the job queue handling.""" -import os -import logging import errno -import re +import os from ganeti import constants from ganeti import errors +from ganeti import runtime from ganeti import utils +from ganeti import pathutils + + +JOBS_PER_ARCHIVE_DIRECTORY = 10000 def _ReadNumericFile(file_name): @@ -39,17 +42,12 @@ def _ReadNumericFile(file_name): """ try: - fd = open(file_name, "r") - except IOError, err: + return int(utils.ReadFile(file_name)) + except EnvironmentError, err: if err.errno in (errno.ENOENT, ): return None raise - try: - return int(fd.read(128)) - finally: - fd.close() - def ReadSerial(): """Read the serial file. @@ -57,7 +55,7 @@ def ReadSerial(): The queue should be locked while this function is called. """ - return _ReadNumericFile(constants.JOB_QUEUE_SERIAL_FILE) + return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE) def ReadVersion(): @@ -66,70 +64,153 @@ def ReadVersion(): The queue should be locked while this function is called. """ - return _ReadNumericFile(constants.JOB_QUEUE_VERSION_FILE) + return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE) -def InitAndVerifyQueue(exclusive): +def InitAndVerifyQueue(must_lock): """Open and lock job queue. If necessary, the queue is automatically initialized. - @type exclusive: bool - @param exclusive: Whether to lock the queue in exclusive mode. Shared - mode otherwise. + @type must_lock: bool + @param must_lock: Whether an exclusive lock must be held. @rtype: utils.FileLock @return: Lock object for the queue. This can be used to change the locking mode. """ - # Make sure our directories exists - for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR): - try: - os.mkdir(path, 0700) - except OSError, err: - if err.errno not in (errno.EEXIST, ): - raise + getents = runtime.GetEnts() # Lock queue - queue_lock = utils.FileLock(constants.JOB_QUEUE_LOCK_FILE) + queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE) try: - # Determine locking function and call it - if exclusive: - fn = queue_lock.Exclusive + # The queue needs to be locked in exclusive mode to write to the serial and + # version files. + if must_lock: + queue_lock.Exclusive(blocking=True) + holding_lock = True else: - fn = queue_lock.Shared - - fn(blocking=False) - - # Verify version - version = ReadVersion() - if version is None: - # Write new version file - utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE, - data="%s\n" % constants.JOB_QUEUE_VERSION) - - # Read again + try: + queue_lock.Exclusive(blocking=False) + holding_lock = True + except errors.LockError: + # Ignore errors and assume the process keeping the lock checked + # everything. + holding_lock = False + + if holding_lock: + # Verify version version = ReadVersion() + if version is None: + # Write new version file + utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE, + uid=getents.masterd_uid, gid=getents.masterd_gid, + data="%s\n" % constants.JOB_QUEUE_VERSION) - if version != constants.JOB_QUEUE_VERSION: - raise errors.JobQueueError("Found job queue version %s, expected %s", - version, constants.JOB_QUEUE_VERSION) + # Read again + version = ReadVersion() - serial = ReadSerial() - if serial is None: - # Write new serial file - utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE, - data="%s\n" % 0) + if version != constants.JOB_QUEUE_VERSION: + raise errors.JobQueueError("Found job queue version %s, expected %s", + version, constants.JOB_QUEUE_VERSION) - # Read again serial = ReadSerial() + if serial is None: + # Write new serial file + utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE, + uid=getents.masterd_uid, gid=getents.masterd_gid, + data="%s\n" % 0) + + # Read again + serial = ReadSerial() + + if serial is None: + # There must be a serious problem + raise errors.JobQueueError("Can't read/parse the job queue" + " serial file") - if serial is None: - # There must be a serious problem - raise errors.JobQueueError("Can't read/parse the job queue serial file") + if not must_lock: + # There's no need for more error handling. Closing the lock + # file below in case of an error will unlock it anyway. + queue_lock.Unlock() except: queue_lock.Close() raise return queue_lock + + +def CheckDrainFlag(): + """Check if the queue is marked to be drained. + + This currently uses the queue drain file, which makes it a per-node flag. + In the future this can be moved to the config file. + + @rtype: boolean + @return: True if the job queue is marked drained + + """ + return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE) + + +def SetDrainFlag(drain_flag): + """Sets the drain flag for the queue. + + @type drain_flag: boolean + @param drain_flag: Whether to set or unset the drain flag + @attention: This function should only called the current holder of the queue + lock + + """ + getents = runtime.GetEnts() + + if drain_flag: + utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="", + uid=getents.masterd_uid, gid=getents.masterd_gid) + else: + utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE) + + assert (not drain_flag) ^ CheckDrainFlag() + + +def FormatJobID(job_id): + """Convert a job ID to int format. + + Currently this just is a no-op that performs some checks, but if we + want to change the job id format this will abstract this change. + + @type job_id: int or long + @param job_id: the numeric job id + @rtype: int + @return: the formatted job id + + """ + if not isinstance(job_id, (int, long)): + raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) + if job_id < 0: + raise errors.ProgrammerError("Job ID %s is negative" % job_id) + + return job_id + + +def GetArchiveDirectory(job_id): + """Returns the archive directory for a job. + + @type job_id: str + @param job_id: Job identifier + @rtype: str + @return: Directory name + + """ + return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY) + + +def ParseJobId(job_id): + """Parses a job ID and converts it to integer. + + """ + try: + return int(job_id) + except (ValueError, TypeError): + raise errors.ParameterError("Invalid job ID '%s'" % job_id)