import os
+import sys
import time
import subprocess
import re
import logging
import logging.handlers
import signal
+import OpenSSL
+import datetime
+import calendar
+import hmac
from cStringIO import StringIO
try:
from hashlib import sha1
except ImportError:
- import sha
- sha1 = sha.new
+ import sha as sha1
from ganeti import errors
from ganeti import constants
_RANDOM_UUID_FILE = "/proc/sys/kernel/random/uuid"
+HEX_CHAR_RE = r"[a-zA-Z0-9]"
+VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S)
+X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
+ (re.escape(constants.X509_CERT_SIGNATURE_HEADER),
+ HEX_CHAR_RE, HEX_CHAR_RE),
+ re.S | re.I)
+
class RunResult(object):
"""Holds the result of running external programs.
output = property(_GetOutput, None, None, "Return full output")
-def RunCmd(cmd, env=None, output=None, cwd='/', reset_env=False):
+def _BuildCmdEnvironment(env, reset):
+ """Builds the environment for an external program.
+
+ """
+ if reset:
+ cmd_env = {}
+ else:
+ cmd_env = os.environ.copy()
+ cmd_env["LC_ALL"] = "C"
+
+ if env is not None:
+ cmd_env.update(env)
+
+ return cmd_env
+
+
+def RunCmd(cmd, env=None, output=None, cwd="/", reset_env=False):
"""Execute a (shell) command.
The command should not read from its standard input, as it will be
@type cmd: string or list
@param cmd: Command to run
@type env: dict
- @param env: Additional environment
+ @param env: Additional environment variables
@type output: str
@param output: if desired, the output of the command can be
saved in a file instead of the RunResult instance; this
if no_fork:
raise errors.ProgrammerError("utils.RunCmd() called with fork() disabled")
- if isinstance(cmd, list):
- cmd = [str(val) for val in cmd]
- strcmd = " ".join(cmd)
- shell = False
- else:
+ if isinstance(cmd, basestring):
strcmd = cmd
shell = True
- logging.debug("RunCmd '%s'", strcmd)
+ else:
+ cmd = [str(val) for val in cmd]
+ strcmd = ShellQuoteArgs(cmd)
+ shell = False
- if not reset_env:
- cmd_env = os.environ.copy()
- cmd_env["LC_ALL"] = "C"
+ if output:
+ logging.debug("RunCmd %s, output file '%s'", strcmd, output)
else:
- cmd_env = {}
+ logging.debug("RunCmd %s", strcmd)
- if env is not None:
- cmd_env.update(env)
+ cmd_env = _BuildCmdEnvironment(env, reset_env)
try:
if output is None:
return RunResult(exitcode, signal_, out, err, strcmd)
+def StartDaemon(cmd, env=None, cwd="/", output=None, output_fd=None,
+ pidfile=None):
+ """Start a daemon process after forking twice.
+
+ @type cmd: string or list
+ @param cmd: Command to run
+ @type env: dict
+ @param env: Additional environment variables
+ @type cwd: string
+ @param cwd: Working directory for the program
+ @type output: string
+ @param output: Path to file in which to save the output
+ @type output_fd: int
+ @param output_fd: File descriptor for output
+ @type pidfile: string
+ @param pidfile: Process ID file
+ @rtype: int
+ @return: Daemon process ID
+ @raise errors.ProgrammerError: if we call this when forks are disabled
+
+ """
+ if no_fork:
+ raise errors.ProgrammerError("utils.StartDaemon() called with fork()"
+ " disabled")
+
+ if output and not (bool(output) ^ (output_fd is not None)):
+ raise errors.ProgrammerError("Only one of 'output' and 'output_fd' can be"
+ " specified")
+
+ if isinstance(cmd, basestring):
+ cmd = ["/bin/sh", "-c", cmd]
+
+ strcmd = ShellQuoteArgs(cmd)
+
+ if output:
+ logging.debug("StartDaemon %s, output file '%s'", strcmd, output)
+ else:
+ logging.debug("StartDaemon %s", strcmd)
+
+ cmd_env = _BuildCmdEnvironment(env, False)
+
+ # Create pipe for sending PID back
+ (pidpipe_read, pidpipe_write) = os.pipe()
+ try:
+ try:
+ # Create pipe for sending error messages
+ (errpipe_read, errpipe_write) = os.pipe()
+ try:
+ try:
+ # First fork
+ pid = os.fork()
+ if pid == 0:
+ try:
+ # Child process, won't return
+ _StartDaemonChild(errpipe_read, errpipe_write,
+ pidpipe_read, pidpipe_write,
+ cmd, cmd_env, cwd,
+ output, output_fd, pidfile)
+ finally:
+ # Well, maybe child process failed
+ os._exit(1) # pylint: disable-msg=W0212
+ finally:
+ _CloseFDNoErr(errpipe_write)
+
+ # Wait for daemon to be started (or an error message to arrive) and read
+ # up to 100 KB as an error message
+ errormsg = RetryOnSignal(os.read, errpipe_read, 100 * 1024)
+ finally:
+ _CloseFDNoErr(errpipe_read)
+ finally:
+ _CloseFDNoErr(pidpipe_write)
+
+ # Read up to 128 bytes for PID
+ pidtext = RetryOnSignal(os.read, pidpipe_read, 128)
+ finally:
+ _CloseFDNoErr(pidpipe_read)
+
+ # Try to avoid zombies by waiting for child process
+ try:
+ os.waitpid(pid, 0)
+ except OSError:
+ pass
+
+ if errormsg:
+ raise errors.OpExecError("Error when starting daemon process: %r" %
+ errormsg)
+
+ try:
+ return int(pidtext)
+ except (ValueError, TypeError), err:
+ raise errors.OpExecError("Error while trying to parse PID %r: %s" %
+ (pidtext, err))
+
+
+def _StartDaemonChild(errpipe_read, errpipe_write,
+ pidpipe_read, pidpipe_write,
+ args, env, cwd,
+ output, fd_output, pidfile):
+ """Child process for starting daemon.
+
+ """
+ try:
+ # Close parent's side
+ _CloseFDNoErr(errpipe_read)
+ _CloseFDNoErr(pidpipe_read)
+
+ # First child process
+ os.chdir("/")
+ os.umask(077)
+ os.setsid()
+
+ # And fork for the second time
+ pid = os.fork()
+ if pid != 0:
+ # Exit first child process
+ os._exit(0) # pylint: disable-msg=W0212
+
+ # Make sure pipe is closed on execv* (and thereby notifies original process)
+ SetCloseOnExecFlag(errpipe_write, True)
+
+ # List of file descriptors to be left open
+ noclose_fds = [errpipe_write]
+
+ # Open PID file
+ if pidfile:
+ try:
+ # TODO: Atomic replace with another locked file instead of writing into
+ # it after creating
+ fd_pidfile = os.open(pidfile, os.O_WRONLY | os.O_CREAT, 0600)
+
+ # Lock the PID file (and fail if not possible to do so). Any code
+ # wanting to send a signal to the daemon should try to lock the PID
+ # file before reading it. If acquiring the lock succeeds, the daemon is
+ # no longer running and the signal should not be sent.
+ LockFile(fd_pidfile)
+
+ os.write(fd_pidfile, "%d\n" % os.getpid())
+ except Exception, err:
+ raise Exception("Creating and locking PID file failed: %s" % err)
+
+ # Keeping the file open to hold the lock
+ noclose_fds.append(fd_pidfile)
+
+ SetCloseOnExecFlag(fd_pidfile, False)
+ else:
+ fd_pidfile = None
+
+ # Open /dev/null
+ fd_devnull = os.open(os.devnull, os.O_RDWR)
+
+ assert not output or (bool(output) ^ (fd_output is not None))
+
+ if fd_output is not None:
+ pass
+ elif output:
+ # Open output file
+ try:
+ # TODO: Implement flag to set append=yes/no
+ fd_output = os.open(output, os.O_WRONLY | os.O_CREAT, 0600)
+ except EnvironmentError, err:
+ raise Exception("Opening output file failed: %s" % err)
+ else:
+ fd_output = fd_devnull
+
+ # Redirect standard I/O
+ os.dup2(fd_devnull, 0)
+ os.dup2(fd_output, 1)
+ os.dup2(fd_output, 2)
+
+ # Send daemon PID to parent
+ RetryOnSignal(os.write, pidpipe_write, str(os.getpid()))
+
+ # Close all file descriptors except stdio and error message pipe
+ CloseFDs(noclose_fds=noclose_fds)
+
+ # Change working directory
+ os.chdir(cwd)
+
+ if env is None:
+ os.execvp(args[0], args)
+ else:
+ os.execvpe(args[0], args, env)
+ except: # pylint: disable-msg=W0702
+ try:
+ # Report errors to original process
+ buf = str(sys.exc_info()[1])
+
+ RetryOnSignal(os.write, errpipe_write, buf)
+ except: # pylint: disable-msg=W0702
+ # Ignore errors in error handling
+ pass
+
+ os._exit(1) # pylint: disable-msg=W0212
+
+
def _RunCmdPipe(cmd, env, via_shell, cwd):
"""Run a command and return its output.
child.stderr.fileno(): (err, child.stderr),
}
for fd in fdmap:
- status = fcntl.fcntl(fd, fcntl.F_GETFL)
- fcntl.fcntl(fd, fcntl.F_SETFL, status | os.O_NONBLOCK)
+ SetNonblockFlag(fd, True)
while fdmap:
- try:
- pollresult = poller.poll()
- except EnvironmentError, eerr:
- if eerr.errno == errno.EINTR:
- continue
- raise
- except select.error, serr:
- if serr[0] == errno.EINTR:
- continue
- raise
+ pollresult = RetryOnSignal(poller.poll)
for fd, event in pollresult:
if event & select.POLLIN or event & select.POLLPRI:
return status
+def SetCloseOnExecFlag(fd, enable):
+ """Sets or unsets the close-on-exec flag on a file descriptor.
+
+ @type fd: int
+ @param fd: File descriptor
+ @type enable: bool
+ @param enable: Whether to set or unset it.
+
+ """
+ flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+
+ if enable:
+ flags |= fcntl.FD_CLOEXEC
+ else:
+ flags &= ~fcntl.FD_CLOEXEC
+
+ fcntl.fcntl(fd, fcntl.F_SETFD, flags)
+
+
+def SetNonblockFlag(fd, enable):
+ """Sets or unsets the O_NONBLOCK flag on on a file descriptor.
+
+ @type fd: int
+ @param fd: File descriptor
+ @type enable: bool
+ @param enable: Whether to set or unset it
+
+ """
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+
+ if enable:
+ flags |= os.O_NONBLOCK
+ else:
+ flags &= ~os.O_NONBLOCK
+
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+
+
+def RetryOnSignal(fn, *args, **kwargs):
+ """Calls a function again if it failed due to EINTR.
+
+ """
+ while True:
+ try:
+ return fn(*args, **kwargs)
+ except EnvironmentError, err:
+ if err.errno != errno.EINTR:
+ raise
+ except select.error, err:
+ if not (err.args and err.args[0] == errno.EINTR):
+ raise
+
+
def RunParts(dir_name, env=None, reset_env=False):
"""Run Scripts or programs in a directory
f = open(filename)
- fp = sha1()
+ if callable(sha1):
+ fp = sha1()
+ else:
+ fp = sha1.new()
while True:
data = f.read(4096)
if not data:
"""Class implementing resolver and hostname functionality
"""
+ _VALID_NAME_RE = re.compile("^[a-z0-9._-]{1,255}$")
+
def __init__(self, name=None):
"""Initialize the host name object.
return result
+ @classmethod
+ def NormalizeName(cls, hostname):
+ """Validate and normalize the given hostname.
+
+ @attention: the validation is a bit more relaxed than the standards
+ require; most importantly, we allow underscores in names
+ @raise errors.OpPrereqError: when the name is not valid
+
+ """
+ hostname = hostname.lower()
+ if (not cls._VALID_NAME_RE.match(hostname) or
+ # double-dots, meaning empty label
+ ".." in hostname or
+ # empty initial label
+ hostname.startswith(".")):
+ raise errors.OpPrereqError("Invalid hostname '%s'" % hostname,
+ errors.ECODE_INVAL)
+ if hostname.endswith("."):
+ hostname = hostname.rstrip(".")
+ return hostname
+
def GetHostInfo(name=None):
"""Lookup host name and raise an OpPrereqError for failures"""
RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
+def TimestampForFilename():
+ """Returns the current time formatted for filenames.
+
+ The format doesn't contain colons as some shells and applications them as
+ separators.
+
+ """
+ return time.strftime("%Y-%m-%d_%H_%M_%S")
+
+
def CreateBackup(file_name):
"""Creates a backup of a file.
raise errors.ProgrammerError("Can't make a backup of a non-file '%s'" %
file_name)
- prefix = '%s.backup-%d.' % (os.path.basename(file_name), int(time.time()))
+ prefix = ("%s.backup-%s." %
+ (os.path.basename(file_name), TimestampForFilename()))
dir_name = os.path.dirname(file_name)
fsrc = open(file_name, 'rb')
(fd, backup_name) = tempfile.mkstemp(prefix=prefix, dir=dir_name)
fdst = os.fdopen(fd, 'wb')
try:
+ logging.debug("Backing up %s at %s", file_name, backup_name)
shutil.copyfileobj(fsrc, fdst)
finally:
fdst.close()
return None
-def all(seq, pred=bool): # pylint: disable-msg=W0622
- "Returns True if pred(x) is True for every element in the iterable"
- for _ in itertools.ifilterfalse(pred, seq):
+try:
+ all = all # pylint: disable-msg=W0622
+except NameError:
+ def all(seq, pred=bool): # pylint: disable-msg=W0622
+ "Returns True if pred(x) is True for every element in the iterable"
+ for _ in itertools.ifilterfalse(pred, seq):
+ return False
+ return True
+
+
+try:
+ any = any # pylint: disable-msg=W0622
+except NameError:
+ def any(seq, pred=bool): # pylint: disable-msg=W0622
+ "Returns True if pred(x) is True for at least one element in the iterable"
+ for _ in itertools.ifilter(pred, seq):
+ return True
return False
- return True
-def any(seq, pred=bool): # pylint: disable-msg=W0622
- "Returns True if pred(x) is True for at least one element in the iterable"
- for _ in itertools.ifilter(pred, seq):
- return True
- return False
+def SingleWaitForFdCondition(fdobj, event, timeout):
+ """Waits for a condition to occur on the socket.
+
+ Immediately returns at the first interruption.
+
+ @type fdobj: integer or object supporting a fileno() method
+ @param fdobj: entity to wait for events on
+ @type event: integer
+ @param event: ORed condition (see select module)
+ @type timeout: float or None
+ @param timeout: Timeout in seconds
+ @rtype: int or None
+ @return: None for timeout, otherwise occured conditions
+
+ """
+ check = (event | select.POLLPRI |
+ select.POLLNVAL | select.POLLHUP | select.POLLERR)
+
+ if timeout is not None:
+ # Poller object expects milliseconds
+ timeout *= 1000
+
+ poller = select.poll()
+ poller.register(fdobj, event)
+ try:
+ # TODO: If the main thread receives a signal and we have no timeout, we
+ # could wait forever. This should check a global "quit" flag or something
+ # every so often.
+ io_events = poller.poll(timeout)
+ except select.error, err:
+ if err[0] != errno.EINTR:
+ raise
+ io_events = []
+ if io_events and io_events[0][1] & check:
+ return io_events[0][1]
+ else:
+ return None
+
+
+class FdConditionWaiterHelper(object):
+ """Retry helper for WaitForFdCondition.
+
+ This class contains the retried and wait functions that make sure
+ WaitForFdCondition can continue waiting until the timeout is actually
+ expired.
+
+ """
+
+ def __init__(self, timeout):
+ self.timeout = timeout
+
+ def Poll(self, fdobj, event):
+ result = SingleWaitForFdCondition(fdobj, event, self.timeout)
+ if result is None:
+ raise RetryAgain()
+ else:
+ return result
+
+ def UpdateTimeout(self, timeout):
+ self.timeout = timeout
+
+
+def WaitForFdCondition(fdobj, event, timeout):
+ """Waits for a condition to occur on the socket.
+
+ Retries until the timeout is expired, even if interrupted.
+
+ @type fdobj: integer or object supporting a fileno() method
+ @param fdobj: entity to wait for events on
+ @type event: integer
+ @param event: ORed condition (see select module)
+ @type timeout: float or None
+ @param timeout: Timeout in seconds
+ @rtype: int or None
+ @return: None for timeout, otherwise occured conditions
+
+ """
+ if timeout is not None:
+ retrywaiter = FdConditionWaiterHelper(timeout)
+ result = Retry(retrywaiter.Poll, RETRY_REMAINING_TIME, timeout,
+ args=(fdobj, event), wait_fn=retrywaiter.UpdateTimeout)
+ else:
+ result = None
+ while result is None:
+ result = SingleWaitForFdCondition(fdobj, event, timeout)
+ return result
+
+
+def partition(seq, pred=bool): # # pylint: disable-msg=W0622
+ "Partition a list in two, based on the given predicate"
+ return (list(itertools.ifilter(pred, seq)),
+ list(itertools.ifilterfalse(pred, seq)))
def UniqueSequence(seq):
return rows[-lines:]
+def _ParseAsn1Generalizedtime(value):
+ """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
+
+ @type value: string
+ @param value: ASN1 GENERALIZEDTIME timestamp
+
+ """
+ m = re.match(r"^(\d+)([-+]\d\d)(\d\d)$", value)
+ if m:
+ # We have an offset
+ asn1time = m.group(1)
+ hours = int(m.group(2))
+ minutes = int(m.group(3))
+ utcoffset = (60 * hours) + minutes
+ else:
+ if not value.endswith("Z"):
+ raise ValueError("Missing timezone")
+ asn1time = value[:-1]
+ utcoffset = 0
+
+ parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
+
+ tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
+
+ return calendar.timegm(tt.utctimetuple())
+
+
+def GetX509CertValidity(cert):
+ """Returns the validity period of the certificate.
+
+ @type cert: OpenSSL.crypto.X509
+ @param cert: X509 certificate object
+
+ """
+ # The get_notBefore and get_notAfter functions are only supported in
+ # pyOpenSSL 0.7 and above.
+ try:
+ get_notbefore_fn = cert.get_notBefore
+ except AttributeError:
+ not_before = None
+ else:
+ not_before_asn1 = get_notbefore_fn()
+
+ if not_before_asn1 is None:
+ not_before = None
+ else:
+ not_before = _ParseAsn1Generalizedtime(not_before_asn1)
+
+ try:
+ get_notafter_fn = cert.get_notAfter
+ except AttributeError:
+ not_after = None
+ else:
+ not_after_asn1 = get_notafter_fn()
+
+ if not_after_asn1 is None:
+ not_after = None
+ else:
+ not_after = _ParseAsn1Generalizedtime(not_after_asn1)
+
+ return (not_before, not_after)
+
+
+def SignX509Certificate(cert, key, salt):
+ """Sign a X509 certificate.
+
+ An RFC822-like signature header is added in front of the certificate.
+
+ @type cert: OpenSSL.crypto.X509
+ @param cert: X509 certificate object
+ @type key: string
+ @param key: Key for HMAC
+ @type salt: string
+ @param salt: Salt for HMAC
+ @rtype: string
+ @return: Serialized and signed certificate in PEM format
+
+ """
+ if not VALID_X509_SIGNATURE_SALT.match(salt):
+ raise errors.GenericError("Invalid salt: %r" % salt)
+
+ # Dumping as PEM here ensures the certificate is in a sane format
+ cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
+
+ return ("%s: %s/%s\n\n%s" %
+ (constants.X509_CERT_SIGNATURE_HEADER, salt,
+ hmac.new(key, salt + cert_pem, sha1).hexdigest(),
+ cert_pem))
+
+
+def _ExtractX509CertificateSignature(cert_pem):
+ """Helper function to extract signature from X509 certificate.
+
+ """
+ # Extract signature from original PEM data
+ for line in cert_pem.splitlines():
+ if line.startswith("---"):
+ break
+
+ m = X509_SIGNATURE.match(line.strip())
+ if m:
+ return (m.group("salt"), m.group("sign"))
+
+ raise errors.GenericError("X509 certificate signature is missing")
+
+
+def LoadSignedX509Certificate(cert_pem, key):
+ """Verifies a signed X509 certificate.
+
+ @type cert_pem: string
+ @param cert_pem: Certificate in PEM format and with signature header
+ @type key: string
+ @param key: Key for HMAC
+ @rtype: tuple; (OpenSSL.crypto.X509, string)
+ @return: X509 certificate object and salt
+
+ """
+ (salt, signature) = _ExtractX509CertificateSignature(cert_pem)
+
+ # Load certificate
+ cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
+
+ # Dump again to ensure it's in a sane format
+ sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
+
+ if signature != hmac.new(key, salt + sane_pem, sha1).hexdigest():
+ raise errors.GenericError("X509 certificate signature is invalid")
+
+ return (cert, salt)
+
+
def SafeEncode(text):
"""Return a 'safe' version of a source string.
return (tsize, fsize)
-def RunInSeparateProcess(fn):
+def RunInSeparateProcess(fn, *args):
"""Runs a function in a separate process.
Note: Only boolean return values are supported.
@type fn: callable
@param fn: Function to be called
- @rtype: tuple of (int/None, int/None)
- @return: Exit code and signal number
+ @rtype: bool
+ @return: Function's result
"""
pid = os.fork()
ResetTempfileModule()
# Call function
- result = int(bool(fn()))
+ result = int(bool(fn(*args)))
assert result in (0, 1)
except: # pylint: disable-msg=W0702
logging.exception("Error while calling function in separate process")
wait_fn(current_delay)
+def GetClosedTempfile(*args, **kwargs):
+ """Creates a temporary file and returns its path.
+
+ """
+ (fd, path) = tempfile.mkstemp(*args, **kwargs)
+ _CloseFDNoErr(fd)
+ return path
+
+
+def GenerateSelfSignedX509Cert(common_name, validity):
+ """Generates a self-signed X509 certificate.
+
+ @type common_name: string
+ @param common_name: commonName value
+ @type validity: int
+ @param validity: Validity for certificate in seconds
+
+ """
+ # Create private and public key
+ key = OpenSSL.crypto.PKey()
+ key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
+
+ # Create self-signed certificate
+ cert = OpenSSL.crypto.X509()
+ if common_name:
+ cert.get_subject().CN = common_name
+ cert.set_serial_number(1)
+ cert.gmtime_adj_notBefore(0)
+ cert.gmtime_adj_notAfter(validity)
+ cert.set_issuer(cert.get_subject())
+ cert.set_pubkey(key)
+ cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
+
+ key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
+ cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
+
+ return (key_pem, cert_pem)
+
+
+def GenerateSelfSignedSslCert(filename, validity=(5 * 365)):
+ """Legacy function to generate self-signed X509 certificate.
+
+ """
+ (key_pem, cert_pem) = GenerateSelfSignedX509Cert(None,
+ validity * 24 * 60 * 60)
+
+ WriteFile(filename, mode=0400, data=key_pem + cert_pem)
+
+
class FileLock(object):
"""Utility class for file locks.
"""
- def __init__(self, filename):
+ def __init__(self, fd, filename):
"""Constructor for FileLock.
- This will open the file denoted by the I{filename} argument.
-
+ @type fd: file
+ @param fd: File object
@type filename: str
- @param filename: path to the file to be locked
+ @param filename: Path of the file opened at I{fd}
"""
+ self.fd = fd
self.filename = filename
- self.fd = open(self.filename, "w")
+
+ @classmethod
+ def Open(cls, filename):
+ """Creates and opens a file to be used as a file-based lock.
+
+ @type filename: string
+ @param filename: path to the file to be locked
+
+ """
+ # Using "os.open" is necessary to allow both opening existing file
+ # read/write and creating if not existing. Vanilla "open" will truncate an
+ # existing file -or- allow creating if not existing.
+ return cls(os.fdopen(os.open(filename, os.O_RDWR | os.O_CREAT), "w+"),
+ filename)
def __del__(self):
self.Close()
assert self.fd, "Lock was closed"
assert timeout is None or timeout >= 0, \
"If specified, timeout must be positive"
+ assert not (flag & fcntl.LOCK_NB), "LOCK_NB must not be set"
- if timeout is not None:
+ # When a timeout is used, LOCK_NB must always be set
+ if not (timeout is None and blocking):
flag |= fcntl.LOCK_NB
- timeout_end = time.time() + timeout
- # Blocking doesn't have effect with timeout
- elif not blocking:
- flag |= fcntl.LOCK_NB
- timeout_end = None
+ if timeout is None:
+ self._Lock(self.fd, flag, timeout)
+ else:
+ try:
+ Retry(self._Lock, (0.1, 1.2, 1.0), timeout,
+ args=(self.fd, flag, timeout))
+ except RetryTimeout:
+ raise errors.LockError(errmsg)
- # TODO: Convert to utils.Retry
+ @staticmethod
+ def _Lock(fd, flag, timeout):
+ try:
+ fcntl.flock(fd, flag)
+ except IOError, err:
+ if timeout is not None and err.errno == errno.EAGAIN:
+ raise RetryAgain()
- retry = True
- while retry:
- try:
- fcntl.flock(self.fd, flag)
- retry = False
- except IOError, err:
- if err.errno in (errno.EAGAIN, ):
- if timeout_end is not None and time.time() < timeout_end:
- # Wait before trying again
- time.sleep(max(0.1, min(1.0, timeout)))
- else:
- raise errors.LockError(errmsg)
- else:
- logging.exception("fcntl.flock failed")
- raise
+ logging.exception("fcntl.flock failed")
+ raise
def Exclusive(self, blocking=False, timeout=None):
"""Locks the file in exclusive mode.
@ivar called: tracks whether any of the signals have been raised
"""
- def __init__(self, signum):
+ def __init__(self, signum, handler_fn=None):
"""Constructs a new SignalHandler instance.
@type signum: int or list of ints
@param signum: Single signal number or set of signal numbers
+ @type handler_fn: callable
+ @param handler_fn: Signal handling function
"""
+ assert handler_fn is None or callable(handler_fn)
+
self.signum = set(signum)
self.called = False
+ self._handler_fn = handler_fn
+
self._previous = {}
try:
for signum in self.signum:
"""
self.called = False
- # we don't care about arguments, but we leave them named for the future
- def _HandleSignal(self, signum, frame): # pylint: disable-msg=W0613
+ def _HandleSignal(self, signum, frame):
"""Actual signal handling function.
"""
# solution in Python -- there are no atomic types.
self.called = True
+ if self._handler_fn:
+ self._handler_fn(signum, frame)
+
class FieldSet(object):
"""A simple field set.