Commit 48b85188 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis

Don't decide object if it failed recently

Send message when sync fails, either at staging or pulling, and keep the
heartbeat record, so as to prohibit consecutive unsuccessful syncings.
parent 5c6b038b
......@@ -98,6 +98,7 @@ class AgkyraTest(unittest.TestCase):
auth_token=TOKEN,
container=cls.ID,
local_root_path=cls.LOCAL_ROOT_PATH,
action_max_wait=5,
ignore_ssl=True)
cls.master = PithosFileClient(cls.settings)
......@@ -329,6 +330,11 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.CollisionMessage)
self.assert_message(messaging.SyncErrorMessage)
# this will fail because of recent failed sync
self.s.decide_file_sync(fil)
self.assert_message(messaging.HeartbeatSkipDecideMessage)
time.sleep(self.settings.action_max_wait)
# this will fail because serial is marked as failed
self.s.decide_file_sync(fil)
self.assert_message(messaging.FailedSyncIgnoreDecisionMessage)
......@@ -463,6 +469,7 @@ class AgkyraTest(unittest.TestCase):
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.SyncErrorMessage)
time.sleep(self.settings.action_max_wait)
# but if we fist sync the dir, it's ok
self.s.decide_file_sync(fil)
......@@ -595,6 +602,7 @@ class AgkyraTest(unittest.TestCase):
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.SyncErrorMessage)
time.sleep(self.settings.action_max_wait)
# but this is ok
self.s.decide_file_sync(innerd)
......
......@@ -34,21 +34,11 @@ class FileClient(object):
raise NotImplementedError
def start_pulling_file(self, source_handle, target_state, sync_state,
callback=None, failure_callback=None):
try:
synced_source_state, synced_target_state = \
self._start(source_handle, target_state, sync_state)
if callback is not None:
callback(synced_source_state, synced_target_state)
except common.SyncError as e:
hard = isinstance(e, common.HardSyncError)
if failure_callback is not None:
failure_callback(source_handle.source_state, hard=hard)
msg = messaging.SyncErrorMessage(
objname=target_state.objname,
serial=source_handle.source_state.serial,
exception=e, logger=logger)
self.settings.messager.put(msg)
callback=None):
synced_source_state, synced_target_state = \
self._start(source_handle, target_state, sync_state)
if callback is not None:
callback(synced_source_state, synced_target_state)
def _start(self, source_handle, target_state, sync_state):
try:
......
......@@ -98,6 +98,15 @@ class HeartbeatReplayDecideMessage(Message):
% self.heartbeat["ident"])
class HeartbeatSkipDecideMessage(Message):
def __init__(self, *args, **kwargs):
Message.__init__(self, *args, **kwargs)
self.objname = kwargs["objname"]
self.heartbeat = kwargs["heartbeat"]
self.logger.debug("Skipping decide due to recent failure: %s" %
self.objname)
class FailedSyncIgnoreDecisionMessage(Message):
def __init__(self, *args, **kwargs):
Message.__init__(self, *args, **kwargs)
......
......@@ -42,7 +42,7 @@ DEFAULT_CACHE_STAGE_NAME = 'staged'
DEFAULT_CACHE_FETCH_NAME = 'fetched'
GLOBAL_SETTINGS_NAME = '.agkyra'
DEFAULT_DBNAME = "syncer.db"
DEFAULT_ACTION_MAX_WAIT = 10
DEFAULT_ACTION_MAX_WAIT = 30
DEFAULT_PITHOS_LIST_INTERVAL = 5
DEFAULT_CONNECTION_RETRY_LIMIT = 3
INSTANCES_NAME = 'instances'
......
......@@ -28,6 +28,31 @@ from agkyra.syncer import messaging, utils
logger = logging.getLogger(__name__)
class HandleSyncErrors(object):
def __init__(self, state, messager, callback=None):
self.state = state
self.callback = callback
self.messager = messager
def __enter__(self):
pass
def __exit__(self, exctype, value, traceback):
if value is None:
return
if not isinstance(value, common.SyncError):
return False # re-raise
hard = isinstance(value, common.HardSyncError)
if self.callback is not None:
self.callback(self.state, hard=hard)
msg = messaging.SyncErrorMessage(
objname=self.state.objname,
serial=self.state.serial,
exception=value, logger=logger)
self.messager.put(msg)
return True
class FileSyncer(object):
dbname = None
......@@ -274,6 +299,13 @@ class FileSyncer(object):
objname=objname, heartbeat=beat, logger=logger)
self.messager.put(msg)
return None
if utils.younger_than(beat["ident"],
self.settings.action_max_wait):
if not dry_run:
msg = messaging.HeartbeatSkipDecideMessage(
objname=objname, heartbeat=beat, logger=logger)
self.messager.put(msg)
return None
logger.warning("Ignoring previous run: %s %s" %
(objname, beat))
......@@ -372,16 +404,13 @@ class FileSyncer(object):
def _sync_file(self, source_state, target_state, sync_state):
clients = self.clients
source_client = clients[source_state.archive]
try:
source_handle = source_client.stage_file(source_state)
except common.SyncError as e:
logger.warning(e)
return
target_client = clients[target_state.archive]
target_client.start_pulling_file(
source_handle, target_state, sync_state,
callback=self.ack_file_sync,
failure_callback=self.mark_as_failed)
with HandleSyncErrors(
source_state, self.messager, self.mark_as_failed):
source_handle = source_client.stage_file(source_state)
target_client.start_pulling_file(
source_handle, target_state, sync_state,
callback=self.ack_file_sync)
def mark_as_failed(self, state, hard=False):
serial = state.serial
......@@ -392,7 +421,6 @@ class FileSyncer(object):
(serial, state.archive, objname))
with self.failed_serials.lock() as d:
d[(serial, objname)] = state
self.clean_heartbeat([objname])
def update_state(self, db, old_state, new_state):
db.put_state(new_state)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment