diff --git a/agkyra/agkyra/syncer/common.py b/agkyra/agkyra/syncer/common.py index ebe60f05e44f65b3d72297df512022c74ff723dd..3003c4d303421bc1dacee6b1c3273508b9292bd5 100644 --- a/agkyra/agkyra/syncer/common.py +++ b/agkyra/agkyra/syncer/common.py @@ -55,6 +55,10 @@ class HandledError(SyncError): pass +class DatabaseError(SyncError): + pass + + class HardSyncError(SyncError): pass diff --git a/agkyra/agkyra/syncer/database.py b/agkyra/agkyra/syncer/database.py index 2fc9d5ff1972319a6dfe9900d50d6b0d42a991b2..d8843c1c4d2c3c6bcad468371a35da83d6e3863f 100644 --- a/agkyra/agkyra/syncer/database.py +++ b/agkyra/agkyra/syncer/database.py @@ -232,7 +232,7 @@ def transaction(max_wait=60, init_wait=0.4, exp_backoff=1.1): "Got DB error '%s' while running '%s' " "with args '%s' and kwargs '%s'. Aborting." % (e, func.__name__, args, kwargs)) - return + raise common.DatabaseError(e) else: raise e return inner diff --git a/agkyra/agkyra/syncer/syncer.py b/agkyra/agkyra/syncer/syncer.py index 5c1f7c666ac26e9d8139e403654bc50659c274ca..8bf40259b9bd29feae000d6b247d50f6d28287fc 100644 --- a/agkyra/agkyra/syncer/syncer.py +++ b/agkyra/agkyra/syncer/syncer.py @@ -97,9 +97,12 @@ class FileSyncer(object): def probe_file(self, archive, objname): ident = utils.time_stamp() - self._probe_files(archive, [objname], ident) - client = self.clients[archive] - client.remove_candidates([objname], ident) + try: + self._probe_files(archive, [objname], ident) + client = self.clients[archive] + client.remove_candidates([objname], ident) + except common.DatabaseError: + pass @transaction() def _probe_files(self, archive, objnames, ident): @@ -159,7 +162,10 @@ class FileSyncer(object): if slave is None: slave = self.SLAVE ident = utils.time_stamp() - states = self._decide_file_sync(objname, master, slave, ident) + try: + states = self._decide_file_sync(objname, master, slave, ident) + except common.DatabaseError: + return if states is None: return self.sync_file(*states) @@ -292,7 +298,10 @@ class FileSyncer(object): # perhaps triggering a probe def ack_file_sync(self, synced_source_state, synced_target_state): - self._ack_file_sync(synced_source_state, synced_target_state) + try: + self._ack_file_sync(synced_source_state, synced_target_state) + except common.DatabaseError: + return serial = synced_source_state.serial objname = synced_source_state.objname target = synced_target_state.archive @@ -358,19 +367,25 @@ class FileSyncer(object): def probe_archive(self, archive, forced=False): ident = utils.time_stamp() client = self.clients[archive] - candidates = client.list_candidate_files(forced=forced) - self._probe_files(archive, candidates, ident) - client.remove_candidates(candidates, ident) + try: + candidates = client.list_candidate_files(forced=forced) + self._probe_files(archive, candidates, ident) + client.remove_candidates(candidates, ident) + except common.DatabaseError: + pass - def decide_archive(self, archive): - for objname in self.list_deciding([archive]): - self.decide_file_sync(objname) + def decide_archive(self, archive=None): + try: + archives = [archive] if archive is not None else None + for objname in self.list_deciding(archives): + self.decide_file_sync(objname) + except common.DatabaseError: + pass def decide_all_archives(self): logger.info("Checking candidates to sync") self.probe_all() - for objname in self.list_deciding(): - self.decide_file_sync(objname) + self.decide_archive() def probe_all(self, forced=False): self.probe_archive(self.MASTER, forced=forced)