Commit f121ea35 authored by Stavros Sachtouris's avatar Stavros Sachtouris Committed by Giorgos Korfiatis
Browse files

Use common status codes across UI components

All status codes are defined in an external JSON file named
"ui_common.json". This file is loaded by the UI components if they
must exchange statuses.

Also, the protocol for propagating the status of the server has
been modified. Instead of using separate flags to denote if the
server is in some status or not, we now use a status code which
must be defined in the common JSON file.

In the present commit, the WebSocket server is managing status
codes in that fashion, and the GUI is able to understand the
responses and adjust its behavior accordingly.
parent cf8507a4
......@@ -66,7 +66,9 @@ pause_item = new gui.MenuItem({
label: 'NOT READY',
type: 'normal',
click: function() {
if (paused) {post_start(socket);} else {post_pause(socket);}
if (globals.status.code == STATUS['PAUSED']) post_start(socket);
else if (globals.status.code == STATUS['SYNCING']) post_pause(socket);
else log_debug('Illegal click - status code is ' + globals.status.code);
}
});
pause_item.enabled = false;
......@@ -159,12 +161,28 @@ menu.append(new gui.MenuItem({
}));
function activate_menu() {
if (!pause_item.enabled) pause_item.enabled = true;
if (!settings_menu.enabled) {
if (globals.settings.url) refresh_endpoints(globals.settings.url);
settings_menu.enabled = true;
tray.menu = menu;
}
if ((!pithos_page_menu.enabled) && get_pithos_ui() != null){
pithos_page_menu.enabled = true;
tray.menu = menu;
}
if ((!local_folder_menu.enabled) && globals.settings.directory) {
local_folder_menu.enabled = true;
tray.menu = menu;
}
}
function deactivate_menu() {
if (
pause_item.enabled ||
local_folder_menu.enabled ||
pithos_page_menu.enabled) {
progress_item.label = 'Settings window is open';
pause_item.enabled = false;
local_folder_menu.enabled = false;
pithos_page_menu.enabled = false;
......@@ -172,109 +190,57 @@ function deactivate_menu() {
}
}
// Update progress
var client_ready = false;
window.setInterval(function() {
if (globals.settings_are_open) {
var new_progress = notification[globals.status.code];
var new_pause = '';
switch(globals.status.code) {
case STATUS['UNINITIALIZED']:
case STATUS['INITIALIZING']:
case STATUS['SHUTING DOWN']:
deactivate_menu();
return;
new_pause = 'inactive';
break;
case STATUS['SYNCING']:
activate_menu();
new_progress += ', ' + remaining(globals.status) + ' remaining';
new_pause = 'Pause'
break;
case STATUS['PAUSING']:
new_progress += ', ' + remaining(globals.status) + ' remaining';
new_pause = 'waiting...'
pause_item.enabled = false;
break;
case STATUS['PAUSED']:
activate_menu();
new_pause = 'Start syncing';
if (remaining(globals.status) > 0)
new_progress += ', ' + remaining(globals.status) + ' remaining';
break;
case STATUS['SETTINGS MISSING']:
case STATUS['AUTH URL ERROR']:
case STATUS['TOKEN ERROR']:
case STATUS['DIRECTORY ERROR']:
case STATUS['CONTAINER ERROR']:
deactivate_menu();
new_pause = 'inactive';
settings_menu.enabled = true;
break;
}
if (globals.open_settings) {
new_progress = 'Settings window is open';
globals.open_settings = false;
settings_menu.click();
deactivate_menu();
return;
}
} else if (globals.settings_are_open) deactivate_menu();
var menu_modified = false;
if (!client_ready) {
if (!globals.authenticated) return;
client_ready = true;
}
if (client_ready) {
pause_item.enabled = (pause_item.label !== 'inactive');
if (!settings_menu.enabled) {
if (globals.settings.url) refresh_endpoints(globals.settings.url);
settings_menu.enabled = true;
tray.menu = menu;
}
if (globals.settings.url && !pithos_page_menu.enabled) {
if (get_pithos_ui() != null) {
pithos_page_menu.enabled = true;
tray.menu = menu;
} else { refresh_endpoints(globals.settings.url); }
}
if (!local_folder_menu.enabled) {
if (globals.settings.directory) {
local_folder_menu.enabled = true;
tray.menu = menu;
}
}
}
var status = globals['status'];
var new_progress = progress_item.label;
var new_pause = pause_item.label;
if (status.notification !== 0) {
new_progress = notifications[status.notification];
new_pause = 'inactive';
if (progress_item.label !== new_progress) {
notify_user(new_progress, 'critical');
}
}
else if (!status.can_sync) {
if (globals.just_opened) new_progress = 'Connecting...'
else new_progress = 'Not able to sync'
new_pause = 'inactive'
pause_item.enabled = false;
} else {
if (status.paused !== null) {
switch(pause_item.label) {
case pause_syncing: if (status.paused) {
// Update to "Paused - start syncing"
paused = true;
new_pause = start_syncing;
menu_modified = true;
} // else continue syncing
break;
case start_syncing: if (!status.paused) {
//update to "Syncing - pause syncing"
paused = false;
new_pause = pause_syncing;
menu_modified = true;
}
break;
default:
if (status.paused) {new_pause = start_syncing; paused=true;}
else {new_pause = pause_syncing; paused=false;}
pause_item.enabled = true;
menu_modified = true;
}
}
var remaining = status.unsynced - status.synced;
if (status.paused){
if (remaining)
new_progress = 'Pausing, ' + remaining + ' remain';
else new_progress = 'Paused';
} else {
if (remaining)
new_progress = 'Syncing, ' + remaining + ' remain';
else new_progress = 'Running, all synced';
}
}
if (new_pause !== pause_item.label.slice(0, new_pause.length)) {
pause_item.label = new_pause;
menu_modified = true;
}
if (new_progress !== progress_item.label) {
if (new_progress !== progress_item.label
|| new_pause !== pause_item.label) {
progress_item.label = new_progress;
menu_modified = true;
pause_item.label = new_pause;
tray.menu = menu;
}
if (menu_modified) tray.menu = menu;
get_status(socket);
}, 1500);
......
var gui = require('nw.gui');
var notification = {
0: 'Not initialized',
1: 'Initializing ...',
2: 'Shutting down',
100: 'Syncing',
101: 'Pausing',
102: 'Paused',
200: 'Settings are incomplete',
201: 'Cloud URL error',
202: 'Authentication error',
203: 'Local directory error',
204: 'Remote container error',
1000: 'Critical error'
}
function is_up(code) { return (code / 100 >> 0) === 1; }
function has_settings_error(code) { return (code / 200 >> 0) === 2; }
function remaining(status) { return status.unsynced - status.synced; }
var ntf_title = {
'info': 'Notification',
'warning': 'Warning',
......@@ -16,7 +35,7 @@ var notify_menu = new gui.MenuItem({
icon: 'static/images/play_pause.png',
iconIsTemplate: false,
click: function() {
console.log('Notification is clecked');
console.log('Notification is clicked');
}
});
......
......@@ -13,7 +13,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
var DEBUG = false;
var DEBUG = true;
var gui = require('nw.gui');
var path = require('path');
......@@ -21,6 +21,7 @@ var fs = require('fs');
// Read config file
var cnf = JSON.parse(fs.readFileSync(gui.App.argv[0], encoding='utf-8'));
var UI_COMMON = JSON.parse(fs.readFileSync(path.join('..', 'ui_common.json')));
function log_debug(msg) { if (DEBUG) console.log(msg); }
......@@ -28,12 +29,7 @@ function send_json(socket, msg) {
socket.send(JSON.stringify(msg));
}
var notifications = {
0: 'Syncer is consistent',
1: 'Local directory is not accessible',
2: 'Remote container is not accessible',
100: 'Unknown error'
}
var STATUS = UI_COMMON['STATUS'];
var globals = {
settings: {
......@@ -43,10 +39,8 @@ var globals = {
directory: null,
exclude: null
},
status: {
synced: 0, unsynced: 0, paused: null, can_sync: false, notification: 0},
status: {synced: 0, unsynced: 0, code: STATUS['UNINITIALIZED']},
authenticated: false,
just_opened: false,
open_settings: false,
settings_are_open: false
}
......@@ -87,7 +81,7 @@ function put_settings(socket, new_settings) {
function get_status(socket) {
send_json(socket, {'method': 'get', 'path': 'status'});
} // expected response {"synced":.., "unsynced":.., "paused":.., "can_sync":..}
} // expected response {"synced":.., "unsynced":.., "code":..}
// Connect to helper
......@@ -111,7 +105,6 @@ socket.onmessage = function(e) {
get_settings(this);
get_status(this);
globals.authenticated = true;
globals.just_opened = true;
} else {
log_debug('Helper: ' + JSON.stringify(r));
closeWindows();
......@@ -139,10 +132,8 @@ socket.onmessage = function(e) {
break;
case 'get status':
globals['status'] = r;
if (globals.just_opened) {
globals.just_opened = false;
globals.open_settings = !r.can_sync;
}
if (!globals.open_settings)
globals.open_settings = has_settings_error(r.code);
break;
default:
console.log('Incomprehensible response ' + JSON.stringify(r));
......
......@@ -34,6 +34,10 @@ CURPATH = os.path.dirname(os.path.abspath(__file__))
LOG = logging.getLogger(__name__)
SYNCERS = utils.ThreadSafeDict()
with open(os.path.join(CURPATH, 'ui_common.json')) as f:
UI_COMMON = json.load(f)
STATUS = UI_COMMON['STATUS']
def retry_on_locked_db(method, *args, **kwargs):
"""If DB is locked, wait and try again"""
......@@ -66,8 +70,6 @@ class SessionHelper(object):
self.db = sqlite3.connect(self.session_db)
self._init_db_relation()
# self.session = self._load_active_session() or self._create_session()
# self.db.close()
def _init_db_relation(self):
"""Create the session relation"""
......@@ -152,7 +154,7 @@ class SessionHelper(object):
return not bool(self.load_active_session())
def heartbeat(self):
"""General session heartbeat - when heart stops, WSGI server dies"""
"""Periodically update the session database timestamp"""
db, alive = sqlite3.connect(self.session_db), True
while alive:
time.sleep(2)
......@@ -249,19 +251,16 @@ class WebSocketProtocol(WebSocket):
-- GET STATUS --
GUI: {"method": "get", "path": "status"}
HELPER: {
"can_sync": <boolean>,
"notification": <int>,
"paused": <boolean>,
"action": "get status"} or
{<ERROR>: <ERROR CODE>, "action": "get status"}
HELPER: {"code": <int>,
"synced": <int>,
"unsynced": <int>,
"action": "get status"
} or {<ERROR>: <ERROR CODE>, "action": "get status"}
"""
notification = {
0: 'Syncer is consistent',
1: 'Local directory is not accessible',
2: 'Remote container is not accessible',
100: 'unknown error'
}
status = utils.ThreadSafeDict()
with status.lock() as d:
d.update(code=STATUS['UNINITIALIZED'], synced=0, unsynced=0)
ui_id = None
session_db, session_relation = None, None
accepted = False
......@@ -269,21 +268,37 @@ class WebSocketProtocol(WebSocket):
token=None, url=None,
container=None, directory=None,
exclude=None)
status = dict(
notification=0, synced=0, unsynced=0, paused=True, can_sync=False)
cnf = AgkyraConfig()
essentials = ('url', 'token', 'container', 'directory')
def get_status(self, key=None):
""":return: updated status dict or value of specified key"""
if self.syncer and self.can_sync():
self._consume_messages()
with self.status.lock() as d:
if self.syncer.paused:
d['code'] = STATUS['PAUSED']
elif d['code'] != STATUS['PAUSING'] or (
d['unsynced'] == d['synced']):
d['code'] = STATUS['SYNCING']
with self.status.lock() as d:
return d.get(key, None) if key else dict(d)
def set_status(self, **kwargs):
with self.status.lock() as d:
d.update(kwargs)
@property
def syncer(self):
""":returns: the first syncer object or None"""
with SYNCERS.lock() as d:
for sync_key, sync_obj in d.items():
return sync_obj
return None
def clean_db(self):
"""Clean DB from session traces"""
LOG.debug('Remove session traces')
"""Clean DB from current session trace"""
LOG.debug('Remove current session trace')
db = sqlite3.connect(self.session_db)
db.execute('BEGIN')
db.execute('DELETE FROM %s WHERE ui_id="%s"' % (
......@@ -292,7 +307,7 @@ class WebSocketProtocol(WebSocket):
db.close()
def shutdown_syncer(self, syncer_key=0):
"""Shutdown the service"""
"""Shutdown the syncer backend object"""
LOG.debug('Shutdown syncer')
with SYNCERS.lock() as d:
syncer = d.pop(syncer_key, None)
......@@ -302,7 +317,7 @@ class WebSocketProtocol(WebSocket):
syncer.wait_sync_threads()
def heartbeat(self):
"""Check if socket should be alive"""
"""Update session DB timestamp as long as session is alive"""
db, alive = sqlite3.connect(self.session_db), True
while alive:
time.sleep(1)
......@@ -320,6 +335,7 @@ class WebSocketProtocol(WebSocket):
alive = True
db.close()
self.shutdown_syncer()
self.set_status(code=STATUS['UNINITIALIZED'])
self.close()
def _get_default_sync(self):
......@@ -358,10 +374,12 @@ class WebSocketProtocol(WebSocket):
self.settings['url'] = self.cnf.get_cloud(cloud, 'url')
except Exception:
self.settings['url'] = None
self.set_status(code=STATUS['SETTINGS MISSING'])
try:
self.settings['token'] = self.cnf.get_cloud(cloud, 'token')
except Exception:
self.settings['url'] = None
self.set_status(code=STATUS['SETTINGS MISSING'])
# for option in ('container', 'directory', 'exclude'):
for option in ('container', 'directory'):
......@@ -369,6 +387,7 @@ class WebSocketProtocol(WebSocket):
self.settings[option] = self.cnf.get_sync(sync, option)
except KeyError:
LOG.debug('No %s is set' % option)
self.set_status(code=STATUS['SETTINGS MISSING'])
LOG.debug('Finished loading settings')
......@@ -418,35 +437,31 @@ class WebSocketProtocol(WebSocket):
return all([
self.settings[e] == self.settings[e] for e in self.essentials])
def _consume_messages(self):
def _consume_messages(self, max_consumption=10):
"""Update status by consuming and understanding syncer messages"""
if self.can_sync():
msg = self.syncer.get_next_message()
if not msg:
if self.status['unsynced'] == self.status['synced']:
self.status['unsynced'] = 0
self.status['synced'] = 0
while (msg):
with self.status.lock() as d:
if d['unsynced'] == d['synced']:
d.update(unsynced=0, synced=0)
while msg:
if isinstance(msg, messaging.SyncMessage):
# LOG.info('Start syncing "%s"' % msg.objname)
self.status['unsynced'] += 1
self.set_status(unsynced=self.get_status('unsynced') + 1)
elif isinstance(msg, messaging.AckSyncMessage):
# LOG.info('Finished syncing "%s"' % msg.objname)
self.status['synced'] += 1
# elif isinstance(msg, messaging.CollisionMessage):
# LOG.info('Collision for "%s"' % msg.objname)
# elif isinstance(msg, messaging.ConflictStashMessage):
# LOG.info('Conflict for "%s"' % msg.objname)
self.set_status(synced=self.get_status('synced') + 1)
elif isinstance(msg, messaging.LocalfsSyncDisabled):
# LOG.debug('Local FS is dissabled, noooo!')
self.status['notification'] = 1
self.set_status(code=STATUS['DIRECTORY ERROR'])
self.syncer.stop_all_daemons()
elif isinstance(msg, messaging.PithosSyncDisabled):
# LOG.debug('Pithos sync is disabled, noooooo!')
self.status['notification'] = 2
self.set_status(code=STATUS['CONTAINER ERROR'])
self.syncer.stop_all_daemons()
LOG.debug('Backend message: %s' % msg.name)
msg = self.syncer.get_next_message()
# Limit the amount of messages consumed each time
max_consumption -= 1
if not max_consumption:
break
def can_sync(self):
"""Check if settings are enough to setup a syncing proccess"""
......@@ -454,6 +469,7 @@ class WebSocketProtocol(WebSocket):
def init_sync(self):
"""Initialize syncer"""
self.set_status(code=STATUS['INITIALIZING'])
sync = self._get_default_sync()
kwargs = dict(agkyra_path=AGKYRA_DIR)
......@@ -480,46 +496,41 @@ class WebSocketProtocol(WebSocket):
slave = localfs_client.LocalfsFileClient(syncer_settings)
syncer_ = syncer.FileSyncer(syncer_settings, master, slave)
self.syncer_settings = syncer_settings
# Check if syncer is ready, by consumming messages
msg = syncer_.get_next_message()
# while not msg:
# time.sleep(0.2)
# msg = syncer_.get_next_message()
# This should be activated only on accepting a positive message
self.status['notification'] = 0
self.status['unsynced'] = 0
self.status['synced'] = 0
if msg:
# Check if syncer is ready, by consuming messages
local_ok, remote_ok = False, False
for i in range(2):
LOG.debug('Get message %s' % (i + 1))
msg = syncer_.get_next_message(block=True)
LOG.debug('Got message: %s' % msg)
if isinstance(msg, messaging.LocalfsSyncDisabled):
LOG.debug('Local FS is disabled')
self.status['notification'] = 1
self.set_status(code=STATUS['DIRECTORY ERROR'])
local_ok = False
break
elif isinstance(msg, messaging.PithosSyncDisabled):
LOG.debug('Pithos sync is disabled')
self.status['notification'] = 2
else:
LOG.debug("Unexpected message: %s" % msg)
msg = None
if msg:
syncer_ = None
self.set_status(code=STATUS['CONTAINER ERRIR'])
remote_ok = False
break
elif isinstance(msg, messaging.LocalfsSyncEnabled):
local_ok = True
elif isinstance(msg, messaging.PithosSyncEnabled):
remote_ok = True
else:
LOG.error('Unexpected message %s' % msg)
self.set_status(code=STATUS['CRITICAL ERROR'])
break
if local_ok and remote_ok:
syncer_.initiate_probe()
self.set_status(code=STATUS['SYNCING'])
else:
syncer_ = None
finally:
self.set_status(synced=0, unsynced=0)
with SYNCERS.lock() as d:
d[0] = syncer_
# Syncer-related methods
def get_status(self):
if self.syncer and self.can_sync():
self._consume_messages()
self.status['paused'] = self.syncer.paused
self.status['can_sync'] = self.can_sync()
else:
self.status.update(dict(
synced=0, unsynced=0, paused=True, can_sync=False))
return self.status
def get_settings(self):
return self.settings
......@@ -544,10 +555,17 @@ class WebSocketProtocol(WebSocket):
if was_active:
self.start_sync()
def pause_sync(self):
self.syncer.stop_decide()
def _pause_syncer(self):
syncer_ = self.syncer
syncer_.stop_decide()
LOG.debug('Wait open syncs to complete')
self.syncer.wait_sync_threads()
syncer_.wait_sync_threads()
def pause_sync(self):
syncer_ = self.syncer
if syncer_ and not syncer_.paused:
Thread(target=self._pause_syncer).start()
self.set_status(code=STATUS['PAUSING'])
def start_sync(self):
self.syncer.start_decide()
......@@ -562,6 +580,8 @@ class WebSocketProtocol(WebSocket):
if self.accepted:
action = r['path']
if action == 'shutdown':
# Clean db to cause syncer backend to shut down
self.set_status(code=STATUS['SHUTTING DOWN'])
retry_on_locked_db(self.clean_db)
# self._shutdown()
# self.terminate()
......
......@@ -31,10 +31,10 @@ LOGFILE = os.path.join(AGKYRA_DIR, 'agkyra.log')
LOGGER = logging.getLogger('agkyra')
HANDLER = logging.FileHandler(LOGFILE)
FORMATTER = logging.Formatter(
"[CLI]%(name)s %(levelname)s:%(asctime)s:%(message)s")
"%(name)s:%(lineno)s %(levelname)s:%(asctime)s:%(message)s")
HANDLER.setFormatter(FORMATTER)
LOGGER.addHandler(HANDLER)
LOGGER.setLevel(logging.INFO)
LOGGER.setLevel(logging.DEBUG)