diff --git a/agkyra/__init__.py b/agkyra/__init__.py index 89ad4d917d53f4ff351e28e361ea1871461a9a5b..a82995de2c7cf195c736939cdf7aff1dd0de4d06 100644 --- a/agkyra/__init__.py +++ b/agkyra/__init__.py @@ -1,107 +1 @@ -from wsgiref.simple_server import make_server -# from ws4py.websocket import EchoWebSocket -from protocol import WebSocketProtocol -from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler -from ws4py.server.wsgiutils import WebSocketWSGIApplication -from ws4py.client import WebSocketBaseClient -from tempfile import NamedTemporaryFile -import subprocess -import json -from os.path import abspath -from threading import Thread -from hashlib import sha1 -import os -import logging - - -LOG = logging.getLogger(__name__) - - -class GUI(WebSocketBaseClient): - """Launch the GUI when the helper server is ready""" - - def __init__(self, addr, gui_exec_path, gui_id): - """Initialize the GUI Launcher""" - super(GUI, self).__init__(addr) - self.addr = addr - self.gui_exec_path = gui_exec_path - self.gui_id = gui_id - self.start = self.connect - - def run_gui(self): - """Launch the GUI and keep it running, clean up afterwards. - If the GUI is terminated for some reason, the WebSocket is closed and - the temporary file with GUI settings is deleted. - In windows, the file must be closed before the GUI is launched. - """ - # NamedTemporaryFile creates a file accessible only to current user - LOG.debug('Create temporary file') - with NamedTemporaryFile(delete=False) as fp: - json.dump(dict(gui_id=self.gui_id, address=self.addr), fp) - # subprocess.call blocks the execution - LOG.debug('RUN: %s %s' % (self.gui_exec_path, fp.name)) - subprocess.call([ - '/home/saxtouri/node-webkit-v0.11.6-linux-x64/nw', - # self.gui_exec_path, - abspath('gui/gui.nw'), - fp.name]) - LOG.debug('GUI process closed, remove temp file') - os.remove(fp.name) - - def handshake_ok(self): - """If handshake is OK, the helper is UP, so the GUI can be launched""" - self.run_gui() - LOG.debug('Close GUI wrapper connection') - self.close() - - -class HelperServer(object): - """Agkyra Helper Server sets a WebSocket server with the Helper protocol - It also provided methods for running and killing the Helper server - :param gui_id: Only the GUI with this ID is allowed to chat with the Helper - """ - - def __init__(self, port=0): - """Setup the helper server""" - self.gui_id = sha1(os.urandom(128)).hexdigest() - WebSocketProtocol.gui_id = self.gui_id - server = make_server( - '', port, - server_class=WSGIServer, - handler_class=WebSocketWSGIRequestHandler, - app=WebSocketWSGIApplication(handler_cls=WebSocketProtocol)) - server.initialize_websockets_manager() - self.server, self.port = server, server.server_port - - def start(self): - """Start the helper server in a thread""" - Thread(target=self.server.serve_forever).start() - - def shutdown(self): - """Shutdown the server (needs another thread) and join threads""" - t = Thread(target=self.server.shutdown) - t.start() - t.join() - - -def run(gui_exec_path): - """Prepare helper and GUI and run them in the proper order""" - server = HelperServer() - addr = 'ws://localhost:%s' % server.port - gui = GUI(addr, gui_exec_path, server.gui_id) - - LOG.info('Start helper server') - server.start() - - try: - LOG.info('Start GUI') - gui.start() - except KeyboardInterrupt: - LOG.info('Shutdown GUI') - gui.close() - LOG.info('Shutdown helper server') - server.shutdown() - -if __name__ == '__main__': - logging.basicConfig(filename='agkyra.log', level=logging.DEBUG) - run(abspath('gui/app')) + __version__ = '0.1' diff --git a/agkyra/cli.py b/agkyra/cli.py new file mode 100644 index 0000000000000000000000000000000000000000..6e6e0c7a88c42f61082571d4d8fccc3e5b4170e4 --- /dev/null +++ b/agkyra/cli.py @@ -0,0 +1,184 @@ +import cmd +import sys +import logging +import json +from titanic import setup, syncer +from titanic.pithos_client import PithosFileClient +from titanic.localfs_client import FilesystemFileClient +import config +# from config import AgkyraConfig + + +logging.basicConfig(filename='agkyra.log', level=logging.DEBUG) +LOG = logging.getLogger(__name__) + +setup.GLOBAL_SETTINGS_NAME = config.AGKYRA_DIR + + +class AgkyraCLI(cmd.Cmd): + """The CLI for """ + + cnf = config.AgkyraConfig() + is_shell = False + + def init(self): + """initialize syncer""" + # Read settings + sync = self.cnf.get('global', 'default_sync') + LOG.info('Using sync: %s' % sync) + cloud = self.cnf.get_sync(sync, 'cloud') + url = self.cnf.get_cloud(cloud, 'url') + token = self.cnf.get_cloud(cloud, 'token') + container = self.cnf.get_sync(sync, 'container') + directory = self.cnf.get_sync(sync, 'directory') + + # Prepare syncer settings + self.settings = setup.SyncerSettings( + sync, url, token, container, directory) + LOG.info('Local: %s' % directory) + LOG.info('Remote: %s of %s' % (container, url)) + # self.exclude = self.cnf.get_sync(sync, 'exclude') + + # Init syncer + master = PithosFileClient(self.settings) + slave = FilesystemFileClient(self.settings) + self.syncer = syncer.FileSyncer(self.settings, master, slave) + + def preloop(self): + """This runs when the shell loads""" + if not self.is_shell: + self.is_shell = True + self.prompt = '\xe2\x9a\x93 ' + self.init() + self.default('') + + def print_option(self, section, name, option): + """Print a configuration option""" + section = '%s.%s' % (section, name) if name else section + value = self.cnf.get(section, option) + print ' %s: %s' % (option, value) + + def list_section(self, section, name): + """list contents of a section""" + content = dict(self.cnf.items(section)) + if section in 'global' and name: + self.print_option(section, '', name) + else: + if name: + content = content[name] + for option in content.keys(): + self.print_option(section, name, option) + + def list_section_type(self, section): + """print the contents of a configuration section""" + names = ['', ] if section in ('global', ) else self.cnf.keys(section) + assert names, 'Section %s not found' % section + for name in names: + print section, name + self.list_section(section, name) + + def list_sections(self): + """List all configuration sections""" + for section in self.cnf.sections(): + self.list_section_type(section) + + def do_list(self, line): + """List current settings (\"help list\" for details) + list global List all settings + list global <option> Get the value of this global option + list cloud List all clouds + list cloud <name> List all options of a cloud + list cloud <name> <option> Get the value of this cloud option + list sync List all syncs + list sync <name> List all options of a sync + list sync <name> <option> Get the value of this sync option + """ + args = line.split() + try: + { + 0: self.list_sections, + 1: self.list_section_type, + 2: self.list_section, + 3: self.print_option + }[len(args)](*args) + except Exception as e: + sys.stderr.write('%s\n' % e) + cmd.Cmd.do_help(self, 'list') + + def set_global_setting(self, section, option, value): + assert section in ('global'), 'Syntax error' + self.cnf.set(section, option, value) + + def set_setting(self, section, name, option, value): + assert section in self.sections(), 'Syntax error' + self.cnf.set('%s.%s' % (section, name), option, value) + + def do_set(self, line): + """Set a setting""" + args = line.split() + try: + { + 3: self.set_global_setting, + 4: self.set_setting + }[len(args)](*args) + self.cnf.write() + except Exception as e: + sys.stderr.write('%s\n' % e) + cmd.Cmd.do_help(self, 'set') + + def do_start(self, line): + """Start syncing""" + self.syncer.run() + + def do_pause(self, line): + """Pause syncing""" + + def do_status(self, line): + """Get current status (running/paused, progress)""" + print 'I have no idea' + + # def do_shell(self, line): + # """Run system, shell commands""" + # if getattr(self, 'is_shell'): + # os.system(line) + # else: + # try: + # self.prompt = '\xe2\x9a\x93 ' + # self.is_shell = True + # finally: + # self.init() + # self.cmdloop() + + def do_help(self, line): + """List commands with \"help\" or detailed help with \"help cmd\"""" + if not line: + self.default(line) + cmd.Cmd.do_help(self, line) + + def do_quit(self, line): + """Quit Agkyra shell""" + return True + + def default(self, line): + """print help""" + sys.stderr.write('Usage:\t%s <command> [args]\n\n' % self.prompt) + for arg in [c for c in self.get_names() if c.startswith('do_')]: + sys.stderr.write('%s\t' % arg[3:]) + method = getattr(self, arg) + sys.stderr.write(method.__doc__.split('\n')[0] + '\n') + sys.stderr.write('\n') + + def emptyline(self): + if not self.is_shell: + return self.default('') + + def run_onecmd(self, argv): + self.prompt = argv[0] + self.init() + self.onecmd(' '.join(argv[1:])) + + +# AgkyraCLI().run_onecmd(sys.argv) + +# or run a shell with +AgkyraCLI().cmdloop() diff --git a/agkyra/config.py b/agkyra/config.py new file mode 100644 index 0000000000000000000000000000000000000000..55a406da1464073a0619b37d1921702a4cb95088 --- /dev/null +++ b/agkyra/config.py @@ -0,0 +1,165 @@ +""" +A Sync is a triplete consisting of at least the following: + +* a cloud (a reference to a cloud set in the same config file) +* a container +* a local directory + +Other parameters may also be set in the context of a sync. + +The sync is identified by the "sync_id", which is a string + +The operations of a sync are similar to the operations of a cloud, as they are +implemented in kamaki.cli.config +""" +import os +from re import match +from ConfigParser import Error +from kamaki.cli import config +# import Config, CLOUD_PREFIX +from kamaki.cli.config import Config +from kamaki.cli.utils import escape_ctrl_chars + + +CLOUD_PREFIX = config.CLOUD_PREFIX +config.HEADER = '# Agkyra configuration file version XXX\n' +AGKYRA_DIR = os.environ.get('AGKYRA_DIR', os.path.expanduser('~/.agkyra')) +config.CONFIG_PATH = '%s%sconfig.rc' % (AGKYRA_DIR, os.path.sep) +# neutralize kamaki CONFIG_ENV for this session +config.CONFIG_ENV = '' + +SYNC_PREFIX = 'sync' +config.DEFAULTS = { + 'global': { + 'agkyra_dir': AGKYRA_DIR, + }, + CLOUD_PREFIX: { + # <cloud>: { + # 'url': '', + # 'token': '', + # whatever else may be useful in this context + # }, + # ... more clouds + }, + SYNC_PREFIX: { + # <sync>: { + # 'cloud': '', + # 'container': '', + # 'directory': '' + # }, + # ... more syncs + }, +} + + +class InvalidSyncNameError(Error): + """A valid sync name must pass through this regex: ([~@#$:.-\w]+)""" + + +class AgkyraConfig(Config): + """ + Handle the config file for Agkyra, adding the notion of a sync + + A Sync is a triplete consisting of at least the following: + + * a cloud (a reference to a cloud set in the same config file) + * a container + * a local directory + + Other parameters may also be set in the context of a sync. + + The sync is identified by the sync id, which is a string + + The operations of a sync are similar to the operations of a cloud, as they + are implemented in kamaki.cli.config + """ + + def __init__(self, *args, **kwargs): + """Enhance Config to read SYNC sections""" + Config.__init__(self, *args, **kwargs) + + for section in self.sections(): + r = self.sync_name(section) + if r: + for k, v in self.items(section): + self.set_sync(r, k, v) + self.remove_section(section) + + @staticmethod + def sync_name(full_section_name): + """Get the sync name if the section is a sync, None otherwise""" + if not full_section_name.startswith(SYNC_PREFIX + ' '): + return None + matcher = match(SYNC_PREFIX + ' "([~@#$.:\-\w]+)"', full_section_name) + if matcher: + return matcher.groups()[0] + else: + isn = full_section_name[len(SYNC_PREFIX) + 1:] + raise InvalidSyncNameError('Invalid Cloud Name %s' % isn) + + def get(self, section, option): + """Enhance Config.get to handle sync options""" + value = self._overrides.get(section, {}).get(option) + if value is not None: + return value + prefix = SYNC_PREFIX + '.' + if section.startswith(prefix): + return self.get_sync(section[len(prefix):], option) + return config.Config.get(self, section, option) + + def set(self, section, option, value): + """Enhance Config.set to handle sync options""" + self.assert_option(option) + prefix = SYNC_PREFIX + '.' + if section.startswith(prefix): + sync = self.sync_name( + '%s "%s"' % (SYNC_PREFIX, section[len(prefix):])) + return self.set_sync(sync, option, value) + return config.Config.set(self, section, option, value) + + def get_sync(self, sync, option): + """Get the option value from the given sync option + :raises KeyError: if the sync or the option do not exist + """ + r = self.get(SYNC_PREFIX, sync) if sync else None + if r: + return r[option] + raise KeyError('Sync "%s" does not exist' % sync) + + def set_sync(self, sync, option, value): + """Set the value of this option in the named sync. + If the sync or the option do not exist, create them. + """ + try: + d = self.get(SYNC_PREFIX, sync) or dict() + except KeyError: + d = dict() + self.assert_option(option) + d[option] = value + self.set(SYNC_PREFIX, sync, d) + + def remove_from_sync(self, sync, option): + """Remove a sync option""" + d = self.get(SYNC_PREFIX, sync) + if isinstance(d, dict): + d.pop(option) + + def safe_to_print(self): + """Enhance Config.safe_to_print to handle syncs""" + dump = Config.safe_to_print(self) + for r, d in self.items(SYNC_PREFIX, include_defaults=False): + dump += u'\n[%s "%s"]\n' % (SYNC_PREFIX, escape_ctrl_chars(r)) + for k, v in d.items(): + dump += u'%s = %s\n' % ( + escape_ctrl_chars(k), escape_ctrl_chars(v)) + return dump + + +# if __name__ == '__main__': +# cnf = AgkyraConfig() +# config.Config.pretty_print(cnf) +# cnf.set_sync('1', 'cloud', '~okeanos') +# print cnf.get_sync('1', 'container') +# cnf.set_sync('1', 'lala', 123) +# cnf.remove_from_sync('1', 'lala') +# cnf.write() diff --git a/agkyra/gui.py b/agkyra/gui.py new file mode 100644 index 0000000000000000000000000000000000000000..89ad4d917d53f4ff351e28e361ea1871461a9a5b --- /dev/null +++ b/agkyra/gui.py @@ -0,0 +1,107 @@ +from wsgiref.simple_server import make_server +# from ws4py.websocket import EchoWebSocket +from protocol import WebSocketProtocol +from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler +from ws4py.server.wsgiutils import WebSocketWSGIApplication +from ws4py.client import WebSocketBaseClient +from tempfile import NamedTemporaryFile +import subprocess +import json +from os.path import abspath +from threading import Thread +from hashlib import sha1 +import os +import logging + + +LOG = logging.getLogger(__name__) + + +class GUI(WebSocketBaseClient): + """Launch the GUI when the helper server is ready""" + + def __init__(self, addr, gui_exec_path, gui_id): + """Initialize the GUI Launcher""" + super(GUI, self).__init__(addr) + self.addr = addr + self.gui_exec_path = gui_exec_path + self.gui_id = gui_id + self.start = self.connect + + def run_gui(self): + """Launch the GUI and keep it running, clean up afterwards. + If the GUI is terminated for some reason, the WebSocket is closed and + the temporary file with GUI settings is deleted. + In windows, the file must be closed before the GUI is launched. + """ + # NamedTemporaryFile creates a file accessible only to current user + LOG.debug('Create temporary file') + with NamedTemporaryFile(delete=False) as fp: + json.dump(dict(gui_id=self.gui_id, address=self.addr), fp) + # subprocess.call blocks the execution + LOG.debug('RUN: %s %s' % (self.gui_exec_path, fp.name)) + subprocess.call([ + '/home/saxtouri/node-webkit-v0.11.6-linux-x64/nw', + # self.gui_exec_path, + abspath('gui/gui.nw'), + fp.name]) + LOG.debug('GUI process closed, remove temp file') + os.remove(fp.name) + + def handshake_ok(self): + """If handshake is OK, the helper is UP, so the GUI can be launched""" + self.run_gui() + LOG.debug('Close GUI wrapper connection') + self.close() + + +class HelperServer(object): + """Agkyra Helper Server sets a WebSocket server with the Helper protocol + It also provided methods for running and killing the Helper server + :param gui_id: Only the GUI with this ID is allowed to chat with the Helper + """ + + def __init__(self, port=0): + """Setup the helper server""" + self.gui_id = sha1(os.urandom(128)).hexdigest() + WebSocketProtocol.gui_id = self.gui_id + server = make_server( + '', port, + server_class=WSGIServer, + handler_class=WebSocketWSGIRequestHandler, + app=WebSocketWSGIApplication(handler_cls=WebSocketProtocol)) + server.initialize_websockets_manager() + self.server, self.port = server, server.server_port + + def start(self): + """Start the helper server in a thread""" + Thread(target=self.server.serve_forever).start() + + def shutdown(self): + """Shutdown the server (needs another thread) and join threads""" + t = Thread(target=self.server.shutdown) + t.start() + t.join() + + +def run(gui_exec_path): + """Prepare helper and GUI and run them in the proper order""" + server = HelperServer() + addr = 'ws://localhost:%s' % server.port + gui = GUI(addr, gui_exec_path, server.gui_id) + + LOG.info('Start helper server') + server.start() + + try: + LOG.info('Start GUI') + gui.start() + except KeyboardInterrupt: + LOG.info('Shutdown GUI') + gui.close() + LOG.info('Shutdown helper server') + server.shutdown() + +if __name__ == '__main__': + logging.basicConfig(filename='agkyra.log', level=logging.DEBUG) + run(abspath('gui/app')) diff --git a/agkyra/protocol.py b/agkyra/protocol.py index 4da26aba26c0ea212a83fd36959b85cb762dcf4b..60a836463518025b10d220403a833e342f8d1d32 100644 --- a/agkyra/protocol.py +++ b/agkyra/protocol.py @@ -2,6 +2,7 @@ from ws4py.websocket import WebSocket import json import logging from os.path import abspath +from titanic import syncer LOG = logging.getLogger(__name__) @@ -67,7 +68,8 @@ class WebSocketProtocol(WebSocket): exclude=abspath('exclude.cnf'), pithos_url='https://pithos.okeanos.grnet.gr/ui/', weblogin='https://accounts.okeanos.grnet.gr/ui') - status = dict(progress=0, paused=False) + status = dict(progress=0, paused=True) + file_syncer = None # Syncer-related methods def get_status(self):