#
#
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
"""Module implementing the job queue handling."""
-import os
-import logging
import errno
-import re
+import os
from ganeti import constants
from ganeti import errors
+from ganeti import runtime
from ganeti import utils
+from ganeti import pathutils
+
+
+JOBS_PER_ARCHIVE_DIRECTORY = 10000
def _ReadNumericFile(file_name):
"""
try:
- fd = open(file_name, "r")
- except IOError, err:
+ contents = utils.ReadFile(file_name)
+ except EnvironmentError, err:
if err.errno in (errno.ENOENT, ):
return None
raise
try:
- return int(fd.read(128))
- finally:
- fd.close()
+ return int(contents)
+ except (ValueError, TypeError), err:
+ # Couldn't convert to int
+ raise errors.JobQueueError("Content of file '%s' is not numeric: %s" %
+ (file_name, err))
def ReadSerial():
The queue should be locked while this function is called.
"""
- return _ReadNumericFile(constants.JOB_QUEUE_SERIAL_FILE)
+ return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE)
def ReadVersion():
The queue should be locked while this function is called.
"""
- return _ReadNumericFile(constants.JOB_QUEUE_VERSION_FILE)
+ return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE)
-def InitAndVerifyQueue(exclusive):
+def InitAndVerifyQueue(must_lock):
"""Open and lock job queue.
If necessary, the queue is automatically initialized.
- @type exclusive: bool
- @param exclusive: Whether to lock the queue in exclusive mode. Shared
- mode otherwise.
+ @type must_lock: bool
+ @param must_lock: Whether an exclusive lock must be held.
@rtype: utils.FileLock
@return: Lock object for the queue. This can be used to change the
locking mode.
"""
- # Make sure our directories exists
- for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
- try:
- os.mkdir(path, 0700)
- except OSError, err:
- if err.errno not in (errno.EEXIST, ):
- raise
+ getents = runtime.GetEnts()
# Lock queue
- queue_lock = utils.FileLock(constants.JOB_QUEUE_LOCK_FILE)
+ queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE)
try:
- # Determine locking function and call it
- if exclusive:
- fn = queue_lock.Exclusive
+ # The queue needs to be locked in exclusive mode to write to the serial and
+ # version files.
+ if must_lock:
+ queue_lock.Exclusive(blocking=True)
+ holding_lock = True
else:
- fn = queue_lock.Shared
-
- fn(blocking=False)
-
- # Verify version
- version = ReadVersion()
- if version is None:
- # Write new version file
- utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
- data="%s\n" % constants.JOB_QUEUE_VERSION)
-
- # Read again
+ try:
+ queue_lock.Exclusive(blocking=False)
+ holding_lock = True
+ except errors.LockError:
+ # Ignore errors and assume the process keeping the lock checked
+ # everything.
+ holding_lock = False
+
+ if holding_lock:
+ # Verify version
version = ReadVersion()
+ if version is None:
+ # Write new version file
+ utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE,
+ uid=getents.masterd_uid, gid=getents.daemons_gid,
+ mode=constants.JOB_QUEUE_FILES_PERMS,
+ data="%s\n" % constants.JOB_QUEUE_VERSION)
- if version != constants.JOB_QUEUE_VERSION:
- raise errors.JobQueueError("Found job queue version %s, expected %s",
- version, constants.JOB_QUEUE_VERSION)
+ # Read again
+ version = ReadVersion()
- serial = ReadSerial()
- if serial is None:
- # Write new serial file
- utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
- data="%s\n" % 0)
+ if version != constants.JOB_QUEUE_VERSION:
+ raise errors.JobQueueError("Found job queue version %s, expected %s",
+ version, constants.JOB_QUEUE_VERSION)
- # Read again
serial = ReadSerial()
-
- if serial is None:
- # There must be a serious problem
- raise errors.JobQueueError("Can't read/parse the job queue serial file")
+ if serial is None:
+ # Write new serial file
+ utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE,
+ uid=getents.masterd_uid, gid=getents.daemons_gid,
+ mode=constants.JOB_QUEUE_FILES_PERMS,
+ data="%s\n" % 0)
+
+ # Read again
+ serial = ReadSerial()
+
+ if serial is None:
+ # There must be a serious problem
+ raise errors.JobQueueError("Can't read/parse the job queue"
+ " serial file")
+
+ if not must_lock:
+ # There's no need for more error handling. Closing the lock
+ # file below in case of an error will unlock it anyway.
+ queue_lock.Unlock()
except:
queue_lock.Close()
raise
return queue_lock
+
+
+def CheckDrainFlag():
+ """Check if the queue is marked to be drained.
+
+ This currently uses the queue drain file, which makes it a per-node flag.
+ In the future this can be moved to the config file.
+
+ @rtype: boolean
+ @return: True if the job queue is marked drained
+
+ """
+ return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
+
+
+def SetDrainFlag(drain_flag):
+ """Sets the drain flag for the queue.
+
+ @type drain_flag: boolean
+ @param drain_flag: Whether to set or unset the drain flag
+ @attention: This function should only called the current holder of the queue
+ lock
+
+ """
+ getents = runtime.GetEnts()
+
+ if drain_flag:
+ utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="",
+ uid=getents.masterd_uid, gid=getents.daemons_gid,
+ mode=constants.JOB_QUEUE_FILES_PERMS)
+ else:
+ utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE)
+
+ assert (not drain_flag) ^ CheckDrainFlag()
+
+
+def FormatJobID(job_id):
+ """Convert a job ID to int format.
+
+ Currently this just is a no-op that performs some checks, but if we
+ want to change the job id format this will abstract this change.
+
+ @type job_id: int or long
+ @param job_id: the numeric job id
+ @rtype: int
+ @return: the formatted job id
+
+ """
+ if not isinstance(job_id, (int, long)):
+ raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
+ if job_id < 0:
+ raise errors.ProgrammerError("Job ID %s is negative" % job_id)
+
+ return job_id
+
+
+def GetArchiveDirectory(job_id):
+ """Returns the archive directory for a job.
+
+ @type job_id: str
+ @param job_id: Job identifier
+ @rtype: str
+ @return: Directory name
+
+ """
+ return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
+
+
+def ParseJobId(job_id):
+ """Parses a job ID and converts it to integer.
+
+ """
+ try:
+ return int(job_id)
+ except (ValueError, TypeError):
+ raise errors.ParameterError("Invalid job ID '%s'" % job_id)