Commit fdcac7e2 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Merge branch 'feature-test-argument' into develop

parents 0b609d5a 264847a7
......@@ -41,16 +41,7 @@ from time import mktime
from logging import getLogger
from argparse import ArgumentParser, ArgumentError
from argparse import RawDescriptionHelpFormatter
from import ShadyBar as KamakiProgressBar
except ImportError:
from import Bar as KamakiProgressBar
except ImportError:
# progress not installed - pls, pip install progress
from import ShadyBar as KamakiProgressBar
log = getLogger(__name__)
......@@ -76,7 +67,7 @@ class Argument(object):
self, name)
assert name.startswith('-'), msg
self.default = default or (None if self.arity else False)
self.default = default if (default or self.arity) else False
def value(self):
......@@ -144,7 +135,7 @@ class ConfigArgument(Argument):
_config_arg = ConfigArgument('Path to config file')
class CmdLineConfigArgument(Argument):
class RuntimeConfigArgument(Argument):
"""Set a run-time setting option (not persistent)"""
def __init__(self, config_arg, help='', parsed_name=None, default=None):
......@@ -153,8 +144,7 @@ class CmdLineConfigArgument(Argument):
def value(self):
"""A key=val option"""
return super(self.__class__, self).value
return super(RuntimeConfigArgument, self).value
def value(self, options):
......@@ -223,11 +213,9 @@ class IntArgument(ValueArgument):
def value(self, newvalue):
if newvalue == self.default:
self._value = self.default
self._value = int(newvalue)
self._value = self.default if (
newvalue == self.default) else int(newvalue)
except ValueError:
'IntArgument Error',
......@@ -235,18 +223,10 @@ class IntArgument(ValueArgument):
class DateArgument(ValueArgument):
:value type: a string formated in an acceptable date format
:value returns: same date in first of DATE_FORMATS
DATE_FORMAT = '%a %b %d %H:%M:%S %Y'
"%a %b %d %H:%M:%S %Y",
"%A, %d-%b-%y %H:%M:%S GMT",
"%a, %d %b %Y %H:%M:%S GMT"]
INPUT_FORMATS = DATE_FORMATS + ["%d-%m-%Y", "%H:%M:%S %d-%m-%Y"]
INPUT_FORMATS = [DATE_FORMAT, '%d-%m-%Y', '%H:%M:%S %d-%m-%Y']
def timestamp(self):
......@@ -256,7 +236,7 @@ class DateArgument(ValueArgument):
def formated(self):
v = getattr(self, '_value', self.default)
return v.strftime(self.DATE_FORMATS[0]) if v else None
return v.strftime(self.DATE_FORMAT) if v else None
def value(self):
......@@ -264,8 +244,7 @@ class DateArgument(ValueArgument):
def value(self, newvalue):
if newvalue:
self._value = self.format_date(newvalue)
self._value = self.format_date(newvalue) if newvalue else self.default
def format_date(self, datestr):
for format in self.INPUT_FORMATS:
......@@ -273,12 +252,10 @@ class DateArgument(ValueArgument):
t = dtm.strptime(datestr, format)
except ValueError:
return t # .strftime(self.DATE_FORMATS[0])
'Date Argument Error',
details='%s not a valid date. correct formats:\n\t%s' % (
datestr, self.INPUT_FORMATS))
return t # .strftime(self.DATE_FORMAT)
raiseCLIError(None, 'Date Argument Error', details=[
'%s not a valid date' % datestr,
'Correct formats:\n\t%s' % self.INPUT_FORMATS])
class VersionArgument(FlagArgument):
......@@ -303,27 +280,29 @@ class KeyValueArgument(Argument):
:syntax: --<arg> key1=value1 --<arg> key2=value2 ...
def __init__(self, help='', parsed_name=None, default=[]):
def __init__(self, help='', parsed_name=None, default={}):
super(KeyValueArgument, self).__init__(-1, help, parsed_name, default)
def value(self):
:input: key=value
:output: {'key1':'value1', 'key2':'value2', ...}
:returns: (dict) {key1: val1, key2: val2, ...}
return super(KeyValueArgument, self).value
def value(self, keyvalue_pairs):
:param keyvalue_pairs: (str) ['key1=val1', 'key2=val2', ...]
self._value = {}
for pair in keyvalue_pairs:
key, sep, val = pair.partition('=')
if not sep:
CLISyntaxError('Argument syntax error '),
details='%s is missing a "=" (usage: key1=val1 )\n' % pair)
self._value[key.strip()] = val.strip()
for pair in keyvalue_pairs:
key, sep, val = pair.partition('=')
assert sep, ' %s misses a "=" (usage: key1=val1 )\n' % (pair)
self._value[key] = val
except Exception as e:
raiseCLIError(e, 'KeyValueArgument Syntax Error')
class ProgressBarArgument(FlagArgument):
......@@ -332,17 +311,11 @@ class ProgressBarArgument(FlagArgument):
def __init__(self, help='', parsed_name='', default=True):
self.suffix = '%(percent)d%%'
super(ProgressBarArgument, self).__init__(help, parsed_name, default)
except NameError:
log.warning('WARNING: no progress bar functionality')
def clone(self):
"""Get a modifiable copy of this bar"""
newarg = ProgressBarArgument(,
self.default), self.parsed_name, self.default)
newarg._value = self._value
return newarg
......@@ -384,7 +357,7 @@ _arguments = dict(
silent=FlagArgument('Do not output anything', ('-s', '--silent')),
verbose=FlagArgument('More info at response', ('-v', '--verbose')),
version=VersionArgument('Print current version', ('-V', '--version')),
_config_arg, 'Override a config value', ('-o', '--options'))
......@@ -395,12 +368,6 @@ _arguments = dict(
class ArgumentParseManager(object):
"""Manage (initialize and update) an ArgumentParser object"""
parser = None
_arguments = {}
_parser_modified = False
_parsed = None
_unparsed = None
def __init__(self, exe, arguments=None):
:param exe: (str) the basic command (e.g. 'kamaki')
......@@ -416,6 +383,7 @@ class ArgumentParseManager(object):
global _arguments
self.arguments = _arguments
self._parser_modified, self._parsed, self._unparsed = False, None, None
......@@ -429,13 +397,12 @@ class ArgumentParseManager(object):
def arguments(self):
"""(dict) arguments the parser should be aware of"""
""":returns: (dict) arguments the parser should be aware of"""
return self._arguments
def arguments(self, new_arguments):
if new_arguments:
assert isinstance(new_arguments, dict)
assert isinstance(new_arguments, dict), 'Arguments must be in a dict'
self._arguments = new_arguments
......@@ -458,8 +425,7 @@ class ArgumentParseManager(object):
:param arguments: if not given, update self.arguments instead
if not arguments:
arguments = self._arguments
arguments = arguments or self._arguments
for name, arg in arguments.items():
......@@ -31,16 +31,29 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
#from mock import patch, call
from mock import patch, call, MagicMock
from unittest import TestCase
from StringIO import StringIO
from sys import stdin, stdout
from datetime import datetime
#from itertools import product
from kamaki.cli import argument
from kamaki.cli import argument, errors
from kamaki.cli.config import Config
def assert_dicts_are_equal(test_case, d1, d2):
for k, v in d1.items():
test_case.assertTrue(k in d2)
if isinstance(v, dict):
test_case.assert_dicts_are_equal(v, d2[k])
test_case.assertEqual(unicode(v), unicode(d2[k]))
cnf_path = 'kamaki.cli.config.Config'
arg_path = 'kamaki.cli.argument'
class Argument(TestCase):
def test___init__(self):
......@@ -63,7 +76,7 @@ class Argument(TestCase):
isinstance(parsed_name, list)) else [parsed_name, ]
self.assertEqual(exp_name, a.parsed_name)
exp_default = default or (None if arity else False)
exp_default = default if (default or arity) else False
self.assertEqual(exp_default, a.default)
def test_value(self):
......@@ -89,12 +102,6 @@ class Argument(TestCase):
class ConfigArgument(TestCase):
# A cloud name in config with a URL but no TOKEN
SEMI_CLOUD = 'production'
# A cloud name that is not configured in config
def setUp(self):
argument._config_arg = argument.ConfigArgument('Recovered Path')
......@@ -110,19 +117,33 @@ class ConfigArgument(TestCase):
def test_get(self):
c = argument._config_arg
c.value = None
self.assertEqual(c.value.get('global', 'config_cli'), 'config')
with patch('%s.get' % cnf_path, return_value='config') as get:
self.assertEqual(c.value.get('global', 'config_cli'), 'config')
self.assertEqual(get.mock_calls[-1], call('global', 'config_cli'))
def test_groups(self):
@patch('%s.keys' % cnf_path, return_value=(
'image_cli', 'config_cli', 'history_cli', 'file'))
def test_groups(self, keys):
c = argument._config_arg
c.value = None
'image', 'config', 'history']))
cset = set(c.groups)
self.assertTrue(cset.issuperset(['image', 'config', 'history']))
self.assertEqual(keys.mock_calls[-1], call('global'))
self.assertFalse('file' in cset)
self.assertEqual(keys.mock_calls[-1], call('global'))
def test_cli_specs(self):
@patch('%s.items' % cnf_path, return_value=(
('image_cli', 'image'), ('file', 'pithos'),
('config_cli', 'config'), ('history_cli', 'history')))
def test_cli_specs(self, items):
c = argument._config_arg
c.value = None
cset = set(c.cli_specs)
('image', 'image'), ('config', 'config'), ('history', 'history')]))
self.assertEqual(items.mock_calls[-1], call('global'))
self.assertFalse(cset.issuperset([('file', 'pithos'), ]))
self.assertEqual(items.mock_calls[-1], call('global'))
def test_get_global(self):
c = argument._config_arg
......@@ -131,23 +152,335 @@ class ConfigArgument(TestCase):
('config_cli', 'config'),
('image_cli', 'image'),
('history_cli', 'history')):
self.assertEqual(c.get_global(k), v)
with patch('%s.get_global' % cnf_path, return_value=v) as gg:
self.assertEqual(c.get_global(k), v)
self.assertEqual(gg.mock_calls[-1], call(k))
def test_get_cloud(self):
"""test_get_cloud (!! hard-set SEMI/INVALID_CLOUD to run this !!)"""
c = argument._config_arg
c.value = None
if not self.SEMI_CLOUD:
'\n\tA cloud name set in config file with URL but no TOKEN: ')
self.SEMI_CLOUD = stdin.readline()[:-1]
self.assertTrue(len(c.get_cloud(self.SEMI_CLOUD, 'url')) > 0)
self.assertRaises(KeyError, c.get_cloud, self.SEMI_CLOUD, 'token')
with patch(
'%s.get_cloud' % cnf_path,
return_value='http://cloud') as get_cloud:
self.assertTrue(len(c.get_cloud('mycloud', 'url')) > 0)
self.assertEqual(get_cloud.mock_calls[-1], call('mycloud', 'url'))
with patch(
'%s.get_cloud' % cnf_path,
side_effect=KeyError('no token')) as get_cloud:
self.assertRaises(KeyError, c.get_cloud, 'mycloud', 'token')
self.assertRaises(KeyError, c.get_cloud, invalidcloud, 'url')
class RuntimeConfigArgument(TestCase):
def setUp(self):
argument._config_arg = argument.ConfigArgument('Recovered Path')
@patch('%s.Argument.__init__' % arg_path)
def test___init__(self, arg):
config, help, pname, default = 'config', 'help', 'pname', 'default'
rca = argument.RuntimeConfigArgument(config, help, pname, default)
self.assertTrue(isinstance(rca, argument.RuntimeConfigArgument))
self.assertEqual(rca._config_arg, config)
self.assertEqual(arg.mock_calls[-1], call(1, help, pname, default))
@patch('%s.override' % cnf_path)
def test_value(self, override):
config, help, pname, default = argument._config_arg, 'help', '-n', 'df'
config.value = None
rca = argument.RuntimeConfigArgument(config, help, pname, default)
self.assertEqual(rca.value, default)
for options in ('grp', 'grp.opt', 'k v', '=nokey', 2.8, None, 42, ''):
self.assertRaises(TypeError, rca.value, options)
for options in ('key=val', 'grp.key=val', 'dotted.opt.key=val'):
rca.value = options
option, sep, val = options.partition('=')
grp, sep, key = option.partition('.')
grp, key = (grp, key) if key else ('global', grp)
self.assertEqual(override.mock_calls[-1], call(grp, key, val))
class FlagArgument(TestCase):
@patch('%s.Argument.__init__' % arg_path)
def test___init__(self, arg):
help, pname, default = 'help', 'pname', 'default'
fa = argument.FlagArgument(help, pname, default)
self.assertTrue(isinstance(fa, argument.FlagArgument))
arg.assert_called_once(0, help, pname, default)
class ValueArgument(TestCase):
@patch('%s.Argument.__init__' % arg_path)
def test___init__(self, arg):
help, pname, default = 'help', 'pname', 'default'
fa = argument.ValueArgument(help, pname, default)
self.assertTrue(isinstance(fa, argument.ValueArgument))
arg.assert_called_once(1, help, pname, default)
class IntArgument(TestCase):
def test_value(self):
ia = argument.IntArgument(parsed_name='--ia')
self.assertEqual(ia.value, None)
for v in (1, 0, -1, 923455555555555555555555555555555):
ia.value = v
self.assertEqual(ia.value, v)
for v in ('1', '-1', 2.8):
ia.value = v
self.assertEqual(ia.value, int(v))
for v, err in (
('invalid', errors.CLIError),
(None, TypeError), (False, TypeError), ([1, 2, 3], TypeError)):
ia.value = v
except Exception as e:
self.assertTrue(isinstance(e, err))
class DateArgument(TestCase):
def test_timestamp(self):
da = argument.DateArgument(parsed_name='--date')
self.assertEqual(da.timestamp, None)
date, format, exp = '24-10-1917', '%d-%m-%Y', -1646964000.0
da._value = argument.dtm.strptime(date, format)
self.assertEqual(da.timestamp, exp)
def test_formated(self):
da = argument.DateArgument(parsed_name='--date')
self.assertEqual(da.formated, None)
date, format, exp = (
'24-10-1917', '%d-%m-%Y', 'Wed Oct 24 00:00:00 1917')
da._value = argument.dtm.strptime(date, format)
self.assertEqual(da.formated, exp)
@patch('%s.DateArgument.timestamp' % arg_path)
@patch('%s.DateArgument.format_date' % arg_path)
def test_value(self, format_date, timestamp):
da = argument.DateArgument(parsed_name='--date')
self.assertTrue(isinstance(da.value, MagicMock))
da.value = 'Something'
def test_format_date(self):
da = argument.DateArgument(parsed_name='--date')
for datestr, exp in (
('Wed Oct 24 01:02:03 1917', datetime(1917, 10, 24, 1, 2, 3)),
('24-10-1917', datetime(1917, 10, 24)),
('01:02:03 24-10-1917', datetime(1917, 10, 24, 1, 2, 3))):
self.assertEqual(da.format_date(datestr), exp)
for datestr, err in (
('32-40-20134', errors.CLIError),
('Wednesday, 24 Oct 2017', errors.CLIError),
(None, TypeError), (0.8, TypeError)):
self.assertRaises(err, da.format_date, datestr)
class VersionArgument(TestCase):
def test_value(self):
va = argument.VersionArgument(parsed_name='--version')
self.assertTrue(va, argument.VersionArgument)
va.value = 'some value'
self.assertEqual(va.value, 'some value')
class KeyValueArgument(TestCase):
@patch('%s.Argument.__init__' % arg_path)
def test___init__(self, init):
help, pname, default = 'help', 'pname', 'default'
kva = argument.KeyValueArgument(help, pname, default)
self.assertTrue(isinstance(kva, argument.KeyValueArgument))
self.assertEqual(init.mock_calls[-1], call(-1, help, pname, default))
def test_value(self):
kva = argument.KeyValueArgument(parsed_name='--keyval')
self.assertEqual(kva.value, {})
for kvpairs in (
'strval', 'key=val', 2.8, 42, None,
('key', 'val'), ('key val'), ['=val', 'key=val'],
['key1=val1', 'key2 val2'], ('key1 = val1', )):
kva.value = kvpairs
except Exception as e:
self.assertTrue(isinstance(e, errors.CLIError))
for kvpairs, exp in (
(('key=val', ), {'key': 'val'}),
(['key1=val1', 'key2=val2'], {'key1': 'val1', 'key2': 'val2'}),
('k1=v1 v2', 'k3=', 'k 4=v4'),
{'k1': 'v1 v2', 'k3': '', 'k 4': 'v4'}),
(('k=v1', 'k=v2', 'k=v3'), {'k': 'v3'})
kva.value = kvpairs
assert_dicts_are_equal(self, kva.value, exp)
class ProgressBarArgument(TestCase):
class PseudoBar(object):
message = ''
suffix = ''
def start():
@patch('%s.FlagArgument.__init__' % arg_path)
def test___init__(self, init):
help, pname, default = 'help', '--progress', 'default'
pba = argument.ProgressBarArgument(help, pname, default)
self.assertTrue(isinstance(pba, argument.ProgressBarArgument))
self.assertEqual(pba.suffix, '%(percent)d%%')
init.assert_called_once(help, pname, default)
def test_clone(self):
pba = argument.ProgressBarArgument(parsed_name='--progress')
pba.value = None
pba_clone = pba.clone()
self.assertTrue(isinstance(pba, argument.ProgressBarArgument))
self.assertTrue(isinstance(pba_clone, argument.ProgressBarArgument))
self.assertNotEqual(pba, pba_clone)
self.assertEqual(pba.parsed_name, pba_clone.parsed_name)
def test_get_generator(self):
pba = argument.ProgressBarArgument(parsed_name='--progress')
pba.value = None
msg, msg_len = 'message', 40
with patch('%s.KamakiProgressBar.start' % arg_path) as start:
pba.get_generator(msg, msg_len)
self.assertTrue(isinstance(, argument.KamakiProgressBar))
self.assertNotEqual(, msg)
self.assertEqual(, '%s%s' % (msg, ' ' * (msg_len - len(msg))))
self.assertEqual(, '%(percent)d%% - %(eta)ds')
def test_finish(self):
pba = argument.ProgressBarArgument(parsed_name='--progress')
pba.value = None
self.assertEqual(pba.finish(), None) = argument.KamakiProgressBar()
with patch('%s.KamakiProgressBar.finish' % arg_path) as finish:
class ArgumentParseManager(TestCase):
@patch('%s.ArgumentParseManager.parse' % arg_path)
@patch('%s.ArgumentParseManager.update_parser' % arg_path)
def test___init__(self, parse, update_parser):
for arguments in (None, {'k1': 'v1', 'k2': 'v2'}):
apm = argument.ArgumentParseManager('exe', arguments)
self.assertTrue(isinstance(apm, argument.ArgumentParseManager))
self.assertTrue(isinstance(apm.parser, argument.ArgumentParser))
apm.syntax, 'exe <cmd_group> [<cmd_subbroup> ...] <cmd>')
self, apm.arguments,
arguments or argument._arguments)
self.assertEqual(apm._parsed, None)
self.assertEqual(apm._unparsed, None)
self.assertEqual(parse.mock_calls[-1], call())
if arguments:
def test_syntax(self):
apm = argument.ArgumentParseManager('exe')
apm.syntax, 'exe <cmd_group> [<cmd_subbroup> ...] <cmd>')
apm.syntax = 'some syntax'
self.assertEqual(apm.syntax, 'some syntax')
@patch('%s.ArgumentParseManager.update_parser' % arg_path)
def test_arguments(self, update_parser):
apm = argument.ArgumentParseManager('exe')
assert_dicts_are_equal(self, apm.arguments, argument._arguments)
exp = {'k1': 'v1', 'k2': 'v2'}
apm.arguments = exp
assert_dicts_are_equal(self, apm.arguments, exp)
self.assertEqual(update_parser.mock_calls[-1], call())
apm.arguments = None
except Exception as e:
self.assertTrue(isinstance(e, AssertionError))
@patch('%s.ArgumentParseManager.parse' % arg_path)
def test_parsed