X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/44bf25fffa15970b7ab8677f718026e603ed2543..bf4a90afe25a7357b9e544bf8d3ad0c22dfcf673:/lib/utils.py diff --git a/lib/utils.py b/lib/utils.py index 4c55dc9..8bd1000 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -166,6 +166,7 @@ def RunCmd(cmd, env=None, output=None, cwd='/'): return RunResult(exitcode, signal_, out, err, strcmd) + def _RunCmdPipe(cmd, env, via_shell, cwd): """Run a command and return its output. @@ -203,7 +204,18 @@ def _RunCmdPipe(cmd, env, via_shell, cwd): fcntl.fcntl(fd, fcntl.F_SETFL, status | os.O_NONBLOCK) while fdmap: - for fd, event in poller.poll(): + try: + pollresult = poller.poll() + except EnvironmentError, eerr: + if eerr.errno == errno.EINTR: + continue + raise + except select.error, serr: + if serr[0] == errno.EINTR: + continue + raise + + for fd, event in pollresult: if event & select.POLLIN or event & select.POLLPRI: data = fdmap[fd][1].read() # no data from read signifies EOF (the same as POLLHUP) @@ -274,6 +286,32 @@ def RemoveFile(filename): raise +def RenameFile(old, new, mkdir=False, mkdir_mode=0750): + """Renames a file. + + @type old: string + @param old: Original path + @type new: string + @param new: New path + @type mkdir: bool + @param mkdir: Whether to create target directory if it doesn't exist + @type mkdir_mode: int + @param mkdir_mode: Mode for newly created directories + + """ + try: + return os.rename(old, new) + except OSError, err: + # In at least one use case of this function, the job queue, directory + # creation is very rare. Checking for the directory before renaming is not + # as efficient. + if mkdir and err.errno == errno.ENOENT: + # Create directory and try again + os.makedirs(os.path.dirname(new), mkdir_mode) + return os.rename(old, new) + raise + + def _FingerprintFile(filename): """Compute the fingerprint of a file. @@ -349,6 +387,69 @@ def CheckDict(target, template, logname=None): logging.warning('%s missing keys %s', logname, ', '.join(missing)) +def ForceDictType(target, key_types, allowed_values=None): + """Force the values of a dict to have certain types. + + @type target: dict + @param target: the dict to update + @type key_types: dict + @param key_types: dict mapping target dict keys to types + in constants.ENFORCEABLE_TYPES + @type allowed_values: list + @keyword allowed_values: list of specially allowed values + + """ + if allowed_values is None: + allowed_values = [] + + for key in target: + if key not in key_types: + msg = "Unknown key '%s'" % key + raise errors.TypeEnforcementError(msg) + + if target[key] in allowed_values: + continue + + type = key_types[key] + if type not in constants.ENFORCEABLE_TYPES: + msg = "'%s' has non-enforceable type %s" % (key, type) + raise errors.ProgrammerError(msg) + + if type == constants.VTYPE_STRING: + if not isinstance(target[key], basestring): + if isinstance(target[key], bool) and not target[key]: + target[key] = '' + else: + msg = "'%s' (value %s) is not a valid string" % (key, target[key]) + raise errors.TypeEnforcementError(msg) + elif type == constants.VTYPE_BOOL: + if isinstance(target[key], basestring) and target[key]: + if target[key].lower() == constants.VALUE_FALSE: + target[key] = False + elif target[key].lower() == constants.VALUE_TRUE: + target[key] = True + else: + msg = "'%s' (value %s) is not a valid boolean" % (key, target[key]) + raise errors.TypeEnforcementError(msg) + elif target[key]: + target[key] = True + else: + target[key] = False + elif type == constants.VTYPE_SIZE: + try: + target[key] = ParseUnit(target[key]) + except errors.UnitParseError, err: + msg = "'%s' (value %s) is not a valid size. error: %s" % \ + (key, target[key], err) + raise errors.TypeEnforcementError(msg) + elif type == constants.VTYPE_INT: + try: + target[key] = int(target[key]) + except (ValueError, TypeError): + msg = "'%s' (value %s) is not a valid integer" % (key, target[key]) + raise errors.TypeEnforcementError(msg) + + def IsProcessAlive(pid): """Check if a given pid exists on the system. @@ -378,7 +479,7 @@ def ReadPidFile(pidfile): @type pidfile: string @param pidfile: path to the file containing the pid @rtype: int - @return: The process id, if the file exista and contains a valid PID, + @return: The process id, if the file exists and contains a valid PID, otherwise 0 """ @@ -638,23 +739,40 @@ def BuildShellCmd(template, *args): return template % args -def FormatUnit(value): +def FormatUnit(value, units): """Formats an incoming number of MiB with the appropriate unit. @type value: int @param value: integer representing the value in MiB (1048576) + @type units: char + @param units: the type of formatting we should do: + - 'h' for automatic scaling + - 'm' for MiBs + - 'g' for GiBs + - 't' for TiBs @rtype: str @return: the formatted value (with suffix) """ - if value < 1024: - return "%dM" % round(value, 0) + if units not in ('m', 'g', 't', 'h'): + raise errors.ProgrammerError("Invalid unit specified '%s'" % str(units)) - elif value < (1024 * 1024): - return "%0.1fG" % round(float(value) / 1024, 1) + suffix = '' + + if units == 'm' or (units == 'h' and value < 1024): + if units == 'h': + suffix = 'M' + return "%d%s" % (round(value, 0), suffix) + + elif units == 'g' or (units == 'h' and value < (1024 * 1024)): + if units == 'h': + suffix = 'G' + return "%0.1f%s" % (round(float(value) / 1024, 1), suffix) else: - return "%0.1fT" % round(float(value) / 1024 / 1024, 1) + if units == 'h': + suffix = 'T' + return "%0.1f%s" % (round(float(value) / 1024 / 1024, 1), suffix) def ParseUnit(input_string): @@ -665,7 +783,7 @@ def ParseUnit(input_string): is always an int in MiB. """ - m = re.match('^([.\d]+)\s*([a-zA-Z]+)?$', input_string) + m = re.match('^([.\d]+)\s*([a-zA-Z]+)?$', str(input_string)) if not m: raise errors.UnitParseError("Invalid format") @@ -777,6 +895,7 @@ def SetEtcHostsEntry(file_name, ip, hostname, aliases): @param aliases: the list of aliases to add for the hostname """ + # FIXME: use WriteFile + fn rather than duplicating its efforts # Ensure aliases are unique aliases = UniqueSequence([hostname] + aliases)[1:] @@ -786,7 +905,6 @@ def SetEtcHostsEntry(file_name, ip, hostname, aliases): try: f = open(file_name, 'r') try: - written = False for line in f: fields = line.split() if fields and not fields[0].startswith('#') and ip == fields[0]: @@ -800,6 +918,7 @@ def SetEtcHostsEntry(file_name, ip, hostname, aliases): out.flush() os.fsync(out) + os.chmod(tmpname, 0644) os.rename(tmpname, file_name) finally: f.close() @@ -833,6 +952,7 @@ def RemoveEtcHostsEntry(file_name, hostname): @param hostname: the hostname to be removed """ + # FIXME: use WriteFile + fn rather than duplicating its efforts fd, tmpname = tempfile.mkstemp(dir=os.path.dirname(file_name)) try: out = os.fdopen(fd, 'w') @@ -854,6 +974,7 @@ def RemoveEtcHostsEntry(file_name, hostname): out.flush() os.fsync(out) + os.chmod(tmpname, 0644) os.rename(tmpname, file_name) finally: f.close() @@ -959,7 +1080,7 @@ def TcpPing(target, port, timeout=10, live_port_needed=False, source=None): """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sucess = False + success = False if source is not None: try: @@ -1264,23 +1385,37 @@ def TestDelay(duration): return True -def Daemonize(logfile, noclose_fds=None): - """Daemonize the current process. +def _CloseFDNoErr(fd, retries=5): + """Close a file descriptor ignoring errors. - This detaches the current process from the controlling terminal and - runs it in the background as a daemon. + @type fd: int + @param fd: the file descriptor + @type retries: int + @param retries: how many retries to make, in case we get any + other error than EBADF + + """ + try: + os.close(fd) + except OSError, err: + if err.errno != errno.EBADF: + if retries > 0: + _CloseFDNoErr(fd, retries - 1) + # else either it's closed already or we're out of retries, so we + # ignore this and go on + + +def CloseFDs(noclose_fds=None): + """Close file descriptors. + + This closes all file descriptors above 2 (i.e. except + stdin/out/err). - @type logfile: str - @param logfile: the logfile to which we should redirect stdout/stderr @type noclose_fds: list or None @param noclose_fds: if given, it denotes a list of file descriptor that should not be closed - @rtype: int - @returns: the value zero """ - UMASK = 077 - WORKDIR = "/" # Default maximum for the number of available file descriptors. if 'SC_OPEN_MAX' in os.sysconf_names: try: @@ -1291,6 +1426,31 @@ def Daemonize(logfile, noclose_fds=None): MAXFD = 1024 else: MAXFD = 1024 + maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + if (maxfd == resource.RLIM_INFINITY): + maxfd = MAXFD + + # Iterate through and close all file descriptors (except the standard ones) + for fd in range(3, maxfd): + if noclose_fds and fd in noclose_fds: + continue + _CloseFDNoErr(fd) + + +def Daemonize(logfile): + """Daemonize the current process. + + This detaches the current process from the controlling terminal and + runs it in the background as a daemon. + + @type logfile: str + @param logfile: the logfile to which we should redirect stdout/stderr + @rtype: int + @returns: the value zero + + """ + UMASK = 077 + WORKDIR = "/" # this might fail pid = os.fork() @@ -1306,22 +1466,15 @@ def Daemonize(logfile, noclose_fds=None): os._exit(0) # Exit parent (the first child) of the second child. else: os._exit(0) # Exit parent of the first child. - maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] - if (maxfd == resource.RLIM_INFINITY): - maxfd = MAXFD - # Iterate through and close all file descriptors. - for fd in range(0, maxfd): - if noclose_fds and fd in noclose_fds: - continue - try: - os.close(fd) - except OSError: # ERROR, fd wasn't open to begin with (ignored) - pass - os.open(logfile, os.O_RDWR|os.O_CREAT|os.O_APPEND, 0600) - # Duplicate standard input to standard output and standard error. - os.dup2(0, 1) # standard output (1) - os.dup2(0, 2) # standard error (2) + for fd in range(3): + _CloseFDNoErr(fd) + i = os.open("/dev/null", os.O_RDONLY) # stdin + assert i == 0, "Can't close/reopen stdin" + i = os.open(logfile, os.O_WRONLY|os.O_CREAT|os.O_APPEND, 0600) # stdout + assert i == 1, "Can't close/reopen stdout" + # Duplicate standard output to standard error. + os.dup2(1, 2) return 0 @@ -1411,11 +1564,25 @@ def KillProcess(pid, signal_=signal.SIGTERM, timeout=30, _helper(pid, signal_, waitpid) if timeout <= 0: return + + # Wait up to $timeout seconds end = time.time() + timeout + wait = 0.01 while time.time() < end and IsProcessAlive(pid): - time.sleep(0.1) + try: + (result_pid, _) = os.waitpid(pid, os.WNOHANG) + if result_pid > 0: + break + except OSError: + pass + time.sleep(wait) + # Make wait time longer for next try + if wait < 0.1: + wait *= 1.5 + if IsProcessAlive(pid): - _helper(pid, signal.SIGKILL, wait) + # Kill process if it's still alive + _helper(pid, signal.SIGKILL, waitpid) def FindFile(name, search_path, test=os.path.exists): @@ -1522,16 +1689,8 @@ def GetNodeDaemonPort(): return port -def GetNodeDaemonPassword(): - """Get the node password for the cluster. - - @rtype: str - - """ - return ReadFile(constants.CLUSTER_PASSWORD_FILE) - - -def SetupLogging(logfile, debug=False, stderr_logging=False, program=""): +def SetupLogging(logfile, debug=False, stderr_logging=False, program="", + multithreaded=False): """Configures the logging module. @type logfile: str @@ -1543,21 +1702,28 @@ def SetupLogging(logfile, debug=False, stderr_logging=False, program=""): @param stderr_logging: whether we should also log to the standard error @type program: str @param program: the name under which we should log messages + @type multithreaded: boolean + @param multithreaded: if True, will add the thread name to the log file @raise EnvironmentError: if we can't open the log file and stderr logging is disabled """ - fmt = "%(asctime)s: " + program + " " + fmt = "%(asctime)s: " + program + " pid=%(process)d" + if multithreaded: + fmt += "/%(threadName)s" if debug: - fmt += ("pid=%(process)d/%(threadName)s %(levelname)s" - " %(module)s:%(lineno)s %(message)s") - else: - fmt += "pid=%(process)d %(levelname)s %(message)s" + fmt += " %(module)s:%(lineno)s" + fmt += " %(levelname)s %(message)s" formatter = logging.Formatter(fmt) root_logger = logging.getLogger("") root_logger.setLevel(logging.NOTSET) + # Remove all previously setup handlers + for handler in root_logger.handlers: + handler.close() + root_logger.removeHandler(handler) + if stderr_logging: stderr_handler = logging.StreamHandler() stderr_handler.setFormatter(formatter) @@ -1579,7 +1745,7 @@ def SetupLogging(logfile, debug=False, stderr_logging=False, program=""): else: logfile_handler.setLevel(logging.INFO) root_logger.addHandler(logfile_handler) - except EnvironmentError, err: + except EnvironmentError: if stderr_logging: logging.exception("Failed to enable logging to file '%s'", logfile) else: @@ -1587,6 +1753,53 @@ def SetupLogging(logfile, debug=False, stderr_logging=False, program=""): raise +def TailFile(fname, lines=20): + """Return the last lines from a file. + + @note: this function will only read and parse the last 4KB of + the file; if the lines are very long, it could be that less + than the requested number of lines are returned + + @param fname: the file name + @type lines: int + @param lines: the (maximum) number of lines to return + + """ + fd = open(fname, "r") + try: + fd.seek(0, 2) + pos = fd.tell() + pos = max(0, pos-4096) + fd.seek(pos, 0) + raw_data = fd.read() + finally: + fd.close() + + rows = raw_data.splitlines() + return rows[-lines:] + + +def SafeEncode(text): + """Return a 'safe' version of a source string. + + This function mangles the input string and returns a version that + should be safe to disply/encode as ASCII. To this end, we first + convert it to ASCII using the 'backslashreplace' encoding which + should get rid of any non-ASCII chars, and then we again encode it + via 'string_escape' which converts '\n' into '\\n' so that log + messages remain one-line. + + @type text: str or unicode + @param text: input data + @rtype: str + @return: a safe version of text + + """ + text = text.encode('ascii', 'backslashreplace') + text = text.encode('string_escape') + return text + + def LockedMethod(fn): """Synchronized object access decorator. @@ -1823,3 +2036,45 @@ class SignalHandler(object): # This is not nice and not absolutely atomic, but it appears to be the only # solution in Python -- there are no atomic types. self.called = True + + +class FieldSet(object): + """A simple field set. + + Among the features are: + - checking if a string is among a list of static string or regex objects + - checking if a whole list of string matches + - returning the matching groups from a regex match + + Internally, all fields are held as regular expression objects. + + """ + def __init__(self, *items): + self.items = [re.compile("^%s$" % value) for value in items] + + def Extend(self, other_set): + """Extend the field set with the items from another one""" + self.items.extend(other_set.items) + + def Matches(self, field): + """Checks if a field matches the current set + + @type field: str + @param field: the string to match + @return: either False or a regular expression match object + + """ + for m in itertools.ifilter(None, (val.match(field) for val in self.items)): + return m + return False + + def NonMatching(self, items): + """Returns the list of fields not matching the current set + + @type items: list + @param items: the list of fields to check + @rtype: list + @return: list of non-matching fields + + """ + return [val for val in items if not self.Matches(val)]