from ganeti import ht
from ganeti import query
from ganeti import qlang
+from ganeti import pathutils
JOBQUEUE_THREADS = 25
files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
# Upload current serial file
- files.append(constants.JOB_QUEUE_SERIAL_FILE)
+ files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
# Static address list
addrs = [node.primary_ip]
serial = self._last_serial + count
# Write to file
- self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
+ self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
"%s\n" % serial, True)
result = [jstore.FormatJobID(v)
@return: the path to the job file
"""
- return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
+ return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
@staticmethod
def _GetArchivedJobPath(job_id):
@return: the path to the archived job file
"""
- return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
+ return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
jstore.GetArchiveDirectory(job_id),
"job-%s" % job_id)
"""
jlist = []
- for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
+ for filename in utils.ListVisibleFiles(pathutils.QUEUE_DIR):
m = constants.JOB_FILE_RE.match(filename)
if m:
jlist.append(int(m.group(1)))
from ganeti import errors
from ganeti import runtime
from ganeti import utils
+from ganeti import pathutils
JOBS_PER_ARCHIVE_DIRECTORY = 10000
The queue should be locked while this function is called.
"""
- return _ReadNumericFile(constants.JOB_QUEUE_SERIAL_FILE)
+ return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE)
def ReadVersion():
The queue should be locked while this function is called.
"""
- return _ReadNumericFile(constants.JOB_QUEUE_VERSION_FILE)
+ return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE)
def InitAndVerifyQueue(must_lock):
getents = runtime.GetEnts()
# Lock queue
- queue_lock = utils.FileLock.Open(constants.JOB_QUEUE_LOCK_FILE)
+ queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE)
try:
# The queue needs to be locked in exclusive mode to write to the serial and
# version files.
version = ReadVersion()
if version is None:
# Write new version file
- utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
+ utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE,
uid=getents.masterd_uid, gid=getents.masterd_gid,
data="%s\n" % constants.JOB_QUEUE_VERSION)
serial = ReadSerial()
if serial is None:
# Write new serial file
- utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
+ utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE,
uid=getents.masterd_uid, gid=getents.masterd_gid,
data="%s\n" % 0)
@return: True if the job queue is marked drained
"""
- return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+ return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
def SetDrainFlag(drain_flag):
getents = runtime.GetEnts()
if drain_flag:
- utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="",
+ utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="",
uid=getents.masterd_uid, gid=getents.masterd_gid)
else:
- utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+ utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE)
assert (not drain_flag) ^ CheckDrainFlag()