X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/8218713518c25baa5aeb97e7cbb0a78c1c292fa7..bb579a7dc676d7d0a3d24ee4f07883c74c726a9d:/lib/utils.py diff --git a/lib/utils.py b/lib/utils.py index 1abec0e..d0d07bc 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -41,7 +41,11 @@ import select import fcntl import resource import logging +import logging.handlers import signal +import datetime +import calendar +import collections from cStringIO import StringIO @@ -117,13 +121,13 @@ class RunResult(object): output = property(_GetOutput, None, None, "Return full output") -def RunCmd(cmd, env=None, output=None, cwd='/'): +def RunCmd(cmd, env=None, output=None, cwd='/', reset_env=False): """Execute a (shell) command. The command should not read from its standard input, as it will be closed. - @type cmd: string or list + @type cmd: string or list @param cmd: Command to run @type env: dict @param env: Additional environment @@ -134,6 +138,8 @@ def RunCmd(cmd, env=None, output=None, cwd='/'): @type cwd: string @param cwd: if specified, will be used as the working directory for the command; the default will be / + @type reset_env: boolean + @param reset_env: whether to reset or keep the default os environment @rtype: L{RunResult} @return: RunResult instance @raise errors.ProgrammerError: if we call this when forks are disabled @@ -151,8 +157,12 @@ def RunCmd(cmd, env=None, output=None, cwd='/'): shell = True logging.debug("RunCmd '%s'", strcmd) - cmd_env = os.environ.copy() - cmd_env["LC_ALL"] = "C" + if not reset_env: + cmd_env = os.environ.copy() + cmd_env["LC_ALL"] = "C" + else: + cmd_env = {} + if env is not None: cmd_env.update(env) @@ -281,6 +291,43 @@ def _RunCmdFile(cmd, env, via_shell, output, cwd): return status +def RunParts(dir_name, env=None, reset_env=False): + """Run Scripts or programs in a directory + + @type dir_name: string + @param dir_name: absolute path to a directory + @type env: dict + @param env: The environment to use + @type reset_env: boolean + @param reset_env: whether to reset or keep the default os environment + @rtype: list of tuples + @return: list of (name, (one of RUNDIR_STATUS), RunResult) + + """ + rr = [] + + try: + dir_contents = ListVisibleFiles(dir_name) + except OSError, err: + logging.warning("RunParts: skipping %s (cannot list: %s)", dir_name, err) + return rr + + for relname in sorted(dir_contents): + fname = PathJoin(dir_name, relname) + if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and + constants.EXT_PLUGIN_MASK.match(relname) is not None): + rr.append((relname, constants.RUNPARTS_SKIP, None)) + else: + try: + result = RunCmd([fname], env=env, reset_env=reset_env) + except Exception, err: # pylint: disable-msg=W0703 + rr.append((relname, constants.RUNPARTS_ERR, str(err))) + else: + rr.append((relname, constants.RUNPARTS_RUN, result)) + + return rr + + def RemoveFile(filename): """Remove a file ignoring some errors. @@ -319,20 +366,52 @@ def RenameFile(old, new, mkdir=False, mkdir_mode=0750): # as efficient. if mkdir and err.errno == errno.ENOENT: # Create directory and try again - dirname = os.path.dirname(new) - try: - os.makedirs(dirname, mode=mkdir_mode) - except OSError, err: - # Ignore EEXIST. This is only handled in os.makedirs as included in - # Python 2.5 and above. - if err.errno != errno.EEXIST or not os.path.exists(dirname): - raise + Makedirs(os.path.dirname(new), mode=mkdir_mode) return os.rename(old, new) raise +def Makedirs(path, mode=0750): + """Super-mkdir; create a leaf directory and all intermediate ones. + + This is a wrapper around C{os.makedirs} adding error handling not implemented + before Python 2.5. + + """ + try: + os.makedirs(path, mode) + except OSError, err: + # Ignore EEXIST. This is only handled in os.makedirs as included in + # Python 2.5 and above. + if err.errno != errno.EEXIST or not os.path.exists(path): + raise + + +def ResetTempfileModule(): + """Resets the random name generator of the tempfile module. + + This function should be called after C{os.fork} in the child process to + ensure it creates a newly seeded random generator. Otherwise it would + generate the same random parts as the parent process. If several processes + race for the creation of a temporary file, this could lead to one not getting + a temporary name. + + """ + # pylint: disable-msg=W0212 + if hasattr(tempfile, "_once_lock") and hasattr(tempfile, "_name_sequence"): + tempfile._once_lock.acquire() + try: + # Reset random name generator + tempfile._name_sequence = None + finally: + tempfile._once_lock.release() + else: + logging.critical("The tempfile module misses at least one of the" + " '_once_lock' and '_name_sequence' attributes") + + def _FingerprintFile(filename): """Compute the fingerprint of a file. @@ -491,7 +570,7 @@ def ReadPidFile(pidfile): try: pid = int(raw_data) - except ValueError, err: + except (TypeError, ValueError), err: logging.info("Can't parse pid file contents", exc_info=True) return 0 @@ -548,6 +627,8 @@ class HostInfo: """Class implementing resolver and hostname functionality """ + _VALID_NAME_RE = re.compile("^[a-z0-9._-]{1,255}$") + def __init__(self, name=None): """Initialize the host name object. @@ -598,6 +679,27 @@ class HostInfo: return result + @classmethod + def NormalizeName(cls, hostname): + """Validate and normalize the given hostname. + + @attention: the validation is a bit more relaxed than the standards + require; most importantly, we allow underscores in names + @raise errors.OpPrereqError: when the name is not valid + + """ + hostname = hostname.lower() + if (not cls._VALID_NAME_RE.match(hostname) or + # double-dots, meaning empty label + ".." in hostname or + # empty initial label + hostname.startswith(".")): + raise errors.OpPrereqError("Invalid hostname '%s'" % hostname, + errors.ECODE_INVAL) + if hostname.endswith("."): + hostname = hostname.rstrip(".") + return hostname + def GetHostInfo(name=None): """Lookup host name and raise an OpPrereqError for failures""" @@ -1027,6 +1129,16 @@ def RemoveHostFromEtcHosts(hostname): RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName()) +def TimestampForFilename(): + """Returns the current time formatted for filenames. + + The format doesn't contain colons as some shells and applications them as + separators. + + """ + return time.strftime("%Y-%m-%d_%H_%M_%S") + + def CreateBackup(file_name): """Creates a backup of a file. @@ -1041,7 +1153,8 @@ def CreateBackup(file_name): raise errors.ProgrammerError("Can't make a backup of a non-file '%s'" % file_name) - prefix = '%s.backup-%d.' % (os.path.basename(file_name), int(time.time())) + prefix = ("%s.backup-%s." % + (os.path.basename(file_name), TimestampForFilename())) dir_name = os.path.dirname(file_name) fsrc = open(file_name, 'rb') @@ -1049,6 +1162,7 @@ def CreateBackup(file_name): (fd, backup_name) = tempfile.mkstemp(prefix=prefix, dir=dir_name) fdst = os.fdopen(fd, 'wb') try: + logging.debug("Backing up %s at %s", file_name, backup_name) shutil.copyfileobj(fsrc, fdst) finally: fdst.close() @@ -1154,8 +1268,12 @@ def ListVisibleFiles(path): @param path: the directory to enumerate @rtype: list @return: the list of all files not starting with a dot + @raise ProgrammerError: if L{path} is not an absolue and normalized path """ + if not IsNormAbsPath(path): + raise errors.ProgrammerError("Path passed to ListVisibleFiles is not" + " absolute/normalized: '%s'" % path) files = [i for i in os.listdir(path) if not i.startswith(".")] files.sort() return files @@ -1383,6 +1501,103 @@ def any(seq, pred=bool): # pylint: disable-msg=W0622 return False +def SingleWaitForFdCondition(fdobj, event, timeout): + """Waits for a condition to occur on the socket. + + Immediately returns at the first interruption. + + @type fdobj: integer or object supporting a fileno() method + @param fdobj: entity to wait for events on + @type event: integer + @param event: ORed condition (see select module) + @type timeout: float or None + @param timeout: Timeout in seconds + @rtype: int or None + @return: None for timeout, otherwise occured conditions + + """ + check = (event | select.POLLPRI | + select.POLLNVAL | select.POLLHUP | select.POLLERR) + + if timeout is not None: + # Poller object expects milliseconds + timeout *= 1000 + + poller = select.poll() + poller.register(fdobj, event) + try: + # TODO: If the main thread receives a signal and we have no timeout, we + # could wait forever. This should check a global "quit" flag or something + # every so often. + io_events = poller.poll(timeout) + except select.error, err: + if err[0] != errno.EINTR: + raise + io_events = [] + if io_events and io_events[0][1] & check: + return io_events[0][1] + else: + return None + + +class FdConditionWaiterHelper(object): + """Retry helper for WaitForFdCondition. + + This class contains the retried and wait functions that make sure + WaitForFdCondition can continue waiting until the timeout is actually + expired. + + """ + + def __init__(self, timeout): + self.timeout = timeout + + def Poll(self, fdobj, event): + result = SingleWaitForFdCondition(fdobj, event, self.timeout) + if result is None: + raise RetryAgain() + else: + return result + + def UpdateTimeout(self, timeout): + self.timeout = timeout + + +def WaitForFdCondition(fdobj, event, timeout): + """Waits for a condition to occur on the socket. + + Retries until the timeout is expired, even if interrupted. + + @type fdobj: integer or object supporting a fileno() method + @param fdobj: entity to wait for events on + @type event: integer + @param event: ORed condition (see select module) + @type timeout: float or None + @param timeout: Timeout in seconds + @rtype: int or None + @return: None for timeout, otherwise occured conditions + + """ + if timeout is not None: + retrywaiter = FdConditionWaiterHelper(timeout) + try: + result = Retry(retrywaiter.Poll, RETRY_REMAINING_TIME, timeout, + args=(fdobj, event), wait_fn=retrywaiter.UpdateTimeout) + except RetryTimeout: + result = None + else: + result = None + while result is None: + result = SingleWaitForFdCondition(fdobj, event, timeout) + return result + + +def partition(seq, pred=bool): # # pylint: disable-msg=W0622 + "Partition a list in two, based on the given predicate" + return (list(itertools.ifilter(pred, seq)), + list(itertools.ifilterfalse(pred, seq))) + + def UniqueSequence(seq): """Returns a list with unique elements. @@ -1540,7 +1755,20 @@ def DaemonPidFileName(name): daemon name """ - return os.path.join(constants.RUN_GANETI_DIR, "%s.pid" % name) + return PathJoin(constants.RUN_GANETI_DIR, "%s.pid" % name) + + +def EnsureDaemon(name): + """Check for and start daemon if not alive. + + """ + result = RunCmd([constants.DAEMON_UTIL, "check-and-start", name]) + if result.failed: + logging.error("Can't start daemon '%s', failure %s, output: %s", + name, result.fail_reason, result.output) + return False + + return True def WritePidFile(name): @@ -1668,6 +1896,7 @@ def FindFile(name, search_path, test=os.path.exists): return None for dir_name in search_path: + # FIXME: investigate switch to PathJoin item_name = os.path.sep.join([dir_name, name]) # check the user test and that we're indeed resolving to the given # basename @@ -1761,14 +1990,14 @@ def GetDaemonPort(daemon_name): return port -def SetupLogging(logfile, debug=False, stderr_logging=False, program="", - multithreaded=False): +def SetupLogging(logfile, debug=0, stderr_logging=False, program="", + multithreaded=False, syslog=constants.SYSLOG_USAGE): """Configures the logging module. @type logfile: str @param logfile: the filename to which we should log - @type debug: boolean - @param debug: whether to enable debug messages too or + @type debug: integer + @param debug: if greater than zero, enable debug messages, otherwise only those at C{INFO} and above level @type stderr_logging: boolean @param stderr_logging: whether we should also log to the standard error @@ -1776,17 +2005,29 @@ def SetupLogging(logfile, debug=False, stderr_logging=False, program="", @param program: the name under which we should log messages @type multithreaded: boolean @param multithreaded: if True, will add the thread name to the log file + @type syslog: string + @param syslog: one of 'no', 'yes', 'only': + - if no, syslog is not used + - if yes, syslog is used (in addition to file-logging) + - if only, only syslog is used @raise EnvironmentError: if we can't open the log file and - stderr logging is disabled + syslog/stderr logging is disabled """ fmt = "%(asctime)s: " + program + " pid=%(process)d" + sft = program + "[%(process)d]:" if multithreaded: fmt += "/%(threadName)s" + sft += " (%(threadName)s)" if debug: fmt += " %(module)s:%(lineno)s" + # no debug info for syslog loggers fmt += " %(levelname)s %(message)s" + # yes, we do want the textual level, as remote syslog will probably + # lose the error level, and it's easier to grep for it + sft += " %(levelname)s %(message)s" formatter = logging.Formatter(fmt) + sys_fmt = logging.Formatter(sft) root_logger = logging.getLogger("") root_logger.setLevel(logging.NOTSET) @@ -1805,24 +2046,34 @@ def SetupLogging(logfile, debug=False, stderr_logging=False, program="", stderr_handler.setLevel(logging.CRITICAL) root_logger.addHandler(stderr_handler) - # this can fail, if the logging directories are not setup or we have - # a permisssion problem; in this case, it's best to log but ignore - # the error if stderr_logging is True, and if false we re-raise the - # exception since otherwise we could run but without any logs at all - try: - logfile_handler = logging.FileHandler(logfile) - logfile_handler.setFormatter(formatter) - if debug: - logfile_handler.setLevel(logging.DEBUG) - else: - logfile_handler.setLevel(logging.INFO) - root_logger.addHandler(logfile_handler) - except EnvironmentError: - if stderr_logging: - logging.exception("Failed to enable logging to file '%s'", logfile) - else: - # we need to re-raise the exception - raise + if syslog in (constants.SYSLOG_YES, constants.SYSLOG_ONLY): + facility = logging.handlers.SysLogHandler.LOG_DAEMON + syslog_handler = logging.handlers.SysLogHandler(constants.SYSLOG_SOCKET, + facility) + syslog_handler.setFormatter(sys_fmt) + # Never enable debug over syslog + syslog_handler.setLevel(logging.INFO) + root_logger.addHandler(syslog_handler) + + if syslog != constants.SYSLOG_ONLY: + # this can fail, if the logging directories are not setup or we have + # a permisssion problem; in this case, it's best to log but ignore + # the error if stderr_logging is True, and if false we re-raise the + # exception since otherwise we could run but without any logs at all + try: + logfile_handler = logging.FileHandler(logfile) + logfile_handler.setFormatter(formatter) + if debug: + logfile_handler.setLevel(logging.DEBUG) + else: + logfile_handler.setLevel(logging.INFO) + root_logger.addHandler(logfile_handler) + except EnvironmentError: + if stderr_logging or syslog == constants.SYSLOG_YES: + logging.exception("Failed to enable logging to file '%s'", logfile) + else: + # we need to re-raise the exception + raise def IsNormAbsPath(path): @@ -1834,6 +2085,36 @@ def IsNormAbsPath(path): return os.path.normpath(path) == path and os.path.isabs(path) +def PathJoin(*args): + """Safe-join a list of path components. + + Requirements: + - the first argument must be an absolute path + - no component in the path must have backtracking (e.g. /../), + since we check for normalization at the end + + @param args: the path components to be joined + @raise ValueError: for invalid paths + + """ + # ensure we're having at least one path passed in + assert args + # ensure the first component is an absolute and normalized path name + root = args[0] + if not IsNormAbsPath(root): + raise ValueError("Invalid parameter to PathJoin: '%s'" % str(args[0])) + result = os.path.join(*args) + # ensure that the whole path is normalized + if not IsNormAbsPath(result): + raise ValueError("Invalid parameters to PathJoin: '%s'" % str(args)) + # check that we're still under the original prefix + prefix = os.path.commonprefix([root, result]) + if prefix != root: + raise ValueError("Error: path joining resulted in different prefix" + " (%s != %s)" % (prefix, root)) + return result + + def TailFile(fname, lines=20): """Return the last lines from a file. @@ -1860,6 +2141,69 @@ def TailFile(fname, lines=20): return rows[-lines:] +def _ParseAsn1Generalizedtime(value): + """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL. + + @type value: string + @param value: ASN1 GENERALIZEDTIME timestamp + + """ + m = re.match(r"^(\d+)([-+]\d\d)(\d\d)$", value) + if m: + # We have an offset + asn1time = m.group(1) + hours = int(m.group(2)) + minutes = int(m.group(3)) + utcoffset = (60 * hours) + minutes + else: + if not value.endswith("Z"): + raise ValueError("Missing timezone") + asn1time = value[:-1] + utcoffset = 0 + + parsed = time.strptime(asn1time, "%Y%m%d%H%M%S") + + tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset) + + return calendar.timegm(tt.utctimetuple()) + + +def GetX509CertValidity(cert): + """Returns the validity period of the certificate. + + @type cert: OpenSSL.crypto.X509 + @param cert: X509 certificate object + + """ + # The get_notBefore and get_notAfter functions are only supported in + # pyOpenSSL 0.7 and above. + try: + get_notbefore_fn = cert.get_notBefore + except AttributeError: + not_before = None + else: + not_before_asn1 = get_notbefore_fn() + + if not_before_asn1 is None: + not_before = None + else: + not_before = _ParseAsn1Generalizedtime(not_before_asn1) + + try: + get_notafter_fn = cert.get_notAfter + except AttributeError: + not_after = None + else: + not_after_asn1 = get_notafter_fn() + + if not_after_asn1 is None: + not_after = None + else: + not_after = _ParseAsn1Generalizedtime(not_after_asn1) + + return (not_before, not_after) + + def SafeEncode(text): """Return a 'safe' version of a source string. @@ -1897,6 +2241,48 @@ def SafeEncode(text): return resu +def UnescapeAndSplit(text, sep=","): + """Split and unescape a string based on a given separator. + + This function splits a string based on a separator where the + separator itself can be escape in order to be an element of the + elements. The escaping rules are (assuming coma being the + separator): + - a plain , separates the elements + - a sequence \\\\, (double backslash plus comma) is handled as a + backslash plus a separator comma + - a sequence \, (backslash plus comma) is handled as a + non-separator comma + + @type text: string + @param text: the string to split + @type sep: string + @param text: the separator + @rtype: string + @return: a list of strings + + """ + # we split the list by sep (with no escaping at this stage) + slist = text.split(sep) + # next, we revisit the elements and if any of them ended with an odd + # number of backslashes, then we join it with the next + rlist = [] + while slist: + e1 = slist.pop(0) + if e1.endswith("\\"): + num_b = len(e1) - len(e1.rstrip("\\")) + if num_b % 2 == 1: + e2 = slist.pop(0) + # here the backslashes remain (all), and will be reduced in + # the next step + rlist.append(e1 + sep + e2) + continue + rlist.append(e1) + # finally, replace backslash-something with something + rlist = [re.sub(r"\\(.)", r"\1", v) for v in rlist] + return rlist + + def CommaJoin(names): """Nicely join a set of identifiers. @@ -1932,7 +2318,7 @@ def CalculateDirectorySize(path): for (curpath, _, files) in os.walk(path): for filename in files: - st = os.lstat(os.path.join(curpath, filename)) + st = os.lstat(PathJoin(curpath, filename)) size += st.st_size return BytesToMebibyte(size) @@ -1954,6 +2340,53 @@ def GetFilesystemStats(path): return (tsize, fsize) +def RunInSeparateProcess(fn, *args): + """Runs a function in a separate process. + + Note: Only boolean return values are supported. + + @type fn: callable + @param fn: Function to be called + @rtype: bool + @return: Function's result + + """ + pid = os.fork() + if pid == 0: + # Child process + try: + # In case the function uses temporary files + ResetTempfileModule() + + # Call function + result = int(bool(fn(*args))) + assert result in (0, 1) + except: # pylint: disable-msg=W0702 + logging.exception("Error while calling function in separate process") + # 0 and 1 are reserved for the return value + result = 33 + + os._exit(result) # pylint: disable-msg=W0212 + + # Parent process + + # Avoid zombies and check exit code + (_, status) = os.waitpid(pid, 0) + + if os.WIFSIGNALED(status): + exitcode = None + signum = os.WTERMSIG(status) + else: + exitcode = os.WEXITSTATUS(status) + signum = None + + if not (exitcode in (0, 1) and signum is None): + raise errors.GenericError("Child program failed (code=%s, signal=%s)" % + (exitcode, signum)) + + return bool(exitcode) + + def LockedMethod(fn): """Synchronized object access decorator. @@ -2107,7 +2540,7 @@ class _RetryDelayCalculator(object): # Update for next run if self._limit is None or self._next < self._limit: - self._next = max(self._limit, self._next * self._factor) + self._next = min(self._limit, self._next * self._factor) return current @@ -2177,6 +2610,9 @@ def Retry(fn, delay, timeout, args=None, wait_fn=time.sleep, return fn(*args) except RetryAgain: pass + except RetryTimeout: + raise errors.ProgrammerError("Nested retry loop detected that didn't" + " handle RetryTimeout") remaining_time = end_time - _time_fn() @@ -2197,17 +2633,31 @@ class FileLock(object): """Utility class for file locks. """ - def __init__(self, filename): + def __init__(self, fd, filename): """Constructor for FileLock. - This will open the file denoted by the I{filename} argument. - + @type fd: file + @param fd: File object @type filename: str - @param filename: path to the file to be locked + @param filename: Path of the file opened at I{fd} """ + self.fd = fd self.filename = filename - self.fd = open(self.filename, "w") + + @classmethod + def Open(cls, filename): + """Creates and opens a file to be used as a file-based lock. + + @type filename: string + @param filename: path to the file to be locked + + """ + # Using "os.open" is necessary to allow both opening existing file + # read/write and creating if not existing. Vanilla "open" will truncate an + # existing file -or- allow creating if not existing. + return cls(os.fdopen(os.open(filename, os.O_RDWR | os.O_CREAT), "w+"), + filename) def __del__(self): self.Close() @@ -2216,7 +2666,7 @@ class FileLock(object): """Close the file and release the lock. """ - if self.fd: + if hasattr(self, "fd") and self.fd: self.fd.close() self.fd = None @@ -2237,33 +2687,31 @@ class FileLock(object): assert self.fd, "Lock was closed" assert timeout is None or timeout >= 0, \ "If specified, timeout must be positive" + assert not (flag & fcntl.LOCK_NB), "LOCK_NB must not be set" - if timeout is not None: + # When a timeout is used, LOCK_NB must always be set + if not (timeout is None and blocking): flag |= fcntl.LOCK_NB - timeout_end = time.time() + timeout - # Blocking doesn't have effect with timeout - elif not blocking: - flag |= fcntl.LOCK_NB - timeout_end = None + if timeout is None: + self._Lock(self.fd, flag, timeout) + else: + try: + Retry(self._Lock, (0.1, 1.2, 1.0), timeout, + args=(self.fd, flag, timeout)) + except RetryTimeout: + raise errors.LockError(errmsg) - # TODO: Convert to utils.Retry + @staticmethod + def _Lock(fd, flag, timeout): + try: + fcntl.flock(fd, flag) + except IOError, err: + if timeout is not None and err.errno == errno.EAGAIN: + raise RetryAgain() - retry = True - while retry: - try: - fcntl.flock(self.fd, flag) - retry = False - except IOError, err: - if err.errno in (errno.EAGAIN, ): - if timeout_end is not None and time.time() < timeout_end: - # Wait before trying again - time.sleep(max(0.1, min(1.0, timeout))) - else: - raise errors.LockError(errmsg) - else: - logging.exception("fcntl.flock failed") - raise + logging.exception("fcntl.flock failed") + raise def Exclusive(self, blocking=False, timeout=None): """Locks the file in exclusive mode. @@ -2314,6 +2762,47 @@ class FileLock(object): "Failed to unlock %s" % self.filename) +class LineSplitter: + """Splits data chunks into lines separated by newline. + + Instances provide a file-like interface. + + """ + def __init__(self, line_fn, *args): + """Initializes this class. + + @type line_fn: callable + @param line_fn: Function called for each line, first parameter is line + @param args: Extra arguments for L{line_fn} + + """ + assert callable(line_fn) + + if args: + # Python 2.4 doesn't have functools.partial yet + self._line_fn = \ + lambda line: line_fn(line, *args) # pylint: disable-msg=W0142 + else: + self._line_fn = line_fn + + self._lines = collections.deque() + self._buffer = "" + + def write(self, data): + parts = (self._buffer + data).split("\n") + self._buffer = parts.pop() + self._lines.extend(parts) + + def flush(self): + while self._lines: + self._line_fn(self._lines.popleft().rstrip("\r\n")) + + def close(self): + self.flush() + if self._buffer: + self._line_fn(self._buffer) + + def SignalHandled(signums): """Signal Handled decoration.