Commit f0f7db82 authored by Stavros Sachtouris's avatar Stavros Sachtouris Committed by Giorgos Korfiatis
Browse files

Implement clean shutdown with multiple UIs

To shutdown all UIs, one has to "stop" the heart of the UI session.
The heart of a UI session is a database relation in the
"session.db" database. It is emptied when a shutdown message is
received by a running WebSocketProtocol instance.

All other connections periodically check the heart and update its
timestamp. When the tupple containing the session id is empty, all
connections of the same sesion die.

The WSGI server, which is running as a member of a SessionHelper
instance, is shut down by a simpling threaded method launched by
the SessionHelper before running the server. This method is called
_shutdown_daemon and is polling the database every 4 seconds. When
the database is empty, it sends a shutdown signal to the WSGI
server.

These changes affect the way an Agkyra daemon is launched, as well
as the "launch_daemon" and "stop_daemon" CLI commands.
parent 34303d51
......@@ -213,7 +213,7 @@ class AgkyraCLI(cmd.Cmd):
sys.stdout.write('Not running\n')
sys.stdout.flush()
def do_launch(self, line):
def do_start_daemon(self, line):
"""Start the Agkyra daemon if it is not running"""
if self.client:
sys.stderr.write('An Agkyra daemon is already running\n')
......@@ -225,7 +225,7 @@ class AgkyraCLI(cmd.Cmd):
self.do_status('')
sys.stderr.flush()
def do_stop(self, line):
def do_stop_daemon(self, line):
"""Stop the Agkyra daemon, if it is running"""
client = self.client
if client:
......
......@@ -13,12 +13,13 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
var DEBUG = false;
var gui = require('nw.gui');
var path = require('path');
var fs = require('fs');
// Read config file
var DEBUG = false;
var fs = require('fs');
var cnf = JSON.parse(fs.readFileSync(gui.App.argv[0], encoding='utf-8'));
function log_debug(msg) { if (DEBUG) console.log(msg); }
......
......@@ -69,11 +69,15 @@ class SessionHelper(object):
r = self.db.execute('SELECT * FROM %s' % self.session_relation)
sessions = r.fetchall()
if sessions:
last = sessions[-1]
last, expected_id = sessions[-1], getattr(self, 'ui_id', None)
if expected_id and last[0] != '%s' % expected_id:
LOG.debug('Session ID is old')
return None
now, last_beat = time.time(), float(last[2])
if now - last_beat < self.session_timeout:
# Found an active session
return dict(ui_id=last[0], address=last[1])
LOG.debug('No active sessions found')
return None
def create_session(self):
......@@ -89,7 +93,6 @@ class SessionHelper(object):
server_class=WSGIServer,
handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=WebSocketProtocol))
WebSocketProtocol.server = server
server.initialize_websockets_manager()
address = 'ws://%s:%s' % (LOCAL_ADDR, server.server_port)
self.server = server
......@@ -100,6 +103,7 @@ class SessionHelper(object):
self.session_relation, ui_id, address, time.time()))
self.db.commit()
self.ui_id = ui_id
return dict(ui_id=ui_id, address=address)
def wait_session_to_load(self, timeout=20, step=2):
......@@ -128,10 +132,23 @@ class SessionHelper(object):
def start(self):
"""Start the helper server in a thread"""
if getattr(self, 'server', None):
Thread(target=self.server.serve_forever).start()
Thread(target=self._shutdown_daemon).start()
self.server.serve_forever()
def shutdown(self):
def _shutdown_daemon(self):
"""Shutdown the server (needs another thread) and join threads"""
LOG.debug('The Shutdown Daemon is up')
self.db = sqlite3.connect(self.session_db)
# Do not hurry to kill the server, make sure it does not actually work
retry = 3
while retry:
while self.load_active_session():
time.sleep(4)
retry = 3
time.sleep(3)
retry -= 1
LOG.debug('Daemon server is down, removing WSGI server')
self.db.close()
if getattr(self, 'server', None):
t = Thread(target=self.server.shutdown)
t.start()
......@@ -206,6 +223,7 @@ class WebSocketProtocol(WebSocket):
progress=0, synced=0, unsynced=0, paused=True, can_sync=False)
cnf = AgkyraConfig()
essentials = ('url', 'token', 'container', 'directory')
_alive = True
@property
def syncer(self):
......@@ -215,13 +233,21 @@ class WebSocketProtocol(WebSocket):
return None
def heartbeat(self):
if not self._alive:
return
db = sqlite3.connect(self.session_db)
while True:
while self._alive:
time.sleep(2)
db.execute('BEGIN')
db.execute('UPDATE %s SET beat="%s" WHERE ui_id="%s"' % (
self.session_relation, time.time(), self.ui_id))
r = db.execute('SELECT ui_id FROM %s WHERE ui_id="%s"' % (
self.session_relation, self.ui_id))
if r.fetchall():
db.execute('UPDATE %s SET beat="%s" WHERE ui_id="%s"' % (
self.session_relation, time.time(), self.ui_id))
else:
self._alive = False
db.commit()
time.sleep(2)
self._shutdown()
def _get_default_sync(self):
"""Get global.default_sync or pick the first sync as default
......@@ -436,12 +462,15 @@ class WebSocketProtocol(WebSocket):
LOG.debug('Stop protocol heart for this session')
self.heart.stop()
def shutdown(self):
def _shutdown(self):
"""Shutdown the service"""
LOG.debug('Shutdown daemon')
self.close()
LOG.debug('Clean database')
self.clean_db()
Thread(target=self.server.shutdown).start()
if self.can_sync():
self.syncer.stop_all_daemons()
LOG.debug('Wait open syncs to complete')
self.syncer.wait_sync_threads()
LOG.debug('Daemon is now shut down')
def clean_db(self):
"""Clean DB from session traces"""
......@@ -462,11 +491,7 @@ class WebSocketProtocol(WebSocket):
if self.accepted:
action = r['path']
if action == 'shutdown':
if self.can_sync():
self.syncer.stop_all_daemons()
LOG.debug('Wait open syncs to complete')
self.syncer.wait_sync_threads()
self.shutdown()
self.clean_db()
return
{
'start': self.start_sync,
......
......@@ -33,7 +33,7 @@ HANDLER = logging.FileHandler(LOGFILE)
FORMATTER = logging.Formatter("%(name)s %(levelname)s:%(asctime)s:%(message)s")
HANDLER.setFormatter(FORMATTER)
LOGGER.addHandler(HANDLER)
LOGGER.setLevel(logging.DEBUG)
LOGGER.setLevel(logging.INFO)
def main():
......
......@@ -34,10 +34,11 @@ def main():
helper = SessionHelper()
if not helper.load_active_session():
helper.create_session()
helper.server.serve_forever()
helper.start()
else:
LOGGER.info('Another session is running, aborting')
exit(1)
LOGGER.debug('Session Helper is now down')
if __name__ == "__main__":
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment