Commit 07cab2e5 authored by Stavros Sachtouris's avatar Stavros Sachtouris Committed by Giorgos Korfiatis
Browse files

Implement a config system and a CLI

The config system is an extension of the kamaki config system,
tailored to support configuration files with multiple clouds and
multiple syncs. A sync contains all the attributes needed to sync
a local directory with a remote container, included a reference to
a cloud.

The CLI is interactive and can be used to start, pause and control
the syncing process.
parent 0efad943
from wsgiref.simple_server import make_server
# from ws4py.websocket import EchoWebSocket
from protocol import WebSocketProtocol
from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler
from ws4py.server.wsgiutils import WebSocketWSGIApplication
from ws4py.client import WebSocketBaseClient
from tempfile import NamedTemporaryFile
import subprocess
import json
from os.path import abspath
from threading import Thread
from hashlib import sha1
import os
import logging
LOG = logging.getLogger(__name__)
class GUI(WebSocketBaseClient):
"""Launch the GUI when the helper server is ready"""
def __init__(self, addr, gui_exec_path, gui_id):
"""Initialize the GUI Launcher"""
super(GUI, self).__init__(addr)
self.addr = addr
self.gui_exec_path = gui_exec_path
self.gui_id = gui_id
self.start = self.connect
def run_gui(self):
"""Launch the GUI and keep it running, clean up afterwards.
If the GUI is terminated for some reason, the WebSocket is closed and
the temporary file with GUI settings is deleted.
In windows, the file must be closed before the GUI is launched.
"""
# NamedTemporaryFile creates a file accessible only to current user
LOG.debug('Create temporary file')
with NamedTemporaryFile(delete=False) as fp:
json.dump(dict(gui_id=self.gui_id, address=self.addr), fp)
# subprocess.call blocks the execution
LOG.debug('RUN: %s %s' % (self.gui_exec_path, fp.name))
subprocess.call([
'/home/saxtouri/node-webkit-v0.11.6-linux-x64/nw',
# self.gui_exec_path,
abspath('gui/gui.nw'),
fp.name])
LOG.debug('GUI process closed, remove temp file')
os.remove(fp.name)
def handshake_ok(self):
"""If handshake is OK, the helper is UP, so the GUI can be launched"""
self.run_gui()
LOG.debug('Close GUI wrapper connection')
self.close()
class HelperServer(object):
"""Agkyra Helper Server sets a WebSocket server with the Helper protocol
It also provided methods for running and killing the Helper server
:param gui_id: Only the GUI with this ID is allowed to chat with the Helper
"""
def __init__(self, port=0):
"""Setup the helper server"""
self.gui_id = sha1(os.urandom(128)).hexdigest()
WebSocketProtocol.gui_id = self.gui_id
server = make_server(
'', port,
server_class=WSGIServer,
handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=WebSocketProtocol))
server.initialize_websockets_manager()
self.server, self.port = server, server.server_port
def start(self):
"""Start the helper server in a thread"""
Thread(target=self.server.serve_forever).start()
def shutdown(self):
"""Shutdown the server (needs another thread) and join threads"""
t = Thread(target=self.server.shutdown)
t.start()
t.join()
def run(gui_exec_path):
"""Prepare helper and GUI and run them in the proper order"""
server = HelperServer()
addr = 'ws://localhost:%s' % server.port
gui = GUI(addr, gui_exec_path, server.gui_id)
LOG.info('Start helper server')
server.start()
try:
LOG.info('Start GUI')
gui.start()
except KeyboardInterrupt:
LOG.info('Shutdown GUI')
gui.close()
LOG.info('Shutdown helper server')
server.shutdown()
if __name__ == '__main__':
logging.basicConfig(filename='agkyra.log', level=logging.DEBUG)
run(abspath('gui/app'))
__version__ = '0.1'
import cmd
import sys
import logging
import json
from titanic import setup, syncer
from titanic.pithos_client import PithosFileClient
from titanic.localfs_client import FilesystemFileClient
import config
# from config import AgkyraConfig
logging.basicConfig(filename='agkyra.log', level=logging.DEBUG)
LOG = logging.getLogger(__name__)
setup.GLOBAL_SETTINGS_NAME = config.AGKYRA_DIR
class AgkyraCLI(cmd.Cmd):
"""The CLI for """
cnf = config.AgkyraConfig()
is_shell = False
def init(self):
"""initialize syncer"""
# Read settings
sync = self.cnf.get('global', 'default_sync')
LOG.info('Using sync: %s' % sync)
cloud = self.cnf.get_sync(sync, 'cloud')
url = self.cnf.get_cloud(cloud, 'url')
token = self.cnf.get_cloud(cloud, 'token')
container = self.cnf.get_sync(sync, 'container')
directory = self.cnf.get_sync(sync, 'directory')
# Prepare syncer settings
self.settings = setup.SyncerSettings(
sync, url, token, container, directory)
LOG.info('Local: %s' % directory)
LOG.info('Remote: %s of %s' % (container, url))
# self.exclude = self.cnf.get_sync(sync, 'exclude')
# Init syncer
master = PithosFileClient(self.settings)
slave = FilesystemFileClient(self.settings)
self.syncer = syncer.FileSyncer(self.settings, master, slave)
def preloop(self):
"""This runs when the shell loads"""
if not self.is_shell:
self.is_shell = True
self.prompt = '\xe2\x9a\x93 '
self.init()
self.default('')
def print_option(self, section, name, option):
"""Print a configuration option"""
section = '%s.%s' % (section, name) if name else section
value = self.cnf.get(section, option)
print ' %s: %s' % (option, value)
def list_section(self, section, name):
"""list contents of a section"""
content = dict(self.cnf.items(section))
if section in 'global' and name:
self.print_option(section, '', name)
else:
if name:
content = content[name]
for option in content.keys():
self.print_option(section, name, option)
def list_section_type(self, section):
"""print the contents of a configuration section"""
names = ['', ] if section in ('global', ) else self.cnf.keys(section)
assert names, 'Section %s not found' % section
for name in names:
print section, name
self.list_section(section, name)
def list_sections(self):
"""List all configuration sections"""
for section in self.cnf.sections():
self.list_section_type(section)
def do_list(self, line):
"""List current settings (\"help list\" for details)
list global List all settings
list global <option> Get the value of this global option
list cloud List all clouds
list cloud <name> List all options of a cloud
list cloud <name> <option> Get the value of this cloud option
list sync List all syncs
list sync <name> List all options of a sync
list sync <name> <option> Get the value of this sync option
"""
args = line.split()
try:
{
0: self.list_sections,
1: self.list_section_type,
2: self.list_section,
3: self.print_option
}[len(args)](*args)
except Exception as e:
sys.stderr.write('%s\n' % e)
cmd.Cmd.do_help(self, 'list')
def set_global_setting(self, section, option, value):
assert section in ('global'), 'Syntax error'
self.cnf.set(section, option, value)
def set_setting(self, section, name, option, value):
assert section in self.sections(), 'Syntax error'
self.cnf.set('%s.%s' % (section, name), option, value)
def do_set(self, line):
"""Set a setting"""
args = line.split()
try:
{
3: self.set_global_setting,
4: self.set_setting
}[len(args)](*args)
self.cnf.write()
except Exception as e:
sys.stderr.write('%s\n' % e)
cmd.Cmd.do_help(self, 'set')
def do_start(self, line):
"""Start syncing"""
self.syncer.run()
def do_pause(self, line):
"""Pause syncing"""
def do_status(self, line):
"""Get current status (running/paused, progress)"""
print 'I have no idea'
# def do_shell(self, line):
# """Run system, shell commands"""
# if getattr(self, 'is_shell'):
# os.system(line)
# else:
# try:
# self.prompt = '\xe2\x9a\x93 '
# self.is_shell = True
# finally:
# self.init()
# self.cmdloop()
def do_help(self, line):
"""List commands with \"help\" or detailed help with \"help cmd\""""
if not line:
self.default(line)
cmd.Cmd.do_help(self, line)
def do_quit(self, line):
"""Quit Agkyra shell"""
return True
def default(self, line):
"""print help"""
sys.stderr.write('Usage:\t%s <command> [args]\n\n' % self.prompt)
for arg in [c for c in self.get_names() if c.startswith('do_')]:
sys.stderr.write('%s\t' % arg[3:])
method = getattr(self, arg)
sys.stderr.write(method.__doc__.split('\n')[0] + '\n')
sys.stderr.write('\n')
def emptyline(self):
if not self.is_shell:
return self.default('')
def run_onecmd(self, argv):
self.prompt = argv[0]
self.init()
self.onecmd(' '.join(argv[1:]))
# AgkyraCLI().run_onecmd(sys.argv)
# or run a shell with
AgkyraCLI().cmdloop()
"""
A Sync is a triplete consisting of at least the following:
* a cloud (a reference to a cloud set in the same config file)
* a container
* a local directory
Other parameters may also be set in the context of a sync.
The sync is identified by the "sync_id", which is a string
The operations of a sync are similar to the operations of a cloud, as they are
implemented in kamaki.cli.config
"""
import os
from re import match
from ConfigParser import Error
from kamaki.cli import config
# import Config, CLOUD_PREFIX
from kamaki.cli.config import Config
from kamaki.cli.utils import escape_ctrl_chars
CLOUD_PREFIX = config.CLOUD_PREFIX
config.HEADER = '# Agkyra configuration file version XXX\n'
AGKYRA_DIR = os.environ.get('AGKYRA_DIR', os.path.expanduser('~/.agkyra'))
config.CONFIG_PATH = '%s%sconfig.rc' % (AGKYRA_DIR, os.path.sep)
# neutralize kamaki CONFIG_ENV for this session
config.CONFIG_ENV = ''
SYNC_PREFIX = 'sync'
config.DEFAULTS = {
'global': {
'agkyra_dir': AGKYRA_DIR,
},
CLOUD_PREFIX: {
# <cloud>: {
# 'url': '',
# 'token': '',
# whatever else may be useful in this context
# },
# ... more clouds
},
SYNC_PREFIX: {
# <sync>: {
# 'cloud': '',
# 'container': '',
# 'directory': ''
# },
# ... more syncs
},
}
class InvalidSyncNameError(Error):
"""A valid sync name must pass through this regex: ([~@#$:.-\w]+)"""
class AgkyraConfig(Config):
"""
Handle the config file for Agkyra, adding the notion of a sync
A Sync is a triplete consisting of at least the following:
* a cloud (a reference to a cloud set in the same config file)
* a container
* a local directory
Other parameters may also be set in the context of a sync.
The sync is identified by the sync id, which is a string
The operations of a sync are similar to the operations of a cloud, as they
are implemented in kamaki.cli.config
"""
def __init__(self, *args, **kwargs):
"""Enhance Config to read SYNC sections"""
Config.__init__(self, *args, **kwargs)
for section in self.sections():
r = self.sync_name(section)
if r:
for k, v in self.items(section):
self.set_sync(r, k, v)
self.remove_section(section)
@staticmethod
def sync_name(full_section_name):
"""Get the sync name if the section is a sync, None otherwise"""
if not full_section_name.startswith(SYNC_PREFIX + ' '):
return None
matcher = match(SYNC_PREFIX + ' "([~@#$.:\-\w]+)"', full_section_name)
if matcher:
return matcher.groups()[0]
else:
isn = full_section_name[len(SYNC_PREFIX) + 1:]
raise InvalidSyncNameError('Invalid Cloud Name %s' % isn)
def get(self, section, option):
"""Enhance Config.get to handle sync options"""
value = self._overrides.get(section, {}).get(option)
if value is not None:
return value
prefix = SYNC_PREFIX + '.'
if section.startswith(prefix):
return self.get_sync(section[len(prefix):], option)
return config.Config.get(self, section, option)
def set(self, section, option, value):
"""Enhance Config.set to handle sync options"""
self.assert_option(option)
prefix = SYNC_PREFIX + '.'
if section.startswith(prefix):
sync = self.sync_name(
'%s "%s"' % (SYNC_PREFIX, section[len(prefix):]))
return self.set_sync(sync, option, value)
return config.Config.set(self, section, option, value)
def get_sync(self, sync, option):
"""Get the option value from the given sync option
:raises KeyError: if the sync or the option do not exist
"""
r = self.get(SYNC_PREFIX, sync) if sync else None
if r:
return r[option]
raise KeyError('Sync "%s" does not exist' % sync)
def set_sync(self, sync, option, value):
"""Set the value of this option in the named sync.
If the sync or the option do not exist, create them.
"""
try:
d = self.get(SYNC_PREFIX, sync) or dict()
except KeyError:
d = dict()
self.assert_option(option)
d[option] = value
self.set(SYNC_PREFIX, sync, d)
def remove_from_sync(self, sync, option):
"""Remove a sync option"""
d = self.get(SYNC_PREFIX, sync)
if isinstance(d, dict):
d.pop(option)
def safe_to_print(self):
"""Enhance Config.safe_to_print to handle syncs"""
dump = Config.safe_to_print(self)
for r, d in self.items(SYNC_PREFIX, include_defaults=False):
dump += u'\n[%s "%s"]\n' % (SYNC_PREFIX, escape_ctrl_chars(r))
for k, v in d.items():
dump += u'%s = %s\n' % (
escape_ctrl_chars(k), escape_ctrl_chars(v))
return dump
# if __name__ == '__main__':
# cnf = AgkyraConfig()
# config.Config.pretty_print(cnf)
# cnf.set_sync('1', 'cloud', '~okeanos')
# print cnf.get_sync('1', 'container')
# cnf.set_sync('1', 'lala', 123)
# cnf.remove_from_sync('1', 'lala')
# cnf.write()
from wsgiref.simple_server import make_server
# from ws4py.websocket import EchoWebSocket
from protocol import WebSocketProtocol
from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler
from ws4py.server.wsgiutils import WebSocketWSGIApplication
from ws4py.client import WebSocketBaseClient
from tempfile import NamedTemporaryFile
import subprocess
import json
from os.path import abspath
from threading import Thread
from hashlib import sha1
import os
import logging
LOG = logging.getLogger(__name__)
class GUI(WebSocketBaseClient):
"""Launch the GUI when the helper server is ready"""
def __init__(self, addr, gui_exec_path, gui_id):
"""Initialize the GUI Launcher"""
super(GUI, self).__init__(addr)
self.addr = addr
self.gui_exec_path = gui_exec_path
self.gui_id = gui_id
self.start = self.connect
def run_gui(self):
"""Launch the GUI and keep it running, clean up afterwards.
If the GUI is terminated for some reason, the WebSocket is closed and
the temporary file with GUI settings is deleted.
In windows, the file must be closed before the GUI is launched.
"""
# NamedTemporaryFile creates a file accessible only to current user
LOG.debug('Create temporary file')
with NamedTemporaryFile(delete=False) as fp:
json.dump(dict(gui_id=self.gui_id, address=self.addr), fp)
# subprocess.call blocks the execution
LOG.debug('RUN: %s %s' % (self.gui_exec_path, fp.name))
subprocess.call([
'/home/saxtouri/node-webkit-v0.11.6-linux-x64/nw',
# self.gui_exec_path,
abspath('gui/gui.nw'),
fp.name])
LOG.debug('GUI process closed, remove temp file')
os.remove(fp.name)
def handshake_ok(self):
"""If handshake is OK, the helper is UP, so the GUI can be launched"""
self.run_gui()
LOG.debug('Close GUI wrapper connection')
self.close()
class HelperServer(object):
"""Agkyra Helper Server sets a WebSocket server with the Helper protocol
It also provided methods for running and killing the Helper server
:param gui_id: Only the GUI with this ID is allowed to chat with the Helper
"""
def __init__(self, port=0):
"""Setup the helper server"""
self.gui_id = sha1(os.urandom(128)).hexdigest()
WebSocketProtocol.gui_id = self.gui_id
server = make_server(
'', port,
server_class=WSGIServer,
handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=WebSocketProtocol))
server.initialize_websockets_manager()
self.server, self.port = server, server.server_port
def start(self):
"""Start the helper server in a thread"""
Thread(target=self.server.serve_forever).start()
def shutdown(self):
"""Shutdown the server (needs another thread) and join threads"""
t = Thread(target=self.server.shutdown)
t.start()
t.join()
def run(gui_exec_path):
"""Prepare helper and GUI and run them in the proper order"""
server = HelperServer()
addr = 'ws://localhost:%s' % server.port
gui = GUI(addr, gui_exec_path, server.gui_id)
LOG.info('Start helper server')
server.start()
try:
LOG.info('Start GUI')
gui.start()
except KeyboardInterrupt:
LOG.info('Shutdown GUI')
gui.close()
LOG.info('Shutdown helper server')
server.shutdown()
if __name__ == '__main__':
logging.basicConfig(filename='agkyra.log', level=logging.DEBUG)
run(abspath('gui/app'))
......@@ -2,6 +2,7 @@ from ws4py.websocket import WebSocket
import json
import logging
from os.path import abspath
from titanic import syncer
LOG = logging.getLogger(__name__)
......@@ -67,7 +68,8 @@ class WebSocketProtocol(WebSocket):
exclude=abspath('exclude.cnf'),
pithos_url='https://pithos.okeanos.grnet.gr/ui/',
weblogin='https://accounts.okeanos.grnet.gr/ui')
status = dict(progress=0, paused=False)
status = dict(progress=0, paused=True)
file_syncer = None
# Syncer-related methods
def get_status(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment