Commit 3fc52244 authored by Stavros Sachtouris's avatar Stavros Sachtouris

Handle non-ascii characters in config files

Fixes grnet/kamaki#76

* Support non-ascii characters for all values
* Disallow non-ascii characters for all types of options
* Escape control characters in option names and values
parent 342eb1b2
......@@ -13,6 +13,8 @@ Bug fixes
* Warn or raise errors when the configuration file is inaccessible
[grnet/kamaki#71]
* Handle non-ascii characters when managing configuration options
[grnet/kamaki#76]
v0.13rc5
========
......
......@@ -103,10 +103,11 @@ class config_get(CommandInit):
return
section = 'global'
prefix = 'cloud.'
get, section = (
self.config.get_cloud, section[len(prefix):]) if (
section.startswith(prefix)) else (self.config.get, section)
get, section = (self.config.get_cloud, section[len(prefix):]) if (
section.startswith(prefix)) else (self.config.get, section)
value = get(section, key)
if value is None:
raise CLIError('%s not found' % option)
if isinstance(value, dict):
for k, v in value.items():
self.writeln('%s.%s.%s = %s' % (section, key, k, v))
......
......@@ -40,6 +40,7 @@ from ConfigParser import RawConfigParser, NoOptionError, NoSectionError, Error
from re import match
from kamaki.cli.errors import CLISyntaxError, CLIError
from kamaki.cli.utils import pref_enc, escape_ctrl_chars
from kamaki import __version__
try:
......@@ -163,6 +164,17 @@ class Config(RawConfigParser):
self.set_cloud(r, k, v)
self.remove_section(section)
@staticmethod
def assert_option(option):
if isinstance(option, unicode):
try:
option = str(option)
except UnicodeError, ue:
raise CLIError('Invalid config option %s' % option, details=[
'Illegal character(s) in config option name',
'Non-ascii characters are only allowed as values',
ue])
@staticmethod
def cloud_name(full_section_name):
if not full_section_name.startswith(CLOUD_PREFIX + ' '):
......@@ -238,7 +250,6 @@ class Config(RawConfigParser):
err.flush()
self.set_cloud('default', term, gval)
self.remove_option(s, term)
print 'CHECK'
for term, wrong, right in (
('ip', 'cyclades', 'network'),
('network', 'cyclades', 'network'),):
......@@ -348,6 +359,7 @@ class Config(RawConfigParser):
d = self.get(CLOUD_PREFIX, cloud) or dict()
except KeyError:
d = dict()
self.assert_option(option)
d[option] = value
self.set(CLOUD_PREFIX, cloud, d)
......@@ -362,7 +374,14 @@ class Config(RawConfigParser):
except KeyError:
d = {}
try:
d.update(RawConfigParser.items(self, section))
for k, v in RawConfigParser.items(self, section):
new_k, new_v = k, v
if isinstance(k, basestring) and not isinstance(k, unicode):
new_k = k.decode(pref_enc)
if isinstance(v, basestring) and not isinstance(v, unicode):
new_v = v.decode(pref_enc)
d[new_k] = new_v
# d.update(RawConfigParser.items(self, section))
except NoSectionError:
pass
return d
......@@ -385,7 +404,10 @@ class Config(RawConfigParser):
if section.startswith(prefix):
return self.get_cloud(section[len(prefix):], option)
try:
return RawConfigParser.get(self, section, option)
r = RawConfigParser.get(self, section, option)
if isinstance(r, str):
return r.decode(pref_enc, 'replace')
return r
except (NoSectionError, NoOptionError):
return DEFAULTS.get(section, {}).get(option)
......@@ -397,6 +419,7 @@ class Config(RawConfigParser):
:param value: str
"""
self.assert_option(option)
prefix = CLOUD_PREFIX + '.'
if section.startswith(prefix):
cloud = self.cloud_name(
......@@ -430,25 +453,19 @@ class Config(RawConfigParser):
def override(self, section, option, value):
self._overrides[section][option] = value
def safe_to_print(self):
dump = u'[global]\n'
for k, v in self.items('global'):
dump += u'%s = %s\n' % (escape_ctrl_chars(k), escape_ctrl_chars(v))
for r, d in self.items(CLOUD_PREFIX):
dump += u'\n[%s "%s"]\n' % (CLOUD_PREFIX, escape_ctrl_chars(r))
for k, v in d.items():
dump += u'%s = %s\n' % (
escape_ctrl_chars(k), escape_ctrl_chars(v))
return dump
def write(self):
cld_bu = self._get_dict(CLOUD_PREFIX)
try:
for r, d in self.items(CLOUD_PREFIX):
for k, v in d.items():
self.set(CLOUD_PREFIX + ' "%s"' % r, k, v)
self.remove_section(CLOUD_PREFIX)
with open(self.path, 'w') as f:
os.chmod(self.path, 0600)
f.write(HEADER.lstrip())
f.flush()
RawConfigParser.write(self, f)
except IOError as ioe:
raise CLIError(
'Cannot write to config file %s' % os.path.abspath(self.path),
importance=3, details=[type(ioe), ioe, ])
finally:
if CLOUD_PREFIX not in self.sections():
self.add_section(CLOUD_PREFIX)
for cloud, d in cld_bu.items():
self.set(CLOUD_PREFIX, cloud, d)
with open(self.path, mode='w') as f:
os.chmod(self.path, 0600)
f.write(HEADER.lstrip())
f.write(self.safe_to_print().encode(pref_enc, 'replace'))
......@@ -466,39 +466,55 @@ class Config(TestCase):
_cnf.override('sec', 'opt', 'val')
self.assertEqual(_cnf._overrides['sec']['opt'], 'val')
def test_write(self):
from kamaki.cli.config import Config, DEFAULTS
_cnf = Config(path=self.f.name)
def test_safe_to_print(self):
itemsd = {
'global': {
'opt1': 'v1',
'opt2': 2,
'opt3': u'\u03c4\u03b9\u03bc\u03ae',
'opt4': 'un\b\bdeleted'
}, 'cloud': {
'cld1': {'url': 'url1', 'token': 'token1'},
'cld2': {'url': u'\u03bf\u03c5\u03b1\u03c1\u03ad\u03bb'}
}
}
exp = '%s[global]\n' % HEADER
exp += ''.join([
'%s = %s\n' % (k, v) for k, v in DEFAULTS['global'].items()])
exp += '\n'
from kamaki.cli.config import Config
_cnf = Config(path=self.f.name)
bu_func = Config.items
try:
Config.items = lambda cls, opt: itemsd[opt].items()
saved = _cnf.safe_to_print().split('\n')
glb, cld = saved[:5], saved[6:]
self.assertEqual(u'[global]', glb[0])
self.assertTrue(u'opt1 = v1' in glb)
self.assertTrue(u'opt2 = 2' in glb)
self.assertTrue(u'opt3 = \u03c4\u03b9\u03bc\u03ae' in glb)
self.assertTrue(u'opt4 = un\\x08\\x08deleted' in glb)
self.assertTrue('[cloud "cld1"]' in cld)
cld1_i = cld.index('[cloud "cld1"]')
cld1 = cld[cld1_i: cld1_i + 3]
self.assertTrue('url = url1' in cld1)
self.assertTrue('token = token1' in cld1)
self.assertTrue('[cloud "cld2"]' in cld)
cld2_i = cld.index('[cloud "cld2"]')
self.assertEqual(
u'url = \u03bf\u03c5\u03b1\u03c1\u03ad\u03bb', cld[cld2_i + 1])
finally:
Config.items = bu_func
@patch('kamaki.cli.config.Config.safe_to_print', return_value='rv')
def test_write(self, stp):
from kamaki.cli.config import Config
_cnf = Config(path=self.f.name)
exp = '%s%s' % (HEADER, 'rv')
_cnf.write()
self.f.seek(0)
self.assertEqual(self.f.read(), exp)
stp.assert_called_once_with()
del _cnf
with NamedTemporaryFile() as f:
f.write('\n'.join(self.config_file_content))
f.flush()
_cnf = Config(path=f.name)
f.seek(0)
self.assertEqual(f.read(), '\n'.join(self.config_file_content))
_cnf.write()
f.seek(0)
file_contents = f.read()
for line in self.config_file_content:
self.assertTrue(line in file_contents)
_cnf.set('sec', 'opt', 'val')
_cnf.set('global', 'opt', 'val')
_cnf.set('global', 'file_cli', 'val')
_cnf.write()
f.seek(0)
file_contents = f.read()
for line in ('file_cli = val\n', '[sec]\n', 'opt = val\n'):
self.assertTrue(line in file_contents)
if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment