4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling."""
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import runtime
30 from ganeti import utils
33 def _ReadNumericFile(file_name):
34 """Reads a file containing a number.
37 @return: None if file is not found, otherwise number
41 return int(utils.ReadFile(file_name))
42 except EnvironmentError, err:
43 if err.errno in (errno.ENOENT, ):
49 """Read the serial file.
51 The queue should be locked while this function is called.
54 return _ReadNumericFile(constants.JOB_QUEUE_SERIAL_FILE)
58 """Read the queue version.
60 The queue should be locked while this function is called.
63 return _ReadNumericFile(constants.JOB_QUEUE_VERSION_FILE)
66 def InitAndVerifyQueue(must_lock):
67 """Open and lock job queue.
69 If necessary, the queue is automatically initialized.
72 @param must_lock: Whether an exclusive lock must be held.
73 @rtype: utils.FileLock
74 @return: Lock object for the queue. This can be used to change the
78 getents = runtime.GetEnts()
81 queue_lock = utils.FileLock.Open(constants.JOB_QUEUE_LOCK_FILE)
83 # The queue needs to be locked in exclusive mode to write to the serial and
86 queue_lock.Exclusive(blocking=True)
90 queue_lock.Exclusive(blocking=False)
92 except errors.LockError:
93 # Ignore errors and assume the process keeping the lock checked
99 version = ReadVersion()
101 # Write new version file
102 utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
103 uid=getents.masterd_uid, gid=getents.masterd_gid,
104 data="%s\n" % constants.JOB_QUEUE_VERSION)
107 version = ReadVersion()
109 if version != constants.JOB_QUEUE_VERSION:
110 raise errors.JobQueueError("Found job queue version %s, expected %s",
111 version, constants.JOB_QUEUE_VERSION)
113 serial = ReadSerial()
115 # Write new serial file
116 utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
117 uid=getents.masterd_uid, gid=getents.masterd_gid,
121 serial = ReadSerial()
124 # There must be a serious problem
125 raise errors.JobQueueError("Can't read/parse the job queue"
129 # There's no need for more error handling. Closing the lock
130 # file below in case of an error will unlock it anyway.
140 def CheckDrainFlag():
141 """Check if the queue is marked to be drained.
143 This currently uses the queue drain file, which makes it a per-node flag.
144 In the future this can be moved to the config file.
147 @return: True if the job queue is marked drained
150 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
153 def SetDrainFlag(drain_flag):
154 """Sets the drain flag for the queue.
156 @type drain_flag: boolean
157 @param drain_flag: Whether to set or unset the drain flag
158 @attention: This function should only called the current holder of the queue
162 getents = runtime.GetEnts()
165 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="",
166 uid=getents.masterd_uid, gid=getents.masterd_gid)
168 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
170 assert (not drain_flag) ^ CheckDrainFlag()