Commit d3319489 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

raise exception on transaction failure and handle

parent ee2fff22
......@@ -55,6 +55,10 @@ class HandledError(SyncError):
pass
class DatabaseError(SyncError):
pass
class HardSyncError(SyncError):
pass
......
......@@ -232,7 +232,7 @@ def transaction(max_wait=60, init_wait=0.4, exp_backoff=1.1):
"Got DB error '%s' while running '%s' "
"with args '%s' and kwargs '%s'. Aborting." %
(e, func.__name__, args, kwargs))
return
raise common.DatabaseError(e)
else:
raise e
return inner
......
......@@ -97,9 +97,12 @@ class FileSyncer(object):
def probe_file(self, archive, objname):
ident = utils.time_stamp()
self._probe_files(archive, [objname], ident)
client = self.clients[archive]
client.remove_candidates([objname], ident)
try:
self._probe_files(archive, [objname], ident)
client = self.clients[archive]
client.remove_candidates([objname], ident)
except common.DatabaseError:
pass
@transaction()
def _probe_files(self, archive, objnames, ident):
......@@ -159,7 +162,10 @@ class FileSyncer(object):
if slave is None:
slave = self.SLAVE
ident = utils.time_stamp()
states = self._decide_file_sync(objname, master, slave, ident)
try:
states = self._decide_file_sync(objname, master, slave, ident)
except common.DatabaseError:
return
if states is None:
return
self.sync_file(*states)
......@@ -292,7 +298,10 @@ class FileSyncer(object):
# perhaps triggering a probe
def ack_file_sync(self, synced_source_state, synced_target_state):
self._ack_file_sync(synced_source_state, synced_target_state)
try:
self._ack_file_sync(synced_source_state, synced_target_state)
except common.DatabaseError:
return
serial = synced_source_state.serial
objname = synced_source_state.objname
target = synced_target_state.archive
......@@ -358,19 +367,25 @@ class FileSyncer(object):
def probe_archive(self, archive, forced=False):
ident = utils.time_stamp()
client = self.clients[archive]
candidates = client.list_candidate_files(forced=forced)
self._probe_files(archive, candidates, ident)
client.remove_candidates(candidates, ident)
try:
candidates = client.list_candidate_files(forced=forced)
self._probe_files(archive, candidates, ident)
client.remove_candidates(candidates, ident)
except common.DatabaseError:
pass
def decide_archive(self, archive):
for objname in self.list_deciding([archive]):
self.decide_file_sync(objname)
def decide_archive(self, archive=None):
try:
archives = [archive] if archive is not None else None
for objname in self.list_deciding(archives):
self.decide_file_sync(objname)
except common.DatabaseError:
pass
def decide_all_archives(self):
logger.info("Checking candidates to sync")
self.probe_all()
for objname in self.list_deciding():
self.decide_file_sync(objname)
self.decide_archive()
def probe_all(self, forced=False):
self.probe_archive(self.MASTER, forced=forced)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment