X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/6358dbc27205e03e749eb66c87e5c92c4b3beb11..47387ccb5b333102e8a36f52a20870d2dca0b71a:/lib/jstore.py diff --git a/lib/jstore.py b/lib/jstore.py index f61a79c..324f91e 100644 --- a/lib/jstore.py +++ b/lib/jstore.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -22,10 +22,16 @@ """Module implementing the job queue handling.""" import errno +import os from ganeti import constants from ganeti import errors +from ganeti import runtime from ganeti import utils +from ganeti import pathutils + + +JOBS_PER_ARCHIVE_DIRECTORY = 10000 def _ReadNumericFile(file_name): @@ -36,12 +42,19 @@ def _ReadNumericFile(file_name): """ try: - return int(utils.ReadFile(file_name)) + contents = utils.ReadFile(file_name) except EnvironmentError, err: if err.errno in (errno.ENOENT, ): return None raise + try: + return int(contents) + except (ValueError, TypeError), err: + # Couldn't convert to int + raise errors.JobQueueError("Content of file '%s' is not numeric: %s" % + (file_name, err)) + def ReadSerial(): """Read the serial file. @@ -49,7 +62,7 @@ def ReadSerial(): The queue should be locked while this function is called. """ - return _ReadNumericFile(constants.JOB_QUEUE_SERIAL_FILE) + return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE) def ReadVersion(): @@ -58,7 +71,7 @@ def ReadVersion(): The queue should be locked while this function is called. """ - return _ReadNumericFile(constants.JOB_QUEUE_VERSION_FILE) + return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE) def InitAndVerifyQueue(must_lock): @@ -73,11 +86,10 @@ def InitAndVerifyQueue(must_lock): locking mode. """ - dirs = [(d, constants.JOB_QUEUE_DIRS_MODE) for d in constants.JOB_QUEUE_DIRS] - utils.EnsureDirs(dirs) + getents = runtime.GetEnts() # Lock queue - queue_lock = utils.FileLock.Open(constants.JOB_QUEUE_LOCK_FILE) + queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE) try: # The queue needs to be locked in exclusive mode to write to the serial and # version files. @@ -98,7 +110,9 @@ def InitAndVerifyQueue(must_lock): version = ReadVersion() if version is None: # Write new version file - utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE, + utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE, + uid=getents.masterd_uid, gid=getents.daemons_gid, + mode=constants.JOB_QUEUE_FILES_PERMS, data="%s\n" % constants.JOB_QUEUE_VERSION) # Read again @@ -111,7 +125,9 @@ def InitAndVerifyQueue(must_lock): serial = ReadSerial() if serial is None: # Write new serial file - utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE, + utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE, + uid=getents.masterd_uid, gid=getents.daemons_gid, + mode=constants.JOB_QUEUE_FILES_PERMS, data="%s\n" % 0) # Read again @@ -132,3 +148,79 @@ def InitAndVerifyQueue(must_lock): raise return queue_lock + + +def CheckDrainFlag(): + """Check if the queue is marked to be drained. + + This currently uses the queue drain file, which makes it a per-node flag. + In the future this can be moved to the config file. + + @rtype: boolean + @return: True if the job queue is marked drained + + """ + return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE) + + +def SetDrainFlag(drain_flag): + """Sets the drain flag for the queue. + + @type drain_flag: boolean + @param drain_flag: Whether to set or unset the drain flag + @attention: This function should only called the current holder of the queue + lock + + """ + getents = runtime.GetEnts() + + if drain_flag: + utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="", + uid=getents.masterd_uid, gid=getents.daemons_gid, + mode=constants.JOB_QUEUE_FILES_PERMS) + else: + utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE) + + assert (not drain_flag) ^ CheckDrainFlag() + + +def FormatJobID(job_id): + """Convert a job ID to int format. + + Currently this just is a no-op that performs some checks, but if we + want to change the job id format this will abstract this change. + + @type job_id: int or long + @param job_id: the numeric job id + @rtype: int + @return: the formatted job id + + """ + if not isinstance(job_id, (int, long)): + raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) + if job_id < 0: + raise errors.ProgrammerError("Job ID %s is negative" % job_id) + + return job_id + + +def GetArchiveDirectory(job_id): + """Returns the archive directory for a job. + + @type job_id: str + @param job_id: Job identifier + @rtype: str + @return: Directory name + + """ + return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY) + + +def ParseJobId(job_id): + """Parses a job ID and converts it to integer. + + """ + try: + return int(job_id) + except (ValueError, TypeError): + raise errors.ParameterError("Invalid job ID '%s'" % job_id)