diff --git a/agkyra/agkyra/syncer/common.py b/agkyra/agkyra/syncer/common.py index ea71c4c402eca9d0fe89ad89b572a9ce8a5b072c..0248079b9527acb5908714658a4383f550e80f53 100644 --- a/agkyra/agkyra/syncer/common.py +++ b/agkyra/agkyra/syncer/common.py @@ -1,4 +1,5 @@ from collections import namedtuple +import threading FileStateTuple = namedtuple('FileStateTuple', ['archive', 'path', 'serial', 'info']) @@ -35,3 +36,28 @@ class InvalidInput(SyncError): class HandledError(SyncError): pass + + +class HardSyncError(SyncError): + pass + + +class CollisionError(HardSyncError): + pass + + +class LockedDict(object): + def __init__(self, *args, **kwargs): + self._Dict = {} + self._Lock = threading.Lock() + + def put(self, key, value): + self._Lock.acquire() + self._Dict[key] = value + self._Lock.release() + + def get(self, key): + self._Lock.acquire() + value = self._Dict.get(key) + self._Lock.release() + return value diff --git a/agkyra/agkyra/syncer/file_client.py b/agkyra/agkyra/syncer/file_client.py index 5645ab8da97518a5ee8e9af1a659d672e8ea9a22..60156da43cb658c89dd2ef205ab63b14cc10371a 100644 --- a/agkyra/agkyra/syncer/file_client.py +++ b/agkyra/agkyra/syncer/file_client.py @@ -19,7 +19,7 @@ class FileClient(object): raise NotImplementedError def start_pulling_file(self, source_handle, target_state, sync_state, - callback=None): + callback=None, failure_callback=None): try: synced_source_state, synced_target_state = \ self._start(source_handle, target_state, sync_state) @@ -27,6 +27,9 @@ class FileClient(object): callback(synced_source_state, synced_target_state) except common.SyncError as e: logger.warning(e) + if isinstance(e, common.HardSyncError): + if failure_callback is not None: + failure_callback(source_handle.source_state) def _start(self, source_handle, target_state, sync_state): try: diff --git a/agkyra/agkyra/syncer/pithos_client.py b/agkyra/agkyra/syncer/pithos_client.py index 8b572d5de2ce53eb5542cc1116e631fc6c6eba8e..f3a264ae009b25fd0c469986aed282e6a95adcb9 100644 --- a/agkyra/agkyra/syncer/pithos_client.py +++ b/agkyra/agkyra/syncer/pithos_client.py @@ -54,6 +54,19 @@ def give_heartbeat(f): return inner +def handle_client_errors(f): + @wraps(f) + def inner(*args, **kwargs): + try: + return f(*args, **kwargs) + except ClientError as e: + if e.status == 412: # Precondition failed + raise common.CollisionError(e) + # TODO handle other cases, too + raise common.SyncError(e) + return inner + + class PithosSourceHandle(object): def __init__(self, settings, source_state): self.NAME = "PithosSourceHandle" @@ -91,6 +104,7 @@ class PithosSourceHandle(object): db.insert_cachepath(fetch_name, self.NAME, filename) return utils.join_path(self.cache_path, fetch_name) + @handle_client_errors @give_heartbeat def send_file(self, sync_state): fetched_file = self.register_fetch_name(self.path) @@ -176,6 +190,7 @@ class PithosTargetHandle(object): if_etag_match=etag) return r + @handle_client_errors @give_heartbeat def pull(self, source_handle, sync_state): # assert isinstance(source_handle, LocalfsSourceHandle) diff --git a/agkyra/agkyra/syncer/syncer.py b/agkyra/agkyra/syncer/syncer.py index 89611ab62531255881f357e904a279877812f371..9e1dd97b4896b55936ae9387512352f2ae162941 100644 --- a/agkyra/agkyra/syncer/syncer.py +++ b/agkyra/agkyra/syncer/syncer.py @@ -43,6 +43,7 @@ class FileSyncer(object): self.get_db = settings.get_db self.clients = {self.MASTER: master, self.SLAVE: slave} self.decide_event = None + self.failed_serials = common.LockedDict() @property def paused(self): @@ -86,6 +87,10 @@ class FileSyncer(object): client = self.clients[archive] db_state = db.get_state(archive, path) ref_state = db.get_state(self.SYNC, path) + if db_state.serial != ref_state.serial: + logger.warning("Serial mismatch in probing archive: %s, path: '%s'" + % (archive, path)) + return client.start_probing_path(path, db_state, ref_state, callback=self.update_path) @@ -139,16 +144,35 @@ class FileSyncer(object): master_serial = master_state.serial slave_serial = slave_state.serial sync_serial = sync_state.serial - if master_serial > sync_serial: - return self._mark_sync_start_path( - master_state, slave_state, - decision_state, sync_state) + decision_serial = decision_state.serial + + if decision_serial != sync_serial: + failed_sync = self.failed_serials.get((decision_serial, path)) + if failed_sync is None: + logger.warning( + "Already decided: '%s', decision: %s, sync: %s" % + (path, decision_serial, sync_serial)) + if decision_serial == master_serial: + return master_state, slave_state, sync_state + elif decision_serial == slave_serial: + return slave_state, master_state, sync_state + else: + raise AssertionError( + "Decision serial %s for path '%s' " + "does not match any archive." % + (decision_serial, path)) + else: + logger.warning( + "Ignoring failed decision for: '%s', decision: %s" % + (path, decision_serial)) + if master_serial > sync_serial: + self._make_decision_state(decision_state, master_state) + return master_state, slave_state, sync_state elif master_serial == sync_serial: if slave_serial > sync_serial: - return self._mark_sync_start_path( - slave_state, master_state, - decision_state, sync_state) + self._make_decision_state(decision_state, slave_state) + return slave_state, master_state, sync_state elif slave_serial == sync_serial: return None else: @@ -159,31 +183,17 @@ class FileSyncer(object): raise AssertionError("Master serial %s, sync serial %s" % (master_serial, sync_serial)) - def _mark_sync_start_path(self, - source_state, - target_state, - decision_state, - sync_state): + def _make_decision_state(self, decision_state, source_state): db = self.get_db() + new_decision_state = decision_state.set( + serial=source_state.serial, info=source_state.info) + db.put_state(new_decision_state) + + def sync_path(self, source_state, target_state, sync_state): logger.info("Syncing archive: %s, path: '%s', serial: %s" % (source_state.archive, source_state.path, source_state.serial)) - - path = source_state.path - decision_serial = decision_state.serial - sync_serial = sync_state.serial - decided = decision_serial != sync_serial - if decided: - logger.warning("Already decided: '%s', decision: %s, sync: %s" % - (path, decision_serial, sync_serial)) - else: - new_decision_state = decision_state.set( - serial=source_state.serial, info=source_state.info) - db.put_state(new_decision_state) - return source_state, target_state, sync_state - - def sync_path(self, source_state, target_state, sync_state): thread = threading.Thread( target=self._sync_path, args=(source_state, target_state, sync_state)) @@ -200,7 +210,16 @@ class FileSyncer(object): target_client = clients[target_state.archive] target_client.start_pulling_file( source_handle, target_state, sync_state, - callback=self.acknowledge_path) + callback=self.acknowledge_path, + failure_callback=self.mark_as_failed) + + def mark_as_failed(self, state): + serial = state.serial + path = state.path + logger.warning( + "Marking failed serial %s for archive: %s, path: '%s'" % + (serial, state.archive, path)) + self.failed_serials.put((serial, path), state) def update_state(self, old_state, new_state): db = self.get_db()