4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling."""
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import runtime
30 from ganeti import utils
31 from ganeti import pathutils
34 JOBS_PER_ARCHIVE_DIRECTORY = 10000
37 def _ReadNumericFile(file_name):
38 """Reads a file containing a number.
41 @return: None if file is not found, otherwise number
45 contents = utils.ReadFile(file_name)
46 except EnvironmentError, err:
47 if err.errno in (errno.ENOENT, ):
53 except (ValueError, TypeError), err:
54 # Couldn't convert to int
55 raise errors.JobQueueError("Content of file '%s' is not numeric: %s" %
60 """Read the serial file.
62 The queue should be locked while this function is called.
65 return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE)
69 """Read the queue version.
71 The queue should be locked while this function is called.
74 return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE)
77 def InitAndVerifyQueue(must_lock):
78 """Open and lock job queue.
80 If necessary, the queue is automatically initialized.
83 @param must_lock: Whether an exclusive lock must be held.
84 @rtype: utils.FileLock
85 @return: Lock object for the queue. This can be used to change the
89 getents = runtime.GetEnts()
92 queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE)
94 # The queue needs to be locked in exclusive mode to write to the serial and
97 queue_lock.Exclusive(blocking=True)
101 queue_lock.Exclusive(blocking=False)
103 except errors.LockError:
104 # Ignore errors and assume the process keeping the lock checked
110 version = ReadVersion()
112 # Write new version file
113 utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE,
114 uid=getents.masterd_uid, gid=getents.masterd_gid,
115 data="%s\n" % constants.JOB_QUEUE_VERSION)
118 version = ReadVersion()
120 if version != constants.JOB_QUEUE_VERSION:
121 raise errors.JobQueueError("Found job queue version %s, expected %s",
122 version, constants.JOB_QUEUE_VERSION)
124 serial = ReadSerial()
126 # Write new serial file
127 utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE,
128 uid=getents.masterd_uid, gid=getents.masterd_gid,
132 serial = ReadSerial()
135 # There must be a serious problem
136 raise errors.JobQueueError("Can't read/parse the job queue"
140 # There's no need for more error handling. Closing the lock
141 # file below in case of an error will unlock it anyway.
151 def CheckDrainFlag():
152 """Check if the queue is marked to be drained.
154 This currently uses the queue drain file, which makes it a per-node flag.
155 In the future this can be moved to the config file.
158 @return: True if the job queue is marked drained
161 return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
164 def SetDrainFlag(drain_flag):
165 """Sets the drain flag for the queue.
167 @type drain_flag: boolean
168 @param drain_flag: Whether to set or unset the drain flag
169 @attention: This function should only called the current holder of the queue
173 getents = runtime.GetEnts()
176 utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="",
177 uid=getents.masterd_uid, gid=getents.masterd_gid)
179 utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE)
181 assert (not drain_flag) ^ CheckDrainFlag()
184 def FormatJobID(job_id):
185 """Convert a job ID to int format.
187 Currently this just is a no-op that performs some checks, but if we
188 want to change the job id format this will abstract this change.
190 @type job_id: int or long
191 @param job_id: the numeric job id
193 @return: the formatted job id
196 if not isinstance(job_id, (int, long)):
197 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
199 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
204 def GetArchiveDirectory(job_id):
205 """Returns the archive directory for a job.
208 @param job_id: Job identifier
210 @return: Directory name
213 return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
216 def ParseJobId(job_id):
217 """Parses a job ID and converts it to integer.
222 except (ValueError, TypeError):
223 raise errors.ParameterError("Invalid job ID '%s'" % job_id)