# Copyright (C) 2015 GRNET S.A. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from wsgiref.simple_server import make_server from ws4py.websocket import WebSocket from ws4py.server.wsgiutils import WebSocketWSGIApplication from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler from hashlib import sha1 from threading import Thread import sqlite3 import time import os import json import logging from agkyra.syncer import ( syncer, setup, pithos_client, localfs_client, messaging) from agkyra.config import AgkyraConfig, AGKYRA_DIR LOG = logging.getLogger(__name__) class SessionHelper(object): """Agkyra Helper Server sets a WebSocket server with the Helper protocol It also provided methods for running and killing the Helper server """ session_timeout = 20 def __init__(self, **kwargs): """Setup the helper server""" self.session_db = kwargs.get( 'session_db', os.path.join(AGKYRA_DIR, 'session.db')) self.session_relation = kwargs.get('session_relation', 'heart') LOG.debug('Connect to db') self.db = sqlite3.connect(self.session_db) self._init_db_relation() self.session = self._load_active_session() or self._create_session() def _init_db_relation(self): self.db.execute('BEGIN') self.db.execute( 'CREATE TABLE IF NOT EXISTS %s (' 'ui_id VARCHAR(256), address text, beat VARCHAR(32)' ')' % self.session_relation) self.db.commit() def _load_active_session(self): """Load a session from db""" r = self.db.execute('SELECT * FROM %s' % self.session_relation) sessions = r.fetchall() if sessions: last = sessions[-1] now, last_beat = time.time(), float(last[2]) if now - last_beat < self.session_timeout: # Found an active session return dict(ui_id=last[0], address=last[1]) return None def _create_session(self): """Create session credentials""" ui_id = sha1(os.urandom(128)).hexdigest() WebSocketProtocol.ui_id = ui_id server = make_server( '', 0, server_class=WSGIServer, handler_class=WebSocketWSGIRequestHandler, app=WebSocketWSGIApplication(handler_cls=WebSocketProtocol)) server.initialize_websockets_manager() address = 'ws://%s:%s' % (server.server_name, server.server_port) self.server = server self.db.execute('BEGIN') self.db.execute('DELETE FROM %s' % self.session_relation) self.db.execute('INSERT INTO %s VALUES ("%s", "%s", "%s")' % ( self.session_relation, ui_id, address, time.time())) self.db.commit() return dict(ui_id=ui_id, address=address) def start(self): """Start the helper server in a thread""" if getattr(self, 'server', None): Thread(target=self.server.serve_forever).start() def shutdown(self): """Shutdown the server (needs another thread) and join threads""" if getattr(self, 'server', None): t = Thread(target=self.server.shutdown) t.start() t.join() class WebSocketProtocol(WebSocket): """Helper-side WebSocket protocol for communication with GUI: -- INTERRNAL HANDSAKE -- GUI: {"method": "post", "ui_id": } HELPER: {"ACCEPTED": 202, "method": "post"}" or "{"REJECTED": 401, "action": "post ui_id"} -- SHUT DOWN -- GUI: {"method": "post", "path": "shutdown"} -- PAUSE -- GUI: {"method": "post", "path": "pause"} HELPER: {"OK": 200, "action": "post pause"} or error -- start -- GUI: {"method": "post", "path": "start"} HELPER: {"OK": 200, "action": "post start"} or error -- GET SETTINGS -- GUI: {"method": "get", "path": "settings"} HELPER: { "action": "get settings", "token": , "url": , "container": , "directory": , "exclude": } or {: } -- PUT SETTINGS -- GUI: { "method": "put", "path": "settings", "token": , "url": , "container": , "directory": , "exclude": } HELPER: {"CREATED": 201, "action": "put settings",} or {: , "action": "get settings",} -- GET STATUS -- GUI: {"method": "get", "path": "status"} HELPER: { "can_sync": , "progress": , "paused": , "action": "get status"} or {: , "action": "get status"} """ ui_id = None accepted = False settings = dict( token=None, url=None, container=None, directory=None, exclude=None) status = dict( progress=0, synced=0, unsynced=0, paused=True, can_sync=False) file_syncer = None cnf = AgkyraConfig() essentials = ('url', 'token', 'container', 'directory') def _get_default_sync(self): """Get global.default_sync or pick the first sync as default If there are no syncs, create a 'default' sync. """ sync = self.cnf.get('global', 'default_sync') if not sync: for sync in self.cnf.keys('sync'): break self.cnf.set('global', 'default_sync', sync or 'default') return sync or 'default' def _get_sync_cloud(self, sync): """Get the .cloud or pick the first cloud and use it In case of cloud picking, set the cloud as the .cloud for future sessions. If no clouds are found, create a 'default' cloud, with an empty url. """ try: cloud = self.cnf.get_sync(sync, 'cloud') except KeyError: cloud = None if not cloud: for cloud in self.cnf.keys('cloud'): break self.cnf.set_sync(sync, 'cloud', cloud or 'default') return cloud or 'default' def _load_settings(self): LOG.debug('Start loading settings') sync = self._get_default_sync() cloud = self._get_sync_cloud(sync) try: self.settings['url'] = self.cnf.get_cloud(cloud, 'url') except Exception: self.settings['url'] = None try: self.settings['token'] = self.cnf.get_cloud(cloud, 'token') except Exception: self.settings['url'] = None for option in ('container', 'directory', 'exclude'): try: self.settings[option] = self.cnf.get_sync(sync, option) except KeyError: LOG.debug('No %s is set' % option) LOG.debug('Finished loading settings') def _dump_settings(self): LOG.debug('Saving settings') if not self.settings.get('url', None): LOG.debug('No settings to save') return sync = self._get_default_sync() cloud = self._get_sync_cloud(sync) try: old_url = self.cnf.get_cloud(cloud, 'url') or '' except KeyError: old_url = self.settings['url'] while old_url != self.settings['url']: cloud = '%s_%s' % (cloud, sync) try: self.cnf.get_cloud(cloud, 'url') except KeyError: break self.cnf.set_cloud(cloud, 'url', self.settings['url']) self.cnf.set_cloud(cloud, 'token', self.settings['token'] or '') self.cnf.set_sync(sync, 'cloud', cloud) for option in ('directory', 'container', 'exclude'): self.cnf.set_sync(sync, option, self.settings[option] or '') self.cnf.write() LOG.debug('Settings saved') def _essentials_changed(self, new_settings): """Check if essential settings have changed in new_settings""" return all([ self.settings[e] == self.settings[e] for e in self.essentials]) def _update_statistics(self): """Update statistics by consuming and understanding syncer messages""" if self.can_sync(): msg = self.syncer.get_next_message() if not msg: if self.status['unsynced'] == self.status['synced']: self.status['unsynced'] = 0 self.status['synced'] = 0 while (msg): if isinstance(msg, messaging.SyncMessage): LOG.info('Start syncing "%s"' % msg.objname) self.status['unsynced'] += 1 elif isinstance(msg, messaging.AckSyncMessage): LOG.info('Finished syncing "%s"' % msg.objname) self.status['synced'] += 1 elif isinstance(msg, messaging.CollisionMessage): LOG.info('Collision for "%s"' % msg.objname) elif isinstance(msg, messaging.ConflictStashMessage): LOG.info('Conflict for "%s"' % msg.objname) else: LOG.debug('Consumed msg %s' % msg) msg = self.syncer.get_next_message() def can_sync(self): """Check if settings are enough to setup a syncing proccess""" return all([self.settings[e] for e in self.essentials]) def init_sync(self): """Initialize syncer""" sync = self._get_default_sync() syncer_settings = setup.SyncerSettings( sync, self.settings['url'], self.settings['token'], self.settings['container'], self.settings['directory'], agkyra_path=AGKYRA_DIR, ignore_ssl=True) master = pithos_client.PithosFileClient(syncer_settings) slave = localfs_client.LocalfsFileClient(syncer_settings) self.syncer = syncer.FileSyncer(syncer_settings, master, slave) self.syncer_settings = syncer_settings self.syncer.initiate_probe() # Syncer-related methods def get_status(self): if self.can_sync(): self._update_statistics() self.status['paused'] = self.syncer.paused self.status['can_sync'] = self.can_sync() else: self.status = dict( progress=0, synced=0, unsynced=0, paused=True, can_sync=False) return self.status def get_settings(self): return self.settings def set_settings(self, new_settings): # Prepare setting save could_sync = self.can_sync() was_active = False if could_sync and not self.syncer.paused: was_active = True self.pause_sync() must_reset_syncing = self._essentials_changed(new_settings) # save settings self.settings = new_settings self._dump_settings() # Restart if self.can_sync(): if must_reset_syncing or not could_sync: self.init_sync() elif was_active: self.start_sync() def pause_sync(self): self.syncer.stop_decide() LOG.debug('Wait open syncs to complete') self.syncer.wait_sync_threads() def start_sync(self): self.syncer.start_decide() # WebSocket connection methods def opened(self): LOG.debug('Helper: connection established') def closed(self, *args): LOG.debug('Helper: connection closed') def send_json(self, msg): LOG.debug('send: %s' % msg) self.send(json.dumps(msg)) # Protocol handling methods def _post(self, r): """Handle POST requests""" if self.accepted: action = r['path'] if action == 'shutdown': if self.can_sync(): self.syncer.stop_all_daemons() LOG.debug('Wait open syncs to complete') self.syncer.wait_sync_threads() self.close() return { 'start': self.start_sync, 'pause': self.pause_sync }[action]() self.send_json({'OK': 200, 'action': 'post %s' % action}) elif r['ui_id'] == self.ui_id: self._load_settings() self.accepted = True self.send_json({'ACCEPTED': 202, 'action': 'post ui_id'}) if self.can_sync(): self.init_sync() self.pause_sync() else: action = r.get('path', 'ui_id') self.send_json({'REJECTED': 401, 'action': 'post %s' % action}) self.terminate() def _put(self, r): """Handle PUT requests""" if self.accepted: LOG.debug('put %s' % r) action = r.pop('path') self.set_settings(r) r.update({'CREATED': 201, 'action': 'put %s' % action}) self.send_json(r) else: action = r['path'] self.send_json({'UNAUTHORIZED': 401, 'action': 'put %s' % action}) self.terminate() def _get(self, r): """Handle GET requests""" action = r.pop('path') if not self.accepted: self.send_json({'UNAUTHORIZED': 401, 'action': 'get %s' % action}) self.terminate() else: data = { 'settings': self.get_settings, 'status': self.get_status, }[action]() data['action'] = 'get %s' % action self.send_json(data) def received_message(self, message): """Route requests to corresponding handling methods""" LOG.debug('recv: %s' % message) try: r = json.loads('%s' % message) except ValueError as ve: self.send_json({'BAD REQUEST': 400}) LOG.error('JSON ERROR: %s' % ve) return try: method = r.pop('method') { 'post': self._post, 'put': self._put, 'get': self._get }[method](r) except KeyError as ke: self.send_json({'BAD REQUEST': 400}) LOG.error('KEY ERROR: %s' % ke) except Exception as e: self.send_json({'INTERNAL ERROR': 500}) LOG.error('EXCEPTION: %s' % e) self.terminate()