(2.11) Make disk.name and disk.uuid available in bdev
[ganeti-local] / lib / jstore.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling."""
23
24 import errno
25 import os
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import runtime
30 from ganeti import utils
31 from ganeti import pathutils
32
33
34 JOBS_PER_ARCHIVE_DIRECTORY = constants.JSTORE_JOBS_PER_ARCHIVE_DIRECTORY
35
36
37 def _ReadNumericFile(file_name):
38   """Reads a file containing a number.
39
40   @rtype: None or int
41   @return: None if file is not found, otherwise number
42
43   """
44   try:
45     contents = utils.ReadFile(file_name)
46   except EnvironmentError, err:
47     if err.errno in (errno.ENOENT, ):
48       return None
49     raise
50
51   try:
52     return int(contents)
53   except (ValueError, TypeError), err:
54     # Couldn't convert to int
55     raise errors.JobQueueError("Content of file '%s' is not numeric: %s" %
56                                (file_name, err))
57
58
59 def ReadSerial():
60   """Read the serial file.
61
62   The queue should be locked while this function is called.
63
64   """
65   return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE)
66
67
68 def ReadVersion():
69   """Read the queue version.
70
71   The queue should be locked while this function is called.
72
73   """
74   return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE)
75
76
77 def InitAndVerifyQueue(must_lock):
78   """Open and lock job queue.
79
80   If necessary, the queue is automatically initialized.
81
82   @type must_lock: bool
83   @param must_lock: Whether an exclusive lock must be held.
84   @rtype: utils.FileLock
85   @return: Lock object for the queue. This can be used to change the
86            locking mode.
87
88   """
89   getents = runtime.GetEnts()
90
91   # Lock queue
92   queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE)
93   try:
94     # The queue needs to be locked in exclusive mode to write to the serial and
95     # version files.
96     if must_lock:
97       queue_lock.Exclusive(blocking=True)
98       holding_lock = True
99     else:
100       try:
101         queue_lock.Exclusive(blocking=False)
102         holding_lock = True
103       except errors.LockError:
104         # Ignore errors and assume the process keeping the lock checked
105         # everything.
106         holding_lock = False
107
108     if holding_lock:
109       # Verify version
110       version = ReadVersion()
111       if version is None:
112         # Write new version file
113         utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE,
114                         uid=getents.masterd_uid, gid=getents.daemons_gid,
115                         mode=constants.JOB_QUEUE_FILES_PERMS,
116                         data="%s\n" % constants.JOB_QUEUE_VERSION)
117
118         # Read again
119         version = ReadVersion()
120
121       if version != constants.JOB_QUEUE_VERSION:
122         raise errors.JobQueueError("Found job queue version %s, expected %s",
123                                    version, constants.JOB_QUEUE_VERSION)
124
125       serial = ReadSerial()
126       if serial is None:
127         # Write new serial file
128         utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE,
129                         uid=getents.masterd_uid, gid=getents.daemons_gid,
130                         mode=constants.JOB_QUEUE_FILES_PERMS,
131                         data="%s\n" % 0)
132
133         # Read again
134         serial = ReadSerial()
135
136       if serial is None:
137         # There must be a serious problem
138         raise errors.JobQueueError("Can't read/parse the job queue"
139                                    " serial file")
140
141       if not must_lock:
142         # There's no need for more error handling. Closing the lock
143         # file below in case of an error will unlock it anyway.
144         queue_lock.Unlock()
145
146   except:
147     queue_lock.Close()
148     raise
149
150   return queue_lock
151
152
153 def CheckDrainFlag():
154   """Check if the queue is marked to be drained.
155
156   This currently uses the queue drain file, which makes it a per-node flag.
157   In the future this can be moved to the config file.
158
159   @rtype: boolean
160   @return: True if the job queue is marked drained
161
162   """
163   return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
164
165
166 def SetDrainFlag(drain_flag):
167   """Sets the drain flag for the queue.
168
169   @type drain_flag: boolean
170   @param drain_flag: Whether to set or unset the drain flag
171   @attention: This function should only called the current holder of the queue
172     lock
173
174   """
175   getents = runtime.GetEnts()
176
177   if drain_flag:
178     utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="",
179                     uid=getents.masterd_uid, gid=getents.daemons_gid,
180                     mode=constants.JOB_QUEUE_FILES_PERMS)
181   else:
182     utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE)
183
184   assert (not drain_flag) ^ CheckDrainFlag()
185
186
187 def FormatJobID(job_id):
188   """Convert a job ID to int format.
189
190   Currently this just is a no-op that performs some checks, but if we
191   want to change the job id format this will abstract this change.
192
193   @type job_id: int or long
194   @param job_id: the numeric job id
195   @rtype: int
196   @return: the formatted job id
197
198   """
199   if not isinstance(job_id, (int, long)):
200     raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
201   if job_id < 0:
202     raise errors.ProgrammerError("Job ID %s is negative" % job_id)
203
204   return job_id
205
206
207 def GetArchiveDirectory(job_id):
208   """Returns the archive directory for a job.
209
210   @type job_id: str
211   @param job_id: Job identifier
212   @rtype: str
213   @return: Directory name
214
215   """
216   return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
217
218
219 def ParseJobId(job_id):
220   """Parses a job ID and converts it to integer.
221
222   """
223   try:
224     return int(job_id)
225   except (ValueError, TypeError):
226     raise errors.ParameterError("Invalid job ID '%s'" % job_id)