utils_PYTHON = \
lib/utils/__init__.py \
lib/utils/algo.py \
- lib/utils/retry.py
+ lib/utils/retry.py \
+ lib/utils/text.py
docrst = \
doc/admin.rst \
test/ganeti.uidpool_unittest.py \
test/ganeti.utils.algo_unittest.py \
test/ganeti.utils.retry_unittest.py \
+ test/ganeti.utils.text_unittest.py \
test/ganeti.utils_mlockall_unittest.py \
test/ganeti.utils_unittest.py \
test/ganeti.workerpool_unittest.py \
import datetime
import calendar
import hmac
-import collections
from cStringIO import StringIO
from ganeti.utils.algo import * # pylint: disable-msg=W0401
from ganeti.utils.retry import * # pylint: disable-msg=W0401
+from ganeti.utils.text import * # pylint: disable-msg=W0401
_locksheld = []
-_re_shell_unquoted = re.compile('^[-.,=:/_+@A-Za-z0-9]+$')
debug_locks = False
_MCL_CURRENT = 1
_MCL_FUTURE = 2
-#: MAC checker regexp
-_MAC_CHECK = re.compile("^([0-9a-f]{2}:){5}[0-9a-f]{2}$", re.I)
-
(_TIMEOUT_NONE,
_TIMEOUT_TERM,
_TIMEOUT_KILL) = range(3)
#: Shell param checker regexp
_SHELLPARAM_REGEX = re.compile(r"^[-a-zA-Z0-9._+/:%@]+$")
-#: Unit checker regexp
-_PARSEUNIT_REGEX = re.compile(r"^([.\d]+)\s*([a-zA-Z]+)?$")
-
#: ASN1 time regexp
_ASN1_TIME_REGEX = re.compile(r"^(\d+)([-+]\d\d)(\d\d)$")
return None
-def MatchNameComponent(key, name_list, case_sensitive=True):
- """Try to match a name against a list.
-
- This function will try to match a name like test1 against a list
- like C{['test1.example.com', 'test2.example.com', ...]}. Against
- this list, I{'test1'} as well as I{'test1.example'} will match, but
- not I{'test1.ex'}. A multiple match will be considered as no match
- at all (e.g. I{'test1'} against C{['test1.example.com',
- 'test1.example.org']}), except when the key fully matches an entry
- (e.g. I{'test1'} against C{['test1', 'test1.example.com']}).
-
- @type key: str
- @param key: the name to be searched
- @type name_list: list
- @param name_list: the list of strings against which to search the key
- @type case_sensitive: boolean
- @param case_sensitive: whether to provide a case-sensitive match
-
- @rtype: None or str
- @return: None if there is no match I{or} if there are multiple matches,
- otherwise the element from the list which matches
-
- """
- if key in name_list:
- return key
-
- re_flags = 0
- if not case_sensitive:
- re_flags |= re.IGNORECASE
- key = key.upper()
- mo = re.compile("^%s(\..*)?$" % re.escape(key), re_flags)
- names_filtered = []
- string_matches = []
- for name in name_list:
- if mo.match(name) is not None:
- names_filtered.append(name)
- if not case_sensitive and key == name.upper():
- string_matches.append(name)
-
- if len(string_matches) == 1:
- return string_matches[0]
- if len(names_filtered) == 1:
- return names_filtered[0]
- return None
-
-
def ValidateServiceName(name):
"""Validate the given service name.
return template % args
-def FormatUnit(value, units):
- """Formats an incoming number of MiB with the appropriate unit.
-
- @type value: int
- @param value: integer representing the value in MiB (1048576)
- @type units: char
- @param units: the type of formatting we should do:
- - 'h' for automatic scaling
- - 'm' for MiBs
- - 'g' for GiBs
- - 't' for TiBs
- @rtype: str
- @return: the formatted value (with suffix)
-
- """
- if units not in ('m', 'g', 't', 'h'):
- raise errors.ProgrammerError("Invalid unit specified '%s'" % str(units))
-
- suffix = ''
-
- if units == 'm' or (units == 'h' and value < 1024):
- if units == 'h':
- suffix = 'M'
- return "%d%s" % (round(value, 0), suffix)
-
- elif units == 'g' or (units == 'h' and value < (1024 * 1024)):
- if units == 'h':
- suffix = 'G'
- return "%0.1f%s" % (round(float(value) / 1024, 1), suffix)
-
- else:
- if units == 'h':
- suffix = 'T'
- return "%0.1f%s" % (round(float(value) / 1024 / 1024, 1), suffix)
-
-
-def ParseUnit(input_string):
- """Tries to extract number and scale from the given string.
-
- Input must be in the format C{NUMBER+ [DOT NUMBER+] SPACE*
- [UNIT]}. If no unit is specified, it defaults to MiB. Return value
- is always an int in MiB.
-
- """
- m = _PARSEUNIT_REGEX.match(str(input_string))
- if not m:
- raise errors.UnitParseError("Invalid format")
-
- value = float(m.groups()[0])
-
- unit = m.groups()[1]
- if unit:
- lcunit = unit.lower()
- else:
- lcunit = 'm'
-
- if lcunit in ('m', 'mb', 'mib'):
- # Value already in MiB
- pass
-
- elif lcunit in ('g', 'gb', 'gib'):
- value *= 1024
-
- elif lcunit in ('t', 'tb', 'tib'):
- value *= 1024 * 1024
-
- else:
- raise errors.UnitParseError("Unknown unit: %s" % unit)
-
- # Make sure we round up
- if int(value) < value:
- value += 1
-
- # Round up to the next multiple of 4
- value = int(value)
- if value % 4:
- value += 4 - value % 4
-
- return value
-
-
def ParseCpuMask(cpu_mask):
"""Parse a CPU mask definition and return the list of CPU IDs.
return backup_name
-def ShellQuote(value):
- """Quotes shell argument according to POSIX.
-
- @type value: str
- @param value: the argument to be quoted
- @rtype: str
- @return: the quoted value
-
- """
- if _re_shell_unquoted.match(value):
- return value
- else:
- return "'%s'" % value.replace("'", "'\\''")
-
-
-def ShellQuoteArgs(args):
- """Quotes a list of shell arguments.
-
- @type args: list
- @param args: list of arguments to be quoted
- @rtype: str
- @return: the quoted arguments concatenated with spaces
-
- """
- return ' '.join([ShellQuote(i) for i in args])
-
-
-class ShellWriter:
- """Helper class to write scripts with indentation.
-
- """
- INDENT_STR = " "
-
- def __init__(self, fh):
- """Initializes this class.
-
- """
- self._fh = fh
- self._indent = 0
-
- def IncIndent(self):
- """Increase indentation level by 1.
-
- """
- self._indent += 1
-
- def DecIndent(self):
- """Decrease indentation level by 1.
-
- """
- assert self._indent > 0
- self._indent -= 1
-
- def Write(self, txt, *args):
- """Write line to output file.
-
- """
- assert self._indent >= 0
-
- self._fh.write(self._indent * self.INDENT_STR)
-
- if args:
- self._fh.write(txt % args)
- else:
- self._fh.write(txt)
-
- self._fh.write("\n")
-
-
def ListVisibleFiles(path):
"""Returns a list of visible files in a directory.
return ReadFile(_RANDOM_UUID_FILE, size=128).rstrip("\n")
-def GenerateSecret(numbytes=20):
- """Generates a random secret.
-
- This will generate a pseudo-random secret returning an hex string
- (so that it can be used where an ASCII string is needed).
-
- @param numbytes: the number of bytes which will be represented by the returned
- string (defaulting to 20, the length of a SHA1 hash)
- @rtype: str
- @return: an hex representation of the pseudo-random sequence
-
- """
- return os.urandom(numbytes).encode('hex')
-
-
def EnsureDirs(dirs):
"""Make required directories, if they don't exist.
return result
-def NormalizeAndValidateMac(mac):
- """Normalizes and check if a MAC address is valid.
-
- Checks whether the supplied MAC address is formally correct, only
- accepts colon separated format. Normalize it to all lower.
-
- @type mac: str
- @param mac: the MAC to be validated
- @rtype: str
- @return: returns the normalized and validated MAC.
-
- @raise errors.OpPrereqError: If the MAC isn't valid
-
- """
- if not _MAC_CHECK.match(mac):
- raise errors.OpPrereqError("Invalid MAC address specified: %s" %
- mac, errors.ECODE_INVAL)
-
- return mac.lower()
-
-
def TestDelay(duration):
"""Sleep for a fixed amount of time.
return digest.lower() == Sha1Hmac(key, text, salt=salt).lower()
-def SafeEncode(text):
- """Return a 'safe' version of a source string.
-
- This function mangles the input string and returns a version that
- should be safe to display/encode as ASCII. To this end, we first
- convert it to ASCII using the 'backslashreplace' encoding which
- should get rid of any non-ASCII chars, and then we process it
- through a loop copied from the string repr sources in the python; we
- don't use string_escape anymore since that escape single quotes and
- backslashes too, and that is too much; and that escaping is not
- stable, i.e. string_escape(string_escape(x)) != string_escape(x).
-
- @type text: str or unicode
- @param text: input data
- @rtype: str
- @return: a safe version of text
-
- """
- if isinstance(text, unicode):
- # only if unicode; if str already, we handle it below
- text = text.encode('ascii', 'backslashreplace')
- resu = ""
- for char in text:
- c = ord(char)
- if char == '\t':
- resu += r'\t'
- elif char == '\n':
- resu += r'\n'
- elif char == '\r':
- resu += r'\'r'
- elif c < 32 or c >= 127: # non-printable
- resu += "\\x%02x" % (c & 0xff)
- else:
- resu += char
- return resu
-
-
-def UnescapeAndSplit(text, sep=","):
- """Split and unescape a string based on a given separator.
-
- This function splits a string based on a separator where the
- separator itself can be escape in order to be an element of the
- elements. The escaping rules are (assuming coma being the
- separator):
- - a plain , separates the elements
- - a sequence \\\\, (double backslash plus comma) is handled as a
- backslash plus a separator comma
- - a sequence \, (backslash plus comma) is handled as a
- non-separator comma
-
- @type text: string
- @param text: the string to split
- @type sep: string
- @param text: the separator
- @rtype: string
- @return: a list of strings
-
- """
- # we split the list by sep (with no escaping at this stage)
- slist = text.split(sep)
- # next, we revisit the elements and if any of them ended with an odd
- # number of backslashes, then we join it with the next
- rlist = []
- while slist:
- e1 = slist.pop(0)
- if e1.endswith("\\"):
- num_b = len(e1) - len(e1.rstrip("\\"))
- if num_b % 2 == 1:
- e2 = slist.pop(0)
- # here the backslashes remain (all), and will be reduced in
- # the next step
- rlist.append(e1 + sep + e2)
- continue
- rlist.append(e1)
- # finally, replace backslash-something with something
- rlist = [re.sub(r"\\(.)", r"\1", v) for v in rlist]
- return rlist
-
-
-def CommaJoin(names):
- """Nicely join a set of identifiers.
-
- @param names: set, list or tuple
- @return: a string with the formatted results
-
- """
- return ", ".join([str(val) for val in names])
-
-
def FindMatch(data, name):
"""Tries to find an item in a dictionary matching a name.
raise
-def FormatTime(val):
- """Formats a time value.
-
- @type val: float or None
- @param val: Timestamp as returned by time.time() (seconds since Epoch,
- 1970-01-01 00:00:00 UTC)
- @return: a string value or N/A if we don't have a valid timestamp
-
- """
- if val is None or not isinstance(val, (int, float)):
- return "N/A"
- # these two codes works on Linux, but they are not guaranteed on all
- # platforms
- return time.strftime("%F %T", time.localtime(val))
-
-
-def FormatSeconds(secs):
- """Formats seconds for easier reading.
-
- @type secs: number
- @param secs: Number of seconds
- @rtype: string
- @return: Formatted seconds (e.g. "2d 9h 19m 49s")
-
- """
- parts = []
-
- secs = round(secs, 0)
-
- if secs > 0:
- # Negative values would be a bit tricky
- for unit, one in [("d", 24 * 60 * 60), ("h", 60 * 60), ("m", 60)]:
- (complete, secs) = divmod(secs, one)
- if complete or parts:
- parts.append("%d%s" % (complete, unit))
-
- parts.append("%ds" % secs)
-
- return " ".join(parts)
-
-
def ReadWatcherPauseFile(filename, now=None, remove_after=3600):
"""Reads the watcher pause file.
"Failed to unlock %s" % self.filename)
-class LineSplitter:
- """Splits data chunks into lines separated by newline.
-
- Instances provide a file-like interface.
-
- """
- def __init__(self, line_fn, *args):
- """Initializes this class.
-
- @type line_fn: callable
- @param line_fn: Function called for each line, first parameter is line
- @param args: Extra arguments for L{line_fn}
-
- """
- assert callable(line_fn)
-
- if args:
- # Python 2.4 doesn't have functools.partial yet
- self._line_fn = \
- lambda line: line_fn(line, *args) # pylint: disable-msg=W0142
- else:
- self._line_fn = line_fn
-
- self._lines = collections.deque()
- self._buffer = ""
-
- def write(self, data):
- parts = (self._buffer + data).split("\n")
- self._buffer = parts.pop()
- self._lines.extend(parts)
-
- def flush(self):
- while self._lines:
- self._line_fn(self._lines.popleft().rstrip("\r\n"))
-
- def close(self):
- self.flush()
- if self._buffer:
- self._line_fn(self._buffer)
-
-
def SignalHandled(signums):
"""Signal Handled decoration.
--- /dev/null
+#
+#
+
+# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+"""Utility functions for manipulating or working with text.
+
+"""
+
+
+import re
+import os
+import time
+import collections
+
+from ganeti import errors
+
+
+#: Unit checker regexp
+_PARSEUNIT_REGEX = re.compile(r"^([.\d]+)\s*([a-zA-Z]+)?$")
+
+#: Characters which don't need to be quoted for shell commands
+_SHELL_UNQUOTED_RE = re.compile('^[-.,=:/_+@A-Za-z0-9]+$')
+
+#: MAC checker regexp
+_MAC_CHECK_RE = re.compile("^([0-9a-f]{2}:){5}[0-9a-f]{2}$", re.I)
+
+
+def MatchNameComponent(key, name_list, case_sensitive=True):
+ """Try to match a name against a list.
+
+ This function will try to match a name like test1 against a list
+ like C{['test1.example.com', 'test2.example.com', ...]}. Against
+ this list, I{'test1'} as well as I{'test1.example'} will match, but
+ not I{'test1.ex'}. A multiple match will be considered as no match
+ at all (e.g. I{'test1'} against C{['test1.example.com',
+ 'test1.example.org']}), except when the key fully matches an entry
+ (e.g. I{'test1'} against C{['test1', 'test1.example.com']}).
+
+ @type key: str
+ @param key: the name to be searched
+ @type name_list: list
+ @param name_list: the list of strings against which to search the key
+ @type case_sensitive: boolean
+ @param case_sensitive: whether to provide a case-sensitive match
+
+ @rtype: None or str
+ @return: None if there is no match I{or} if there are multiple matches,
+ otherwise the element from the list which matches
+
+ """
+ if key in name_list:
+ return key
+
+ re_flags = 0
+ if not case_sensitive:
+ re_flags |= re.IGNORECASE
+ key = key.upper()
+ mo = re.compile("^%s(\..*)?$" % re.escape(key), re_flags)
+ names_filtered = []
+ string_matches = []
+ for name in name_list:
+ if mo.match(name) is not None:
+ names_filtered.append(name)
+ if not case_sensitive and key == name.upper():
+ string_matches.append(name)
+
+ if len(string_matches) == 1:
+ return string_matches[0]
+ if len(names_filtered) == 1:
+ return names_filtered[0]
+ return None
+
+
+def FormatUnit(value, units):
+ """Formats an incoming number of MiB with the appropriate unit.
+
+ @type value: int
+ @param value: integer representing the value in MiB (1048576)
+ @type units: char
+ @param units: the type of formatting we should do:
+ - 'h' for automatic scaling
+ - 'm' for MiBs
+ - 'g' for GiBs
+ - 't' for TiBs
+ @rtype: str
+ @return: the formatted value (with suffix)
+
+ """
+ if units not in ('m', 'g', 't', 'h'):
+ raise errors.ProgrammerError("Invalid unit specified '%s'" % str(units))
+
+ suffix = ''
+
+ if units == 'm' or (units == 'h' and value < 1024):
+ if units == 'h':
+ suffix = 'M'
+ return "%d%s" % (round(value, 0), suffix)
+
+ elif units == 'g' or (units == 'h' and value < (1024 * 1024)):
+ if units == 'h':
+ suffix = 'G'
+ return "%0.1f%s" % (round(float(value) / 1024, 1), suffix)
+
+ else:
+ if units == 'h':
+ suffix = 'T'
+ return "%0.1f%s" % (round(float(value) / 1024 / 1024, 1), suffix)
+
+
+def ParseUnit(input_string):
+ """Tries to extract number and scale from the given string.
+
+ Input must be in the format C{NUMBER+ [DOT NUMBER+] SPACE*
+ [UNIT]}. If no unit is specified, it defaults to MiB. Return value
+ is always an int in MiB.
+
+ """
+ m = _PARSEUNIT_REGEX.match(str(input_string))
+ if not m:
+ raise errors.UnitParseError("Invalid format")
+
+ value = float(m.groups()[0])
+
+ unit = m.groups()[1]
+ if unit:
+ lcunit = unit.lower()
+ else:
+ lcunit = 'm'
+
+ if lcunit in ('m', 'mb', 'mib'):
+ # Value already in MiB
+ pass
+
+ elif lcunit in ('g', 'gb', 'gib'):
+ value *= 1024
+
+ elif lcunit in ('t', 'tb', 'tib'):
+ value *= 1024 * 1024
+
+ else:
+ raise errors.UnitParseError("Unknown unit: %s" % unit)
+
+ # Make sure we round up
+ if int(value) < value:
+ value += 1
+
+ # Round up to the next multiple of 4
+ value = int(value)
+ if value % 4:
+ value += 4 - value % 4
+
+ return value
+
+
+def ShellQuote(value):
+ """Quotes shell argument according to POSIX.
+
+ @type value: str
+ @param value: the argument to be quoted
+ @rtype: str
+ @return: the quoted value
+
+ """
+ if _SHELL_UNQUOTED_RE.match(value):
+ return value
+ else:
+ return "'%s'" % value.replace("'", "'\\''")
+
+
+def ShellQuoteArgs(args):
+ """Quotes a list of shell arguments.
+
+ @type args: list
+ @param args: list of arguments to be quoted
+ @rtype: str
+ @return: the quoted arguments concatenated with spaces
+
+ """
+ return " ".join([ShellQuote(i) for i in args])
+
+
+class ShellWriter:
+ """Helper class to write scripts with indentation.
+
+ """
+ INDENT_STR = " "
+
+ def __init__(self, fh):
+ """Initializes this class.
+
+ """
+ self._fh = fh
+ self._indent = 0
+
+ def IncIndent(self):
+ """Increase indentation level by 1.
+
+ """
+ self._indent += 1
+
+ def DecIndent(self):
+ """Decrease indentation level by 1.
+
+ """
+ assert self._indent > 0
+ self._indent -= 1
+
+ def Write(self, txt, *args):
+ """Write line to output file.
+
+ """
+ assert self._indent >= 0
+
+ self._fh.write(self._indent * self.INDENT_STR)
+
+ if args:
+ self._fh.write(txt % args)
+ else:
+ self._fh.write(txt)
+
+ self._fh.write("\n")
+
+
+def GenerateSecret(numbytes=20):
+ """Generates a random secret.
+
+ This will generate a pseudo-random secret returning an hex string
+ (so that it can be used where an ASCII string is needed).
+
+ @param numbytes: the number of bytes which will be represented by the returned
+ string (defaulting to 20, the length of a SHA1 hash)
+ @rtype: str
+ @return: an hex representation of the pseudo-random sequence
+
+ """
+ return os.urandom(numbytes).encode("hex")
+
+
+def NormalizeAndValidateMac(mac):
+ """Normalizes and check if a MAC address is valid.
+
+ Checks whether the supplied MAC address is formally correct, only
+ accepts colon separated format. Normalize it to all lower.
+
+ @type mac: str
+ @param mac: the MAC to be validated
+ @rtype: str
+ @return: returns the normalized and validated MAC.
+
+ @raise errors.OpPrereqError: If the MAC isn't valid
+
+ """
+ if not _MAC_CHECK_RE.match(mac):
+ raise errors.OpPrereqError("Invalid MAC address '%s'" % mac,
+ errors.ECODE_INVAL)
+
+ return mac.lower()
+
+
+def SafeEncode(text):
+ """Return a 'safe' version of a source string.
+
+ This function mangles the input string and returns a version that
+ should be safe to display/encode as ASCII. To this end, we first
+ convert it to ASCII using the 'backslashreplace' encoding which
+ should get rid of any non-ASCII chars, and then we process it
+ through a loop copied from the string repr sources in the python; we
+ don't use string_escape anymore since that escape single quotes and
+ backslashes too, and that is too much; and that escaping is not
+ stable, i.e. string_escape(string_escape(x)) != string_escape(x).
+
+ @type text: str or unicode
+ @param text: input data
+ @rtype: str
+ @return: a safe version of text
+
+ """
+ if isinstance(text, unicode):
+ # only if unicode; if str already, we handle it below
+ text = text.encode('ascii', 'backslashreplace')
+ resu = ""
+ for char in text:
+ c = ord(char)
+ if char == '\t':
+ resu += r'\t'
+ elif char == '\n':
+ resu += r'\n'
+ elif char == '\r':
+ resu += r'\'r'
+ elif c < 32 or c >= 127: # non-printable
+ resu += "\\x%02x" % (c & 0xff)
+ else:
+ resu += char
+ return resu
+
+
+def UnescapeAndSplit(text, sep=","):
+ """Split and unescape a string based on a given separator.
+
+ This function splits a string based on a separator where the
+ separator itself can be escape in order to be an element of the
+ elements. The escaping rules are (assuming coma being the
+ separator):
+ - a plain , separates the elements
+ - a sequence \\\\, (double backslash plus comma) is handled as a
+ backslash plus a separator comma
+ - a sequence \, (backslash plus comma) is handled as a
+ non-separator comma
+
+ @type text: string
+ @param text: the string to split
+ @type sep: string
+ @param text: the separator
+ @rtype: string
+ @return: a list of strings
+
+ """
+ # we split the list by sep (with no escaping at this stage)
+ slist = text.split(sep)
+ # next, we revisit the elements and if any of them ended with an odd
+ # number of backslashes, then we join it with the next
+ rlist = []
+ while slist:
+ e1 = slist.pop(0)
+ if e1.endswith("\\"):
+ num_b = len(e1) - len(e1.rstrip("\\"))
+ if num_b % 2 == 1:
+ e2 = slist.pop(0)
+ # here the backslashes remain (all), and will be reduced in
+ # the next step
+ rlist.append(e1 + sep + e2)
+ continue
+ rlist.append(e1)
+ # finally, replace backslash-something with something
+ rlist = [re.sub(r"\\(.)", r"\1", v) for v in rlist]
+ return rlist
+
+
+def CommaJoin(names):
+ """Nicely join a set of identifiers.
+
+ @param names: set, list or tuple
+ @return: a string with the formatted results
+
+ """
+ return ", ".join([str(val) for val in names])
+
+
+def FormatTime(val):
+ """Formats a time value.
+
+ @type val: float or None
+ @param val: Timestamp as returned by time.time() (seconds since Epoch,
+ 1970-01-01 00:00:00 UTC)
+ @return: a string value or N/A if we don't have a valid timestamp
+
+ """
+ if val is None or not isinstance(val, (int, float)):
+ return "N/A"
+ # these two codes works on Linux, but they are not guaranteed on all
+ # platforms
+ return time.strftime("%F %T", time.localtime(val))
+
+
+def FormatSeconds(secs):
+ """Formats seconds for easier reading.
+
+ @type secs: number
+ @param secs: Number of seconds
+ @rtype: string
+ @return: Formatted seconds (e.g. "2d 9h 19m 49s")
+
+ """
+ parts = []
+
+ secs = round(secs, 0)
+
+ if secs > 0:
+ # Negative values would be a bit tricky
+ for unit, one in [("d", 24 * 60 * 60), ("h", 60 * 60), ("m", 60)]:
+ (complete, secs) = divmod(secs, one)
+ if complete or parts:
+ parts.append("%d%s" % (complete, unit))
+
+ parts.append("%ds" % secs)
+
+ return " ".join(parts)
+
+
+class LineSplitter:
+ """Splits data chunks into lines separated by newline.
+
+ Instances provide a file-like interface.
+
+ """
+ def __init__(self, line_fn, *args):
+ """Initializes this class.
+
+ @type line_fn: callable
+ @param line_fn: Function called for each line, first parameter is line
+ @param args: Extra arguments for L{line_fn}
+
+ """
+ assert callable(line_fn)
+
+ if args:
+ # Python 2.4 doesn't have functools.partial yet
+ self._line_fn = \
+ lambda line: line_fn(line, *args) # pylint: disable-msg=W0142
+ else:
+ self._line_fn = line_fn
+
+ self._lines = collections.deque()
+ self._buffer = ""
+
+ def write(self, data):
+ parts = (self._buffer + data).split("\n")
+ self._buffer = parts.pop()
+ self._lines.extend(parts)
+
+ def flush(self):
+ while self._lines:
+ self._line_fn(self._lines.popleft().rstrip("\r\n"))
+
+ def close(self):
+ self.flush()
+ if self._buffer:
+ self._line_fn(self._buffer)
--- /dev/null
+#!/usr/bin/python
+#
+
+# Copyright (C) 2011 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+
+"""Script for testing ganeti.utils.text"""
+
+import re
+import string
+import time
+import unittest
+import os
+
+from cStringIO import StringIO
+
+from ganeti import constants
+from ganeti import utils
+from ganeti import errors
+
+import testutils
+
+
+class TestMatchNameComponent(unittest.TestCase):
+ """Test case for the MatchNameComponent function"""
+
+ def testEmptyList(self):
+ """Test that there is no match against an empty list"""
+ self.failUnlessEqual(utils.MatchNameComponent("", []), None)
+ self.failUnlessEqual(utils.MatchNameComponent("test", []), None)
+
+ def testSingleMatch(self):
+ """Test that a single match is performed correctly"""
+ mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
+ for key in "test2", "test2.example", "test2.example.com":
+ self.failUnlessEqual(utils.MatchNameComponent(key, mlist), mlist[1])
+
+ def testMultipleMatches(self):
+ """Test that a multiple match is returned as None"""
+ mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
+ for key in "test1", "test1.example":
+ self.failUnlessEqual(utils.MatchNameComponent(key, mlist), None)
+
+ def testFullMatch(self):
+ """Test that a full match is returned correctly"""
+ key1 = "test1"
+ key2 = "test1.example"
+ mlist = [key2, key2 + ".com"]
+ self.failUnlessEqual(utils.MatchNameComponent(key1, mlist), None)
+ self.failUnlessEqual(utils.MatchNameComponent(key2, mlist), key2)
+
+ def testCaseInsensitivePartialMatch(self):
+ """Test for the case_insensitive keyword"""
+ mlist = ["test1.example.com", "test2.example.net"]
+ self.assertEqual(utils.MatchNameComponent("test2", mlist,
+ case_sensitive=False),
+ "test2.example.net")
+ self.assertEqual(utils.MatchNameComponent("Test2", mlist,
+ case_sensitive=False),
+ "test2.example.net")
+ self.assertEqual(utils.MatchNameComponent("teSt2", mlist,
+ case_sensitive=False),
+ "test2.example.net")
+ self.assertEqual(utils.MatchNameComponent("TeSt2", mlist,
+ case_sensitive=False),
+ "test2.example.net")
+
+ def testCaseInsensitiveFullMatch(self):
+ mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
+
+ # Between the two ts1 a full string match non-case insensitive should work
+ self.assertEqual(utils.MatchNameComponent("Ts1", mlist,
+ case_sensitive=False),
+ None)
+ self.assertEqual(utils.MatchNameComponent("Ts1.ex", mlist,
+ case_sensitive=False),
+ "ts1.ex")
+ self.assertEqual(utils.MatchNameComponent("ts1.ex", mlist,
+ case_sensitive=False),
+ "ts1.ex")
+
+ # Between the two ts2 only case differs, so only case-match works
+ self.assertEqual(utils.MatchNameComponent("ts2.ex", mlist,
+ case_sensitive=False),
+ "ts2.ex")
+ self.assertEqual(utils.MatchNameComponent("Ts2.ex", mlist,
+ case_sensitive=False),
+ "Ts2.ex")
+ self.assertEqual(utils.MatchNameComponent("TS2.ex", mlist,
+ case_sensitive=False),
+ None)
+
+
+class TestFormatUnit(unittest.TestCase):
+ """Test case for the FormatUnit function"""
+
+ def testMiB(self):
+ self.assertEqual(utils.FormatUnit(1, "h"), "1M")
+ self.assertEqual(utils.FormatUnit(100, "h"), "100M")
+ self.assertEqual(utils.FormatUnit(1023, "h"), "1023M")
+
+ self.assertEqual(utils.FormatUnit(1, "m"), "1")
+ self.assertEqual(utils.FormatUnit(100, "m"), "100")
+ self.assertEqual(utils.FormatUnit(1023, "m"), "1023")
+
+ self.assertEqual(utils.FormatUnit(1024, "m"), "1024")
+ self.assertEqual(utils.FormatUnit(1536, "m"), "1536")
+ self.assertEqual(utils.FormatUnit(17133, "m"), "17133")
+ self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "m"), "1048575")
+
+ def testGiB(self):
+ self.assertEqual(utils.FormatUnit(1024, "h"), "1.0G")
+ self.assertEqual(utils.FormatUnit(1536, "h"), "1.5G")
+ self.assertEqual(utils.FormatUnit(17133, "h"), "16.7G")
+ self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "h"), "1024.0G")
+
+ self.assertEqual(utils.FormatUnit(1024, "g"), "1.0")
+ self.assertEqual(utils.FormatUnit(1536, "g"), "1.5")
+ self.assertEqual(utils.FormatUnit(17133, "g"), "16.7")
+ self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "g"), "1024.0")
+
+ self.assertEqual(utils.FormatUnit(1024 * 1024, "g"), "1024.0")
+ self.assertEqual(utils.FormatUnit(5120 * 1024, "g"), "5120.0")
+ self.assertEqual(utils.FormatUnit(29829 * 1024, "g"), "29829.0")
+
+ def testTiB(self):
+ self.assertEqual(utils.FormatUnit(1024 * 1024, "h"), "1.0T")
+ self.assertEqual(utils.FormatUnit(5120 * 1024, "h"), "5.0T")
+ self.assertEqual(utils.FormatUnit(29829 * 1024, "h"), "29.1T")
+
+ self.assertEqual(utils.FormatUnit(1024 * 1024, "t"), "1.0")
+ self.assertEqual(utils.FormatUnit(5120 * 1024, "t"), "5.0")
+ self.assertEqual(utils.FormatUnit(29829 * 1024, "t"), "29.1")
+
+ def testErrors(self):
+ self.assertRaises(errors.ProgrammerError, utils.FormatUnit, 1, "a")
+
+
+class TestParseUnit(unittest.TestCase):
+ """Test case for the ParseUnit function"""
+
+ SCALES = (("", 1),
+ ("M", 1), ("G", 1024), ("T", 1024 * 1024),
+ ("MB", 1), ("GB", 1024), ("TB", 1024 * 1024),
+ ("MiB", 1), ("GiB", 1024), ("TiB", 1024 * 1024))
+
+ def testRounding(self):
+ self.assertEqual(utils.ParseUnit("0"), 0)
+ self.assertEqual(utils.ParseUnit("1"), 4)
+ self.assertEqual(utils.ParseUnit("2"), 4)
+ self.assertEqual(utils.ParseUnit("3"), 4)
+
+ self.assertEqual(utils.ParseUnit("124"), 124)
+ self.assertEqual(utils.ParseUnit("125"), 128)
+ self.assertEqual(utils.ParseUnit("126"), 128)
+ self.assertEqual(utils.ParseUnit("127"), 128)
+ self.assertEqual(utils.ParseUnit("128"), 128)
+ self.assertEqual(utils.ParseUnit("129"), 132)
+ self.assertEqual(utils.ParseUnit("130"), 132)
+
+ def testFloating(self):
+ self.assertEqual(utils.ParseUnit("0"), 0)
+ self.assertEqual(utils.ParseUnit("0.5"), 4)
+ self.assertEqual(utils.ParseUnit("1.75"), 4)
+ self.assertEqual(utils.ParseUnit("1.99"), 4)
+ self.assertEqual(utils.ParseUnit("2.00"), 4)
+ self.assertEqual(utils.ParseUnit("2.01"), 4)
+ self.assertEqual(utils.ParseUnit("3.99"), 4)
+ self.assertEqual(utils.ParseUnit("4.00"), 4)
+ self.assertEqual(utils.ParseUnit("4.01"), 8)
+ self.assertEqual(utils.ParseUnit("1.5G"), 1536)
+ self.assertEqual(utils.ParseUnit("1.8G"), 1844)
+ self.assertEqual(utils.ParseUnit("8.28T"), 8682212)
+
+ def testSuffixes(self):
+ for sep in ("", " ", " ", "\t", "\t "):
+ for suffix, scale in self.SCALES:
+ for func in (lambda x: x, str.lower, str.upper):
+ self.assertEqual(utils.ParseUnit("1024" + sep + func(suffix)),
+ 1024 * scale)
+
+ def testInvalidInput(self):
+ for sep in ("-", "_", ",", "a"):
+ for suffix, _ in self.SCALES:
+ self.assertRaises(errors.UnitParseError, utils.ParseUnit,
+ "1" + sep + suffix)
+
+ for suffix, _ in self.SCALES:
+ self.assertRaises(errors.UnitParseError, utils.ParseUnit,
+ "1,3" + suffix)
+
+
+class TestShellQuoting(unittest.TestCase):
+ """Test case for shell quoting functions"""
+
+ def testShellQuote(self):
+ self.assertEqual(utils.ShellQuote('abc'), "abc")
+ self.assertEqual(utils.ShellQuote('ab"c'), "'ab\"c'")
+ self.assertEqual(utils.ShellQuote("a'bc"), "'a'\\''bc'")
+ self.assertEqual(utils.ShellQuote("a b c"), "'a b c'")
+ self.assertEqual(utils.ShellQuote("a b\\ c"), "'a b\\ c'")
+
+ def testShellQuoteArgs(self):
+ self.assertEqual(utils.ShellQuoteArgs(['a', 'b', 'c']), "a b c")
+ self.assertEqual(utils.ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
+ self.assertEqual(utils.ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
+
+
+class TestShellWriter(unittest.TestCase):
+ def test(self):
+ buf = StringIO()
+ sw = utils.ShellWriter(buf)
+ sw.Write("#!/bin/bash")
+ sw.Write("if true; then")
+ sw.IncIndent()
+ try:
+ sw.Write("echo true")
+
+ sw.Write("for i in 1 2 3")
+ sw.Write("do")
+ sw.IncIndent()
+ try:
+ self.assertEqual(sw._indent, 2)
+ sw.Write("date")
+ finally:
+ sw.DecIndent()
+ sw.Write("done")
+ finally:
+ sw.DecIndent()
+ sw.Write("echo %s", utils.ShellQuote("Hello World"))
+ sw.Write("exit 0")
+
+ self.assertEqual(sw._indent, 0)
+
+ output = buf.getvalue()
+
+ self.assert_(output.endswith("\n"))
+
+ lines = output.splitlines()
+ self.assertEqual(len(lines), 9)
+ self.assertEqual(lines[0], "#!/bin/bash")
+ self.assert_(re.match(r"^\s+date$", lines[5]))
+ self.assertEqual(lines[7], "echo 'Hello World'")
+
+ def testEmpty(self):
+ buf = StringIO()
+ sw = utils.ShellWriter(buf)
+ sw = None
+ self.assertEqual(buf.getvalue(), "")
+
+
+class TestNormalizeAndValidateMac(unittest.TestCase):
+ def testInvalid(self):
+ self.assertRaises(errors.OpPrereqError,
+ utils.NormalizeAndValidateMac, "xxx")
+
+ def testNormalization(self):
+ for mac in ["aa:bb:cc:dd:ee:ff", "00:AA:11:bB:22:cc"]:
+ self.assertEqual(utils.NormalizeAndValidateMac(mac), mac.lower())
+
+
+class TestSafeEncode(unittest.TestCase):
+ """Test case for SafeEncode"""
+
+ def testAscii(self):
+ for txt in [string.digits, string.letters, string.punctuation]:
+ self.failUnlessEqual(txt, utils.SafeEncode(txt))
+
+ def testDoubleEncode(self):
+ for i in range(255):
+ txt = utils.SafeEncode(chr(i))
+ self.failUnlessEqual(txt, utils.SafeEncode(txt))
+
+ def testUnicode(self):
+ # 1024 is high enough to catch non-direct ASCII mappings
+ for i in range(1024):
+ txt = utils.SafeEncode(unichr(i))
+ self.failUnlessEqual(txt, utils.SafeEncode(txt))
+
+
+class TestUnescapeAndSplit(unittest.TestCase):
+ """Testing case for UnescapeAndSplit"""
+
+ def setUp(self):
+ # testing more that one separator for regexp safety
+ self._seps = [",", "+", "."]
+
+ def testSimple(self):
+ a = ["a", "b", "c", "d"]
+ for sep in self._seps:
+ self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), a)
+
+ def testEscape(self):
+ for sep in self._seps:
+ a = ["a", "b\\" + sep + "c", "d"]
+ b = ["a", "b" + sep + "c", "d"]
+ self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
+
+ def testDoubleEscape(self):
+ for sep in self._seps:
+ a = ["a", "b\\\\", "c", "d"]
+ b = ["a", "b\\", "c", "d"]
+ self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
+
+ def testThreeEscape(self):
+ for sep in self._seps:
+ a = ["a", "b\\\\\\" + sep + "c", "d"]
+ b = ["a", "b\\" + sep + "c", "d"]
+ self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
+
+
+class TestCommaJoin(unittest.TestCase):
+ def test(self):
+ self.assertEqual(utils.CommaJoin([]), "")
+ self.assertEqual(utils.CommaJoin([1, 2, 3]), "1, 2, 3")
+ self.assertEqual(utils.CommaJoin(["Hello"]), "Hello")
+ self.assertEqual(utils.CommaJoin(["Hello", "World"]), "Hello, World")
+ self.assertEqual(utils.CommaJoin(["Hello", "World", 99]),
+ "Hello, World, 99")
+
+
+class TestFormatTime(unittest.TestCase):
+ """Testing case for FormatTime"""
+
+ @staticmethod
+ def _TestInProcess(tz, timestamp, expected):
+ os.environ["TZ"] = tz
+ time.tzset()
+ return utils.FormatTime(timestamp) == expected
+
+ def _Test(self, *args):
+ # Need to use separate process as we want to change TZ
+ self.assert_(utils.RunInSeparateProcess(self._TestInProcess, *args))
+
+ def test(self):
+ self._Test("UTC", 0, "1970-01-01 00:00:00")
+ self._Test("America/Sao_Paulo", 1292606926, "2010-12-17 15:28:46")
+ self._Test("Europe/London", 1292606926, "2010-12-17 17:28:46")
+ self._Test("Europe/Zurich", 1292606926, "2010-12-17 18:28:46")
+ self._Test("Australia/Sydney", 1292606926, "2010-12-18 04:28:46")
+
+ def testNone(self):
+ self.failUnlessEqual(utils.FormatTime(None), "N/A")
+
+ def testInvalid(self):
+ self.failUnlessEqual(utils.FormatTime(()), "N/A")
+
+ def testNow(self):
+ # tests that we accept time.time input
+ utils.FormatTime(time.time())
+ # tests that we accept int input
+ utils.FormatTime(int(time.time()))
+
+
+class TestFormatSeconds(unittest.TestCase):
+ def test(self):
+ self.assertEqual(utils.FormatSeconds(1), "1s")
+ self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
+ self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
+ self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
+ self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
+ self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
+ self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
+ self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
+ self.assertEqual(utils.FormatSeconds(-1), "-1s")
+ self.assertEqual(utils.FormatSeconds(-282), "-282s")
+ self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
+
+ def testFloat(self):
+ self.assertEqual(utils.FormatSeconds(1.3), "1s")
+ self.assertEqual(utils.FormatSeconds(1.9), "2s")
+ self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
+ self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
+
+
+class TestLineSplitter(unittest.TestCase):
+ def test(self):
+ lines = []
+ ls = utils.LineSplitter(lines.append)
+ ls.write("Hello World\n")
+ self.assertEqual(lines, [])
+ ls.write("Foo\n Bar\r\n ")
+ ls.write("Baz")
+ ls.write("Moo")
+ self.assertEqual(lines, [])
+ ls.flush()
+ self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
+ ls.close()
+ self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
+
+ def _testExtra(self, line, all_lines, p1, p2):
+ self.assertEqual(p1, 999)
+ self.assertEqual(p2, "extra")
+ all_lines.append(line)
+
+ def testExtraArgsNoFlush(self):
+ lines = []
+ ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
+ ls.write("\n\nHello World\n")
+ ls.write("Foo\n Bar\r\n ")
+ ls.write("")
+ ls.write("Baz")
+ ls.write("Moo\n\nx\n")
+ self.assertEqual(lines, [])
+ ls.close()
+ self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
+ "", "x"])
+
+
+if __name__ == "__main__":
+ testutils.GanetiTestProgram()
import OpenSSL
import random
import operator
-from cStringIO import StringIO
import testutils
from ganeti import constants
from ganeti import compat
from ganeti import utils
from ganeti import errors
-from ganeti.utils import RunCmd, RemoveFile, MatchNameComponent, FormatUnit, \
- ParseUnit, ShellQuote, ShellQuoteArgs, ListVisibleFiles, FirstFree, \
- TailFile, SafeEncode, FormatTime, UnescapeAndSplit, RunParts, PathJoin, \
+from ganeti.utils import RunCmd, RemoveFile, \
+ ListVisibleFiles, FirstFree, \
+ TailFile, RunParts, PathJoin, \
ReadOneLineFile, SetEtcHostsEntry, RemoveEtcHostsEntry
self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
-class TestMatchNameComponent(unittest.TestCase):
- """Test case for the MatchNameComponent function"""
-
- def testEmptyList(self):
- """Test that there is no match against an empty list"""
-
- self.failUnlessEqual(MatchNameComponent("", []), None)
- self.failUnlessEqual(MatchNameComponent("test", []), None)
-
- def testSingleMatch(self):
- """Test that a single match is performed correctly"""
- mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
- for key in "test2", "test2.example", "test2.example.com":
- self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
-
- def testMultipleMatches(self):
- """Test that a multiple match is returned as None"""
- mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
- for key in "test1", "test1.example":
- self.failUnlessEqual(MatchNameComponent(key, mlist), None)
-
- def testFullMatch(self):
- """Test that a full match is returned correctly"""
- key1 = "test1"
- key2 = "test1.example"
- mlist = [key2, key2 + ".com"]
- self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
- self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
-
- def testCaseInsensitivePartialMatch(self):
- """Test for the case_insensitive keyword"""
- mlist = ["test1.example.com", "test2.example.net"]
- self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
- "test2.example.net")
- self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
- "test2.example.net")
- self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
- "test2.example.net")
- self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
- "test2.example.net")
-
-
- def testCaseInsensitiveFullMatch(self):
- mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
- # Between the two ts1 a full string match non-case insensitive should work
- self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
- None)
- self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
- "ts1.ex")
- self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
- "ts1.ex")
- # Between the two ts2 only case differs, so only case-match works
- self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
- "ts2.ex")
- self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
- "Ts2.ex")
- self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
- None)
-
-
class TestReadFile(testutils.GanetiTestCase):
def testReadAll(self):
self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
-class TestFormatUnit(unittest.TestCase):
- """Test case for the FormatUnit function"""
-
- def testMiB(self):
- self.assertEqual(FormatUnit(1, 'h'), '1M')
- self.assertEqual(FormatUnit(100, 'h'), '100M')
- self.assertEqual(FormatUnit(1023, 'h'), '1023M')
-
- self.assertEqual(FormatUnit(1, 'm'), '1')
- self.assertEqual(FormatUnit(100, 'm'), '100')
- self.assertEqual(FormatUnit(1023, 'm'), '1023')
-
- self.assertEqual(FormatUnit(1024, 'm'), '1024')
- self.assertEqual(FormatUnit(1536, 'm'), '1536')
- self.assertEqual(FormatUnit(17133, 'm'), '17133')
- self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
-
- def testGiB(self):
- self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
- self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
- self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
- self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
-
- self.assertEqual(FormatUnit(1024, 'g'), '1.0')
- self.assertEqual(FormatUnit(1536, 'g'), '1.5')
- self.assertEqual(FormatUnit(17133, 'g'), '16.7')
- self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
-
- self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
- self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
- self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
-
- def testTiB(self):
- self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
- self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
- self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
-
- self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
- self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
- self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
-
- def testErrors(self):
- self.assertRaises(errors.ProgrammerError, FormatUnit, 1, "a")
-
-
-class TestParseUnit(unittest.TestCase):
- """Test case for the ParseUnit function"""
-
- SCALES = (('', 1),
- ('M', 1), ('G', 1024), ('T', 1024 * 1024),
- ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
- ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
-
- def testRounding(self):
- self.assertEqual(ParseUnit('0'), 0)
- self.assertEqual(ParseUnit('1'), 4)
- self.assertEqual(ParseUnit('2'), 4)
- self.assertEqual(ParseUnit('3'), 4)
-
- self.assertEqual(ParseUnit('124'), 124)
- self.assertEqual(ParseUnit('125'), 128)
- self.assertEqual(ParseUnit('126'), 128)
- self.assertEqual(ParseUnit('127'), 128)
- self.assertEqual(ParseUnit('128'), 128)
- self.assertEqual(ParseUnit('129'), 132)
- self.assertEqual(ParseUnit('130'), 132)
-
- def testFloating(self):
- self.assertEqual(ParseUnit('0'), 0)
- self.assertEqual(ParseUnit('0.5'), 4)
- self.assertEqual(ParseUnit('1.75'), 4)
- self.assertEqual(ParseUnit('1.99'), 4)
- self.assertEqual(ParseUnit('2.00'), 4)
- self.assertEqual(ParseUnit('2.01'), 4)
- self.assertEqual(ParseUnit('3.99'), 4)
- self.assertEqual(ParseUnit('4.00'), 4)
- self.assertEqual(ParseUnit('4.01'), 8)
- self.assertEqual(ParseUnit('1.5G'), 1536)
- self.assertEqual(ParseUnit('1.8G'), 1844)
- self.assertEqual(ParseUnit('8.28T'), 8682212)
-
- def testSuffixes(self):
- for sep in ('', ' ', ' ', "\t", "\t "):
- for suffix, scale in TestParseUnit.SCALES:
- for func in (lambda x: x, str.lower, str.upper):
- self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
- 1024 * scale)
-
- def testInvalidInput(self):
- for sep in ('-', '_', ',', 'a'):
- for suffix, _ in TestParseUnit.SCALES:
- self.assertRaises(errors.UnitParseError, ParseUnit, '1' + sep + suffix)
-
- for suffix, _ in TestParseUnit.SCALES:
- self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix)
-
-
class TestParseCpuMask(unittest.TestCase):
"""Test case for the ParseCpuMask function."""
])
-class TestShellQuoting(unittest.TestCase):
- """Test case for shell quoting functions"""
-
- def testShellQuote(self):
- self.assertEqual(ShellQuote('abc'), "abc")
- self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
- self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
- self.assertEqual(ShellQuote("a b c"), "'a b c'")
- self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
-
- def testShellQuoteArgs(self):
- self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
- self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
- self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
-
-
class TestListVisibleFiles(unittest.TestCase):
"""Test case for ListVisibleFiles"""
self._pathTestHelper('/etc/', False)
-class TestSafeEncode(unittest.TestCase):
- """Test case for SafeEncode"""
-
- def testAscii(self):
- for txt in [string.digits, string.letters, string.punctuation]:
- self.failUnlessEqual(txt, SafeEncode(txt))
-
- def testDoubleEncode(self):
- for i in range(255):
- txt = SafeEncode(chr(i))
- self.failUnlessEqual(txt, SafeEncode(txt))
-
- def testUnicode(self):
- # 1024 is high enough to catch non-direct ASCII mappings
- for i in range(1024):
- txt = SafeEncode(unichr(i))
- self.failUnlessEqual(txt, SafeEncode(txt))
-
-
-class TestFormatTime(unittest.TestCase):
- """Testing case for FormatTime"""
-
- @staticmethod
- def _TestInProcess(tz, timestamp, expected):
- os.environ["TZ"] = tz
- time.tzset()
- return utils.FormatTime(timestamp) == expected
-
- def _Test(self, *args):
- # Need to use separate process as we want to change TZ
- self.assert_(utils.RunInSeparateProcess(self._TestInProcess, *args))
-
- def test(self):
- self._Test("UTC", 0, "1970-01-01 00:00:00")
- self._Test("America/Sao_Paulo", 1292606926, "2010-12-17 15:28:46")
- self._Test("Europe/London", 1292606926, "2010-12-17 17:28:46")
- self._Test("Europe/Zurich", 1292606926, "2010-12-17 18:28:46")
- self._Test("Australia/Sydney", 1292606926, "2010-12-18 04:28:46")
-
- def testNone(self):
- self.failUnlessEqual(FormatTime(None), "N/A")
-
- def testInvalid(self):
- self.failUnlessEqual(FormatTime(()), "N/A")
-
- def testNow(self):
- # tests that we accept time.time input
- FormatTime(time.time())
- # tests that we accept int input
- FormatTime(int(time.time()))
-
-
class RunInSeparateProcess(unittest.TestCase):
def test(self):
for exp in [True, False]:
self.assertEqual(utils.FingerprintFiles(self.results.keys()), self.results)
-class TestUnescapeAndSplit(unittest.TestCase):
- """Testing case for UnescapeAndSplit"""
-
- def setUp(self):
- # testing more that one separator for regexp safety
- self._seps = [",", "+", "."]
-
- def testSimple(self):
- a = ["a", "b", "c", "d"]
- for sep in self._seps:
- self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
-
- def testEscape(self):
- for sep in self._seps:
- a = ["a", "b\\" + sep + "c", "d"]
- b = ["a", "b" + sep + "c", "d"]
- self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
-
- def testDoubleEscape(self):
- for sep in self._seps:
- a = ["a", "b\\\\", "c", "d"]
- b = ["a", "b\\", "c", "d"]
- self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
-
- def testThreeEscape(self):
- for sep in self._seps:
- a = ["a", "b\\\\\\" + sep + "c", "d"]
- b = ["a", "b\\" + sep + "c", "d"]
- self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
-
-
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.assert_(os.path.isdir(path))
-class TestLineSplitter(unittest.TestCase):
- def test(self):
- lines = []
- ls = utils.LineSplitter(lines.append)
- ls.write("Hello World\n")
- self.assertEqual(lines, [])
- ls.write("Foo\n Bar\r\n ")
- ls.write("Baz")
- ls.write("Moo")
- self.assertEqual(lines, [])
- ls.flush()
- self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
- ls.close()
- self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
-
- def _testExtra(self, line, all_lines, p1, p2):
- self.assertEqual(p1, 999)
- self.assertEqual(p2, "extra")
- all_lines.append(line)
-
- def testExtraArgsNoFlush(self):
- lines = []
- ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
- ls.write("\n\nHello World\n")
- ls.write("Foo\n Bar\r\n ")
- ls.write("")
- ls.write("Baz")
- ls.write("Moo\n\nx\n")
- self.assertEqual(lines, [])
- ls.close()
- self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
- "", "x"])
-
-
class TestReadLockedPidFile(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
os.umask(self.old_umask)
-class TestFormatSeconds(unittest.TestCase):
- def test(self):
- self.assertEqual(utils.FormatSeconds(1), "1s")
- self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
- self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
- self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
- self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
- self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
- self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
- self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
- self.assertEqual(utils.FormatSeconds(-1), "-1s")
- self.assertEqual(utils.FormatSeconds(-282), "-282s")
- self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
-
- def testFloat(self):
- self.assertEqual(utils.FormatSeconds(1.3), "1s")
- self.assertEqual(utils.FormatSeconds(1.9), "2s")
- self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
- self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
-
-
class TestIgnoreProcessNotFound(unittest.TestCase):
@staticmethod
def _WritePid(fd):
self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
-class TestShellWriter(unittest.TestCase):
- def test(self):
- buf = StringIO()
- sw = utils.ShellWriter(buf)
- sw.Write("#!/bin/bash")
- sw.Write("if true; then")
- sw.IncIndent()
- try:
- sw.Write("echo true")
-
- sw.Write("for i in 1 2 3")
- sw.Write("do")
- sw.IncIndent()
- try:
- self.assertEqual(sw._indent, 2)
- sw.Write("date")
- finally:
- sw.DecIndent()
- sw.Write("done")
- finally:
- sw.DecIndent()
- sw.Write("echo %s", utils.ShellQuote("Hello World"))
- sw.Write("exit 0")
-
- self.assertEqual(sw._indent, 0)
-
- output = buf.getvalue()
-
- self.assert_(output.endswith("\n"))
-
- lines = output.splitlines()
- self.assertEqual(len(lines), 9)
- self.assertEqual(lines[0], "#!/bin/bash")
- self.assert_(re.match(r"^\s+date$", lines[5]))
- self.assertEqual(lines[7], "echo 'Hello World'")
-
- def testEmpty(self):
- buf = StringIO()
- sw = utils.ShellWriter(buf)
- sw = None
- self.assertEqual(buf.getvalue(), "")
-
-
-class TestCommaJoin(unittest.TestCase):
- def test(self):
- self.assertEqual(utils.CommaJoin([]), "")
- self.assertEqual(utils.CommaJoin([1, 2, 3]), "1, 2, 3")
- self.assertEqual(utils.CommaJoin(["Hello"]), "Hello")
- self.assertEqual(utils.CommaJoin(["Hello", "World"]), "Hello, World")
- self.assertEqual(utils.CommaJoin(["Hello", "World", 99]),
- "Hello, World, 99")
-
-
class TestFindMatch(unittest.TestCase):
def test(self):
data = {
os.close(fd)
-class TestNormalizeAndValidateMac(unittest.TestCase):
- def testInvalid(self):
- self.assertRaises(errors.OpPrereqError,
- utils.NormalizeAndValidateMac, "xxx")
-
- def testNormalization(self):
- for mac in ["aa:bb:cc:dd:ee:ff", "00:AA:11:bB:22:cc"]:
- self.assertEqual(utils.NormalizeAndValidateMac(mac), mac.lower())
-
-
if __name__ == '__main__':
testutils.GanetiTestProgram()