f20da060408ae17ef77c568048105d71bf842d5f
[ganeti-local] / lib / jstore.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling."""
23
24 import errno
25 import os
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import runtime
30 from ganeti import utils
31 from ganeti import pathutils
32
33
34 JOBS_PER_ARCHIVE_DIRECTORY = 10000
35
36
37 def _ReadNumericFile(file_name):
38   """Reads a file containing a number.
39
40   @rtype: None or int
41   @return: None if file is not found, otherwise number
42
43   """
44   try:
45     contents = utils.ReadFile(file_name)
46   except EnvironmentError, err:
47     if err.errno in (errno.ENOENT, ):
48       return None
49     raise
50
51   try:
52     return int(contents)
53   except (ValueError, TypeError), err:
54     # Couldn't convert to int
55     raise errors.JobQueueError("Content of file '%s' is not numeric: %s" %
56                                (file_name, err))
57
58
59 def ReadSerial():
60   """Read the serial file.
61
62   The queue should be locked while this function is called.
63
64   """
65   return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE)
66
67
68 def ReadVersion():
69   """Read the queue version.
70
71   The queue should be locked while this function is called.
72
73   """
74   return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE)
75
76
77 def InitAndVerifyQueue(must_lock):
78   """Open and lock job queue.
79
80   If necessary, the queue is automatically initialized.
81
82   @type must_lock: bool
83   @param must_lock: Whether an exclusive lock must be held.
84   @rtype: utils.FileLock
85   @return: Lock object for the queue. This can be used to change the
86            locking mode.
87
88   """
89   getents = runtime.GetEnts()
90
91   # Lock queue
92   queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE)
93   try:
94     # The queue needs to be locked in exclusive mode to write to the serial and
95     # version files.
96     if must_lock:
97       queue_lock.Exclusive(blocking=True)
98       holding_lock = True
99     else:
100       try:
101         queue_lock.Exclusive(blocking=False)
102         holding_lock = True
103       except errors.LockError:
104         # Ignore errors and assume the process keeping the lock checked
105         # everything.
106         holding_lock = False
107
108     if holding_lock:
109       # Verify version
110       version = ReadVersion()
111       if version is None:
112         # Write new version file
113         utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE,
114                         uid=getents.masterd_uid, gid=getents.masterd_gid,
115                         data="%s\n" % constants.JOB_QUEUE_VERSION)
116
117         # Read again
118         version = ReadVersion()
119
120       if version != constants.JOB_QUEUE_VERSION:
121         raise errors.JobQueueError("Found job queue version %s, expected %s",
122                                    version, constants.JOB_QUEUE_VERSION)
123
124       serial = ReadSerial()
125       if serial is None:
126         # Write new serial file
127         utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE,
128                         uid=getents.masterd_uid, gid=getents.masterd_gid,
129                         data="%s\n" % 0)
130
131         # Read again
132         serial = ReadSerial()
133
134       if serial is None:
135         # There must be a serious problem
136         raise errors.JobQueueError("Can't read/parse the job queue"
137                                    " serial file")
138
139       if not must_lock:
140         # There's no need for more error handling. Closing the lock
141         # file below in case of an error will unlock it anyway.
142         queue_lock.Unlock()
143
144   except:
145     queue_lock.Close()
146     raise
147
148   return queue_lock
149
150
151 def CheckDrainFlag():
152   """Check if the queue is marked to be drained.
153
154   This currently uses the queue drain file, which makes it a per-node flag.
155   In the future this can be moved to the config file.
156
157   @rtype: boolean
158   @return: True if the job queue is marked drained
159
160   """
161   return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
162
163
164 def SetDrainFlag(drain_flag):
165   """Sets the drain flag for the queue.
166
167   @type drain_flag: boolean
168   @param drain_flag: Whether to set or unset the drain flag
169   @attention: This function should only called the current holder of the queue
170     lock
171
172   """
173   getents = runtime.GetEnts()
174
175   if drain_flag:
176     utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="",
177                     uid=getents.masterd_uid, gid=getents.masterd_gid)
178   else:
179     utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE)
180
181   assert (not drain_flag) ^ CheckDrainFlag()
182
183
184 def FormatJobID(job_id):
185   """Convert a job ID to int format.
186
187   Currently this just is a no-op that performs some checks, but if we
188   want to change the job id format this will abstract this change.
189
190   @type job_id: int or long
191   @param job_id: the numeric job id
192   @rtype: int
193   @return: the formatted job id
194
195   """
196   if not isinstance(job_id, (int, long)):
197     raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
198   if job_id < 0:
199     raise errors.ProgrammerError("Job ID %s is negative" % job_id)
200
201   return job_id
202
203
204 def GetArchiveDirectory(job_id):
205   """Returns the archive directory for a job.
206
207   @type job_id: str
208   @param job_id: Job identifier
209   @rtype: str
210   @return: Directory name
211
212   """
213   return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
214
215
216 def ParseJobId(job_id):
217   """Parses a job ID and converts it to integer.
218
219   """
220   try:
221     return int(job_id)
222   except (ValueError, TypeError):
223     raise errors.ParameterError("Invalid job ID '%s'" % job_id)