4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling."""
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import runtime
30 from ganeti import utils
31 from ganeti import pathutils
34 JOBS_PER_ARCHIVE_DIRECTORY = constants.JSTORE_JOBS_PER_ARCHIVE_DIRECTORY
37 def _ReadNumericFile(file_name):
38 """Reads a file containing a number.
41 @return: None if file is not found, otherwise number
45 contents = utils.ReadFile(file_name)
46 except EnvironmentError, err:
47 if err.errno in (errno.ENOENT, ):
53 except (ValueError, TypeError), err:
54 # Couldn't convert to int
55 raise errors.JobQueueError("Content of file '%s' is not numeric: %s" %
60 """Read the serial file.
62 The queue should be locked while this function is called.
65 return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE)
69 """Read the queue version.
71 The queue should be locked while this function is called.
74 return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE)
77 def InitAndVerifyQueue(must_lock):
78 """Open and lock job queue.
80 If necessary, the queue is automatically initialized.
83 @param must_lock: Whether an exclusive lock must be held.
84 @rtype: utils.FileLock
85 @return: Lock object for the queue. This can be used to change the
89 getents = runtime.GetEnts()
92 queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE)
94 # The queue needs to be locked in exclusive mode to write to the serial and
97 queue_lock.Exclusive(blocking=True)
101 queue_lock.Exclusive(blocking=False)
103 except errors.LockError:
104 # Ignore errors and assume the process keeping the lock checked
110 version = ReadVersion()
112 # Write new version file
113 utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE,
114 uid=getents.masterd_uid, gid=getents.daemons_gid,
115 mode=constants.JOB_QUEUE_FILES_PERMS,
116 data="%s\n" % constants.JOB_QUEUE_VERSION)
119 version = ReadVersion()
121 if version != constants.JOB_QUEUE_VERSION:
122 raise errors.JobQueueError("Found job queue version %s, expected %s",
123 version, constants.JOB_QUEUE_VERSION)
125 serial = ReadSerial()
127 # Write new serial file
128 utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE,
129 uid=getents.masterd_uid, gid=getents.daemons_gid,
130 mode=constants.JOB_QUEUE_FILES_PERMS,
134 serial = ReadSerial()
137 # There must be a serious problem
138 raise errors.JobQueueError("Can't read/parse the job queue"
142 # There's no need for more error handling. Closing the lock
143 # file below in case of an error will unlock it anyway.
153 def CheckDrainFlag():
154 """Check if the queue is marked to be drained.
156 This currently uses the queue drain file, which makes it a per-node flag.
157 In the future this can be moved to the config file.
160 @return: True if the job queue is marked drained
163 return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
166 def SetDrainFlag(drain_flag):
167 """Sets the drain flag for the queue.
169 @type drain_flag: boolean
170 @param drain_flag: Whether to set or unset the drain flag
171 @attention: This function should only called the current holder of the queue
175 getents = runtime.GetEnts()
178 utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="",
179 uid=getents.masterd_uid, gid=getents.daemons_gid,
180 mode=constants.JOB_QUEUE_FILES_PERMS)
182 utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE)
184 assert (not drain_flag) ^ CheckDrainFlag()
187 def FormatJobID(job_id):
188 """Convert a job ID to int format.
190 Currently this just is a no-op that performs some checks, but if we
191 want to change the job id format this will abstract this change.
193 @type job_id: int or long
194 @param job_id: the numeric job id
196 @return: the formatted job id
199 if not isinstance(job_id, (int, long)):
200 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
202 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
207 def GetArchiveDirectory(job_id):
208 """Returns the archive directory for a job.
211 @param job_id: Job identifier
213 @return: Directory name
216 return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
219 def ParseJobId(job_id):
220 """Parses a job ID and converts it to integer.
225 except (ValueError, TypeError):
226 raise errors.ParameterError("Invalid job ID '%s'" % job_id)