Commit 23963422 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Complete cli.utils unittests

Refs: #4058
parent 46d130c9
......@@ -67,6 +67,16 @@ def _write(w):
stdout.write(w)
def _flush():
"""stdout.flush wrapper is used to help unittests check what is called"""
stdout.flush()
def _readline():
"""stdout.readline wrapper is used to help unittests"""
return stdout.readline()
def suggest_missing(miss=None, exclude=[]):
global suggest
sgs = dict(suggest)
......@@ -295,17 +305,22 @@ def print_items(
page_hold(i + 1, page_size, len(items))
def format_size(size):
units = ('B', 'KiB', 'MiB', 'GiB', 'TiB')
def format_size(size, decimal_factors=False):
units = ('B', 'KB', 'MB', 'GB', 'TB') if decimal_factors else (
'B', 'KiB', 'MiB', 'GiB', 'TiB')
step = 1000 if decimal_factors else 1024
fstep = float(step)
try:
size = float(size)
except ValueError as err:
raiseCLIError(err, 'Cannot format %s in bytes' % size)
for unit in units:
if size < 1024:
except (ValueError, TypeError) as err:
raiseCLIError(err, 'Cannot format %s in bytes' % (
','.join(size) if isinstance(size, tuple) else size))
for i, unit in enumerate(units):
if size < step or i + 1 == len(units):
break
size /= 1024.0
size /= fstep
s = ('%.2f' % size)
s = s.replace('%s' % step, '%s.99' % (step - 1)) if size <= fstep else s
while '.' in s and s[-1] in ('0', '.'):
s = s[:-1]
return s + unit
......@@ -317,6 +332,9 @@ def to_bytes(size, format):
:param format: (case insensitive) KiB, KB, MiB, MB, GiB, GB, TiB, TB
:returns: (int) the size in bytes
:raises ValueError: if invalid size or format
:raises AttributeError: if format is not str
:raises TypeError: if size is not arithmetic or convertible to arithmetic
"""
format = format.upper()
if format == 'B':
......@@ -337,25 +355,25 @@ def to_bytes(size, format):
def dict2file(d, f, depth=0):
for k, v in d.items():
f.write('%s%s: ' % ('\t' * depth, k))
f.write('%s%s: ' % (' ' * INDENT_TAB * depth, k))
if isinstance(v, dict):
f.write('\n')
dict2file(v, f, depth + 1)
elif isinstance(v, list):
elif isinstance(v, list) or isinstance(v, tuple):
f.write('\n')
list2file(v, f, depth + 1)
else:
f.write(' %s\n' % v)
f.write('%s\n' % v)
def list2file(l, f, depth=1):
for item in l:
if isinstance(item, dict):
dict2file(item, f, depth + 1)
elif isinstance(item, list):
elif isinstance(item, list) or isinstance(item, tuple):
list2file(item, f, depth + 1)
else:
f.write('%s%s\n' % ('\t' * depth, item))
f.write('%s%s\n' % (' ' * INDENT_TAB * depth, item))
# Split input auxiliary
......@@ -365,38 +383,14 @@ def _parse_with_regex(line, regex):
return (re_parser.split(line), re_parser.findall(line))
def _sub_split(line):
terms = []
(sub_trivials, sub_interesting) = _parse_with_regex(line, ' ".*?" ')
for subi, subipart in enumerate(sub_interesting):
terms += sub_trivials[subi].split()
terms.append(subipart[2:-2])
terms += sub_trivials[-1].split()
return terms
def old_split_input(line):
"""Use regular expressions to split a line correctly"""
line = ' %s ' % line
(trivial_parts, interesting_parts) = _parse_with_regex(line, ' \'.*?\' ')
terms = []
for i, ipart in enumerate(interesting_parts):
terms += _sub_split(trivial_parts[i])
terms.append(ipart[2:-2])
terms += _sub_split(trivial_parts[-1])
return terms
def _get_from_parsed(parsed_str):
try:
parsed_str = parsed_str.strip()
except:
return None
if parsed_str:
if parsed_str[0] == parsed_str[-1] and parsed_str[0] in ("'", '"'):
return [parsed_str[1:-1]]
return parsed_str.split(' ')
return None
return ([parsed_str[1:-1]] if (
parsed_str[0] == parsed_str[-1] and parsed_str[0] in ("'", '"')) else (
parsed_str.split(' '))) if parsed_str else None
def split_input(line):
......@@ -405,8 +399,6 @@ def split_input(line):
reg_expr = '\'.*?\'|".*?"|^[\S]*$'
(trivial_parts, interesting_parts) = _parse_with_regex(line, reg_expr)
assert(len(trivial_parts) == 1 + len(interesting_parts))
#print(' [split_input] trivial_parts %s are' % trivial_parts)
#print(' [split_input] interesting_parts %s are' % interesting_parts)
terms = []
for i, tpart in enumerate(trivial_parts):
part = _get_from_parsed(tpart)
......@@ -428,53 +420,25 @@ def ask_user(msg, true_resp=('y', )):
:returns: (bool) True if reponse in true responses, False otherwise
"""
stdout.write('%s [%s/N]: ' % (msg, ', '.join(true_resp)))
stdout.flush()
user_response = stdin.readline()
_write('%s [%s/N]: ' % (msg, ', '.join(true_resp)))
_flush()
user_response = _readline()
return user_response[0].lower() in true_resp
def spiner(size=None):
spins = ('/', '-', '\\', '|')
stdout.write(' ')
_write(' ')
size = size or -1
i = 0
while size - i:
stdout.write('\b%s' % spins[i % len(spins)])
stdout.flush()
_write('\b%s' % spins[i % len(spins)])
_flush()
i += 1
sleep(0.1)
yield
yield
if __name__ == '__main__':
examples = [
'la_la le_le li_li',
'\'la la\' \'le le\' \'li li\'',
'\'la la\' le_le \'li li\'',
'la_la \'le le\' li_li',
'la_la \'le le\' \'li li\'',
'"la la" "le le" "li li"',
'"la la" le_le "li li"',
'la_la "le le" li_li',
'"la_la" "le le" "li li"',
'\'la la\' "le le" \'li li\'',
'la_la \'le le\' "li li"',
'la_la \'le le\' li_li',
'\'la la\' le_le "li li"',
'"la la" le_le \'li li\'',
'"la la" \'le le\' li_li',
'la_la \'le\'le\' "li\'li"',
'"la \'le le\' la"',
'\'la "le le" la\'',
'\'la "la" la\' "le \'le\' le" li_"li"_li',
'\'\' \'L\' "" "A"']
for i, example in enumerate(examples):
print('%s. Split this: (%s)' % (i + 1, example))
ret = old_split_input(example)
print('\t(%s) of size %s' % (ret, len(ret)))
def get_path_size(testpath):
if path.isfile(testpath):
......
......@@ -32,7 +32,7 @@
# or implied, of GRNET S.A.
from unittest import TestCase
#from tempfile import NamedTemporaryFile
from tempfile import NamedTemporaryFile
from mock import patch, call
from itertools import product
......@@ -286,6 +286,197 @@ class UtilsMethods(TestCase):
call(i + 1, page_size, len(items)))
ph_counter += 1
def test_format_size(self):
from kamaki.cli.utils import format_size
from kamaki.cli import CLIError
for v in ('wrong', {1: '1', 2: '2'}, ('tuples', 'not OK'), [1, 2]):
self.assertRaises(CLIError, format_size, v)
for step, B, K, M, G, T in (
(1000, 'B', 'KB', 'MB', 'GB', 'TB'),
(1024, 'B', 'KiB', 'MiB', 'GiB', 'TiB')):
Ki, Mi, Gi = step, step * step, step * step * step
for before, after in (
(0, '0' + B), (512, '512' + B), (
Ki - 1, '%s%s' % (step - 1, B)),
(Ki, '1' + K), (42 * Ki, '42' + K), (
Mi - 1, '%s.99%s' % (step - 1, K)),
(Mi, '1' + M), (42 * Mi, '42' + M), (
Ki * Mi - 1, '%s.99%s' % (step - 1, M)),
(Gi, '1' + G), (42 * Gi, '42' + G), (
Mi * Mi - 1, '%s.99%s' % (step - 1, G)),
(Mi * Mi, '1' + T), (42 * Mi * Mi, '42' + T), (
Mi * Gi - 1, '%s.99%s' % (step - 1, T)), (
42 * Mi * Gi, '%s%s' % (42 * Ki, T))):
self.assertEqual(format_size(before, step == 1000), after)
def test_to_bytes(self):
from kamaki.cli.utils import to_bytes
for v in ('wrong', 'KABUM', 'kbps', 'kibps'):
self.assertRaises(ValueError, to_bytes, v, 'B')
self.assertRaises(ValueError, to_bytes, 42, v)
for v in ([1, 2, 3], ('kb', 'mb'), {'kb': 1, 'byte': 2}):
self.assertRaises(TypeError, to_bytes, v, 'B')
self.assertRaises(AttributeError, to_bytes, 42, v)
kl, ki = 1000, 1024
for size, (unit, factor) in product(
(0, 42, 3.14, 1023, 10000),
(
('B', 1), ('b', 1),
('KB', kl), ('KiB', ki),
('mb', kl * kl), ('mIb', ki * ki),
('gB', kl * kl * kl), ('GIB', ki * ki * ki),
('TB', kl * kl * kl * kl), ('tiB', ki * ki * ki * ki))):
self.assertEqual(to_bytes(size, unit), int(size * factor))
def test_dict2file(self):
from kamaki.cli.utils import dict2file, INDENT_TAB
for d, depth in product((
{'k': 42},
{'k1': 'v1', 'k2': [1, 2, 3], 'k3': {'k': 'v'}},
{'k1': {
'k1.1': 'v1.1',
'k1.2': [1, 2, 3],
'k1.3': {'k': 'v'}}}),
(-42, 0, 42)):
exp = ''
exp_d = []
exp_l = []
exp, exp_d, exp_l = '', [], []
with NamedTemporaryFile() as f:
for k, v in d.items():
sfx = '\n'
if isinstance(v, dict):
exp_d.append(call(v, f, depth + 1))
elif isinstance(v, tuple) or isinstance(v, list):
exp_l.append(call(v, f, depth + 1))
else:
sfx = '%s\n' % v
exp += '%s%s: %s' % (
' ' * (depth * INDENT_TAB), k, sfx)
with patch('kamaki.cli.utils.dict2file') as D2F:
with patch('kamaki.cli.utils.list2file') as L2F:
dict2file(d, f, depth)
f.seek(0)
self.assertEqual(f.read(), exp)
self.assertEqual(L2F.mock_calls, exp_l)
self.assertEqual(D2F.mock_calls, exp_d)
def test_list2file(self):
from kamaki.cli.utils import list2file, INDENT_TAB
for l, depth in product(
(
(1, 2, 3),
[1, 2, 3],
('v', [1, 2, 3], (1, 2, 3), {'1': 1, 2: '2', 3: 3}),
['v', {'k1': 'v1', 'k2': [1, 2, 3], 'k3': {1: '1'}}]),
(-42, 0, 42)):
with NamedTemporaryFile() as f:
exp, exp_d, exp_l = '', [], []
for v in l:
if isinstance(v, dict):
exp_d.append(call(v, f, depth + 1))
elif isinstance(v, list) or isinstance(v, tuple):
exp_l.append(call(v, f, depth + 1))
else:
exp += '%s%s\n' % (' ' * INDENT_TAB * depth, v)
with patch('kamaki.cli.utils.dict2file') as D2F:
with patch('kamaki.cli.utils.list2file') as L2F:
list2file(l, f, depth)
f.seek(0)
self.assertEqual(f.read(), exp)
self.assertEqual(L2F.mock_calls, exp_l)
self.assertEqual(D2F.mock_calls, exp_d)
def test__parse_with_regex(self):
from re import compile as r_compile
from kamaki.cli.utils import _parse_with_regex
for args in product(
(
'this is a line',
'this_is_also_a_line',
'This "text" is quoted',
'This "quoted" "text" is more "complicated"',
'Is this \'quoted\' text "double \'quoted\' or not?"',
'"What \'about\' the" oposite?',
' Try with a " single double quote',
'Go "down \'deep " deeper \'bottom \' up" go\' up" !'),
(
'\'.*?\'|".*?"|^[\S]*$',
r'"([A-Za-z0-9_\./\\-]*)"',
r'\"(.+?)\"',
'\\^a\\.\\*\\$')):
r_parser = r_compile(args[1])
self.assertEqual(
_parse_with_regex(*args),
(r_parser.split(args[0]), r_parser.findall(args[0])))
def test_split_input(self):
from kamaki.cli.utils import split_input
for line, expected in (
('unparsable', ['unparsable']),
('"parsable"', ['parsable']),
('"parse" out', ['parse', 'out']),
('"one', ['"one']),
('two" or" more"', ['two', ' or', 'more"']),
('Go "down \'deep " deeper \'bottom \' up" go\' up" !', [
'Go', "down 'deep ", 'deeper', 'bottom ',
'up', " go' up", '!']),
('Is "this" a \'parsed\' string?', [
'Is', 'this', 'a', 'parsed', 'string?'])):
self.assertEqual(split_input(line), expected)
@patch('kamaki.cli.utils._readline', return_value='read line')
@patch('kamaki.cli.utils._flush')
@patch('kamaki.cli.utils._write')
def test_ask_user(self, WR, FL, RL):
from kamaki.cli.utils import ask_user
msg = 'some question'
self.assertFalse(ask_user(msg))
WR.assert_called_once_with('%s [y/N]: ' % msg)
FL.assert_called_once_with()
RL.assert_called_once_with()
self.assertTrue(ask_user(msg, ('r', )))
self.assertEqual(WR.mock_calls[-1], call('%s [r/N]: ' % msg))
self.assertEqual(FL.mock_calls, 2 * [call()])
self.assertEqual(RL.mock_calls, 2 * [call()])
self.assertTrue(ask_user(msg, ('Z', 'r', 'k')))
self.assertEqual(WR.mock_calls[-1], call('%s [Z, r, k/N]: ' % msg))
self.assertEqual(FL.mock_calls, 3 * [call()])
self.assertEqual(RL.mock_calls, 3 * [call()])
@patch('kamaki.cli.utils._flush')
@patch('kamaki.cli.utils._write')
def test_spiner(self, WR, FL):
from kamaki.cli.utils import spiner
spins = ('/', '-', '\\', '|')
prev = 1
for i, SP in enumerate(spiner(6)):
if not i:
self.assertEqual(WR.mock_calls[-2], call(' '))
elif i > 5:
break
self.assertEqual(SP, None)
self.assertEqual(WR.mock_calls[-1], call('\b%s' % spins[i % 4]))
self.assertEqual(FL.mock_calls, prev * [call()])
prev += 1
def test_remove_from_items(self):
from kamaki.cli.utils import remove_from_items
for v in ('wrong', [1, 2, 3], [{}, 2, {}]):
self.assertRaises(AssertionError, remove_from_items, v, 'none')
d = dict(k1=1, k2=dict(k2=2, k3=3), k3=3, k4=4)
for k in (d.keys() + ['kN']):
tmp1, tmp2 = dict(d), dict(d)
remove_from_items([tmp1, ], k)
tmp1.pop(k, None)
self.assert_dicts_are_equal(tmp1, tmp2)
for k in (d.keys() + ['kN']):
tmp1, tmp2 = dict(d), dict(d)
remove_from_items([tmp1, tmp2], k)
self.assert_dicts_are_equal(tmp1, tmp2)
if __name__ == '__main__':
from sys import argv
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment