Commit cef8c2ca authored by Stavros Sachtouris's avatar Stavros Sachtouris Committed by Giorgos Korfiatis
Browse files

Change daemon status according to pithos errors

While initializing the UI daemon, syncer errors would be forwarded
to UI clients as part of the client response. This confuses the
clients. Instead of forwarding the error, we now understand it and
change the UI daemon status accordingly.
parent 7e2c64fe
......@@ -19,7 +19,6 @@ from ws4py.server.wsgiutils import WebSocketWSGIApplication
from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler
from hashlib import sha1
from threading import Thread
import sqlite3
import time
import os
import sys
......@@ -42,7 +41,7 @@ else:
RESOURCES = os.path.join(BASEDIR, 'resources')
LOG = logging.getLogger(__name__)
LOGGER = logging.getLogger(__name__)
SYNCERS = utils.ThreadSafeDict()
with open(os.path.join(RESOURCES, 'ui_data/common_en.json')) as f:
......@@ -105,13 +104,13 @@ class SessionHelper(object):
if sessions:
last, expected_id = sessions[-1], getattr(self, 'ui_id', None)
if expected_id and last[0] != '%s' % expected_id:
LOG.debug('Session ID is old')
LOGGER.debug('Session ID is old')
return None
now, last_beat = time.time(), float(last[2])
if now - last_beat < self.session_timeout:
# Found an active session
return dict(ui_id=last[0], address=last[1])
LOG.debug('No active sessions found')
LOGGER.debug('No active sessions found')
return None
def create_session_daemon(self):
......@@ -193,7 +192,7 @@ class SessionDaemon(object):
t.start()
self.server.serve_forever()
t.join()
LOG.debug('WSGI server is down')
LOGGER.debug('WSGI server is down')
class WebSocketProtocol(WebSocket):
......@@ -205,9 +204,6 @@ class WebSocketProtocol(WebSocket):
"{"REJECTED": 401, "action": "post ui_id"}
-- ERRORS WITH SIGNIFICANCE --
If the token doesn't work:
HELPER: {"action": <action that caused the error>, "UNAUTHORIZED": 401}
-- SHUT DOWN --
GUI: {"method": "post", "path": "shutdown"}
......@@ -277,7 +273,7 @@ class WebSocketProtocol(WebSocket):
if self.syncer and self.can_sync():
self._consume_messages()
with self.status.lock() as d:
LOG.debug('Status was %s' % d['code'])
LOGGER.debug('Status was %s' % d['code'])
if d['code'] in (
STATUS['UNINITIALIZED'], STATUS['INITIALIZING']):
if self.syncer.paused:
......@@ -286,12 +282,12 @@ class WebSocketProtocol(WebSocket):
d['unsynced'] == d['synced'] + d['failed']):
d['code'] = STATUS['SYNCING']
with self.status.lock() as d:
LOG.debug('Status is now %s' % d['code'])
LOGGER.debug('Status is now %s' % d['code'])
return d.get(key, None) if key else dict(d)
def set_status(self, **kwargs):
with self.status.lock() as d:
LOG.debug('CHANGING STATUS TO %s' % kwargs)
LOGGER.debug('CHANGING STATUS TO %s' % kwargs)
d.update(kwargs)
@property
......@@ -304,18 +300,18 @@ class WebSocketProtocol(WebSocket):
def clean_db(self):
"""Clean DB from current session trace"""
LOG.debug('Remove current session trace')
LOGGER.debug('Remove current session trace')
with database.TransactedConnection(self.session_db) as db:
db.unregister_heartbeat(self.ui_id)
def shutdown_syncer(self, syncer_key=0):
"""Shutdown the syncer backend object"""
LOG.debug('Shutdown syncer')
LOGGER.debug('Shutdown syncer')
with SYNCERS.lock() as d:
syncer = d.pop(syncer_key, None)
if syncer and self.can_sync():
syncer.stop_all_daemons()
LOG.debug('Wait open syncs to complete')
LOGGER.debug('Wait open syncs to complete')
syncer.wait_sync_threads()
def _get_default_sync(self):
......@@ -346,7 +342,7 @@ class WebSocketProtocol(WebSocket):
return cloud or 'default'
def _load_settings(self):
LOG.debug('Start loading settings')
LOGGER.debug('Start loading settings')
sync = self._get_default_sync()
cloud = self._get_sync_cloud(sync)
......@@ -370,20 +366,20 @@ class WebSocketProtocol(WebSocket):
try:
self.settings[option] = self.cnf.get_sync(sync, option)
except KeyError:
LOG.debug('No %s is set' % option)
LOGGER.debug('No %s is set' % option)
self.set_status(code=STATUS['SETTINGS MISSING'])
LOG.debug('Finished loading settings')
LOGGER.debug('Finished loading settings')
def _dump_settings(self):
LOG.debug('Saving settings')
LOGGER.debug('Saving settings')
sync = self._get_default_sync()
changes = False
if not self.settings.get('url', None):
LOG.debug('No cloud settings to save')
LOGGER.debug('No cloud settings to save')
else:
LOG.debug('Save cloud settings')
LOGGER.debug('Save cloud settings')
cloud = self._get_sync_cloud(sync)
try:
......@@ -398,13 +394,13 @@ class WebSocketProtocol(WebSocket):
except KeyError:
break
LOG.debug('Cloud name is %s' % cloud)
LOGGER.debug('Cloud name is %s' % cloud)
self.cnf.set_cloud(cloud, 'url', self.settings['url'])
self.cnf.set_cloud(cloud, 'token', self.settings['token'] or '')
self.cnf.set_sync(sync, 'cloud', cloud)
changes = True
LOG.debug('Save sync settings, name is %s' % sync)
LOGGER.debug('Save sync settings, name is %s' % sync)
# for option in ('directory', 'container', 'exclude'):
for option in ('directory', 'container'):
self.cnf.set_sync(sync, option, self.settings[option] or '')
......@@ -417,9 +413,9 @@ class WebSocketProtocol(WebSocket):
if changes:
self.cnf.write()
LOG.debug('Settings saved')
LOGGER.debug('Settings saved')
else:
LOG.debug('No setting changes spotted')
LOGGER.debug('No setting changes spotted')
def _essentials_changed(self, new_settings):
"""Check if essential settings have changed in new_settings"""
......@@ -436,24 +432,28 @@ class WebSocketProtocol(WebSocket):
# d.update(unsynced=0, synced=0, failed=0)
while msg:
if isinstance(msg, messaging.SyncMessage):
LOG.debug('UNSYNCED +1 %s' % getattr(msg, 'objname', ''))
LOGGER.debug(
'UNSYNCED +1 %s' % getattr(msg, 'objname', ''))
self.set_status(unsynced=self.get_status('unsynced') + 1)
elif isinstance(msg, messaging.AckSyncMessage):
LOG.debug('SYNCED +1 %s' % getattr(msg, 'objname', ''))
LOGGER.debug('SYNCED +1 %s' % getattr(msg, 'objname', ''))
self.set_status(synced=self.get_status('synced') + 1)
elif isinstance(msg, messaging.SyncErrorMessage):
LOG.debug('FAILED +1 %s' % getattr(msg, 'objname', ''))
LOGGER.debug('FAILED +1 %s' % getattr(msg, 'objname', ''))
self.set_status(failed=self.get_status('failed') + 1)
elif isinstance(msg, messaging.LocalfsSyncDisabled):
LOG.debug('STOP BACKEND, %s'% getattr(msg, 'objname', ''))
LOG.debug('CHANGE STATUS TO: %s' % STATUS['DIRECTORY ERROR'])
LOGGER.debug(
'STOP BACKEND, %s'% getattr(msg, 'objname', ''))
LOGGER.debug(
'CHANGE STATUS TO: %s' % STATUS['DIRECTORY ERROR'])
self.set_status(code=STATUS['DIRECTORY ERROR'])
self.syncer.stop_all_daemons()
elif isinstance(msg, messaging.PithosSyncDisabled):
LOG.debug('STOP BACKEND, %s'% getattr(msg, 'objname', ''))
LOGGER.debug(
'STOP BACKEND, %s'% getattr(msg, 'objname', ''))
self.set_status(code=STATUS['CONTAINER ERROR'])
self.syncer.stop_all_daemons()
LOG.debug('Backend message: %s %s' % (msg.name, type(msg)))
LOGGER.debug('Backend message: %s %s' % (msg.name, type(msg)))
# Limit the amount of messages consumed each time
max_consumption -= 1
if max_consumption:
......@@ -498,9 +498,9 @@ class WebSocketProtocol(WebSocket):
local_ok, remote_ok = False, False
for i in range(2):
LOG.debug('Get message %s' % (i + 1))
LOGGER.debug('Get message %s' % (i + 1))
msg = syncer_.get_next_message(block=True)
LOG.debug('Got message: %s' % msg)
LOGGER.debug('Got message: %s' % msg)
if isinstance(msg, messaging.LocalfsSyncDisabled):
self.set_status(code=STATUS['DIRECTORY ERROR'])
......@@ -515,7 +515,7 @@ class WebSocketProtocol(WebSocket):
elif isinstance(msg, messaging.PithosSyncEnabled):
remote_ok = True
else:
LOG.error('Unexpected message %s' % msg)
LOGGER.error('Unexpected message %s' % msg)
self.set_status(code=STATUS['CRITICAL ERROR'])
break
if local_ok and remote_ok:
......@@ -523,6 +523,16 @@ class WebSocketProtocol(WebSocket):
self.set_status(code=STATUS['SYNCING'])
else:
syncer_ = None
except pithos_client.ClientError as ce:
LOGGER.debug('backend init failed: %s %s' % (ce, ce.status))
try:
code = {
400: STATUS['AUTH URL ERROR'],
401: STATUS['TOKEN ERROR'],
}[ce.status]
except KeyError:
code = STATUS['UNINITIALIZED']
self.set_status(code=code)
finally:
self.set_status(synced=0, unsynced=0)
with SYNCERS.lock() as d:
......@@ -556,7 +566,7 @@ class WebSocketProtocol(WebSocket):
def _pause_syncer(self):
syncer_ = self.syncer
syncer_.stop_decide()
LOG.debug('Wait open syncs to complete')
LOGGER.debug('Wait open syncs to complete')
syncer_.wait_sync_threads()
def pause_sync(self):
......@@ -583,7 +593,7 @@ class WebSocketProtocol(WebSocket):
self.set_status(code=STATUS['CRITICAL ERROR'])
def send_json(self, msg):
LOG.debug('send: %s' % msg)
LOGGER.debug('send: %s' % msg)
self.send(json.dumps(msg))
# Protocol handling methods
......@@ -596,8 +606,6 @@ class WebSocketProtocol(WebSocket):
self.set_status(code=STATUS['SHUTTING DOWN'])
self.shutdown_syncer()
self.clean_db()
# self._shutdown()
# self.terminate()
return
{
'start': self.start_sync,
......@@ -621,7 +629,7 @@ class WebSocketProtocol(WebSocket):
def _put(self, r):
"""Handle PUT requests"""
if self.accepted:
LOG.debug('put %s' % r)
LOGGER.debug('put %s' % r)
action = r.pop('path')
self.set_settings(r)
r.update({'CREATED': 201, 'action': 'put %s' % action})
......@@ -649,12 +657,11 @@ class WebSocketProtocol(WebSocket):
def received_message(self, message):
"""Route requests to corresponding handling methods"""
LOG.debug('recv: %s' % message)
try:
r = json.loads('%s' % message)
except ValueError as ve:
self.send_json({'BAD REQUEST': 400})
LOG.error('JSON ERROR: %s' % ve)
LOGGER.error('JSON ERROR: %s' % ve)
return
try:
method = r.pop('method')
......@@ -666,7 +673,7 @@ class WebSocketProtocol(WebSocket):
except KeyError as ke:
action = method + ' ' + r.get('path', '')
self.send_json({'BAD REQUEST': 400, 'action': action})
LOG.error('KEY ERROR: %s' % ke)
LOGGER.error('KEY ERROR: %s' % ke)
except setup.ClientError as ce:
action = '%s %s' % (
method, r.get('path', 'ui_id' if 'ui_id' in r else ''))
......@@ -675,13 +682,13 @@ class WebSocketProtocol(WebSocket):
except Exception as e:
self.send_json({'INTERNAL ERROR': 500})
reason = '%s %s' % (method or '', r)
LOG.error('EXCEPTION (%s): %s' % (reason, e))
LOGGER.error('EXCEPTION (%s): %s' % (reason, e))
self.terminate()
def launch_server(callback, debug):
"""Launch the server in a separate process"""
LOG.info('Start SessionHelper session')
LOGGER.info('Start SessionHelper session')
opts = ["start", "daemon"]
if debug:
opts.append('-d')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment