"""
+# Allow wildcard import in pylint: disable=W0401
import os
-import sys
-import time
-import subprocess
import re
-import socket
-import tempfile
-import shutil
import errno
import pwd
+import time
import itertools
import select
-import fcntl
-import resource
import logging
import signal
-import OpenSSL
-import datetime
-import calendar
-
-from cStringIO import StringIO
from ganeti import errors
from ganeti import constants
from ganeti import compat
+from ganeti import pathutils
+
+from ganeti.utils.algo import *
+from ganeti.utils.filelock import *
+from ganeti.utils.hash import *
+from ganeti.utils.io import *
+from ganeti.utils.log import *
+from ganeti.utils.mlock import *
+from ganeti.utils.nodesetup import *
+from ganeti.utils.process import *
+from ganeti.utils.retry import *
+from ganeti.utils.text import *
+from ganeti.utils.wrapper import *
+from ganeti.utils.x509 import *
-from ganeti.utils.algo import * # pylint: disable-msg=W0401
-from ganeti.utils.retry import * # pylint: disable-msg=W0401
-from ganeti.utils.text import * # pylint: disable-msg=W0401
-from ganeti.utils.mlock import * # pylint: disable-msg=W0401
-from ganeti.utils.log import * # pylint: disable-msg=W0401
-from ganeti.utils.hash import * # pylint: disable-msg=W0401
-from ganeti.utils.wrapper import * # pylint: disable-msg=W0401
-from ganeti.utils.filelock import * # pylint: disable-msg=W0401
-from ganeti.utils.io import * # pylint: disable-msg=W0401
-
-
-#: when set to True, L{RunCmd} is disabled
-_no_fork = False
-
-_RANDOM_UUID_FILE = "/proc/sys/kernel/random/uuid"
-
-HEX_CHAR_RE = r"[a-zA-Z0-9]"
-VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S)
-X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
- (re.escape(constants.X509_CERT_SIGNATURE_HEADER),
- HEX_CHAR_RE, HEX_CHAR_RE),
- re.S | re.I)
_VALID_SERVICE_NAME_RE = re.compile("^[-_.a-zA-Z0-9]{1,128}$")
-UUID_RE = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
- '[a-f0-9]{4}-[a-f0-9]{12}$')
-
-# Certificate verification results
-(CERT_WARNING,
- CERT_ERROR) = range(1, 3)
-
-(_TIMEOUT_NONE,
- _TIMEOUT_TERM,
- _TIMEOUT_KILL) = range(3)
-
-#: Shell param checker regexp
-_SHELLPARAM_REGEX = re.compile(r"^[-a-zA-Z0-9._+/:%@]+$")
-
-#: ASN1 time regexp
-_ASN1_TIME_REGEX = re.compile(r"^(\d+)([-+]\d\d)(\d\d)$")
-
-
-def DisableFork():
- """Disables the use of fork(2).
-
- """
- global _no_fork # pylint: disable-msg=W0603
-
- _no_fork = True
-
-
-class RunResult(object):
- """Holds the result of running external programs.
-
- @type exit_code: int
- @ivar exit_code: the exit code of the program, or None (if the program
- didn't exit())
- @type signal: int or None
- @ivar signal: the signal that caused the program to finish, or None
- (if the program wasn't terminated by a signal)
- @type stdout: str
- @ivar stdout: the standard output of the program
- @type stderr: str
- @ivar stderr: the standard error of the program
- @type failed: boolean
- @ivar failed: True in case the program was
- terminated by a signal or exited with a non-zero exit code
- @ivar fail_reason: a string detailing the termination reason
-
- """
- __slots__ = ["exit_code", "signal", "stdout", "stderr",
- "failed", "fail_reason", "cmd"]
-
-
- def __init__(self, exit_code, signal_, stdout, stderr, cmd, timeout_action,
- timeout):
- self.cmd = cmd
- self.exit_code = exit_code
- self.signal = signal_
- self.stdout = stdout
- self.stderr = stderr
- self.failed = (signal_ is not None or exit_code != 0)
-
- fail_msgs = []
- if self.signal is not None:
- fail_msgs.append("terminated by signal %s" % self.signal)
- elif self.exit_code is not None:
- fail_msgs.append("exited with exit code %s" % self.exit_code)
- else:
- fail_msgs.append("unable to determine termination reason")
-
- if timeout_action == _TIMEOUT_TERM:
- fail_msgs.append("terminated after timeout of %.2f seconds" % timeout)
- elif timeout_action == _TIMEOUT_KILL:
- fail_msgs.append(("force termination after timeout of %.2f seconds"
- " and linger for another %.2f seconds") %
- (timeout, constants.CHILD_LINGER_TIMEOUT))
-
- if fail_msgs and self.failed:
- self.fail_reason = CommaJoin(fail_msgs)
-
- if self.failed:
- logging.debug("Command '%s' failed (%s); output: %s",
- self.cmd, self.fail_reason, self.output)
-
- def _GetOutput(self):
- """Returns the combined stdout and stderr for easier usage.
-
- """
- return self.stdout + self.stderr
-
- output = property(_GetOutput, None, None, "Return full output")
-
-
-def _BuildCmdEnvironment(env, reset):
- """Builds the environment for an external program.
-
- """
- if reset:
- cmd_env = {}
- else:
- cmd_env = os.environ.copy()
- cmd_env["LC_ALL"] = "C"
-
- if env is not None:
- cmd_env.update(env)
-
- return cmd_env
-
-
-def RunCmd(cmd, env=None, output=None, cwd="/", reset_env=False,
- interactive=False, timeout=None):
- """Execute a (shell) command.
-
- The command should not read from its standard input, as it will be
- closed.
-
- @type cmd: string or list
- @param cmd: Command to run
- @type env: dict
- @param env: Additional environment variables
- @type output: str
- @param output: if desired, the output of the command can be
- saved in a file instead of the RunResult instance; this
- parameter denotes the file name (if not None)
- @type cwd: string
- @param cwd: if specified, will be used as the working
- directory for the command; the default will be /
- @type reset_env: boolean
- @param reset_env: whether to reset or keep the default os environment
- @type interactive: boolean
- @param interactive: weather we pipe stdin, stdout and stderr
- (default behaviour) or run the command interactive
- @type timeout: int
- @param timeout: If not None, timeout in seconds until child process gets
- killed
- @rtype: L{RunResult}
- @return: RunResult instance
- @raise errors.ProgrammerError: if we call this when forks are disabled
-
- """
- if _no_fork:
- raise errors.ProgrammerError("utils.RunCmd() called with fork() disabled")
-
- if output and interactive:
- raise errors.ProgrammerError("Parameters 'output' and 'interactive' can"
- " not be provided at the same time")
-
- if isinstance(cmd, basestring):
- strcmd = cmd
- shell = True
- else:
- cmd = [str(val) for val in cmd]
- strcmd = ShellQuoteArgs(cmd)
- shell = False
-
- if output:
- logging.debug("RunCmd %s, output file '%s'", strcmd, output)
- else:
- logging.debug("RunCmd %s", strcmd)
-
- cmd_env = _BuildCmdEnvironment(env, reset_env)
-
- try:
- if output is None:
- out, err, status, timeout_action = _RunCmdPipe(cmd, cmd_env, shell, cwd,
- interactive, timeout)
- else:
- timeout_action = _TIMEOUT_NONE
- status = _RunCmdFile(cmd, cmd_env, shell, output, cwd)
- out = err = ""
- except OSError, err:
- if err.errno == errno.ENOENT:
- raise errors.OpExecError("Can't execute '%s': not found (%s)" %
- (strcmd, err))
- else:
- raise
-
- if status >= 0:
- exitcode = status
- signal_ = None
- else:
- exitcode = None
- signal_ = -status
-
- return RunResult(exitcode, signal_, out, err, strcmd, timeout_action, timeout)
-
-
-def SetupDaemonEnv(cwd="/", umask=077):
- """Setup a daemon's environment.
-
- This should be called between the first and second fork, due to
- setsid usage.
-
- @param cwd: the directory to which to chdir
- @param umask: the umask to setup
-
- """
- os.chdir(cwd)
- os.umask(umask)
- os.setsid()
-
-
-def SetupDaemonFDs(output_file, output_fd):
- """Setups up a daemon's file descriptors.
-
- @param output_file: if not None, the file to which to redirect
- stdout/stderr
- @param output_fd: if not None, the file descriptor for stdout/stderr
-
- """
- # check that at most one is defined
- assert [output_file, output_fd].count(None) >= 1
-
- # Open /dev/null (read-only, only for stdin)
- devnull_fd = os.open(os.devnull, os.O_RDONLY)
-
- if output_fd is not None:
- pass
- elif output_file is not None:
- # Open output file
- try:
- output_fd = os.open(output_file,
- os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0600)
- except EnvironmentError, err:
- raise Exception("Opening output file failed: %s" % err)
- else:
- output_fd = os.open(os.devnull, os.O_WRONLY)
-
- # Redirect standard I/O
- os.dup2(devnull_fd, 0)
- os.dup2(output_fd, 1)
- os.dup2(output_fd, 2)
-
-
-def StartDaemon(cmd, env=None, cwd="/", output=None, output_fd=None,
- pidfile=None):
- """Start a daemon process after forking twice.
-
- @type cmd: string or list
- @param cmd: Command to run
- @type env: dict
- @param env: Additional environment variables
- @type cwd: string
- @param cwd: Working directory for the program
- @type output: string
- @param output: Path to file in which to save the output
- @type output_fd: int
- @param output_fd: File descriptor for output
- @type pidfile: string
- @param pidfile: Process ID file
- @rtype: int
- @return: Daemon process ID
- @raise errors.ProgrammerError: if we call this when forks are disabled
-
- """
- if _no_fork:
- raise errors.ProgrammerError("utils.StartDaemon() called with fork()"
- " disabled")
-
- if output and not (bool(output) ^ (output_fd is not None)):
- raise errors.ProgrammerError("Only one of 'output' and 'output_fd' can be"
- " specified")
-
- if isinstance(cmd, basestring):
- cmd = ["/bin/sh", "-c", cmd]
-
- strcmd = ShellQuoteArgs(cmd)
-
- if output:
- logging.debug("StartDaemon %s, output file '%s'", strcmd, output)
- else:
- logging.debug("StartDaemon %s", strcmd)
-
- cmd_env = _BuildCmdEnvironment(env, False)
-
- # Create pipe for sending PID back
- (pidpipe_read, pidpipe_write) = os.pipe()
- try:
- try:
- # Create pipe for sending error messages
- (errpipe_read, errpipe_write) = os.pipe()
- try:
- try:
- # First fork
- pid = os.fork()
- if pid == 0:
- try:
- # Child process, won't return
- _StartDaemonChild(errpipe_read, errpipe_write,
- pidpipe_read, pidpipe_write,
- cmd, cmd_env, cwd,
- output, output_fd, pidfile)
- finally:
- # Well, maybe child process failed
- os._exit(1) # pylint: disable-msg=W0212
- finally:
- CloseFdNoError(errpipe_write)
-
- # Wait for daemon to be started (or an error message to
- # arrive) and read up to 100 KB as an error message
- errormsg = RetryOnSignal(os.read, errpipe_read, 100 * 1024)
- finally:
- CloseFdNoError(errpipe_read)
- finally:
- CloseFdNoError(pidpipe_write)
-
- # Read up to 128 bytes for PID
- pidtext = RetryOnSignal(os.read, pidpipe_read, 128)
- finally:
- CloseFdNoError(pidpipe_read)
-
- # Try to avoid zombies by waiting for child process
- try:
- os.waitpid(pid, 0)
- except OSError:
- pass
-
- if errormsg:
- raise errors.OpExecError("Error when starting daemon process: %r" %
- errormsg)
-
- try:
- return int(pidtext)
- except (ValueError, TypeError), err:
- raise errors.OpExecError("Error while trying to parse PID %r: %s" %
- (pidtext, err))
-
-
-def _StartDaemonChild(errpipe_read, errpipe_write,
- pidpipe_read, pidpipe_write,
- args, env, cwd,
- output, fd_output, pidfile):
- """Child process for starting daemon.
-
- """
- try:
- # Close parent's side
- CloseFdNoError(errpipe_read)
- CloseFdNoError(pidpipe_read)
-
- # First child process
- SetupDaemonEnv()
-
- # And fork for the second time
- pid = os.fork()
- if pid != 0:
- # Exit first child process
- os._exit(0) # pylint: disable-msg=W0212
-
- # Make sure pipe is closed on execv* (and thereby notifies
- # original process)
- SetCloseOnExecFlag(errpipe_write, True)
-
- # List of file descriptors to be left open
- noclose_fds = [errpipe_write]
-
- # Open PID file
- if pidfile:
- fd_pidfile = WritePidFile(pidfile)
-
- # Keeping the file open to hold the lock
- noclose_fds.append(fd_pidfile)
-
- SetCloseOnExecFlag(fd_pidfile, False)
- else:
- fd_pidfile = None
-
- SetupDaemonFDs(output, fd_output)
-
- # Send daemon PID to parent
- RetryOnSignal(os.write, pidpipe_write, str(os.getpid()))
-
- # Close all file descriptors except stdio and error message pipe
- CloseFDs(noclose_fds=noclose_fds)
-
- # Change working directory
- os.chdir(cwd)
-
- if env is None:
- os.execvp(args[0], args)
- else:
- os.execvpe(args[0], args, env)
- except: # pylint: disable-msg=W0702
- try:
- # Report errors to original process
- WriteErrorToFD(errpipe_write, str(sys.exc_info()[1]))
- except: # pylint: disable-msg=W0702
- # Ignore errors in error handling
- pass
-
- os._exit(1) # pylint: disable-msg=W0212
-
-
-def WriteErrorToFD(fd, err):
- """Possibly write an error message to a fd.
-
- @type fd: None or int (file descriptor)
- @param fd: if not None, the error will be written to this fd
- @param err: string, the error message
-
- """
- if fd is None:
- return
-
- if not err:
- err = "<unknown error>"
-
- RetryOnSignal(os.write, fd, err)
-
-
-def _CheckIfAlive(child):
- """Raises L{RetryAgain} if child is still alive.
-
- @raises RetryAgain: If child is still alive
-
- """
- if child.poll() is None:
- raise RetryAgain()
-
-
-def _WaitForProcess(child, timeout):
- """Waits for the child to terminate or until we reach timeout.
-
- """
- try:
- Retry(_CheckIfAlive, (1.0, 1.2, 5.0), max(0, timeout), args=[child])
- except RetryTimeout:
- pass
-
-
-def _RunCmdPipe(cmd, env, via_shell, cwd, interactive, timeout,
- _linger_timeout=constants.CHILD_LINGER_TIMEOUT):
- """Run a command and return its output.
-
- @type cmd: string or list
- @param cmd: Command to run
- @type env: dict
- @param env: The environment to use
- @type via_shell: bool
- @param via_shell: if we should run via the shell
- @type cwd: string
- @param cwd: the working directory for the program
- @type interactive: boolean
- @param interactive: Run command interactive (without piping)
- @type timeout: int
- @param timeout: Timeout after the programm gets terminated
- @rtype: tuple
- @return: (out, err, status)
-
- """
- poller = select.poll()
-
- stderr = subprocess.PIPE
- stdout = subprocess.PIPE
- stdin = subprocess.PIPE
-
- if interactive:
- stderr = stdout = stdin = None
-
- child = subprocess.Popen(cmd, shell=via_shell,
- stderr=stderr,
- stdout=stdout,
- stdin=stdin,
- close_fds=True, env=env,
- cwd=cwd)
-
- out = StringIO()
- err = StringIO()
-
- linger_timeout = None
-
- if timeout is None:
- poll_timeout = None
- else:
- poll_timeout = RunningTimeout(timeout, True).Remaining
-
- msg_timeout = ("Command %s (%d) run into execution timeout, terminating" %
- (cmd, child.pid))
- msg_linger = ("Command %s (%d) run into linger timeout, killing" %
- (cmd, child.pid))
-
- timeout_action = _TIMEOUT_NONE
-
- if not interactive:
- child.stdin.close()
- poller.register(child.stdout, select.POLLIN)
- poller.register(child.stderr, select.POLLIN)
- fdmap = {
- child.stdout.fileno(): (out, child.stdout),
- child.stderr.fileno(): (err, child.stderr),
- }
- for fd in fdmap:
- SetNonblockFlag(fd, True)
-
- while fdmap:
- if poll_timeout:
- pt = poll_timeout() * 1000
- if pt < 0:
- if linger_timeout is None:
- logging.warning(msg_timeout)
- if child.poll() is None:
- timeout_action = _TIMEOUT_TERM
- IgnoreProcessNotFound(os.kill, child.pid, signal.SIGTERM)
- linger_timeout = RunningTimeout(_linger_timeout, True).Remaining
- pt = linger_timeout() * 1000
- if pt < 0:
- break
- else:
- pt = None
-
- pollresult = RetryOnSignal(poller.poll, pt)
-
- for fd, event in pollresult:
- if event & select.POLLIN or event & select.POLLPRI:
- data = fdmap[fd][1].read()
- # no data from read signifies EOF (the same as POLLHUP)
- if not data:
- poller.unregister(fd)
- del fdmap[fd]
- continue
- fdmap[fd][0].write(data)
- if (event & select.POLLNVAL or event & select.POLLHUP or
- event & select.POLLERR):
- poller.unregister(fd)
- del fdmap[fd]
-
- if timeout is not None:
- assert callable(poll_timeout)
-
- # We have no I/O left but it might still run
- if child.poll() is None:
- _WaitForProcess(child, poll_timeout())
-
- # Terminate if still alive after timeout
- if child.poll() is None:
- if linger_timeout is None:
- logging.warning(msg_timeout)
- timeout_action = _TIMEOUT_TERM
- IgnoreProcessNotFound(os.kill, child.pid, signal.SIGTERM)
- lt = _linger_timeout
- else:
- lt = linger_timeout()
- _WaitForProcess(child, lt)
-
- # Okay, still alive after timeout and linger timeout? Kill it!
- if child.poll() is None:
- timeout_action = _TIMEOUT_KILL
- logging.warning(msg_linger)
- IgnoreProcessNotFound(os.kill, child.pid, signal.SIGKILL)
-
- out = out.getvalue()
- err = err.getvalue()
-
- status = child.wait()
- return out, err, status, timeout_action
-
-
-def _RunCmdFile(cmd, env, via_shell, output, cwd):
- """Run a command and save its output to a file.
-
- @type cmd: string or list
- @param cmd: Command to run
- @type env: dict
- @param env: The environment to use
- @type via_shell: bool
- @param via_shell: if we should run via the shell
- @type output: str
- @param output: the filename in which to save the output
- @type cwd: string
- @param cwd: the working directory for the program
- @rtype: int
- @return: the exit status
-
- """
- fh = open(output, "a")
- try:
- child = subprocess.Popen(cmd, shell=via_shell,
- stderr=subprocess.STDOUT,
- stdout=fh,
- stdin=subprocess.PIPE,
- close_fds=True, env=env,
- cwd=cwd)
-
- child.stdin.close()
- status = child.wait()
- finally:
- fh.close()
- return status
-
-
-def RunParts(dir_name, env=None, reset_env=False):
- """Run Scripts or programs in a directory
-
- @type dir_name: string
- @param dir_name: absolute path to a directory
- @type env: dict
- @param env: The environment to use
- @type reset_env: boolean
- @param reset_env: whether to reset or keep the default os environment
- @rtype: list of tuples
- @return: list of (name, (one of RUNDIR_STATUS), RunResult)
-
- """
- rr = []
-
- try:
- dir_contents = ListVisibleFiles(dir_name)
- except OSError, err:
- logging.warning("RunParts: skipping %s (cannot list: %s)", dir_name, err)
- return rr
-
- for relname in sorted(dir_contents):
- fname = PathJoin(dir_name, relname)
- if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
- constants.EXT_PLUGIN_MASK.match(relname) is not None):
- rr.append((relname, constants.RUNPARTS_SKIP, None))
- else:
- try:
- result = RunCmd([fname], env=env, reset_env=reset_env)
- except Exception, err: # pylint: disable-msg=W0703
- rr.append((relname, constants.RUNPARTS_ERR, str(err)))
- else:
- rr.append((relname, constants.RUNPARTS_RUN, result))
-
- return rr
-
-
-def ResetTempfileModule():
- """Resets the random name generator of the tempfile module.
-
- This function should be called after C{os.fork} in the child process to
- ensure it creates a newly seeded random generator. Otherwise it would
- generate the same random parts as the parent process. If several processes
- race for the creation of a temporary file, this could lead to one not getting
- a temporary name.
-
- """
- # pylint: disable-msg=W0212
- if hasattr(tempfile, "_once_lock") and hasattr(tempfile, "_name_sequence"):
- tempfile._once_lock.acquire()
- try:
- # Reset random name generator
- tempfile._name_sequence = None
- finally:
- tempfile._once_lock.release()
- else:
- logging.critical("The tempfile module misses at least one of the"
- " '_once_lock' and '_name_sequence' attributes")
+UUID_RE = re.compile("^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-"
+ "[a-f0-9]{4}-[a-f0-9]{12}$")
def ForceDictType(target, key_types, allowed_values=None):
for key in target:
if key not in key_types:
- msg = "Unknown key '%s'" % key
+ msg = "Unknown parameter '%s'" % key
raise errors.TypeEnforcementError(msg)
if target[key] in allowed_values:
pass
elif not isinstance(target[key], basestring):
if isinstance(target[key], bool) and not target[key]:
- target[key] = ''
+ target[key] = ""
else:
msg = "'%s' (value %s) is not a valid string" % (key, target[key])
raise errors.TypeEnforcementError(msg)
raise errors.TypeEnforcementError(msg)
-def _GetProcStatusPath(pid):
- """Returns the path for a PID's proc status file.
-
- @type pid: int
- @param pid: Process ID
- @rtype: string
-
- """
- return "/proc/%d/status" % pid
-
-
-def IsProcessAlive(pid):
- """Check if a given pid exists on the system.
-
- @note: zombie status is not handled, so zombie processes
- will be returned as alive
- @type pid: int
- @param pid: the process ID to check
- @rtype: boolean
- @return: True if the process exists
-
- """
- def _TryStat(name):
- try:
- os.stat(name)
- return True
- except EnvironmentError, err:
- if err.errno in (errno.ENOENT, errno.ENOTDIR):
- return False
- elif err.errno == errno.EINVAL:
- raise RetryAgain(err)
- raise
-
- assert isinstance(pid, int), "pid must be an integer"
- if pid <= 0:
- return False
-
- # /proc in a multiprocessor environment can have strange behaviors.
- # Retry the os.stat a few times until we get a good result.
- try:
- return Retry(_TryStat, (0.01, 1.5, 0.1), 0.5,
- args=[_GetProcStatusPath(pid)])
- except RetryTimeout, err:
- err.RaiseInner()
-
-
-def _ParseSigsetT(sigset):
- """Parse a rendered sigset_t value.
-
- This is the opposite of the Linux kernel's fs/proc/array.c:render_sigset_t
- function.
-
- @type sigset: string
- @param sigset: Rendered signal set from /proc/$pid/status
- @rtype: set
- @return: Set of all enabled signal numbers
-
- """
- result = set()
-
- signum = 0
- for ch in reversed(sigset):
- chv = int(ch, 16)
-
- # The following could be done in a loop, but it's easier to read and
- # understand in the unrolled form
- if chv & 1:
- result.add(signum + 1)
- if chv & 2:
- result.add(signum + 2)
- if chv & 4:
- result.add(signum + 3)
- if chv & 8:
- result.add(signum + 4)
-
- signum += 4
-
- return result
-
-
-def _GetProcStatusField(pstatus, field):
- """Retrieves a field from the contents of a proc status file.
-
- @type pstatus: string
- @param pstatus: Contents of /proc/$pid/status
- @type field: string
- @param field: Name of field whose value should be returned
- @rtype: string
-
- """
- for line in pstatus.splitlines():
- parts = line.split(":", 1)
-
- if len(parts) < 2 or parts[0] != field:
- continue
-
- return parts[1].strip()
-
- return None
-
-
-def IsProcessHandlingSignal(pid, signum, status_path=None):
- """Checks whether a process is handling a signal.
-
- @type pid: int
- @param pid: Process ID
- @type signum: int
- @param signum: Signal number
- @rtype: bool
-
- """
- if status_path is None:
- status_path = _GetProcStatusPath(pid)
-
- try:
- proc_status = ReadFile(status_path)
- except EnvironmentError, err:
- # In at least one case, reading /proc/$pid/status failed with ESRCH.
- if err.errno in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL, errno.ESRCH):
- return False
- raise
-
- sigcgt = _GetProcStatusField(proc_status, "SigCgt")
- if sigcgt is None:
- raise RuntimeError("%s is missing 'SigCgt' field" % status_path)
-
- # Now check whether signal is handled
- return signum in _ParseSigsetT(sigcgt)
-
-
def ValidateServiceName(name):
"""Validate the given service name.
return name
+def _ComputeMissingKeys(key_path, options, defaults):
+ """Helper functions to compute which keys a invalid.
+
+ @param key_path: The current key path (if any)
+ @param options: The user provided options
+ @param defaults: The default dictionary
+ @return: A list of invalid keys
+
+ """
+ defaults_keys = frozenset(defaults.keys())
+ invalid = []
+ for key, value in options.items():
+ if key_path:
+ new_path = "%s/%s" % (key_path, key)
+ else:
+ new_path = key
+
+ if key not in defaults_keys:
+ invalid.append(new_path)
+ elif isinstance(value, dict):
+ invalid.extend(_ComputeMissingKeys(new_path, value, defaults[key]))
+
+ return invalid
+
+
+def VerifyDictOptions(options, defaults):
+ """Verify a dict has only keys set which also are in the defaults dict.
+
+ @param options: The user provided options
+ @param defaults: The default dictionary
+ @raise error.OpPrereqError: If one of the keys is not supported
+
+ """
+ invalid = _ComputeMissingKeys("", options, defaults)
+
+ if invalid:
+ raise errors.OpPrereqError("Provided option keys not supported: %s" %
+ CommaJoin(invalid), errors.ECODE_INVAL)
+
+
def ListVolumeGroups():
"""List volume groups and their size
return nv
-def IsValidShellParam(word):
- """Verifies is the given word is safe from the shell's p.o.v.
-
- This means that we can pass this to a command via the shell and be
- sure that it doesn't alter the command line and is passed as such to
- the actual command.
-
- Note that we are overly restrictive here, in order to be on the safe
- side.
-
- @type word: str
- @param word: the word to check
- @rtype: boolean
- @return: True if the word is 'safe'
-
- """
- return bool(_SHELLPARAM_REGEX.match(word))
-
-
-def BuildShellCmd(template, *args):
- """Build a safe shell command line from the given arguments.
-
- This function will check all arguments in the args list so that they
- are valid shell parameters (i.e. they don't contain shell
- metacharacters). If everything is ok, it will return the result of
- template % args.
-
- @type template: str
- @param template: the string holding the template for the
- string formatting
- @rtype: str
- @return: the expanded command line
-
- """
- for word in args:
- if not IsValidShellParam(word):
- raise errors.ProgrammerError("Shell argument '%s' contains"
- " invalid characters" % word)
- return template % args
-
-
def ParseCpuMask(cpu_mask):
"""Parse a CPU mask definition and return the list of CPU IDs.
return cpu_list
-def SetEtcHostsEntry(file_name, ip, hostname, aliases):
- """Sets the name of an IP address and hostname in /etc/hosts.
-
- @type file_name: str
- @param file_name: path to the file to modify (usually C{/etc/hosts})
- @type ip: str
- @param ip: the IP address
- @type hostname: str
- @param hostname: the hostname to be added
- @type aliases: list
- @param aliases: the list of aliases to add for the hostname
-
- """
- # Ensure aliases are unique
- aliases = UniqueSequence([hostname] + aliases)[1:]
-
- def _WriteEtcHosts(fd):
- # Duplicating file descriptor because os.fdopen's result will automatically
- # close the descriptor, but we would still like to have its functionality.
- out = os.fdopen(os.dup(fd), "w")
- try:
- for line in ReadFile(file_name).splitlines(True):
- fields = line.split()
- if fields and not fields[0].startswith("#") and ip == fields[0]:
- continue
- out.write(line)
-
- out.write("%s\t%s" % (ip, hostname))
- if aliases:
- out.write(" %s" % " ".join(aliases))
- out.write("\n")
- out.flush()
- finally:
- out.close()
+def ParseMultiCpuMask(cpu_mask):
+ """Parse a multiple CPU mask definition and return the list of CPU IDs.
- WriteFile(file_name, fn=_WriteEtcHosts, mode=0644)
+ CPU mask format: colon-separated list of comma-separated list of CPU IDs
+ or dash-separated ID ranges, with optional "all" as CPU value
+ Example: "0-2,5:all:1,5,6:2" -> [ [ 0,1,2,5 ], [ -1 ], [ 1, 5, 6 ], [ 2 ] ]
-
-def AddHostToEtcHosts(hostname, ip):
- """Wrapper around SetEtcHostsEntry.
-
- @type hostname: str
- @param hostname: a hostname that will be resolved and added to
- L{constants.ETC_HOSTS}
- @type ip: str
- @param ip: The ip address of the host
-
- """
- SetEtcHostsEntry(constants.ETC_HOSTS, ip, hostname, [hostname.split(".")[0]])
-
-
-def RemoveEtcHostsEntry(file_name, hostname):
- """Removes a hostname from /etc/hosts.
-
- IP addresses without names are removed from the file.
-
- @type file_name: str
- @param file_name: path to the file to modify (usually C{/etc/hosts})
- @type hostname: str
- @param hostname: the hostname to be removed
+ @type cpu_mask: str
+ @param cpu_mask: multiple CPU mask definition
+ @rtype: list of lists of int
+ @return: list of lists of CPU IDs
"""
- def _WriteEtcHosts(fd):
- # Duplicating file descriptor because os.fdopen's result will automatically
- # close the descriptor, but we would still like to have its functionality.
- out = os.fdopen(os.dup(fd), "w")
- try:
- for line in ReadFile(file_name).splitlines(True):
- fields = line.split()
- if len(fields) > 1 and not fields[0].startswith("#"):
- names = fields[1:]
- if hostname in names:
- while hostname in names:
- names.remove(hostname)
- if names:
- out.write("%s %s\n" % (fields[0], " ".join(names)))
- continue
-
- out.write(line)
-
- out.flush()
- finally:
- out.close()
-
- WriteFile(file_name, fn=_WriteEtcHosts, mode=0644)
-
-
-def RemoveHostFromEtcHosts(hostname):
- """Wrapper around RemoveEtcHostsEntry.
-
- @type hostname: str
- @param hostname: hostname that will be resolved and its
- full and shot name will be removed from
- L{constants.ETC_HOSTS}
+ if not cpu_mask:
+ return []
+ cpu_list = []
+ for range_def in cpu_mask.split(constants.CPU_PINNING_SEP):
+ if range_def == constants.CPU_PINNING_ALL:
+ cpu_list.append([constants.CPU_PINNING_ALL_VAL, ])
+ else:
+ # Uniquify and sort the list before adding
+ cpu_list.append(sorted(set(ParseCpuMask(range_def))))
- """
- RemoveEtcHostsEntry(constants.ETC_HOSTS, hostname)
- RemoveEtcHostsEntry(constants.ETC_HOSTS, hostname.split(".")[0])
+ return cpu_list
def GetHomeDir(user, default=None):
The user can be passed either as a string (denoting the name) or as
an integer (denoting the user id). If the user is not found, the
- 'default' argument is returned, which defaults to None.
+ C{default} argument is returned, which defaults to C{None}.
"""
try:
return result.pw_dir
-def NewUUID():
- """Returns a random UUID.
-
- @note: This is a Linux-specific method as it uses the /proc
- filesystem.
- @rtype: str
-
- """
- return ReadFile(_RANDOM_UUID_FILE, size=128).rstrip("\n")
-
-
def FirstFree(seq, base=0):
"""Returns the first non-existing integer from seq.
return result
-def CloseFDs(noclose_fds=None):
- """Close file descriptors.
-
- This closes all file descriptors above 2 (i.e. except
- stdin/out/err).
-
- @type noclose_fds: list or None
- @param noclose_fds: if given, it denotes a list of file descriptor
- that should not be closed
-
- """
- # Default maximum for the number of available file descriptors.
- if 'SC_OPEN_MAX' in os.sysconf_names:
- try:
- MAXFD = os.sysconf('SC_OPEN_MAX')
- if MAXFD < 0:
- MAXFD = 1024
- except OSError:
- MAXFD = 1024
- else:
- MAXFD = 1024
- maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
- if (maxfd == resource.RLIM_INFINITY):
- maxfd = MAXFD
-
- # Iterate through and close all file descriptors (except the standard ones)
- for fd in range(3, maxfd):
- if noclose_fds and fd in noclose_fds:
- continue
- CloseFdNoError(fd)
-
-
-def Daemonize(logfile):
- """Daemonize the current process.
-
- This detaches the current process from the controlling terminal and
- runs it in the background as a daemon.
-
- @type logfile: str
- @param logfile: the logfile to which we should redirect stdout/stderr
- @rtype: int
- @return: the value zero
-
- """
- # pylint: disable-msg=W0212
- # yes, we really want os._exit
-
- # TODO: do another attempt to merge Daemonize and StartDaemon, or at
- # least abstract the pipe functionality between them
-
- # Create pipe for sending error messages
- (rpipe, wpipe) = os.pipe()
-
- # this might fail
- pid = os.fork()
- if (pid == 0): # The first child.
- SetupDaemonEnv()
-
- # this might fail
- pid = os.fork() # Fork a second child.
- if (pid == 0): # The second child.
- CloseFdNoError(rpipe)
- else:
- # exit() or _exit()? See below.
- os._exit(0) # Exit parent (the first child) of the second child.
- else:
- CloseFdNoError(wpipe)
- # Wait for daemon to be started (or an error message to
- # arrive) and read up to 100 KB as an error message
- errormsg = RetryOnSignal(os.read, rpipe, 100 * 1024)
- if errormsg:
- sys.stderr.write("Error when starting daemon process: %r\n" % errormsg)
- rcode = 1
- else:
- rcode = 0
- os._exit(rcode) # Exit parent of the first child.
-
- SetupDaemonFDs(logfile, None)
- return wpipe
-
-
def EnsureDaemon(name):
"""Check for and start daemon if not alive.
"""
- result = RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
+ result = RunCmd([pathutils.DAEMON_UTIL, "check-and-start", name])
if result.failed:
logging.error("Can't start daemon '%s', failure %s, output: %s",
name, result.fail_reason, result.output)
"""Stop daemon
"""
- result = RunCmd([constants.DAEMON_UTIL, "stop", name])
+ result = RunCmd([pathutils.DAEMON_UTIL, "stop", name])
if result.failed:
logging.error("Can't stop daemon '%s', failure %s, output: %s",
name, result.fail_reason, result.output)
return True
-def KillProcess(pid, signal_=signal.SIGTERM, timeout=30,
- waitpid=False):
- """Kill a process given by its pid.
-
- @type pid: int
- @param pid: The PID to terminate.
- @type signal_: int
- @param signal_: The signal to send, by default SIGTERM
- @type timeout: int
- @param timeout: The timeout after which, if the process is still alive,
- a SIGKILL will be sent. If not positive, no such checking
- will be done
- @type waitpid: boolean
- @param waitpid: If true, we should waitpid on this process after
- sending signals, since it's our own child and otherwise it
- would remain as zombie
-
- """
- def _helper(pid, signal_, wait):
- """Simple helper to encapsulate the kill/waitpid sequence"""
- if IgnoreProcessNotFound(os.kill, pid, signal_) and wait:
- try:
- os.waitpid(pid, os.WNOHANG)
- except OSError:
- pass
-
- if pid <= 0:
- # kill with pid=0 == suicide
- raise errors.ProgrammerError("Invalid pid given '%s'" % pid)
-
- if not IsProcessAlive(pid):
- return
-
- _helper(pid, signal_, waitpid)
-
- if timeout <= 0:
- return
-
- def _CheckProcess():
- if not IsProcessAlive(pid):
- return
-
- try:
- (result_pid, _) = os.waitpid(pid, os.WNOHANG)
- except OSError:
- raise RetryAgain()
-
- if result_pid > 0:
- return
-
- raise RetryAgain()
-
- try:
- # Wait up to $timeout seconds
- Retry(_CheckProcess, (0.01, 1.5, 0.1), timeout)
- except RetryTimeout:
- pass
-
- if IsProcessAlive(pid):
- # Kill process if it's still alive
- _helper(pid, signal.SIGKILL, waitpid)
-
-
def CheckVolumeGroupSize(vglist, vgname, minsize):
"""Checks if the volume group list is valid.
return float(seconds) + (float(microseconds) * 0.000001)
-def _ParseAsn1Generalizedtime(value):
- """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
-
- @type value: string
- @param value: ASN1 GENERALIZEDTIME timestamp
- @return: Seconds since the Epoch (1970-01-01 00:00:00 UTC)
-
- """
- m = _ASN1_TIME_REGEX.match(value)
- if m:
- # We have an offset
- asn1time = m.group(1)
- hours = int(m.group(2))
- minutes = int(m.group(3))
- utcoffset = (60 * hours) + minutes
- else:
- if not value.endswith("Z"):
- raise ValueError("Missing timezone")
- asn1time = value[:-1]
- utcoffset = 0
-
- parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
-
- tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
-
- return calendar.timegm(tt.utctimetuple())
-
-
-def GetX509CertValidity(cert):
- """Returns the validity period of the certificate.
-
- @type cert: OpenSSL.crypto.X509
- @param cert: X509 certificate object
-
- """
- # The get_notBefore and get_notAfter functions are only supported in
- # pyOpenSSL 0.7 and above.
- try:
- get_notbefore_fn = cert.get_notBefore
- except AttributeError:
- not_before = None
- else:
- not_before_asn1 = get_notbefore_fn()
-
- if not_before_asn1 is None:
- not_before = None
- else:
- not_before = _ParseAsn1Generalizedtime(not_before_asn1)
-
- try:
- get_notafter_fn = cert.get_notAfter
- except AttributeError:
- not_after = None
- else:
- not_after_asn1 = get_notafter_fn()
-
- if not_after_asn1 is None:
- not_after = None
- else:
- not_after = _ParseAsn1Generalizedtime(not_after_asn1)
-
- return (not_before, not_after)
-
-
-def _VerifyCertificateInner(expired, not_before, not_after, now,
- warn_days, error_days):
- """Verifies certificate validity.
-
- @type expired: bool
- @param expired: Whether pyOpenSSL considers the certificate as expired
- @type not_before: number or None
- @param not_before: Unix timestamp before which certificate is not valid
- @type not_after: number or None
- @param not_after: Unix timestamp after which certificate is invalid
- @type now: number
- @param now: Current time as Unix timestamp
- @type warn_days: number or None
- @param warn_days: How many days before expiration a warning should be reported
- @type error_days: number or None
- @param error_days: How many days before expiration an error should be reported
-
- """
- if expired:
- msg = "Certificate is expired"
-
- if not_before is not None and not_after is not None:
- msg += (" (valid from %s to %s)" %
- (FormatTime(not_before), FormatTime(not_after)))
- elif not_before is not None:
- msg += " (valid from %s)" % FormatTime(not_before)
- elif not_after is not None:
- msg += " (valid until %s)" % FormatTime(not_after)
-
- return (CERT_ERROR, msg)
-
- elif not_before is not None and not_before > now:
- return (CERT_WARNING,
- "Certificate not yet valid (valid from %s)" %
- FormatTime(not_before))
-
- elif not_after is not None:
- remaining_days = int((not_after - now) / (24 * 3600))
-
- msg = "Certificate expires in about %d days" % remaining_days
-
- if error_days is not None and remaining_days <= error_days:
- return (CERT_ERROR, msg)
-
- if warn_days is not None and remaining_days <= warn_days:
- return (CERT_WARNING, msg)
-
- return (None, None)
-
-
-def VerifyX509Certificate(cert, warn_days, error_days):
- """Verifies a certificate for LUVerifyCluster.
-
- @type cert: OpenSSL.crypto.X509
- @param cert: X509 certificate object
- @type warn_days: number or None
- @param warn_days: How many days before expiration a warning should be reported
- @type error_days: number or None
- @param error_days: How many days before expiration an error should be reported
-
- """
- # Depending on the pyOpenSSL version, this can just return (None, None)
- (not_before, not_after) = GetX509CertValidity(cert)
-
- return _VerifyCertificateInner(cert.has_expired(), not_before, not_after,
- time.time(), warn_days, error_days)
-
-
-def SignX509Certificate(cert, key, salt):
- """Sign a X509 certificate.
-
- An RFC822-like signature header is added in front of the certificate.
-
- @type cert: OpenSSL.crypto.X509
- @param cert: X509 certificate object
- @type key: string
- @param key: Key for HMAC
- @type salt: string
- @param salt: Salt for HMAC
- @rtype: string
- @return: Serialized and signed certificate in PEM format
-
- """
- if not VALID_X509_SIGNATURE_SALT.match(salt):
- raise errors.GenericError("Invalid salt: %r" % salt)
-
- # Dumping as PEM here ensures the certificate is in a sane format
- cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
-
- return ("%s: %s/%s\n\n%s" %
- (constants.X509_CERT_SIGNATURE_HEADER, salt,
- Sha1Hmac(key, cert_pem, salt=salt),
- cert_pem))
-
-
-def _ExtractX509CertificateSignature(cert_pem):
- """Helper function to extract signature from X509 certificate.
-
- """
- # Extract signature from original PEM data
- for line in cert_pem.splitlines():
- if line.startswith("---"):
- break
-
- m = X509_SIGNATURE.match(line.strip())
- if m:
- return (m.group("salt"), m.group("sign"))
-
- raise errors.GenericError("X509 certificate signature is missing")
-
-
-def LoadSignedX509Certificate(cert_pem, key):
- """Verifies a signed X509 certificate.
-
- @type cert_pem: string
- @param cert_pem: Certificate in PEM format and with signature header
- @type key: string
- @param key: Key for HMAC
- @rtype: tuple; (OpenSSL.crypto.X509, string)
- @return: X509 certificate object and salt
-
- """
- (salt, signature) = _ExtractX509CertificateSignature(cert_pem)
-
- # Load certificate
- cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
-
- # Dump again to ensure it's in a sane format
- sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
-
- if not VerifySha1Hmac(key, sane_pem, signature, salt=salt):
- raise errors.GenericError("X509 certificate signature is invalid")
-
- return (cert, salt)
-
-
def FindMatch(data, name):
"""Tries to find an item in a dictionary matching a name.
return data
-def RunInSeparateProcess(fn, *args):
- """Runs a function in a separate process.
-
- Note: Only boolean return values are supported.
-
- @type fn: callable
- @param fn: Function to be called
- @rtype: bool
- @return: Function's result
-
- """
- pid = os.fork()
- if pid == 0:
- # Child process
- try:
- # In case the function uses temporary files
- ResetTempfileModule()
-
- # Call function
- result = int(bool(fn(*args)))
- assert result in (0, 1)
- except: # pylint: disable-msg=W0702
- logging.exception("Error while calling function in separate process")
- # 0 and 1 are reserved for the return value
- result = 33
-
- os._exit(result) # pylint: disable-msg=W0212
-
- # Parent process
-
- # Avoid zombies and check exit code
- (_, status) = os.waitpid(pid, 0)
-
- if os.WIFSIGNALED(status):
- exitcode = None
- signum = os.WTERMSIG(status)
- else:
- exitcode = os.WEXITSTATUS(status)
- signum = None
-
- if not (exitcode in (0, 1) and signum is None):
- raise errors.GenericError("Child program failed (code=%s, signal=%s)" %
- (exitcode, signum))
-
- return bool(exitcode)
-
-
-def GenerateSelfSignedX509Cert(common_name, validity):
- """Generates a self-signed X509 certificate.
-
- @type common_name: string
- @param common_name: commonName value
- @type validity: int
- @param validity: Validity for certificate in seconds
-
- """
- # Create private and public key
- key = OpenSSL.crypto.PKey()
- key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
-
- # Create self-signed certificate
- cert = OpenSSL.crypto.X509()
- if common_name:
- cert.get_subject().CN = common_name
- cert.set_serial_number(1)
- cert.gmtime_adj_notBefore(0)
- cert.gmtime_adj_notAfter(validity)
- cert.set_issuer(cert.get_subject())
- cert.set_pubkey(key)
- cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
-
- key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
- cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
-
- return (key_pem, cert_pem)
-
-
-def GenerateSelfSignedSslCert(filename, common_name=constants.X509_CERT_CN,
- validity=constants.X509_CERT_DEFAULT_VALIDITY):
- """Legacy function to generate self-signed X509 certificate.
-
- @type filename: str
- @param filename: path to write certificate to
- @type common_name: string
- @param common_name: commonName value
- @type validity: int
- @param validity: validity of certificate in number of days
-
- """
- # TODO: Investigate using the cluster name instead of X505_CERT_CN for
- # common_name, as cluster-renames are very seldom, and it'd be nice if RAPI
- # and node daemon certificates have the proper Subject/Issuer.
- (key_pem, cert_pem) = GenerateSelfSignedX509Cert(common_name,
- validity * 24 * 60 * 60)
-
- WriteFile(filename, mode=0400, data=key_pem + cert_pem)
-
-
def SignalHandled(signums):
"""Signal Handled decoration.
"""
def wrap(fn):
def sig_function(*args, **kwargs):
- assert 'signal_handlers' not in kwargs or \
- kwargs['signal_handlers'] is None or \
- isinstance(kwargs['signal_handlers'], dict), \
+ assert "signal_handlers" not in kwargs or \
+ kwargs["signal_handlers"] is None or \
+ isinstance(kwargs["signal_handlers"], dict), \
"Wrong signal_handlers parameter in original function call"
- if 'signal_handlers' in kwargs and kwargs['signal_handlers'] is not None:
- signal_handlers = kwargs['signal_handlers']
+ if "signal_handlers" in kwargs and kwargs["signal_handlers"] is not None:
+ signal_handlers = kwargs["signal_handlers"]
else:
signal_handlers = {}
- kwargs['signal_handlers'] = signal_handlers
+ kwargs["signal_handlers"] = signal_handlers
sighandler = SignalHandler(signums)
try:
for sig in signums:
return wrap
+def TimeoutExpired(epoch, timeout, _time_fn=time.time):
+ """Checks whether a timeout has expired.
+
+ """
+ return _time_fn() > (epoch + timeout)
+
+
class SignalWakeupFd(object):
try:
# This is only supported in Python 2.5 and above (some distributions
_set_wakeup_fd_fn = signal.set_wakeup_fd
except AttributeError:
# Not supported
- def _SetWakeupFd(self, _): # pylint: disable-msg=R0201
+ def _SetWakeupFd(self, _): # pylint: disable=R0201
return -1
else:
def _SetWakeupFd(self, fd):
"""
return [val for val in items if not self.Matches(val)]
-
-
-class RunningTimeout(object):
- """Class to calculate remaining timeout when doing several operations.
-
- """
- __slots__ = [
- "_allow_negative",
- "_start_time",
- "_time_fn",
- "_timeout",
- ]
-
- def __init__(self, timeout, allow_negative, _time_fn=time.time):
- """Initializes this class.
-
- @type timeout: float
- @param timeout: Timeout duration
- @type allow_negative: bool
- @param allow_negative: Whether to return values below zero
- @param _time_fn: Time function for unittests
-
- """
- object.__init__(self)
-
- if timeout is not None and timeout < 0.0:
- raise ValueError("Timeout must not be negative")
-
- self._timeout = timeout
- self._allow_negative = allow_negative
- self._time_fn = _time_fn
-
- self._start_time = None
-
- def Remaining(self):
- """Returns the remaining timeout.
-
- """
- if self._timeout is None:
- return None
-
- # Get start time on first calculation
- if self._start_time is None:
- self._start_time = self._time_fn()
-
- # Calculate remaining time
- remaining_timeout = self._start_time + self._timeout - self._time_fn()
-
- if not self._allow_negative:
- # Ensure timeout is always >= 0
- return max(0.0, remaining_timeout)
-
- return remaining_timeout