Migrate lib/{jqueue,jstore}.py from constants to pathutils
[ganeti-local] / lib / jstore.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling."""
23
24 import errno
25 import os
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import runtime
30 from ganeti import utils
31 from ganeti import pathutils
32
33
34 JOBS_PER_ARCHIVE_DIRECTORY = 10000
35
36
37 def _ReadNumericFile(file_name):
38   """Reads a file containing a number.
39
40   @rtype: None or int
41   @return: None if file is not found, otherwise number
42
43   """
44   try:
45     return int(utils.ReadFile(file_name))
46   except EnvironmentError, err:
47     if err.errno in (errno.ENOENT, ):
48       return None
49     raise
50
51
52 def ReadSerial():
53   """Read the serial file.
54
55   The queue should be locked while this function is called.
56
57   """
58   return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE)
59
60
61 def ReadVersion():
62   """Read the queue version.
63
64   The queue should be locked while this function is called.
65
66   """
67   return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE)
68
69
70 def InitAndVerifyQueue(must_lock):
71   """Open and lock job queue.
72
73   If necessary, the queue is automatically initialized.
74
75   @type must_lock: bool
76   @param must_lock: Whether an exclusive lock must be held.
77   @rtype: utils.FileLock
78   @return: Lock object for the queue. This can be used to change the
79            locking mode.
80
81   """
82   getents = runtime.GetEnts()
83
84   # Lock queue
85   queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE)
86   try:
87     # The queue needs to be locked in exclusive mode to write to the serial and
88     # version files.
89     if must_lock:
90       queue_lock.Exclusive(blocking=True)
91       holding_lock = True
92     else:
93       try:
94         queue_lock.Exclusive(blocking=False)
95         holding_lock = True
96       except errors.LockError:
97         # Ignore errors and assume the process keeping the lock checked
98         # everything.
99         holding_lock = False
100
101     if holding_lock:
102       # Verify version
103       version = ReadVersion()
104       if version is None:
105         # Write new version file
106         utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE,
107                         uid=getents.masterd_uid, gid=getents.masterd_gid,
108                         data="%s\n" % constants.JOB_QUEUE_VERSION)
109
110         # Read again
111         version = ReadVersion()
112
113       if version != constants.JOB_QUEUE_VERSION:
114         raise errors.JobQueueError("Found job queue version %s, expected %s",
115                                    version, constants.JOB_QUEUE_VERSION)
116
117       serial = ReadSerial()
118       if serial is None:
119         # Write new serial file
120         utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE,
121                         uid=getents.masterd_uid, gid=getents.masterd_gid,
122                         data="%s\n" % 0)
123
124         # Read again
125         serial = ReadSerial()
126
127       if serial is None:
128         # There must be a serious problem
129         raise errors.JobQueueError("Can't read/parse the job queue"
130                                    " serial file")
131
132       if not must_lock:
133         # There's no need for more error handling. Closing the lock
134         # file below in case of an error will unlock it anyway.
135         queue_lock.Unlock()
136
137   except:
138     queue_lock.Close()
139     raise
140
141   return queue_lock
142
143
144 def CheckDrainFlag():
145   """Check if the queue is marked to be drained.
146
147   This currently uses the queue drain file, which makes it a per-node flag.
148   In the future this can be moved to the config file.
149
150   @rtype: boolean
151   @return: True if the job queue is marked drained
152
153   """
154   return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
155
156
157 def SetDrainFlag(drain_flag):
158   """Sets the drain flag for the queue.
159
160   @type drain_flag: boolean
161   @param drain_flag: Whether to set or unset the drain flag
162   @attention: This function should only called the current holder of the queue
163     lock
164
165   """
166   getents = runtime.GetEnts()
167
168   if drain_flag:
169     utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="",
170                     uid=getents.masterd_uid, gid=getents.masterd_gid)
171   else:
172     utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE)
173
174   assert (not drain_flag) ^ CheckDrainFlag()
175
176
177 def FormatJobID(job_id):
178   """Convert a job ID to int format.
179
180   Currently this just is a no-op that performs some checks, but if we
181   want to change the job id format this will abstract this change.
182
183   @type job_id: int or long
184   @param job_id: the numeric job id
185   @rtype: int
186   @return: the formatted job id
187
188   """
189   if not isinstance(job_id, (int, long)):
190     raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
191   if job_id < 0:
192     raise errors.ProgrammerError("Job ID %s is negative" % job_id)
193
194   return job_id
195
196
197 def GetArchiveDirectory(job_id):
198   """Returns the archive directory for a job.
199
200   @type job_id: str
201   @param job_id: Job identifier
202   @rtype: str
203   @return: Directory name
204
205   """
206   return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
207
208
209 def ParseJobId(job_id):
210   """Parses a job ID and converts it to integer.
211
212   """
213   try:
214     return int(job_id)
215   except (ValueError, TypeError):
216     raise errors.ParameterError("Invalid job ID '%s'" % job_id)