From 7fcffe273870eb75842f365554061d1c71fcf81b Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Mon, 10 Jan 2011 15:48:00 +0100 Subject: [PATCH] utils: Move text-related functions into separate file Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Iustin Pop <iustin@google.com> --- Makefile.am | 4 +- lib/utils/__init__.py | 412 +------------------------- lib/utils/text.py | 444 +++++++++++++++++++++++++++++ test/ganeti.utils.text_unittest.py | 426 +++++++++++++++++++++++++++ test/ganeti.utils_unittest.py | 381 +------------------------ 5 files changed, 877 insertions(+), 790 deletions(-) create mode 100644 lib/utils/text.py create mode 100755 test/ganeti.utils.text_unittest.py diff --git a/Makefile.am b/Makefile.am index 73ebcfe6c..6815392fd 100644 --- a/Makefile.am +++ b/Makefile.am @@ -214,7 +214,8 @@ server_PYTHON = \ utils_PYTHON = \ lib/utils/__init__.py \ lib/utils/algo.py \ - lib/utils/retry.py + lib/utils/retry.py \ + lib/utils/text.py docrst = \ doc/admin.rst \ @@ -482,6 +483,7 @@ python_tests = \ test/ganeti.uidpool_unittest.py \ test/ganeti.utils.algo_unittest.py \ test/ganeti.utils.retry_unittest.py \ + test/ganeti.utils.text_unittest.py \ test/ganeti.utils_mlockall_unittest.py \ test/ganeti.utils_unittest.py \ test/ganeti.workerpool_unittest.py \ diff --git a/lib/utils/__init__.py b/lib/utils/__init__.py index 1f2e0b7cb..417df597d 100644 --- a/lib/utils/__init__.py +++ b/lib/utils/__init__.py @@ -48,7 +48,6 @@ import OpenSSL import datetime import calendar import hmac -import collections from cStringIO import StringIO @@ -64,9 +63,9 @@ from ganeti import compat from ganeti.utils.algo import * # pylint: disable-msg=W0401 from ganeti.utils.retry import * # pylint: disable-msg=W0401 +from ganeti.utils.text import * # pylint: disable-msg=W0401 _locksheld = [] -_re_shell_unquoted = re.compile('^[-.,=:/_+@A-Za-z0-9]+$') debug_locks = False @@ -95,9 +94,6 @@ UUID_RE = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-' _MCL_CURRENT = 1 _MCL_FUTURE = 2 -#: MAC checker regexp -_MAC_CHECK = re.compile("^([0-9a-f]{2}:){5}[0-9a-f]{2}$", re.I) - (_TIMEOUT_NONE, _TIMEOUT_TERM, _TIMEOUT_KILL) = range(3) @@ -105,9 +101,6 @@ _MAC_CHECK = re.compile("^([0-9a-f]{2}:){5}[0-9a-f]{2}$", re.I) #: Shell param checker regexp _SHELLPARAM_REGEX = re.compile(r"^[-a-zA-Z0-9._+/:%@]+$") -#: Unit checker regexp -_PARSEUNIT_REGEX = re.compile(r"^([.\d]+)\s*([a-zA-Z]+)?$") - #: ASN1 time regexp _ASN1_TIME_REGEX = re.compile(r"^(\d+)([-+]\d\d)(\d\d)$") @@ -1171,52 +1164,6 @@ def ReadLockedPidFile(path): return None -def MatchNameComponent(key, name_list, case_sensitive=True): - """Try to match a name against a list. - - This function will try to match a name like test1 against a list - like C{['test1.example.com', 'test2.example.com', ...]}. Against - this list, I{'test1'} as well as I{'test1.example'} will match, but - not I{'test1.ex'}. A multiple match will be considered as no match - at all (e.g. I{'test1'} against C{['test1.example.com', - 'test1.example.org']}), except when the key fully matches an entry - (e.g. I{'test1'} against C{['test1', 'test1.example.com']}). - - @type key: str - @param key: the name to be searched - @type name_list: list - @param name_list: the list of strings against which to search the key - @type case_sensitive: boolean - @param case_sensitive: whether to provide a case-sensitive match - - @rtype: None or str - @return: None if there is no match I{or} if there are multiple matches, - otherwise the element from the list which matches - - """ - if key in name_list: - return key - - re_flags = 0 - if not case_sensitive: - re_flags |= re.IGNORECASE - key = key.upper() - mo = re.compile("^%s(\..*)?$" % re.escape(key), re_flags) - names_filtered = [] - string_matches = [] - for name in name_list: - if mo.match(name) is not None: - names_filtered.append(name) - if not case_sensitive and key == name.upper(): - string_matches.append(name) - - if len(string_matches) == 1: - return string_matches[0] - if len(names_filtered) == 1: - return names_filtered[0] - return None - - def ValidateServiceName(name): """Validate the given service name. @@ -1344,87 +1291,6 @@ def BuildShellCmd(template, *args): return template % args -def FormatUnit(value, units): - """Formats an incoming number of MiB with the appropriate unit. - - @type value: int - @param value: integer representing the value in MiB (1048576) - @type units: char - @param units: the type of formatting we should do: - - 'h' for automatic scaling - - 'm' for MiBs - - 'g' for GiBs - - 't' for TiBs - @rtype: str - @return: the formatted value (with suffix) - - """ - if units not in ('m', 'g', 't', 'h'): - raise errors.ProgrammerError("Invalid unit specified '%s'" % str(units)) - - suffix = '' - - if units == 'm' or (units == 'h' and value < 1024): - if units == 'h': - suffix = 'M' - return "%d%s" % (round(value, 0), suffix) - - elif units == 'g' or (units == 'h' and value < (1024 * 1024)): - if units == 'h': - suffix = 'G' - return "%0.1f%s" % (round(float(value) / 1024, 1), suffix) - - else: - if units == 'h': - suffix = 'T' - return "%0.1f%s" % (round(float(value) / 1024 / 1024, 1), suffix) - - -def ParseUnit(input_string): - """Tries to extract number and scale from the given string. - - Input must be in the format C{NUMBER+ [DOT NUMBER+] SPACE* - [UNIT]}. If no unit is specified, it defaults to MiB. Return value - is always an int in MiB. - - """ - m = _PARSEUNIT_REGEX.match(str(input_string)) - if not m: - raise errors.UnitParseError("Invalid format") - - value = float(m.groups()[0]) - - unit = m.groups()[1] - if unit: - lcunit = unit.lower() - else: - lcunit = 'm' - - if lcunit in ('m', 'mb', 'mib'): - # Value already in MiB - pass - - elif lcunit in ('g', 'gb', 'gib'): - value *= 1024 - - elif lcunit in ('t', 'tb', 'tib'): - value *= 1024 * 1024 - - else: - raise errors.UnitParseError("Unknown unit: %s" % unit) - - # Make sure we round up - if int(value) < value: - value += 1 - - # Round up to the next multiple of 4 - value = int(value) - if value % 4: - value += 4 - value % 4 - - return value - - def ParseCpuMask(cpu_mask): """Parse a CPU mask definition and return the list of CPU IDs. @@ -1673,75 +1539,6 @@ def CreateBackup(file_name): return backup_name -def ShellQuote(value): - """Quotes shell argument according to POSIX. - - @type value: str - @param value: the argument to be quoted - @rtype: str - @return: the quoted value - - """ - if _re_shell_unquoted.match(value): - return value - else: - return "'%s'" % value.replace("'", "'\\''") - - -def ShellQuoteArgs(args): - """Quotes a list of shell arguments. - - @type args: list - @param args: list of arguments to be quoted - @rtype: str - @return: the quoted arguments concatenated with spaces - - """ - return ' '.join([ShellQuote(i) for i in args]) - - -class ShellWriter: - """Helper class to write scripts with indentation. - - """ - INDENT_STR = " " - - def __init__(self, fh): - """Initializes this class. - - """ - self._fh = fh - self._indent = 0 - - def IncIndent(self): - """Increase indentation level by 1. - - """ - self._indent += 1 - - def DecIndent(self): - """Decrease indentation level by 1. - - """ - assert self._indent > 0 - self._indent -= 1 - - def Write(self, txt, *args): - """Write line to output file. - - """ - assert self._indent >= 0 - - self._fh.write(self._indent * self.INDENT_STR) - - if args: - self._fh.write(txt % args) - else: - self._fh.write(txt) - - self._fh.write("\n") - - def ListVisibleFiles(path): """Returns a list of visible files in a directory. @@ -1791,21 +1588,6 @@ def NewUUID(): return ReadFile(_RANDOM_UUID_FILE, size=128).rstrip("\n") -def GenerateSecret(numbytes=20): - """Generates a random secret. - - This will generate a pseudo-random secret returning an hex string - (so that it can be used where an ASCII string is needed). - - @param numbytes: the number of bytes which will be represented by the returned - string (defaulting to 20, the length of a SHA1 hash) - @rtype: str - @return: an hex representation of the pseudo-random sequence - - """ - return os.urandom(numbytes).encode('hex') - - def EnsureDirs(dirs): """Make required directories, if they don't exist. @@ -2146,27 +1928,6 @@ def WaitForFdCondition(fdobj, event, timeout): return result -def NormalizeAndValidateMac(mac): - """Normalizes and check if a MAC address is valid. - - Checks whether the supplied MAC address is formally correct, only - accepts colon separated format. Normalize it to all lower. - - @type mac: str - @param mac: the MAC to be validated - @rtype: str - @return: returns the normalized and validated MAC. - - @raise errors.OpPrereqError: If the MAC isn't valid - - """ - if not _MAC_CHECK.match(mac): - raise errors.OpPrereqError("Invalid MAC address specified: %s" % - mac, errors.ECODE_INVAL) - - return mac.lower() - - def TestDelay(duration): """Sleep for a fixed amount of time. @@ -2985,95 +2746,6 @@ def VerifySha1Hmac(key, text, digest, salt=None): return digest.lower() == Sha1Hmac(key, text, salt=salt).lower() -def SafeEncode(text): - """Return a 'safe' version of a source string. - - This function mangles the input string and returns a version that - should be safe to display/encode as ASCII. To this end, we first - convert it to ASCII using the 'backslashreplace' encoding which - should get rid of any non-ASCII chars, and then we process it - through a loop copied from the string repr sources in the python; we - don't use string_escape anymore since that escape single quotes and - backslashes too, and that is too much; and that escaping is not - stable, i.e. string_escape(string_escape(x)) != string_escape(x). - - @type text: str or unicode - @param text: input data - @rtype: str - @return: a safe version of text - - """ - if isinstance(text, unicode): - # only if unicode; if str already, we handle it below - text = text.encode('ascii', 'backslashreplace') - resu = "" - for char in text: - c = ord(char) - if char == '\t': - resu += r'\t' - elif char == '\n': - resu += r'\n' - elif char == '\r': - resu += r'\'r' - elif c < 32 or c >= 127: # non-printable - resu += "\\x%02x" % (c & 0xff) - else: - resu += char - return resu - - -def UnescapeAndSplit(text, sep=","): - """Split and unescape a string based on a given separator. - - This function splits a string based on a separator where the - separator itself can be escape in order to be an element of the - elements. The escaping rules are (assuming coma being the - separator): - - a plain , separates the elements - - a sequence \\\\, (double backslash plus comma) is handled as a - backslash plus a separator comma - - a sequence \, (backslash plus comma) is handled as a - non-separator comma - - @type text: string - @param text: the string to split - @type sep: string - @param text: the separator - @rtype: string - @return: a list of strings - - """ - # we split the list by sep (with no escaping at this stage) - slist = text.split(sep) - # next, we revisit the elements and if any of them ended with an odd - # number of backslashes, then we join it with the next - rlist = [] - while slist: - e1 = slist.pop(0) - if e1.endswith("\\"): - num_b = len(e1) - len(e1.rstrip("\\")) - if num_b % 2 == 1: - e2 = slist.pop(0) - # here the backslashes remain (all), and will be reduced in - # the next step - rlist.append(e1 + sep + e2) - continue - rlist.append(e1) - # finally, replace backslash-something with something - rlist = [re.sub(r"\\(.)", r"\1", v) for v in rlist] - return rlist - - -def CommaJoin(names): - """Nicely join a set of identifiers. - - @param names: set, list or tuple - @return: a string with the formatted results - - """ - return ", ".join([str(val) for val in names]) - - def FindMatch(data, name): """Tries to find an item in a dictionary matching a name. @@ -3270,47 +2942,6 @@ def LockFile(fd): raise -def FormatTime(val): - """Formats a time value. - - @type val: float or None - @param val: Timestamp as returned by time.time() (seconds since Epoch, - 1970-01-01 00:00:00 UTC) - @return: a string value or N/A if we don't have a valid timestamp - - """ - if val is None or not isinstance(val, (int, float)): - return "N/A" - # these two codes works on Linux, but they are not guaranteed on all - # platforms - return time.strftime("%F %T", time.localtime(val)) - - -def FormatSeconds(secs): - """Formats seconds for easier reading. - - @type secs: number - @param secs: Number of seconds - @rtype: string - @return: Formatted seconds (e.g. "2d 9h 19m 49s") - - """ - parts = [] - - secs = round(secs, 0) - - if secs > 0: - # Negative values would be a bit tricky - for unit, one in [("d", 24 * 60 * 60), ("h", 60 * 60), ("m", 60)]: - (complete, secs) = divmod(secs, one) - if complete or parts: - parts.append("%d%s" % (complete, unit)) - - parts.append("%ds" % secs) - - return " ".join(parts) - - def ReadWatcherPauseFile(filename, now=None, remove_after=3600): """Reads the watcher pause file. @@ -3547,47 +3178,6 @@ class FileLock(object): "Failed to unlock %s" % self.filename) -class LineSplitter: - """Splits data chunks into lines separated by newline. - - Instances provide a file-like interface. - - """ - def __init__(self, line_fn, *args): - """Initializes this class. - - @type line_fn: callable - @param line_fn: Function called for each line, first parameter is line - @param args: Extra arguments for L{line_fn} - - """ - assert callable(line_fn) - - if args: - # Python 2.4 doesn't have functools.partial yet - self._line_fn = \ - lambda line: line_fn(line, *args) # pylint: disable-msg=W0142 - else: - self._line_fn = line_fn - - self._lines = collections.deque() - self._buffer = "" - - def write(self, data): - parts = (self._buffer + data).split("\n") - self._buffer = parts.pop() - self._lines.extend(parts) - - def flush(self): - while self._lines: - self._line_fn(self._lines.popleft().rstrip("\r\n")) - - def close(self): - self.flush() - if self._buffer: - self._line_fn(self._buffer) - - def SignalHandled(signums): """Signal Handled decoration. diff --git a/lib/utils/text.py b/lib/utils/text.py new file mode 100644 index 000000000..af42d3680 --- /dev/null +++ b/lib/utils/text.py @@ -0,0 +1,444 @@ +# +# + +# Copyright (C) 2006, 2007, 2010, 2011 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +"""Utility functions for manipulating or working with text. + +""" + + +import re +import os +import time +import collections + +from ganeti import errors + + +#: Unit checker regexp +_PARSEUNIT_REGEX = re.compile(r"^([.\d]+)\s*([a-zA-Z]+)?$") + +#: Characters which don't need to be quoted for shell commands +_SHELL_UNQUOTED_RE = re.compile('^[-.,=:/_+@A-Za-z0-9]+$') + +#: MAC checker regexp +_MAC_CHECK_RE = re.compile("^([0-9a-f]{2}:){5}[0-9a-f]{2}$", re.I) + + +def MatchNameComponent(key, name_list, case_sensitive=True): + """Try to match a name against a list. + + This function will try to match a name like test1 against a list + like C{['test1.example.com', 'test2.example.com', ...]}. Against + this list, I{'test1'} as well as I{'test1.example'} will match, but + not I{'test1.ex'}. A multiple match will be considered as no match + at all (e.g. I{'test1'} against C{['test1.example.com', + 'test1.example.org']}), except when the key fully matches an entry + (e.g. I{'test1'} against C{['test1', 'test1.example.com']}). + + @type key: str + @param key: the name to be searched + @type name_list: list + @param name_list: the list of strings against which to search the key + @type case_sensitive: boolean + @param case_sensitive: whether to provide a case-sensitive match + + @rtype: None or str + @return: None if there is no match I{or} if there are multiple matches, + otherwise the element from the list which matches + + """ + if key in name_list: + return key + + re_flags = 0 + if not case_sensitive: + re_flags |= re.IGNORECASE + key = key.upper() + mo = re.compile("^%s(\..*)?$" % re.escape(key), re_flags) + names_filtered = [] + string_matches = [] + for name in name_list: + if mo.match(name) is not None: + names_filtered.append(name) + if not case_sensitive and key == name.upper(): + string_matches.append(name) + + if len(string_matches) == 1: + return string_matches[0] + if len(names_filtered) == 1: + return names_filtered[0] + return None + + +def FormatUnit(value, units): + """Formats an incoming number of MiB with the appropriate unit. + + @type value: int + @param value: integer representing the value in MiB (1048576) + @type units: char + @param units: the type of formatting we should do: + - 'h' for automatic scaling + - 'm' for MiBs + - 'g' for GiBs + - 't' for TiBs + @rtype: str + @return: the formatted value (with suffix) + + """ + if units not in ('m', 'g', 't', 'h'): + raise errors.ProgrammerError("Invalid unit specified '%s'" % str(units)) + + suffix = '' + + if units == 'm' or (units == 'h' and value < 1024): + if units == 'h': + suffix = 'M' + return "%d%s" % (round(value, 0), suffix) + + elif units == 'g' or (units == 'h' and value < (1024 * 1024)): + if units == 'h': + suffix = 'G' + return "%0.1f%s" % (round(float(value) / 1024, 1), suffix) + + else: + if units == 'h': + suffix = 'T' + return "%0.1f%s" % (round(float(value) / 1024 / 1024, 1), suffix) + + +def ParseUnit(input_string): + """Tries to extract number and scale from the given string. + + Input must be in the format C{NUMBER+ [DOT NUMBER+] SPACE* + [UNIT]}. If no unit is specified, it defaults to MiB. Return value + is always an int in MiB. + + """ + m = _PARSEUNIT_REGEX.match(str(input_string)) + if not m: + raise errors.UnitParseError("Invalid format") + + value = float(m.groups()[0]) + + unit = m.groups()[1] + if unit: + lcunit = unit.lower() + else: + lcunit = 'm' + + if lcunit in ('m', 'mb', 'mib'): + # Value already in MiB + pass + + elif lcunit in ('g', 'gb', 'gib'): + value *= 1024 + + elif lcunit in ('t', 'tb', 'tib'): + value *= 1024 * 1024 + + else: + raise errors.UnitParseError("Unknown unit: %s" % unit) + + # Make sure we round up + if int(value) < value: + value += 1 + + # Round up to the next multiple of 4 + value = int(value) + if value % 4: + value += 4 - value % 4 + + return value + + +def ShellQuote(value): + """Quotes shell argument according to POSIX. + + @type value: str + @param value: the argument to be quoted + @rtype: str + @return: the quoted value + + """ + if _SHELL_UNQUOTED_RE.match(value): + return value + else: + return "'%s'" % value.replace("'", "'\\''") + + +def ShellQuoteArgs(args): + """Quotes a list of shell arguments. + + @type args: list + @param args: list of arguments to be quoted + @rtype: str + @return: the quoted arguments concatenated with spaces + + """ + return " ".join([ShellQuote(i) for i in args]) + + +class ShellWriter: + """Helper class to write scripts with indentation. + + """ + INDENT_STR = " " + + def __init__(self, fh): + """Initializes this class. + + """ + self._fh = fh + self._indent = 0 + + def IncIndent(self): + """Increase indentation level by 1. + + """ + self._indent += 1 + + def DecIndent(self): + """Decrease indentation level by 1. + + """ + assert self._indent > 0 + self._indent -= 1 + + def Write(self, txt, *args): + """Write line to output file. + + """ + assert self._indent >= 0 + + self._fh.write(self._indent * self.INDENT_STR) + + if args: + self._fh.write(txt % args) + else: + self._fh.write(txt) + + self._fh.write("\n") + + +def GenerateSecret(numbytes=20): + """Generates a random secret. + + This will generate a pseudo-random secret returning an hex string + (so that it can be used where an ASCII string is needed). + + @param numbytes: the number of bytes which will be represented by the returned + string (defaulting to 20, the length of a SHA1 hash) + @rtype: str + @return: an hex representation of the pseudo-random sequence + + """ + return os.urandom(numbytes).encode("hex") + + +def NormalizeAndValidateMac(mac): + """Normalizes and check if a MAC address is valid. + + Checks whether the supplied MAC address is formally correct, only + accepts colon separated format. Normalize it to all lower. + + @type mac: str + @param mac: the MAC to be validated + @rtype: str + @return: returns the normalized and validated MAC. + + @raise errors.OpPrereqError: If the MAC isn't valid + + """ + if not _MAC_CHECK_RE.match(mac): + raise errors.OpPrereqError("Invalid MAC address '%s'" % mac, + errors.ECODE_INVAL) + + return mac.lower() + + +def SafeEncode(text): + """Return a 'safe' version of a source string. + + This function mangles the input string and returns a version that + should be safe to display/encode as ASCII. To this end, we first + convert it to ASCII using the 'backslashreplace' encoding which + should get rid of any non-ASCII chars, and then we process it + through a loop copied from the string repr sources in the python; we + don't use string_escape anymore since that escape single quotes and + backslashes too, and that is too much; and that escaping is not + stable, i.e. string_escape(string_escape(x)) != string_escape(x). + + @type text: str or unicode + @param text: input data + @rtype: str + @return: a safe version of text + + """ + if isinstance(text, unicode): + # only if unicode; if str already, we handle it below + text = text.encode('ascii', 'backslashreplace') + resu = "" + for char in text: + c = ord(char) + if char == '\t': + resu += r'\t' + elif char == '\n': + resu += r'\n' + elif char == '\r': + resu += r'\'r' + elif c < 32 or c >= 127: # non-printable + resu += "\\x%02x" % (c & 0xff) + else: + resu += char + return resu + + +def UnescapeAndSplit(text, sep=","): + """Split and unescape a string based on a given separator. + + This function splits a string based on a separator where the + separator itself can be escape in order to be an element of the + elements. The escaping rules are (assuming coma being the + separator): + - a plain , separates the elements + - a sequence \\\\, (double backslash plus comma) is handled as a + backslash plus a separator comma + - a sequence \, (backslash plus comma) is handled as a + non-separator comma + + @type text: string + @param text: the string to split + @type sep: string + @param text: the separator + @rtype: string + @return: a list of strings + + """ + # we split the list by sep (with no escaping at this stage) + slist = text.split(sep) + # next, we revisit the elements and if any of them ended with an odd + # number of backslashes, then we join it with the next + rlist = [] + while slist: + e1 = slist.pop(0) + if e1.endswith("\\"): + num_b = len(e1) - len(e1.rstrip("\\")) + if num_b % 2 == 1: + e2 = slist.pop(0) + # here the backslashes remain (all), and will be reduced in + # the next step + rlist.append(e1 + sep + e2) + continue + rlist.append(e1) + # finally, replace backslash-something with something + rlist = [re.sub(r"\\(.)", r"\1", v) for v in rlist] + return rlist + + +def CommaJoin(names): + """Nicely join a set of identifiers. + + @param names: set, list or tuple + @return: a string with the formatted results + + """ + return ", ".join([str(val) for val in names]) + + +def FormatTime(val): + """Formats a time value. + + @type val: float or None + @param val: Timestamp as returned by time.time() (seconds since Epoch, + 1970-01-01 00:00:00 UTC) + @return: a string value or N/A if we don't have a valid timestamp + + """ + if val is None or not isinstance(val, (int, float)): + return "N/A" + # these two codes works on Linux, but they are not guaranteed on all + # platforms + return time.strftime("%F %T", time.localtime(val)) + + +def FormatSeconds(secs): + """Formats seconds for easier reading. + + @type secs: number + @param secs: Number of seconds + @rtype: string + @return: Formatted seconds (e.g. "2d 9h 19m 49s") + + """ + parts = [] + + secs = round(secs, 0) + + if secs > 0: + # Negative values would be a bit tricky + for unit, one in [("d", 24 * 60 * 60), ("h", 60 * 60), ("m", 60)]: + (complete, secs) = divmod(secs, one) + if complete or parts: + parts.append("%d%s" % (complete, unit)) + + parts.append("%ds" % secs) + + return " ".join(parts) + + +class LineSplitter: + """Splits data chunks into lines separated by newline. + + Instances provide a file-like interface. + + """ + def __init__(self, line_fn, *args): + """Initializes this class. + + @type line_fn: callable + @param line_fn: Function called for each line, first parameter is line + @param args: Extra arguments for L{line_fn} + + """ + assert callable(line_fn) + + if args: + # Python 2.4 doesn't have functools.partial yet + self._line_fn = \ + lambda line: line_fn(line, *args) # pylint: disable-msg=W0142 + else: + self._line_fn = line_fn + + self._lines = collections.deque() + self._buffer = "" + + def write(self, data): + parts = (self._buffer + data).split("\n") + self._buffer = parts.pop() + self._lines.extend(parts) + + def flush(self): + while self._lines: + self._line_fn(self._lines.popleft().rstrip("\r\n")) + + def close(self): + self.flush() + if self._buffer: + self._line_fn(self._buffer) diff --git a/test/ganeti.utils.text_unittest.py b/test/ganeti.utils.text_unittest.py new file mode 100755 index 000000000..9af51b390 --- /dev/null +++ b/test/ganeti.utils.text_unittest.py @@ -0,0 +1,426 @@ +#!/usr/bin/python +# + +# Copyright (C) 2011 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Script for testing ganeti.utils.text""" + +import re +import string +import time +import unittest +import os + +from cStringIO import StringIO + +from ganeti import constants +from ganeti import utils +from ganeti import errors + +import testutils + + +class TestMatchNameComponent(unittest.TestCase): + """Test case for the MatchNameComponent function""" + + def testEmptyList(self): + """Test that there is no match against an empty list""" + self.failUnlessEqual(utils.MatchNameComponent("", []), None) + self.failUnlessEqual(utils.MatchNameComponent("test", []), None) + + def testSingleMatch(self): + """Test that a single match is performed correctly""" + mlist = ["test1.example.com", "test2.example.com", "test3.example.com"] + for key in "test2", "test2.example", "test2.example.com": + self.failUnlessEqual(utils.MatchNameComponent(key, mlist), mlist[1]) + + def testMultipleMatches(self): + """Test that a multiple match is returned as None""" + mlist = ["test1.example.com", "test1.example.org", "test1.example.net"] + for key in "test1", "test1.example": + self.failUnlessEqual(utils.MatchNameComponent(key, mlist), None) + + def testFullMatch(self): + """Test that a full match is returned correctly""" + key1 = "test1" + key2 = "test1.example" + mlist = [key2, key2 + ".com"] + self.failUnlessEqual(utils.MatchNameComponent(key1, mlist), None) + self.failUnlessEqual(utils.MatchNameComponent(key2, mlist), key2) + + def testCaseInsensitivePartialMatch(self): + """Test for the case_insensitive keyword""" + mlist = ["test1.example.com", "test2.example.net"] + self.assertEqual(utils.MatchNameComponent("test2", mlist, + case_sensitive=False), + "test2.example.net") + self.assertEqual(utils.MatchNameComponent("Test2", mlist, + case_sensitive=False), + "test2.example.net") + self.assertEqual(utils.MatchNameComponent("teSt2", mlist, + case_sensitive=False), + "test2.example.net") + self.assertEqual(utils.MatchNameComponent("TeSt2", mlist, + case_sensitive=False), + "test2.example.net") + + def testCaseInsensitiveFullMatch(self): + mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"] + + # Between the two ts1 a full string match non-case insensitive should work + self.assertEqual(utils.MatchNameComponent("Ts1", mlist, + case_sensitive=False), + None) + self.assertEqual(utils.MatchNameComponent("Ts1.ex", mlist, + case_sensitive=False), + "ts1.ex") + self.assertEqual(utils.MatchNameComponent("ts1.ex", mlist, + case_sensitive=False), + "ts1.ex") + + # Between the two ts2 only case differs, so only case-match works + self.assertEqual(utils.MatchNameComponent("ts2.ex", mlist, + case_sensitive=False), + "ts2.ex") + self.assertEqual(utils.MatchNameComponent("Ts2.ex", mlist, + case_sensitive=False), + "Ts2.ex") + self.assertEqual(utils.MatchNameComponent("TS2.ex", mlist, + case_sensitive=False), + None) + + +class TestFormatUnit(unittest.TestCase): + """Test case for the FormatUnit function""" + + def testMiB(self): + self.assertEqual(utils.FormatUnit(1, "h"), "1M") + self.assertEqual(utils.FormatUnit(100, "h"), "100M") + self.assertEqual(utils.FormatUnit(1023, "h"), "1023M") + + self.assertEqual(utils.FormatUnit(1, "m"), "1") + self.assertEqual(utils.FormatUnit(100, "m"), "100") + self.assertEqual(utils.FormatUnit(1023, "m"), "1023") + + self.assertEqual(utils.FormatUnit(1024, "m"), "1024") + self.assertEqual(utils.FormatUnit(1536, "m"), "1536") + self.assertEqual(utils.FormatUnit(17133, "m"), "17133") + self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "m"), "1048575") + + def testGiB(self): + self.assertEqual(utils.FormatUnit(1024, "h"), "1.0G") + self.assertEqual(utils.FormatUnit(1536, "h"), "1.5G") + self.assertEqual(utils.FormatUnit(17133, "h"), "16.7G") + self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "h"), "1024.0G") + + self.assertEqual(utils.FormatUnit(1024, "g"), "1.0") + self.assertEqual(utils.FormatUnit(1536, "g"), "1.5") + self.assertEqual(utils.FormatUnit(17133, "g"), "16.7") + self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "g"), "1024.0") + + self.assertEqual(utils.FormatUnit(1024 * 1024, "g"), "1024.0") + self.assertEqual(utils.FormatUnit(5120 * 1024, "g"), "5120.0") + self.assertEqual(utils.FormatUnit(29829 * 1024, "g"), "29829.0") + + def testTiB(self): + self.assertEqual(utils.FormatUnit(1024 * 1024, "h"), "1.0T") + self.assertEqual(utils.FormatUnit(5120 * 1024, "h"), "5.0T") + self.assertEqual(utils.FormatUnit(29829 * 1024, "h"), "29.1T") + + self.assertEqual(utils.FormatUnit(1024 * 1024, "t"), "1.0") + self.assertEqual(utils.FormatUnit(5120 * 1024, "t"), "5.0") + self.assertEqual(utils.FormatUnit(29829 * 1024, "t"), "29.1") + + def testErrors(self): + self.assertRaises(errors.ProgrammerError, utils.FormatUnit, 1, "a") + + +class TestParseUnit(unittest.TestCase): + """Test case for the ParseUnit function""" + + SCALES = (("", 1), + ("M", 1), ("G", 1024), ("T", 1024 * 1024), + ("MB", 1), ("GB", 1024), ("TB", 1024 * 1024), + ("MiB", 1), ("GiB", 1024), ("TiB", 1024 * 1024)) + + def testRounding(self): + self.assertEqual(utils.ParseUnit("0"), 0) + self.assertEqual(utils.ParseUnit("1"), 4) + self.assertEqual(utils.ParseUnit("2"), 4) + self.assertEqual(utils.ParseUnit("3"), 4) + + self.assertEqual(utils.ParseUnit("124"), 124) + self.assertEqual(utils.ParseUnit("125"), 128) + self.assertEqual(utils.ParseUnit("126"), 128) + self.assertEqual(utils.ParseUnit("127"), 128) + self.assertEqual(utils.ParseUnit("128"), 128) + self.assertEqual(utils.ParseUnit("129"), 132) + self.assertEqual(utils.ParseUnit("130"), 132) + + def testFloating(self): + self.assertEqual(utils.ParseUnit("0"), 0) + self.assertEqual(utils.ParseUnit("0.5"), 4) + self.assertEqual(utils.ParseUnit("1.75"), 4) + self.assertEqual(utils.ParseUnit("1.99"), 4) + self.assertEqual(utils.ParseUnit("2.00"), 4) + self.assertEqual(utils.ParseUnit("2.01"), 4) + self.assertEqual(utils.ParseUnit("3.99"), 4) + self.assertEqual(utils.ParseUnit("4.00"), 4) + self.assertEqual(utils.ParseUnit("4.01"), 8) + self.assertEqual(utils.ParseUnit("1.5G"), 1536) + self.assertEqual(utils.ParseUnit("1.8G"), 1844) + self.assertEqual(utils.ParseUnit("8.28T"), 8682212) + + def testSuffixes(self): + for sep in ("", " ", " ", "\t", "\t "): + for suffix, scale in self.SCALES: + for func in (lambda x: x, str.lower, str.upper): + self.assertEqual(utils.ParseUnit("1024" + sep + func(suffix)), + 1024 * scale) + + def testInvalidInput(self): + for sep in ("-", "_", ",", "a"): + for suffix, _ in self.SCALES: + self.assertRaises(errors.UnitParseError, utils.ParseUnit, + "1" + sep + suffix) + + for suffix, _ in self.SCALES: + self.assertRaises(errors.UnitParseError, utils.ParseUnit, + "1,3" + suffix) + + +class TestShellQuoting(unittest.TestCase): + """Test case for shell quoting functions""" + + def testShellQuote(self): + self.assertEqual(utils.ShellQuote('abc'), "abc") + self.assertEqual(utils.ShellQuote('ab"c'), "'ab\"c'") + self.assertEqual(utils.ShellQuote("a'bc"), "'a'\\''bc'") + self.assertEqual(utils.ShellQuote("a b c"), "'a b c'") + self.assertEqual(utils.ShellQuote("a b\\ c"), "'a b\\ c'") + + def testShellQuoteArgs(self): + self.assertEqual(utils.ShellQuoteArgs(['a', 'b', 'c']), "a b c") + self.assertEqual(utils.ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c") + self.assertEqual(utils.ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c") + + +class TestShellWriter(unittest.TestCase): + def test(self): + buf = StringIO() + sw = utils.ShellWriter(buf) + sw.Write("#!/bin/bash") + sw.Write("if true; then") + sw.IncIndent() + try: + sw.Write("echo true") + + sw.Write("for i in 1 2 3") + sw.Write("do") + sw.IncIndent() + try: + self.assertEqual(sw._indent, 2) + sw.Write("date") + finally: + sw.DecIndent() + sw.Write("done") + finally: + sw.DecIndent() + sw.Write("echo %s", utils.ShellQuote("Hello World")) + sw.Write("exit 0") + + self.assertEqual(sw._indent, 0) + + output = buf.getvalue() + + self.assert_(output.endswith("\n")) + + lines = output.splitlines() + self.assertEqual(len(lines), 9) + self.assertEqual(lines[0], "#!/bin/bash") + self.assert_(re.match(r"^\s+date$", lines[5])) + self.assertEqual(lines[7], "echo 'Hello World'") + + def testEmpty(self): + buf = StringIO() + sw = utils.ShellWriter(buf) + sw = None + self.assertEqual(buf.getvalue(), "") + + +class TestNormalizeAndValidateMac(unittest.TestCase): + def testInvalid(self): + self.assertRaises(errors.OpPrereqError, + utils.NormalizeAndValidateMac, "xxx") + + def testNormalization(self): + for mac in ["aa:bb:cc:dd:ee:ff", "00:AA:11:bB:22:cc"]: + self.assertEqual(utils.NormalizeAndValidateMac(mac), mac.lower()) + + +class TestSafeEncode(unittest.TestCase): + """Test case for SafeEncode""" + + def testAscii(self): + for txt in [string.digits, string.letters, string.punctuation]: + self.failUnlessEqual(txt, utils.SafeEncode(txt)) + + def testDoubleEncode(self): + for i in range(255): + txt = utils.SafeEncode(chr(i)) + self.failUnlessEqual(txt, utils.SafeEncode(txt)) + + def testUnicode(self): + # 1024 is high enough to catch non-direct ASCII mappings + for i in range(1024): + txt = utils.SafeEncode(unichr(i)) + self.failUnlessEqual(txt, utils.SafeEncode(txt)) + + +class TestUnescapeAndSplit(unittest.TestCase): + """Testing case for UnescapeAndSplit""" + + def setUp(self): + # testing more that one separator for regexp safety + self._seps = [",", "+", "."] + + def testSimple(self): + a = ["a", "b", "c", "d"] + for sep in self._seps: + self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), a) + + def testEscape(self): + for sep in self._seps: + a = ["a", "b\\" + sep + "c", "d"] + b = ["a", "b" + sep + "c", "d"] + self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b) + + def testDoubleEscape(self): + for sep in self._seps: + a = ["a", "b\\\\", "c", "d"] + b = ["a", "b\\", "c", "d"] + self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b) + + def testThreeEscape(self): + for sep in self._seps: + a = ["a", "b\\\\\\" + sep + "c", "d"] + b = ["a", "b\\" + sep + "c", "d"] + self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b) + + +class TestCommaJoin(unittest.TestCase): + def test(self): + self.assertEqual(utils.CommaJoin([]), "") + self.assertEqual(utils.CommaJoin([1, 2, 3]), "1, 2, 3") + self.assertEqual(utils.CommaJoin(["Hello"]), "Hello") + self.assertEqual(utils.CommaJoin(["Hello", "World"]), "Hello, World") + self.assertEqual(utils.CommaJoin(["Hello", "World", 99]), + "Hello, World, 99") + + +class TestFormatTime(unittest.TestCase): + """Testing case for FormatTime""" + + @staticmethod + def _TestInProcess(tz, timestamp, expected): + os.environ["TZ"] = tz + time.tzset() + return utils.FormatTime(timestamp) == expected + + def _Test(self, *args): + # Need to use separate process as we want to change TZ + self.assert_(utils.RunInSeparateProcess(self._TestInProcess, *args)) + + def test(self): + self._Test("UTC", 0, "1970-01-01 00:00:00") + self._Test("America/Sao_Paulo", 1292606926, "2010-12-17 15:28:46") + self._Test("Europe/London", 1292606926, "2010-12-17 17:28:46") + self._Test("Europe/Zurich", 1292606926, "2010-12-17 18:28:46") + self._Test("Australia/Sydney", 1292606926, "2010-12-18 04:28:46") + + def testNone(self): + self.failUnlessEqual(utils.FormatTime(None), "N/A") + + def testInvalid(self): + self.failUnlessEqual(utils.FormatTime(()), "N/A") + + def testNow(self): + # tests that we accept time.time input + utils.FormatTime(time.time()) + # tests that we accept int input + utils.FormatTime(int(time.time())) + + +class TestFormatSeconds(unittest.TestCase): + def test(self): + self.assertEqual(utils.FormatSeconds(1), "1s") + self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s") + self.assertEqual(utils.FormatSeconds(3599), "59m 59s") + self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s") + self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s") + self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s") + self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s") + self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s") + self.assertEqual(utils.FormatSeconds(-1), "-1s") + self.assertEqual(utils.FormatSeconds(-282), "-282s") + self.assertEqual(utils.FormatSeconds(-29119), "-29119s") + + def testFloat(self): + self.assertEqual(utils.FormatSeconds(1.3), "1s") + self.assertEqual(utils.FormatSeconds(1.9), "2s") + self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s") + self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s") + + +class TestLineSplitter(unittest.TestCase): + def test(self): + lines = [] + ls = utils.LineSplitter(lines.append) + ls.write("Hello World\n") + self.assertEqual(lines, []) + ls.write("Foo\n Bar\r\n ") + ls.write("Baz") + ls.write("Moo") + self.assertEqual(lines, []) + ls.flush() + self.assertEqual(lines, ["Hello World", "Foo", " Bar"]) + ls.close() + self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"]) + + def _testExtra(self, line, all_lines, p1, p2): + self.assertEqual(p1, 999) + self.assertEqual(p2, "extra") + all_lines.append(line) + + def testExtraArgsNoFlush(self): + lines = [] + ls = utils.LineSplitter(self._testExtra, lines, 999, "extra") + ls.write("\n\nHello World\n") + ls.write("Foo\n Bar\r\n ") + ls.write("") + ls.write("Baz") + ls.write("Moo\n\nx\n") + self.assertEqual(lines, []) + ls.close() + self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo", + "", "x"]) + + +if __name__ == "__main__": + testutils.GanetiTestProgram() diff --git a/test/ganeti.utils_unittest.py b/test/ganeti.utils_unittest.py index 24deb078a..2f8b00c72 100755 --- a/test/ganeti.utils_unittest.py +++ b/test/ganeti.utils_unittest.py @@ -40,16 +40,15 @@ import warnings import OpenSSL import random import operator -from cStringIO import StringIO import testutils from ganeti import constants from ganeti import compat from ganeti import utils from ganeti import errors -from ganeti.utils import RunCmd, RemoveFile, MatchNameComponent, FormatUnit, \ - ParseUnit, ShellQuote, ShellQuoteArgs, ListVisibleFiles, FirstFree, \ - TailFile, SafeEncode, FormatTime, UnescapeAndSplit, RunParts, PathJoin, \ +from ganeti.utils import RunCmd, RemoveFile, \ + ListVisibleFiles, FirstFree, \ + TailFile, RunParts, PathJoin, \ ReadOneLineFile, SetEtcHostsEntry, RemoveEtcHostsEntry @@ -763,66 +762,6 @@ class TestRename(unittest.TestCase): self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz"))) -class TestMatchNameComponent(unittest.TestCase): - """Test case for the MatchNameComponent function""" - - def testEmptyList(self): - """Test that there is no match against an empty list""" - - self.failUnlessEqual(MatchNameComponent("", []), None) - self.failUnlessEqual(MatchNameComponent("test", []), None) - - def testSingleMatch(self): - """Test that a single match is performed correctly""" - mlist = ["test1.example.com", "test2.example.com", "test3.example.com"] - for key in "test2", "test2.example", "test2.example.com": - self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1]) - - def testMultipleMatches(self): - """Test that a multiple match is returned as None""" - mlist = ["test1.example.com", "test1.example.org", "test1.example.net"] - for key in "test1", "test1.example": - self.failUnlessEqual(MatchNameComponent(key, mlist), None) - - def testFullMatch(self): - """Test that a full match is returned correctly""" - key1 = "test1" - key2 = "test1.example" - mlist = [key2, key2 + ".com"] - self.failUnlessEqual(MatchNameComponent(key1, mlist), None) - self.failUnlessEqual(MatchNameComponent(key2, mlist), key2) - - def testCaseInsensitivePartialMatch(self): - """Test for the case_insensitive keyword""" - mlist = ["test1.example.com", "test2.example.net"] - self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False), - "test2.example.net") - self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False), - "test2.example.net") - self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False), - "test2.example.net") - self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False), - "test2.example.net") - - - def testCaseInsensitiveFullMatch(self): - mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"] - # Between the two ts1 a full string match non-case insensitive should work - self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False), - None) - self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False), - "ts1.ex") - self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False), - "ts1.ex") - # Between the two ts2 only case differs, so only case-match works - self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False), - "ts2.ex") - self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False), - "Ts2.ex") - self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False), - None) - - class TestReadFile(testutils.GanetiTestCase): def testReadAll(self): @@ -977,103 +916,6 @@ class TestCreateBackup(testutils.GanetiTestCase): self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount) -class TestFormatUnit(unittest.TestCase): - """Test case for the FormatUnit function""" - - def testMiB(self): - self.assertEqual(FormatUnit(1, 'h'), '1M') - self.assertEqual(FormatUnit(100, 'h'), '100M') - self.assertEqual(FormatUnit(1023, 'h'), '1023M') - - self.assertEqual(FormatUnit(1, 'm'), '1') - self.assertEqual(FormatUnit(100, 'm'), '100') - self.assertEqual(FormatUnit(1023, 'm'), '1023') - - self.assertEqual(FormatUnit(1024, 'm'), '1024') - self.assertEqual(FormatUnit(1536, 'm'), '1536') - self.assertEqual(FormatUnit(17133, 'm'), '17133') - self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575') - - def testGiB(self): - self.assertEqual(FormatUnit(1024, 'h'), '1.0G') - self.assertEqual(FormatUnit(1536, 'h'), '1.5G') - self.assertEqual(FormatUnit(17133, 'h'), '16.7G') - self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G') - - self.assertEqual(FormatUnit(1024, 'g'), '1.0') - self.assertEqual(FormatUnit(1536, 'g'), '1.5') - self.assertEqual(FormatUnit(17133, 'g'), '16.7') - self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0') - - self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0') - self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0') - self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0') - - def testTiB(self): - self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T') - self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T') - self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T') - - self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0') - self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0') - self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1') - - def testErrors(self): - self.assertRaises(errors.ProgrammerError, FormatUnit, 1, "a") - - -class TestParseUnit(unittest.TestCase): - """Test case for the ParseUnit function""" - - SCALES = (('', 1), - ('M', 1), ('G', 1024), ('T', 1024 * 1024), - ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024), - ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024)) - - def testRounding(self): - self.assertEqual(ParseUnit('0'), 0) - self.assertEqual(ParseUnit('1'), 4) - self.assertEqual(ParseUnit('2'), 4) - self.assertEqual(ParseUnit('3'), 4) - - self.assertEqual(ParseUnit('124'), 124) - self.assertEqual(ParseUnit('125'), 128) - self.assertEqual(ParseUnit('126'), 128) - self.assertEqual(ParseUnit('127'), 128) - self.assertEqual(ParseUnit('128'), 128) - self.assertEqual(ParseUnit('129'), 132) - self.assertEqual(ParseUnit('130'), 132) - - def testFloating(self): - self.assertEqual(ParseUnit('0'), 0) - self.assertEqual(ParseUnit('0.5'), 4) - self.assertEqual(ParseUnit('1.75'), 4) - self.assertEqual(ParseUnit('1.99'), 4) - self.assertEqual(ParseUnit('2.00'), 4) - self.assertEqual(ParseUnit('2.01'), 4) - self.assertEqual(ParseUnit('3.99'), 4) - self.assertEqual(ParseUnit('4.00'), 4) - self.assertEqual(ParseUnit('4.01'), 8) - self.assertEqual(ParseUnit('1.5G'), 1536) - self.assertEqual(ParseUnit('1.8G'), 1844) - self.assertEqual(ParseUnit('8.28T'), 8682212) - - def testSuffixes(self): - for sep in ('', ' ', ' ', "\t", "\t "): - for suffix, scale in TestParseUnit.SCALES: - for func in (lambda x: x, str.lower, str.upper): - self.assertEqual(ParseUnit('1024' + sep + func(suffix)), - 1024 * scale) - - def testInvalidInput(self): - for sep in ('-', '_', ',', 'a'): - for suffix, _ in TestParseUnit.SCALES: - self.assertRaises(errors.UnitParseError, ParseUnit, '1' + sep + suffix) - - for suffix, _ in TestParseUnit.SCALES: - self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix) - - class TestParseCpuMask(unittest.TestCase): """Test case for the ParseCpuMask function.""" @@ -1253,22 +1095,6 @@ class TestGetMounts(unittest.TestCase): ]) -class TestShellQuoting(unittest.TestCase): - """Test case for shell quoting functions""" - - def testShellQuote(self): - self.assertEqual(ShellQuote('abc'), "abc") - self.assertEqual(ShellQuote('ab"c'), "'ab\"c'") - self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'") - self.assertEqual(ShellQuote("a b c"), "'a b c'") - self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'") - - def testShellQuoteArgs(self): - self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c") - self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c") - self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c") - - class TestListVisibleFiles(unittest.TestCase): """Test case for ListVisibleFiles""" @@ -1607,58 +1433,6 @@ class TestIsNormAbsPath(unittest.TestCase): self._pathTestHelper('/etc/', False) -class TestSafeEncode(unittest.TestCase): - """Test case for SafeEncode""" - - def testAscii(self): - for txt in [string.digits, string.letters, string.punctuation]: - self.failUnlessEqual(txt, SafeEncode(txt)) - - def testDoubleEncode(self): - for i in range(255): - txt = SafeEncode(chr(i)) - self.failUnlessEqual(txt, SafeEncode(txt)) - - def testUnicode(self): - # 1024 is high enough to catch non-direct ASCII mappings - for i in range(1024): - txt = SafeEncode(unichr(i)) - self.failUnlessEqual(txt, SafeEncode(txt)) - - -class TestFormatTime(unittest.TestCase): - """Testing case for FormatTime""" - - @staticmethod - def _TestInProcess(tz, timestamp, expected): - os.environ["TZ"] = tz - time.tzset() - return utils.FormatTime(timestamp) == expected - - def _Test(self, *args): - # Need to use separate process as we want to change TZ - self.assert_(utils.RunInSeparateProcess(self._TestInProcess, *args)) - - def test(self): - self._Test("UTC", 0, "1970-01-01 00:00:00") - self._Test("America/Sao_Paulo", 1292606926, "2010-12-17 15:28:46") - self._Test("Europe/London", 1292606926, "2010-12-17 17:28:46") - self._Test("Europe/Zurich", 1292606926, "2010-12-17 18:28:46") - self._Test("Australia/Sydney", 1292606926, "2010-12-18 04:28:46") - - def testNone(self): - self.failUnlessEqual(FormatTime(None), "N/A") - - def testInvalid(self): - self.failUnlessEqual(FormatTime(()), "N/A") - - def testNow(self): - # tests that we accept time.time input - FormatTime(time.time()) - # tests that we accept int input - FormatTime(int(time.time())) - - class RunInSeparateProcess(unittest.TestCase): def test(self): for exp in [True, False]: @@ -1725,37 +1499,6 @@ class TestFingerprintFiles(unittest.TestCase): self.assertEqual(utils.FingerprintFiles(self.results.keys()), self.results) -class TestUnescapeAndSplit(unittest.TestCase): - """Testing case for UnescapeAndSplit""" - - def setUp(self): - # testing more that one separator for regexp safety - self._seps = [",", "+", "."] - - def testSimple(self): - a = ["a", "b", "c", "d"] - for sep in self._seps: - self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a) - - def testEscape(self): - for sep in self._seps: - a = ["a", "b\\" + sep + "c", "d"] - b = ["a", "b" + sep + "c", "d"] - self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b) - - def testDoubleEscape(self): - for sep in self._seps: - a = ["a", "b\\\\", "c", "d"] - b = ["a", "b\\", "c", "d"] - self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b) - - def testThreeEscape(self): - for sep in self._seps: - a = ["a", "b\\\\\\" + sep + "c", "d"] - b = ["a", "b\\" + sep + "c", "d"] - self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b) - - class TestGenerateSelfSignedX509Cert(unittest.TestCase): def setUp(self): self.tmpdir = tempfile.mkdtemp() @@ -1995,40 +1738,6 @@ class TestMakedirs(unittest.TestCase): self.assert_(os.path.isdir(path)) -class TestLineSplitter(unittest.TestCase): - def test(self): - lines = [] - ls = utils.LineSplitter(lines.append) - ls.write("Hello World\n") - self.assertEqual(lines, []) - ls.write("Foo\n Bar\r\n ") - ls.write("Baz") - ls.write("Moo") - self.assertEqual(lines, []) - ls.flush() - self.assertEqual(lines, ["Hello World", "Foo", " Bar"]) - ls.close() - self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"]) - - def _testExtra(self, line, all_lines, p1, p2): - self.assertEqual(p1, 999) - self.assertEqual(p2, "extra") - all_lines.append(line) - - def testExtraArgsNoFlush(self): - lines = [] - ls = utils.LineSplitter(self._testExtra, lines, 999, "extra") - ls.write("\n\nHello World\n") - ls.write("Foo\n Bar\r\n ") - ls.write("") - ls.write("Baz") - ls.write("Moo\n\nx\n") - self.assertEqual(lines, []) - ls.close() - self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo", - "", "x"]) - - class TestReadLockedPidFile(unittest.TestCase): def setUp(self): self.tmpdir = tempfile.mkdtemp() @@ -2225,27 +1934,6 @@ class TestEnsureDirs(unittest.TestCase): os.umask(self.old_umask) -class TestFormatSeconds(unittest.TestCase): - def test(self): - self.assertEqual(utils.FormatSeconds(1), "1s") - self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s") - self.assertEqual(utils.FormatSeconds(3599), "59m 59s") - self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s") - self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s") - self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s") - self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s") - self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s") - self.assertEqual(utils.FormatSeconds(-1), "-1s") - self.assertEqual(utils.FormatSeconds(-282), "-282s") - self.assertEqual(utils.FormatSeconds(-29119), "-29119s") - - def testFloat(self): - self.assertEqual(utils.FormatSeconds(1.3), "1s") - self.assertEqual(utils.FormatSeconds(1.9), "2s") - self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s") - self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s") - - class TestIgnoreProcessNotFound(unittest.TestCase): @staticmethod def _WritePid(fd): @@ -2268,59 +1956,6 @@ class TestIgnoreProcessNotFound(unittest.TestCase): self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0)) -class TestShellWriter(unittest.TestCase): - def test(self): - buf = StringIO() - sw = utils.ShellWriter(buf) - sw.Write("#!/bin/bash") - sw.Write("if true; then") - sw.IncIndent() - try: - sw.Write("echo true") - - sw.Write("for i in 1 2 3") - sw.Write("do") - sw.IncIndent() - try: - self.assertEqual(sw._indent, 2) - sw.Write("date") - finally: - sw.DecIndent() - sw.Write("done") - finally: - sw.DecIndent() - sw.Write("echo %s", utils.ShellQuote("Hello World")) - sw.Write("exit 0") - - self.assertEqual(sw._indent, 0) - - output = buf.getvalue() - - self.assert_(output.endswith("\n")) - - lines = output.splitlines() - self.assertEqual(len(lines), 9) - self.assertEqual(lines[0], "#!/bin/bash") - self.assert_(re.match(r"^\s+date$", lines[5])) - self.assertEqual(lines[7], "echo 'Hello World'") - - def testEmpty(self): - buf = StringIO() - sw = utils.ShellWriter(buf) - sw = None - self.assertEqual(buf.getvalue(), "") - - -class TestCommaJoin(unittest.TestCase): - def test(self): - self.assertEqual(utils.CommaJoin([]), "") - self.assertEqual(utils.CommaJoin([1, 2, 3]), "1, 2, 3") - self.assertEqual(utils.CommaJoin(["Hello"]), "Hello") - self.assertEqual(utils.CommaJoin(["Hello", "World"]), "Hello, World") - self.assertEqual(utils.CommaJoin(["Hello", "World", 99]), - "Hello, World, 99") - - class TestFindMatch(unittest.TestCase): def test(self): data = { @@ -2516,15 +2151,5 @@ class TestWriteFile(unittest.TestCase): os.close(fd) -class TestNormalizeAndValidateMac(unittest.TestCase): - def testInvalid(self): - self.assertRaises(errors.OpPrereqError, - utils.NormalizeAndValidateMac, "xxx") - - def testNormalization(self): - for mac in ["aa:bb:cc:dd:ee:ff", "00:AA:11:bB:22:cc"]: - self.assertEqual(utils.NormalizeAndValidateMac(mac), mac.lower()) - - if __name__ == '__main__': testutils.GanetiTestProgram() -- GitLab