"""Module implementing the job queue handling."""
import errno
+import os
from ganeti import constants
from ganeti import errors
+from ganeti import runtime
from ganeti import utils
locking mode.
"""
- dirs = [(d, constants.JOB_QUEUE_DIRS_MODE) for d in constants.JOB_QUEUE_DIRS]
- utils.EnsureDirs(dirs)
+ getents = runtime.GetEnts()
# Lock queue
queue_lock = utils.FileLock.Open(constants.JOB_QUEUE_LOCK_FILE)
if version is None:
# Write new version file
utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
+ uid=getents.masterd_uid, gid=getents.masterd_gid,
data="%s\n" % constants.JOB_QUEUE_VERSION)
# Read again
if serial is None:
# Write new serial file
utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
+ uid=getents.masterd_uid, gid=getents.masterd_gid,
data="%s\n" % 0)
# Read again
raise
return queue_lock
+
+
+def CheckDrainFlag():
+ """Check if the queue is marked to be drained.
+
+ This currently uses the queue drain file, which makes it a per-node flag.
+ In the future this can be moved to the config file.
+
+ @rtype: boolean
+ @return: True if the job queue is marked drained
+
+ """
+ return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+
+
+def SetDrainFlag(drain_flag):
+ """Sets the drain flag for the queue.
+
+ @type drain_flag: boolean
+ @param drain_flag: Whether to set or unset the drain flag
+ @attention: This function should only called the current holder of the queue
+ lock
+
+ """
+ getents = runtime.GetEnts()
+
+ if drain_flag:
+ utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="",
+ uid=getents.masterd_uid, gid=getents.masterd_gid)
+ else:
+ utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+
+ assert (not drain_flag) ^ CheckDrainFlag()