Move parameter title definitions to constants
[ganeti-local] / lib / jstore.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling."""
23
24 import errno
25 import os
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import runtime
30 from ganeti import utils
31
32
33 JOBS_PER_ARCHIVE_DIRECTORY = 10000
34
35
36 def _ReadNumericFile(file_name):
37   """Reads a file containing a number.
38
39   @rtype: None or int
40   @return: None if file is not found, otherwise number
41
42   """
43   try:
44     return int(utils.ReadFile(file_name))
45   except EnvironmentError, err:
46     if err.errno in (errno.ENOENT, ):
47       return None
48     raise
49
50
51 def ReadSerial():
52   """Read the serial file.
53
54   The queue should be locked while this function is called.
55
56   """
57   return _ReadNumericFile(constants.JOB_QUEUE_SERIAL_FILE)
58
59
60 def ReadVersion():
61   """Read the queue version.
62
63   The queue should be locked while this function is called.
64
65   """
66   return _ReadNumericFile(constants.JOB_QUEUE_VERSION_FILE)
67
68
69 def InitAndVerifyQueue(must_lock):
70   """Open and lock job queue.
71
72   If necessary, the queue is automatically initialized.
73
74   @type must_lock: bool
75   @param must_lock: Whether an exclusive lock must be held.
76   @rtype: utils.FileLock
77   @return: Lock object for the queue. This can be used to change the
78            locking mode.
79
80   """
81   getents = runtime.GetEnts()
82
83   # Lock queue
84   queue_lock = utils.FileLock.Open(constants.JOB_QUEUE_LOCK_FILE)
85   try:
86     # The queue needs to be locked in exclusive mode to write to the serial and
87     # version files.
88     if must_lock:
89       queue_lock.Exclusive(blocking=True)
90       holding_lock = True
91     else:
92       try:
93         queue_lock.Exclusive(blocking=False)
94         holding_lock = True
95       except errors.LockError:
96         # Ignore errors and assume the process keeping the lock checked
97         # everything.
98         holding_lock = False
99
100     if holding_lock:
101       # Verify version
102       version = ReadVersion()
103       if version is None:
104         # Write new version file
105         utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
106                         uid=getents.masterd_uid, gid=getents.masterd_gid,
107                         data="%s\n" % constants.JOB_QUEUE_VERSION)
108
109         # Read again
110         version = ReadVersion()
111
112       if version != constants.JOB_QUEUE_VERSION:
113         raise errors.JobQueueError("Found job queue version %s, expected %s",
114                                    version, constants.JOB_QUEUE_VERSION)
115
116       serial = ReadSerial()
117       if serial is None:
118         # Write new serial file
119         utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
120                         uid=getents.masterd_uid, gid=getents.masterd_gid,
121                         data="%s\n" % 0)
122
123         # Read again
124         serial = ReadSerial()
125
126       if serial is None:
127         # There must be a serious problem
128         raise errors.JobQueueError("Can't read/parse the job queue"
129                                    " serial file")
130
131       if not must_lock:
132         # There's no need for more error handling. Closing the lock
133         # file below in case of an error will unlock it anyway.
134         queue_lock.Unlock()
135
136   except:
137     queue_lock.Close()
138     raise
139
140   return queue_lock
141
142
143 def CheckDrainFlag():
144   """Check if the queue is marked to be drained.
145
146   This currently uses the queue drain file, which makes it a per-node flag.
147   In the future this can be moved to the config file.
148
149   @rtype: boolean
150   @return: True if the job queue is marked drained
151
152   """
153   return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
154
155
156 def SetDrainFlag(drain_flag):
157   """Sets the drain flag for the queue.
158
159   @type drain_flag: boolean
160   @param drain_flag: Whether to set or unset the drain flag
161   @attention: This function should only called the current holder of the queue
162     lock
163
164   """
165   getents = runtime.GetEnts()
166
167   if drain_flag:
168     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="",
169                     uid=getents.masterd_uid, gid=getents.masterd_gid)
170   else:
171     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
172
173   assert (not drain_flag) ^ CheckDrainFlag()
174
175
176 def FormatJobID(job_id):
177   """Convert a job ID to int format.
178
179   Currently this just is a no-op that performs some checks, but if we
180   want to change the job id format this will abstract this change.
181
182   @type job_id: int or long
183   @param job_id: the numeric job id
184   @rtype: int
185   @return: the formatted job id
186
187   """
188   if not isinstance(job_id, (int, long)):
189     raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
190   if job_id < 0:
191     raise errors.ProgrammerError("Job ID %s is negative" % job_id)
192
193   return job_id
194
195
196 def GetArchiveDirectory(job_id):
197   """Returns the archive directory for a job.
198
199   @type job_id: str
200   @param job_id: Job identifier
201   @rtype: str
202   @return: Directory name
203
204   """
205   return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
206
207
208 def ParseJobId(job_id):
209   """Parses a job ID and converts it to integer.
210
211   """
212   try:
213     return int(job_id)
214   except (ValueError, TypeError):
215     raise errors.ParameterError("Invalid job ID '%s'" % job_id)