4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling."""
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import runtime
30 from ganeti import utils
33 JOBS_PER_ARCHIVE_DIRECTORY = 10000
36 def _ReadNumericFile(file_name):
37 """Reads a file containing a number.
40 @return: None if file is not found, otherwise number
44 return int(utils.ReadFile(file_name))
45 except EnvironmentError, err:
46 if err.errno in (errno.ENOENT, ):
52 """Read the serial file.
54 The queue should be locked while this function is called.
57 return _ReadNumericFile(constants.JOB_QUEUE_SERIAL_FILE)
61 """Read the queue version.
63 The queue should be locked while this function is called.
66 return _ReadNumericFile(constants.JOB_QUEUE_VERSION_FILE)
69 def InitAndVerifyQueue(must_lock):
70 """Open and lock job queue.
72 If necessary, the queue is automatically initialized.
75 @param must_lock: Whether an exclusive lock must be held.
76 @rtype: utils.FileLock
77 @return: Lock object for the queue. This can be used to change the
81 getents = runtime.GetEnts()
84 queue_lock = utils.FileLock.Open(constants.JOB_QUEUE_LOCK_FILE)
86 # The queue needs to be locked in exclusive mode to write to the serial and
89 queue_lock.Exclusive(blocking=True)
93 queue_lock.Exclusive(blocking=False)
95 except errors.LockError:
96 # Ignore errors and assume the process keeping the lock checked
102 version = ReadVersion()
104 # Write new version file
105 utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
106 uid=getents.masterd_uid, gid=getents.masterd_gid,
107 data="%s\n" % constants.JOB_QUEUE_VERSION)
110 version = ReadVersion()
112 if version != constants.JOB_QUEUE_VERSION:
113 raise errors.JobQueueError("Found job queue version %s, expected %s",
114 version, constants.JOB_QUEUE_VERSION)
116 serial = ReadSerial()
118 # Write new serial file
119 utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
120 uid=getents.masterd_uid, gid=getents.masterd_gid,
124 serial = ReadSerial()
127 # There must be a serious problem
128 raise errors.JobQueueError("Can't read/parse the job queue"
132 # There's no need for more error handling. Closing the lock
133 # file below in case of an error will unlock it anyway.
143 def CheckDrainFlag():
144 """Check if the queue is marked to be drained.
146 This currently uses the queue drain file, which makes it a per-node flag.
147 In the future this can be moved to the config file.
150 @return: True if the job queue is marked drained
153 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
156 def SetDrainFlag(drain_flag):
157 """Sets the drain flag for the queue.
159 @type drain_flag: boolean
160 @param drain_flag: Whether to set or unset the drain flag
161 @attention: This function should only called the current holder of the queue
165 getents = runtime.GetEnts()
168 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="",
169 uid=getents.masterd_uid, gid=getents.masterd_gid)
171 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
173 assert (not drain_flag) ^ CheckDrainFlag()
176 def FormatJobID(job_id):
177 """Convert a job ID to int format.
179 Currently this just is a no-op that performs some checks, but if we
180 want to change the job id format this will abstract this change.
182 @type job_id: int or long
183 @param job_id: the numeric job id
185 @return: the formatted job id
188 if not isinstance(job_id, (int, long)):
189 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
191 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
196 def GetArchiveDirectory(job_id):
197 """Returns the archive directory for a job.
200 @param job_id: Job identifier
202 @return: Directory name
205 return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
208 def ParseJobId(job_id):
209 """Parses a job ID and converts it to integer.
214 except (ValueError, TypeError):
215 raise errors.ParameterError("Invalid job ID '%s'" % job_id)