From d33194891a19af1a0be3a6370ee70468d40001a9 Mon Sep 17 00:00:00 2001
From: Giorgos Korfiatis <gkorf@grnet.gr>
Date: Tue, 12 May 2015 18:08:39 +0300
Subject: [PATCH] raise exception on transaction failure and handle

---
 agkyra/agkyra/syncer/common.py   |  4 ++++
 agkyra/agkyra/syncer/database.py |  2 +-
 agkyra/agkyra/syncer/syncer.py   | 41 ++++++++++++++++++++++----------
 3 files changed, 33 insertions(+), 14 deletions(-)

diff --git a/agkyra/agkyra/syncer/common.py b/agkyra/agkyra/syncer/common.py
index ebe60f0..3003c4d 100644
--- a/agkyra/agkyra/syncer/common.py
+++ b/agkyra/agkyra/syncer/common.py
@@ -55,6 +55,10 @@ class HandledError(SyncError):
     pass
 
 
+class DatabaseError(SyncError):
+    pass
+
+
 class HardSyncError(SyncError):
     pass
 
diff --git a/agkyra/agkyra/syncer/database.py b/agkyra/agkyra/syncer/database.py
index 2fc9d5f..d8843c1 100644
--- a/agkyra/agkyra/syncer/database.py
+++ b/agkyra/agkyra/syncer/database.py
@@ -232,7 +232,7 @@ def transaction(max_wait=60, init_wait=0.4, exp_backoff=1.1):
                                 "Got DB error '%s' while running '%s' "
                                 "with args '%s' and kwargs '%s'. Aborting." %
                                 (e, func.__name__, args, kwargs))
-                            return
+                            raise common.DatabaseError(e)
                     else:
                         raise e
         return inner
diff --git a/agkyra/agkyra/syncer/syncer.py b/agkyra/agkyra/syncer/syncer.py
index 5c1f7c6..8bf4025 100644
--- a/agkyra/agkyra/syncer/syncer.py
+++ b/agkyra/agkyra/syncer/syncer.py
@@ -97,9 +97,12 @@ class FileSyncer(object):
 
     def probe_file(self, archive, objname):
         ident = utils.time_stamp()
-        self._probe_files(archive, [objname], ident)
-        client = self.clients[archive]
-        client.remove_candidates([objname], ident)
+        try:
+            self._probe_files(archive, [objname], ident)
+            client = self.clients[archive]
+            client.remove_candidates([objname], ident)
+        except common.DatabaseError:
+            pass
 
     @transaction()
     def _probe_files(self, archive, objnames, ident):
@@ -159,7 +162,10 @@ class FileSyncer(object):
         if slave is None:
             slave = self.SLAVE
         ident = utils.time_stamp()
-        states = self._decide_file_sync(objname, master, slave, ident)
+        try:
+            states = self._decide_file_sync(objname, master, slave, ident)
+        except common.DatabaseError:
+            return
         if states is None:
             return
         self.sync_file(*states)
@@ -292,7 +298,10 @@ class FileSyncer(object):
         # perhaps triggering a probe
 
     def ack_file_sync(self, synced_source_state, synced_target_state):
-        self._ack_file_sync(synced_source_state, synced_target_state)
+        try:
+            self._ack_file_sync(synced_source_state, synced_target_state)
+        except common.DatabaseError:
+            return
         serial = synced_source_state.serial
         objname = synced_source_state.objname
         target = synced_target_state.archive
@@ -358,19 +367,25 @@ class FileSyncer(object):
     def probe_archive(self, archive, forced=False):
         ident = utils.time_stamp()
         client = self.clients[archive]
-        candidates = client.list_candidate_files(forced=forced)
-        self._probe_files(archive, candidates, ident)
-        client.remove_candidates(candidates, ident)
+        try:
+            candidates = client.list_candidate_files(forced=forced)
+            self._probe_files(archive, candidates, ident)
+            client.remove_candidates(candidates, ident)
+        except common.DatabaseError:
+            pass
 
-    def decide_archive(self, archive):
-        for objname in self.list_deciding([archive]):
-            self.decide_file_sync(objname)
+    def decide_archive(self, archive=None):
+        try:
+            archives = [archive] if archive is not None else None
+            for objname in self.list_deciding(archives):
+                self.decide_file_sync(objname)
+        except common.DatabaseError:
+            pass
 
     def decide_all_archives(self):
         logger.info("Checking candidates to sync")
         self.probe_all()
-        for objname in self.list_deciding():
-            self.decide_file_sync(objname)
+        self.decide_archive()
 
     def probe_all(self, forced=False):
         self.probe_archive(self.MASTER, forced=forced)
-- 
GitLab