Revision 04ab05ce
b/lib/jqueue.py | ||
---|---|---|
35 | 35 |
from ganeti import errors |
36 | 36 |
from ganeti import mcpu |
37 | 37 |
from ganeti import utils |
38 |
from ganeti import jstore |
|
38 | 39 |
from ganeti import rpc |
39 | 40 |
|
40 | 41 |
|
... | ... | |
267 | 268 |
|
268 | 269 |
""" |
269 | 270 |
def wrapper(self, *args, **kwargs): |
270 |
assert self.lock_fd, "Queue should be open"
|
|
271 |
assert self._queue_lock is not None, "Queue should be open"
|
|
271 | 272 |
return fn(self, *args, **kwargs) |
272 | 273 |
return wrapper |
273 | 274 |
|
... | ... | |
281 | 282 |
self.acquire = self._lock.acquire |
282 | 283 |
self.release = self._lock.release |
283 | 284 |
|
284 |
# Make sure our directories exists |
|
285 |
for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR): |
|
286 |
try: |
|
287 |
os.mkdir(path, 0700) |
|
288 |
except OSError, err: |
|
289 |
if err.errno not in (errno.EEXIST, ): |
|
290 |
raise |
|
291 |
|
|
292 |
# Get queue lock |
|
293 |
self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w") |
|
294 |
try: |
|
295 |
utils.LockFile(self.lock_fd) |
|
296 |
except: |
|
297 |
self.lock_fd.close() |
|
298 |
raise |
|
299 |
|
|
300 |
# Read version |
|
301 |
try: |
|
302 |
version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r") |
|
303 |
except IOError, err: |
|
304 |
if err.errno not in (errno.ENOENT, ): |
|
305 |
raise |
|
306 |
|
|
307 |
# Setup a new queue |
|
308 |
self._InitQueueUnlocked() |
|
309 |
|
|
310 |
# Try to open again |
|
311 |
version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r") |
|
285 |
# Initialize |
|
286 |
self._queue_lock = jstore.InitAndVerifyQueue(exclusive=True) |
|
312 | 287 |
|
313 |
try: |
|
314 |
# Try to read version |
|
315 |
version = int(version_fd.read(128)) |
|
316 |
|
|
317 |
# Verify version |
|
318 |
if version != constants.JOB_QUEUE_VERSION: |
|
319 |
raise errors.JobQueueError("Found version %s, expected %s", |
|
320 |
version, constants.JOB_QUEUE_VERSION) |
|
321 |
finally: |
|
322 |
version_fd.close() |
|
323 |
|
|
324 |
self._last_serial = self._ReadSerial() |
|
325 |
if self._last_serial is None: |
|
326 |
raise errors.ConfigurationError("Can't read/parse the job queue serial" |
|
327 |
" file") |
|
288 |
# Read serial file |
|
289 |
self._last_serial = jstore.ReadSerial() |
|
290 |
assert self._last_serial is not None, ("Serial file was modified between" |
|
291 |
" check in jstore and here") |
|
328 | 292 |
|
329 | 293 |
# Setup worker pool |
330 | 294 |
self._wpool = _JobQueueWorkerPool(self) |
... | ... | |
350 | 314 |
finally: |
351 | 315 |
self.release() |
352 | 316 |
|
353 |
@staticmethod |
|
354 |
def _ReadSerial(): |
|
355 |
"""Try to read the job serial file. |
|
356 |
|
|
357 |
@rtype: None or int |
|
358 |
@return: If the serial can be read, then it is returned. Otherwise None |
|
359 |
is returned. |
|
360 |
|
|
361 |
""" |
|
362 |
try: |
|
363 |
serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r") |
|
364 |
try: |
|
365 |
# Read last serial |
|
366 |
serial = int(serial_fd.read(1024).strip()) |
|
367 |
finally: |
|
368 |
serial_fd.close() |
|
369 |
except (ValueError, EnvironmentError): |
|
370 |
serial = None |
|
371 |
|
|
372 |
return serial |
|
373 |
|
|
374 |
def _InitQueueUnlocked(self): |
|
375 |
utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE, |
|
376 |
data="%s\n" % constants.JOB_QUEUE_VERSION) |
|
377 |
if self._ReadSerial() is None: |
|
378 |
utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE, |
|
379 |
data="%s\n" % 0) |
|
380 |
|
|
381 | 317 |
def _FormatJobID(self, job_id): |
382 | 318 |
if not isinstance(job_id, (int, long)): |
383 | 319 |
raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) |
... | ... | |
657 | 593 |
""" |
658 | 594 |
self._wpool.TerminateWorkers() |
659 | 595 |
|
660 |
self.lock_fd.close() |
|
661 |
self.lock_fd = None |
|
596 |
self._queue_lock.Close() |
|
597 |
self._queue_lock = None |
Also available in: Unified diff