Commit c5997814 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Implement tests for _cloud_name, _rescue_old_file

Refs: #4058
parent f767b407
......@@ -33,6 +33,7 @@
import os
from logging import getLogger
from sys import stdout, stderr
from collections import defaultdict
from ConfigParser import RawConfigParser, NoOptionError, NoSectionError, Error
......@@ -48,7 +49,7 @@ except ImportError:
class InvalidCloudNameError(Error):
"""A valid cloud name must pass through this regex: ([~@#$:-\w]+)"""
"""A valid cloud name must pass through this regex: ([~@#$:.-\w]+)"""
log = getLogger(__name__)
......@@ -89,7 +90,6 @@ DEFAULTS = {
# Optional command specs:
# 'livetest_cli': 'livetest',
# 'astakos_cli': 'snf-astakos'
# 'floating_cli': 'cyclades'
},
CLOUD_PREFIX: {
#'default': {
......@@ -116,6 +116,7 @@ except ImportError:
class Config(RawConfigParser):
def __init__(self, path=None, with_defaults=True):
RawConfigParser.__init__(self, dict_type=OrderedDict)
self.path = path or os.environ.get(CONFIG_ENV, CONFIG_PATH)
......@@ -135,14 +136,14 @@ class Config(RawConfigParser):
def _cloud_name(full_section_name):
if not full_section_name.startswith(CLOUD_PREFIX + ' '):
return None
matcher = match(CLOUD_PREFIX + ' "([~@#$:\-\w]+)"', full_section_name)
matcher = match(CLOUD_PREFIX + ' "([~@#$.:\-\w]+)"', full_section_name)
if matcher:
return matcher.groups()[0]
else:
icn = full_section_name[len(CLOUD_PREFIX) + 1:]
raise InvalidCloudNameError('Invalid Cloud Name %s' % icn)
def rescue_old_file(self):
def rescue_old_file(self, err=stderr):
lost_terms = []
global_terms = DEFAULTS['global'].keys()
translations = dict(
......@@ -165,7 +166,7 @@ class Config(RawConfigParser):
self.set('global', 'default_' + CLOUD_PREFIX, 'default')
for s in self.sections():
if s in ('global'):
if s in ('global', ):
# global.url, global.token -->
# cloud.default.url, cloud.default.token
for term in set(self.keys(s)).difference(global_terms):
......@@ -175,27 +176,34 @@ class Config(RawConfigParser):
self.remove_option(s, term)
continue
gval = self.get(s, term)
default_cloud = self.get(
'global', 'default_cloud') or 'default'
try:
cval = self.get_cloud('default', term)
cval = self.get_cloud(default_cloud, term)
except KeyError:
cval = ''
if gval and cval and (
gval.lower().strip('/') != cval.lower().strip('/')):
raise CLISyntaxError(
'Conflicting values for default %s' % term,
'Conflicting values for default %s' % (
term),
importance=2, details=[
' global.%s: %s' % (term, gval),
' %s.default.%s: %s' % (
CLOUD_PREFIX, term, cval),
' %s.%s.%s: %s' % (
CLOUD_PREFIX,
default_cloud,
term,
cval),
'Please remove one of them manually:',
' /config delete global.%s' % term,
' or'
' /config delete %s.default.%s' % (
CLOUD_PREFIX, term),
' /config delete %s.%s.%s' % (
CLOUD_PREFIX, default_cloud, term),
'and try again'])
elif gval:
print('... rescue %s.%s => %s.default.%s' % (
s, term, CLOUD_PREFIX, term))
err.write(u'... rescue %s.%s => %s.%s.%s\n' % (
s, term, CLOUD_PREFIX, default_cloud, term))
err.flush()
self.set_cloud('default', term, gval)
self.remove_option(s, term)
# translation for <service> or <command> settings
......@@ -206,20 +214,24 @@ class Config(RawConfigParser):
k = 'file'
v = self.get(s, k)
if v:
print('... rescue %s.%s => global.%s_%s' % (
err.write(u'... rescue %s.%s => global.%s_%s\n' % (
s, k, s, k))
err.flush()
self.set('global', '%s_%s' % (s, k), v)
self.remove_option(s, k)
trn = translations[s]
for k, v in self.items(s, False):
if v and k in ('cli',):
print('... rescue %s.%s => global.%s_cli' % (
err.write(u'... rescue %s.%s => global.%s_cli\n' % (
s, k, trn['cmd']))
err.flush()
self.set('global', '%s_cli' % trn['cmd'], v)
elif k in ('container',) and trn['serv'] in ('pithos',):
print('... rescue %s.%s => %s.default.pithos_%s' % (
s, k, CLOUD_PREFIX, k))
err.write(
u'... rescue %s.%s => %s.default.pithos_%s\n' % (
s, k, CLOUD_PREFIX, k))
err.flush()
self.set_cloud('default', 'pithos_%s' % k, v)
else:
lost_terms.append('%s.%s = %s' % (s, k, v))
......@@ -227,17 +239,21 @@ class Config(RawConfigParser):
# self.pretty_print()
return lost_terms
def pretty_print(self):
def pretty_print(self, out=stdout):
for s in self.sections():
print s
out.write(s)
out.flush()
for k, v in self.items(s):
if isinstance(v, dict):
print '\t', k, '=> {'
out.write(u'\t%s => {\n' % k)
out.flush()
for ki, vi in v.items():
print '\t\t', ki, '=>', vi
print('\t}')
out.write(u'\t\t%s => %s\n' % (ki, vi))
out.flush()
out.write(u'\t}\n')
else:
print '\t', k, '=>', v
out.write(u'\t %s => %s\n' % (k, v))
out.flush()
def guess_version(self):
"""
......
......@@ -35,6 +35,8 @@ from mock import patch, call
from unittest import TestCase
from itertools import product
import os
from tempfile import NamedTemporaryFile
from io import StringIO
def _2steps_gen(limit=2):
......@@ -92,8 +94,7 @@ class Config(TestCase):
self.assertEqual(c_items.mock_calls[-2:], gen_call)
c_set_cloud_num += 4
self.assertEqual(c_set_cloud.mock_calls[-4:], [
call(r, 'k1', 'v1'), call(r, 'k2', 'v2'),
call(r, 'k1', 'v1'), call(r, 'k2', 'v2')])
call(r, 'k1', 'v1'), call(r, 'k2', 'v2')] * 2)
c_remove_section_num += 2
self.assertEqual(
c_remove_section.mock_calls[-2:], gen_call)
......@@ -102,6 +103,116 @@ class Config(TestCase):
self.assertEqual(
len(c_remove_section.mock_calls), c_remove_section_num)
def test__cloud_name(self):
from kamaki.cli.config import (
Config, CLOUD_PREFIX, InvalidCloudNameError)
cn = Config._cloud_name
self.assertEqual(cn('non%s name' % CLOUD_PREFIX), None)
for invalid in ('"!@#$%^&())_"', '"a b c"', u'"\xce\xcd"', 'naked'):
self.assertRaises(
InvalidCloudNameError, cn, '%s %s' % (CLOUD_PREFIX, invalid))
for valid in ('word', '~okeanos', 'd0t.ted', 'ha$h#ed'):
self.assertEqual(cn('%s "%s"' % (CLOUD_PREFIX, valid)), valid)
def test_rescue_old_file(self):
from kamaki.cli.config import Config
content0 = [
'#kamaki config file version 0.9\n',
'[global]\n',
'max_threads = 5\n',
'default_cloud = ~mycloud\n',
'file_cli = pithos\n',
'history_file = /home/user/.kamaki.history\n',
'colors = off\n',
'config_cli = config\n',
'history_cli = history\n',
'log_token = off\n',
'server_cli = cyclades\n',
'user_cli = astakos\n',
'log_data = off\n',
'flavor_cli = cyclades\n',
'image_cli = image\n',
'log_file = /home/user/.kamaki.log\n',
'network_cli = cyclades\n',
'log_pid = off\n',
'\n',
'[cloud "demo"]\n',
'url = https://demo.example.com\n',
'token = t0k3n-0f-d3m0-3x4mp13\n',
'\n',
'[cloud "~mycloud"]\n',
'url = https://example.com\n',
'pithos_container = images\n']
def make_file(lines):
f = NamedTemporaryFile()
f.writelines(lines)
f.flush()
return f
with make_file(content0) as f:
_cnf = Config(path=f.name)
self.assertEqual([], _cnf.rescue_old_file())
del _cnf
content1, sample = list(content0), 'xyz_cli = XYZ_specs'
content1.insert(2, '%s\n' % sample)
with make_file(content1) as f:
_cnf = Config(path=f.name)
self.assertEqual(['global.%s' % sample], _cnf.rescue_old_file())
del _cnf
content2, sample = list(content0), 'http://www.example2.org'
content2.insert(2, 'url = %s\n' % sample)
err = StringIO()
with make_file(content2) as f:
_cnf = Config(path=f.name)
self.assertEqual([], _cnf.rescue_old_file(err=err))
self.assertEqual(
'... rescue global.url => cloud.default.url\n', err.getvalue())
self.assertEqual(sample, _cnf.get_cloud('default', 'url'))
del _cnf
content3 = list(content0)
content3.insert(
2, 'url = http://example1.com\nurl = http://example2.com\n')
with make_file(content3) as f:
_cnf = Config(path=f.name)
self.assertEqual([], _cnf.rescue_old_file(err=err))
self.assertEqual(
2 * '... rescue global.url => cloud.default.url\n',
err.getvalue())
self.assertEqual(
'http://example2.com', _cnf.get_cloud('default', 'url'))
del _cnf
content4 = list(content0)
content4.insert(2, 'url = http://example1.com\n')
content4.append('\n[cloud "default"]\nurl=http://example2.com\n')
with make_file(content4) as f:
_cnf = Config(path=f.name)
from kamaki.cli.errors import CLISyntaxError
self.assertRaises(CLISyntaxError, _cnf.rescue_old_file)
del _cnf
content5 = list(content0)
extras = [
('pithos_cli', 'pithos'), ('store_cli', 'pithos'),
('storage_cli', 'pithos'), ('compute_cli', 'cyclades'),
('cyclades_cli', 'cyclades')]
for sample in extras:
content5.insert(2, '%s = %s\n' % sample)
with make_file(content5) as f:
_cnf = Config(path=f.name)
self.assertEqual(
sorted(['global.%s = %s' % sample for sample in extras]),
sorted(_cnf.rescue_old_file()))
if __name__ == '__main__':
from sys import argv
......
......@@ -101,7 +101,7 @@ def run(cloud, parser, _help):
cls = cmd.cmd_class
auth_base = init_cached_authenticator(
_cnf.get_cloud(cloud, 'url'), _cnf.get_cloud(cloud, 'token').split(),
_cnf, kloger)
_cnf, kloger) if cloud else None
executable = cls(parser.arguments, auth_base, cloud)
parser.update_arguments(executable.arguments)
for term in _best_match:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment