Commit b796612e authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Check if sync thread is alive instead of heartbeat

parent 2469cd72
......@@ -488,13 +488,6 @@ class AgkyraTest(unittest.TestCase):
common.DatabaseError()]
self.s.decide_file_sync(fil)
self.assert_message(messaging.HeartbeatReplayDecideMessage)
self.s.decide_file_sync(fil)
self.assert_message(messaging.HeartbeatNoDecideMessage)
print "SLEEPING 11"
time.sleep(11)
self.s.decide_file_sync(fil)
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
def test_007_multiprobe(self):
fil = "φ007"
......
......@@ -16,7 +16,6 @@
from functools import wraps
import time
import os
import threading
import logging
import re
......@@ -28,48 +27,6 @@ from agkyra.syncer.database import transaction
logger = logging.getLogger(__name__)
def heartbeat_event(settings, heartbeat, objname):
event = threading.Event()
max_interval = settings.action_max_wait / 4.0
def set_log():
with heartbeat.lock() as hb:
registered_name = utils.reg_name(settings, objname)
beat = hb.get(registered_name)
assert beat is not None
new_beat = {"ident": beat["ident"],
"tstamp": utils.time_stamp()}
hb[registered_name] = new_beat
logger.debug("HEARTBEAT '%s' %s" % (registered_name, new_beat))
def go():
interval = 0.2
while True:
if event.is_set():
break
set_log()
time.sleep(interval)
interval = min(1.2 * interval, max_interval)
thread = threading.Thread(target=go)
thread.start()
return event
def give_heartbeat(f):
@wraps(f)
def inner(*args, **kwargs):
obj = args[0]
objname = obj.objname
heartbeat = obj.heartbeat
settings = obj.settings
event = heartbeat_event(settings, heartbeat, objname)
try:
return f(*args, **kwargs)
finally:
event.set()
return inner
def handle_client_errors(f):
@wraps(f)
def inner(*args, **kwargs):
......@@ -107,7 +64,6 @@ class PithosSourceHandle(object):
return utils.join_path(self.cache_path, fetch_name)
@handle_client_errors
@give_heartbeat
def send_file(self, sync_state):
fetched_fspath = self.register_fetch_name(self.objname)
headers = dict()
......@@ -223,7 +179,6 @@ class PithosTargetHandle(object):
return r
@handle_client_errors
@give_heartbeat
def pull(self, source_handle, sync_state):
# assert isinstance(source_handle, LocalfsSourceHandle)
info = sync_state.info
......
......@@ -128,8 +128,8 @@ class FileSyncer(object):
with self.heartbeat.lock() as hb:
beat = hb.get(self.reg_name(objname))
if beat is not None:
if utils.younger_than(
beat["tstamp"], self.settings.action_max_wait):
beat_thread = beat["thread"]
if beat_thread is None or beat_thread.is_alive():
msg = messaging.HeartbeatNoProbeMessage(
archive=archive, objname=objname, heartbeat=beat,
logger=logger)
......@@ -200,6 +200,12 @@ class FileSyncer(object):
try:
states = self._decide_file_sync(objname, master, slave, ident)
except common.DatabaseError:
logger.debug("DatabaseError in _decide_file_sync "
"for '%s'; cleaning up heartbeat" % objname)
with self.heartbeat.lock() as hb:
beat = hb.get(self.reg_name(objname))
if beat and beat["ident"] == ident:
hb.pop(self.reg_name(objname))
return
if states is None:
return
......@@ -214,7 +220,7 @@ class FileSyncer(object):
states = self._do_decide_file_sync(objname, master, slave, ident)
if states is not None:
with self.heartbeat.lock() as hb:
beat = {"ident": ident, "tstamp": utils.time_stamp()}
beat = {"ident": ident, "thread": None}
hb[self.reg_name(objname)] = beat
return states
......@@ -241,8 +247,8 @@ class FileSyncer(object):
objname=objname, heartbeat=beat, logger=logger)
self.messager.put(msg)
else:
if utils.younger_than(
beat["tstamp"], self.settings.action_max_wait):
beat_thread = beat["thread"]
if beat_thread is None or beat_thread.is_alive():
if not dry_run:
msg = messaging.HeartbeatNoDecideMessage(
objname=objname, heartbeat=beat, logger=logger)
......@@ -313,6 +319,13 @@ class FileSyncer(object):
thread = threading.Thread(
target=self._sync_file,
args=(source_state, target_state, sync_state))
with self.heartbeat.lock() as hb:
beat = hb.get(self.reg_name(source_state.objname))
if beat is None:
raise AssertionError("heartbeat for %s is None" %
source_state.objname)
assert beat["thread"] is None
beat["thread"] = thread
thread.start()
self.sync_threads.append(thread)
......@@ -333,14 +346,14 @@ class FileSyncer(object):
def mark_as_failed(self, state, hard=False):
serial = state.serial
objname = state.objname
with self.heartbeat.lock() as hb:
hb.pop(self.reg_name(objname))
if hard:
logger.warning(
"Marking failed serial %s for archive: %s, object: '%s'" %
(serial, state.archive, objname))
with self.failed_serials.lock() as d:
d[(serial, objname)] = state
with self.heartbeat.lock() as hb:
hb.pop(self.reg_name(objname))
def update_state(self, old_state, new_state):
db = self.get_db()
......@@ -352,6 +365,7 @@ class FileSyncer(object):
try:
self._ack_file_sync(synced_source_state, synced_target_state)
except common.DatabaseError:
# maybe clear heartbeat here too
return
serial = synced_source_state.serial
objname = synced_source_state.objname
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment