X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/7c4d6c7b8488c263e768c26be110ee45071e6c96..c4929a8bcca4a43dc6434394a91a8ea67d854844:/lib/jstore.py diff --git a/lib/jstore.py b/lib/jstore.py index 5c59968..88b15cc 100644 --- a/lib/jstore.py +++ b/lib/jstore.py @@ -21,11 +21,12 @@ """Module implementing the job queue handling.""" -import os import errno +import os from ganeti import constants from ganeti import errors +from ganeti import runtime from ganeti import utils @@ -37,17 +38,12 @@ def _ReadNumericFile(file_name): """ try: - fd = open(file_name, "r") - except IOError, err: + return int(utils.ReadFile(file_name)) + except EnvironmentError, err: if err.errno in (errno.ENOENT, ): return None raise - try: - return int(fd.read(128)) - finally: - fd.close() - def ReadSerial(): """Read the serial file. @@ -79,16 +75,10 @@ def InitAndVerifyQueue(must_lock): locking mode. """ - # Make sure our directories exists - for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR): - try: - os.mkdir(path, 0700) - except OSError, err: - if err.errno not in (errno.EEXIST, ): - raise + getents = runtime.GetEnts() # Lock queue - queue_lock = utils.FileLock(constants.JOB_QUEUE_LOCK_FILE) + queue_lock = utils.FileLock.Open(constants.JOB_QUEUE_LOCK_FILE) try: # The queue needs to be locked in exclusive mode to write to the serial and # version files. @@ -110,6 +100,7 @@ def InitAndVerifyQueue(must_lock): if version is None: # Write new version file utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE, + uid=getents.masterd_uid, gid=getents.masterd_gid, data="%s\n" % constants.JOB_QUEUE_VERSION) # Read again @@ -123,6 +114,7 @@ def InitAndVerifyQueue(must_lock): if serial is None: # Write new serial file utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE, + uid=getents.masterd_uid, gid=getents.masterd_gid, data="%s\n" % 0) # Read again @@ -130,11 +122,12 @@ def InitAndVerifyQueue(must_lock): if serial is None: # There must be a serious problem - raise errors.JobQueueError("Can't read/parse the job queue serial file") + raise errors.JobQueueError("Can't read/parse the job queue" + " serial file") if not must_lock: - # There's no need for more error handling. Closing the lock file below in - # case of an error will unlock it anyway. + # There's no need for more error handling. Closing the lock + # file below in case of an error will unlock it anyway. queue_lock.Unlock() except: @@ -142,3 +135,36 @@ def InitAndVerifyQueue(must_lock): raise return queue_lock + + +def CheckDrainFlag(): + """Check if the queue is marked to be drained. + + This currently uses the queue drain file, which makes it a per-node flag. + In the future this can be moved to the config file. + + @rtype: boolean + @return: True if the job queue is marked drained + + """ + return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) + + +def SetDrainFlag(drain_flag): + """Sets the drain flag for the queue. + + @type drain_flag: boolean + @param drain_flag: Whether to set or unset the drain flag + @attention: This function should only called the current holder of the queue + lock + + """ + getents = runtime.GetEnts() + + if drain_flag: + utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", + uid=getents.masterd_uid, gid=getents.masterd_gid) + else: + utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) + + assert (not drain_flag) ^ CheckDrainFlag()