"""Module implementing the job queue handling."""
import os
-import logging
import errno
-import re
from ganeti import constants
from ganeti import errors
return _ReadNumericFile(constants.JOB_QUEUE_VERSION_FILE)
-def InitAndVerifyQueue(exclusive):
+def InitAndVerifyQueue(must_lock):
"""Open and lock job queue.
If necessary, the queue is automatically initialized.
- @type exclusive: bool
- @param exclusive: Whether to lock the queue in exclusive mode. Shared
- mode otherwise.
+ @type must_lock: bool
+ @param must_lock: Whether an exclusive lock must be held.
@rtype: utils.FileLock
@return: Lock object for the queue. This can be used to change the
locking mode.
# Lock queue
queue_lock = utils.FileLock(constants.JOB_QUEUE_LOCK_FILE)
try:
- # Determine locking function and call it
- if exclusive:
- fn = queue_lock.Exclusive
+ # The queue needs to be locked in exclusive mode to write to the serial and
+ # version files.
+ if must_lock:
+ queue_lock.Exclusive(blocking=True)
+ holding_lock = True
else:
- fn = queue_lock.Shared
-
- fn(blocking=False)
-
- # Verify version
- version = ReadVersion()
- if version is None:
- # Write new version file
- utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
- data="%s\n" % constants.JOB_QUEUE_VERSION)
-
- # Read again
+ try:
+ queue_lock.Exclusive(blocking=False)
+ holding_lock = True
+ except errors.LockError:
+ # Ignore errors and assume the process keeping the lock checked
+ # everything.
+ holding_lock = False
+
+ if holding_lock:
+ # Verify version
version = ReadVersion()
+ if version is None:
+ # Write new version file
+ utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
+ data="%s\n" % constants.JOB_QUEUE_VERSION)
- if version != constants.JOB_QUEUE_VERSION:
- raise errors.JobQueueError("Found job queue version %s, expected %s",
- version, constants.JOB_QUEUE_VERSION)
+ # Read again
+ version = ReadVersion()
- serial = ReadSerial()
- if serial is None:
- # Write new serial file
- utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
- data="%s\n" % 0)
+ if version != constants.JOB_QUEUE_VERSION:
+ raise errors.JobQueueError("Found job queue version %s, expected %s",
+ version, constants.JOB_QUEUE_VERSION)
- # Read again
serial = ReadSerial()
-
- if serial is None:
- # There must be a serious problem
- raise errors.JobQueueError("Can't read/parse the job queue serial file")
+ if serial is None:
+ # Write new serial file
+ utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
+ data="%s\n" % 0)
+
+ # Read again
+ serial = ReadSerial()
+
+ if serial is None:
+ # There must be a serious problem
+ raise errors.JobQueueError("Can't read/parse the job queue"
+ " serial file")
+
+ if not must_lock:
+ # There's no need for more error handling. Closing the lock
+ # file below in case of an error will unlock it anyway.
+ queue_lock.Unlock()
except:
queue_lock.Close()