Commit 0ed942f0 authored by Stavros Sachtouris's avatar Stavros Sachtouris Committed by Giorgos Korfiatis
Browse files

Move heartbeat update operations to main session

Instead of updating the "heart" database from the context of each
connection, update it from the context of the helper session.
The connections will still poll the database to decide whether
they are going to stay open and still have the authority to empty
the database when there is a shutdown request.
parent 3a031ec2
......@@ -16,7 +16,6 @@
import cmd
import sys
import logging
import inspect
from agkyra import config, protocol, protocol_client
......
......@@ -152,30 +152,53 @@ class SessionHelper(object):
time_passed += step
return not bool(self.load_active_session())
def heartbeat(self):
"""General session heartbeat - when heart stops, WSGI server dies"""
db, alive = sqlite3.connect(self.session_db), True
while alive:
time.sleep(2)
try:
db.execute('BEGIN')
r = db.execute('SELECT ui_id FROM %s WHERE ui_id="%s"' % (
self.session_relation, self.ui_id))
if r.fetchall():
db.execute('UPDATE %s SET beat="%s" WHERE ui_id="%s"' % (
self.session_relation, time.time(), self.ui_id))
else:
alive = False
db.commit()
except sqlite3.OperationalError as oe:
if 'locked' not in '%s' % oe:
raise
db.close()
def start(self):
"""Start the helper server in a thread"""
if getattr(self, 'server', None):
Thread(target=self._shutdown_daemon).start()
t = Thread(target=self._shutdown_daemon)
t.start()
Thread(target=self.heartbeat).start()
self.server.serve_forever()
t.join()
LOG.debug('WSGI server is down')
def _shutdown_daemon(self):
"""Shutdown the server (needs another thread) and join threads"""
LOG.debug('The Shutdown Daemon is up')
self.db = sqlite3.connect(self.session_db)
# Do not hurry to kill the server, make sure it does not actually work
retry = 3
while retry:
while self.load_active_session():
time.sleep(4)
retry = 3
time.sleep(3)
retry -= 1
LOG.debug('Daemon server is down, removing WSGI server')
self.db.close()
if getattr(self, 'server', None):
t = Thread(target=self.server.shutdown)
t.start()
t.join()
"""Shutdown WSGI server when the heart stops"""
db = sqlite3.connect(self.session_db)
while True:
time.sleep(4)
try:
r = db.execute('SELECT ui_id FROM %s WHERE ui_id="%s"' % (
self.session_relation, self.ui_id))
if not r.fetchall():
db.close()
time.sleep(5)
t = Thread(target=self.server.shutdown)
t.start()
t.join()
break
except sqlite3.OperationalError:
pass
class WebSocketProtocol(WebSocket):
......@@ -246,7 +269,6 @@ class WebSocketProtocol(WebSocket):
progress=0, synced=0, unsynced=0, paused=True, can_sync=False)
cnf = AgkyraConfig()
essentials = ('url', 'token', 'container', 'directory')
_alive = True
@property
def syncer(self):
......@@ -255,21 +277,42 @@ class WebSocketProtocol(WebSocket):
return sync_obj
return None
def heartbeat(self):
if not self._alive:
return
def _shutdown(self):
"""Shutdown the service"""
LOG.debug('Shutdown syncer')
self.close()
if self.can_sync():
self.syncer.stop_all_daemons()
LOG.debug('Wait open syncs to complete')
self.syncer.wait_sync_threads()
def clean_db(self):
"""Clean DB from session traces"""
LOG.debug('Remove session traces')
db = sqlite3.connect(self.session_db)
while self._alive:
time.sleep(2)
db.execute('BEGIN')
r = db.execute('SELECT ui_id FROM %s WHERE ui_id="%s"' % (
self.session_relation, self.ui_id))
if r.fetchall():
db.execute('UPDATE %s SET beat="%s" WHERE ui_id="%s"' % (
self.session_relation, time.time(), self.ui_id))
else:
self._alive = False
db.commit()
db.execute('BEGIN')
db.execute('DELETE FROM %s' % self.session_relation)
db.commit()
db.close()
def heartbeat(self):
"""Check if socket should be alive"""
db, alive = sqlite3.connect(self.session_db), True
while alive:
time.sleep(1)
try:
db.execute('BEGIN')
r = db.execute('SELECT ui_id FROM %s WHERE ui_id="%s"' % (
self.session_relation, self.ui_id))
if r.fetchall():
db.execute('UPDATE %s SET beat="%s" WHERE ui_id="%s"' % (
self.session_relation, time.time(), self.ui_id))
else:
alive = False
db.commit()
except sqlite3.OperationalError:
alive = True
db.close()
self._shutdown()
def _get_default_sync(self):
......@@ -473,37 +516,6 @@ class WebSocketProtocol(WebSocket):
def start_sync(self):
self.syncer.start_decide()
# WebSocket connection methods
def opened(self):
LOG.debug('Helper: connection established')
self.heart = utils.StoppableThread()
self.heart.run_body = self.heartbeat
self.heart.start()
def closed(self, *args):
"""When a connection closes"""
LOG.debug('Stop protocol heart for this session')
self.heart.stop()
def _shutdown(self):
"""Shutdown the service"""
LOG.debug('Shutdown daemon')
self.close()
if self.can_sync():
self.syncer.stop_all_daemons()
LOG.debug('Wait open syncs to complete')
self.syncer.wait_sync_threads()
LOG.debug('Daemon is now shut down')
def clean_db(self):
"""Clean DB from session traces"""
LOG.debug('Remove session traces')
db = sqlite3.connect(self.session_db)
db.execute('BEGIN')
db.execute('DELETE FROM %s' % self.session_relation)
db.commit()
db.close()
def send_json(self, msg):
LOG.debug('send: %s' % msg)
self.send(json.dumps(msg))
......@@ -515,7 +527,8 @@ class WebSocketProtocol(WebSocket):
action = r['path']
if action == 'shutdown':
retry_on_locked_db(self.clean_db)
self.terminate()
# self._shutdown()
# self.terminate()
return
{
'start': self.start_sync,
......@@ -524,6 +537,7 @@ class WebSocketProtocol(WebSocket):
self.send_json({'OK': 200, 'action': 'post %s' % action})
elif r['ui_id'] == self.ui_id:
self.accepted = True
Thread(target=self.heartbeat).start()
self.send_json({'ACCEPTED': 202, 'action': 'post ui_id'})
self._load_settings()
if (not self.syncer) and self.can_sync():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment