From 85606c6a7ddc05e9b31e3ad4df01454b92d4a56c Mon Sep 17 00:00:00 2001
From: Giorgos Korfiatis <gkorf@grnet.gr>
Date: Mon, 20 Apr 2015 18:21:29 +0300
Subject: [PATCH] Handle failed uploads
When an upload fails, because a new upstream version exists, mark the
serial as failed, so as to skip it later.
---
agkyra/agkyra/syncer/common.py | 26 ++++++++++
agkyra/agkyra/syncer/file_client.py | 5 +-
agkyra/agkyra/syncer/pithos_client.py | 15 ++++++
agkyra/agkyra/syncer/syncer.py | 75 +++++++++++++++++----------
4 files changed, 92 insertions(+), 29 deletions(-)
diff --git a/agkyra/agkyra/syncer/common.py b/agkyra/agkyra/syncer/common.py
index ea71c4c..0248079 100644
--- a/agkyra/agkyra/syncer/common.py
+++ b/agkyra/agkyra/syncer/common.py
@@ -1,4 +1,5 @@
from collections import namedtuple
+import threading
FileStateTuple = namedtuple('FileStateTuple',
['archive', 'path', 'serial', 'info'])
@@ -35,3 +36,28 @@ class InvalidInput(SyncError):
class HandledError(SyncError):
pass
+
+
+class HardSyncError(SyncError):
+ pass
+
+
+class CollisionError(HardSyncError):
+ pass
+
+
+class LockedDict(object):
+ def __init__(self, *args, **kwargs):
+ self._Dict = {}
+ self._Lock = threading.Lock()
+
+ def put(self, key, value):
+ self._Lock.acquire()
+ self._Dict[key] = value
+ self._Lock.release()
+
+ def get(self, key):
+ self._Lock.acquire()
+ value = self._Dict.get(key)
+ self._Lock.release()
+ return value
diff --git a/agkyra/agkyra/syncer/file_client.py b/agkyra/agkyra/syncer/file_client.py
index 5645ab8..60156da 100644
--- a/agkyra/agkyra/syncer/file_client.py
+++ b/agkyra/agkyra/syncer/file_client.py
@@ -19,7 +19,7 @@ class FileClient(object):
raise NotImplementedError
def start_pulling_file(self, source_handle, target_state, sync_state,
- callback=None):
+ callback=None, failure_callback=None):
try:
synced_source_state, synced_target_state = \
self._start(source_handle, target_state, sync_state)
@@ -27,6 +27,9 @@ class FileClient(object):
callback(synced_source_state, synced_target_state)
except common.SyncError as e:
logger.warning(e)
+ if isinstance(e, common.HardSyncError):
+ if failure_callback is not None:
+ failure_callback(source_handle.source_state)
def _start(self, source_handle, target_state, sync_state):
try:
diff --git a/agkyra/agkyra/syncer/pithos_client.py b/agkyra/agkyra/syncer/pithos_client.py
index 8b572d5..f3a264a 100644
--- a/agkyra/agkyra/syncer/pithos_client.py
+++ b/agkyra/agkyra/syncer/pithos_client.py
@@ -54,6 +54,19 @@ def give_heartbeat(f):
return inner
+def handle_client_errors(f):
+ @wraps(f)
+ def inner(*args, **kwargs):
+ try:
+ return f(*args, **kwargs)
+ except ClientError as e:
+ if e.status == 412: # Precondition failed
+ raise common.CollisionError(e)
+ # TODO handle other cases, too
+ raise common.SyncError(e)
+ return inner
+
+
class PithosSourceHandle(object):
def __init__(self, settings, source_state):
self.NAME = "PithosSourceHandle"
@@ -91,6 +104,7 @@ class PithosSourceHandle(object):
db.insert_cachepath(fetch_name, self.NAME, filename)
return utils.join_path(self.cache_path, fetch_name)
+ @handle_client_errors
@give_heartbeat
def send_file(self, sync_state):
fetched_file = self.register_fetch_name(self.path)
@@ -176,6 +190,7 @@ class PithosTargetHandle(object):
if_etag_match=etag)
return r
+ @handle_client_errors
@give_heartbeat
def pull(self, source_handle, sync_state):
# assert isinstance(source_handle, LocalfsSourceHandle)
diff --git a/agkyra/agkyra/syncer/syncer.py b/agkyra/agkyra/syncer/syncer.py
index 89611ab..9e1dd97 100644
--- a/agkyra/agkyra/syncer/syncer.py
+++ b/agkyra/agkyra/syncer/syncer.py
@@ -43,6 +43,7 @@ class FileSyncer(object):
self.get_db = settings.get_db
self.clients = {self.MASTER: master, self.SLAVE: slave}
self.decide_event = None
+ self.failed_serials = common.LockedDict()
@property
def paused(self):
@@ -86,6 +87,10 @@ class FileSyncer(object):
client = self.clients[archive]
db_state = db.get_state(archive, path)
ref_state = db.get_state(self.SYNC, path)
+ if db_state.serial != ref_state.serial:
+ logger.warning("Serial mismatch in probing archive: %s, path: '%s'"
+ % (archive, path))
+ return
client.start_probing_path(path, db_state, ref_state,
callback=self.update_path)
@@ -139,16 +144,35 @@ class FileSyncer(object):
master_serial = master_state.serial
slave_serial = slave_state.serial
sync_serial = sync_state.serial
- if master_serial > sync_serial:
- return self._mark_sync_start_path(
- master_state, slave_state,
- decision_state, sync_state)
+ decision_serial = decision_state.serial
+
+ if decision_serial != sync_serial:
+ failed_sync = self.failed_serials.get((decision_serial, path))
+ if failed_sync is None:
+ logger.warning(
+ "Already decided: '%s', decision: %s, sync: %s" %
+ (path, decision_serial, sync_serial))
+ if decision_serial == master_serial:
+ return master_state, slave_state, sync_state
+ elif decision_serial == slave_serial:
+ return slave_state, master_state, sync_state
+ else:
+ raise AssertionError(
+ "Decision serial %s for path '%s' "
+ "does not match any archive." %
+ (decision_serial, path))
+ else:
+ logger.warning(
+ "Ignoring failed decision for: '%s', decision: %s" %
+ (path, decision_serial))
+ if master_serial > sync_serial:
+ self._make_decision_state(decision_state, master_state)
+ return master_state, slave_state, sync_state
elif master_serial == sync_serial:
if slave_serial > sync_serial:
- return self._mark_sync_start_path(
- slave_state, master_state,
- decision_state, sync_state)
+ self._make_decision_state(decision_state, slave_state)
+ return slave_state, master_state, sync_state
elif slave_serial == sync_serial:
return None
else:
@@ -159,31 +183,17 @@ class FileSyncer(object):
raise AssertionError("Master serial %s, sync serial %s"
% (master_serial, sync_serial))
- def _mark_sync_start_path(self,
- source_state,
- target_state,
- decision_state,
- sync_state):
+ def _make_decision_state(self, decision_state, source_state):
db = self.get_db()
+ new_decision_state = decision_state.set(
+ serial=source_state.serial, info=source_state.info)
+ db.put_state(new_decision_state)
+
+ def sync_path(self, source_state, target_state, sync_state):
logger.info("Syncing archive: %s, path: '%s', serial: %s" %
(source_state.archive,
source_state.path,
source_state.serial))
-
- path = source_state.path
- decision_serial = decision_state.serial
- sync_serial = sync_state.serial
- decided = decision_serial != sync_serial
- if decided:
- logger.warning("Already decided: '%s', decision: %s, sync: %s" %
- (path, decision_serial, sync_serial))
- else:
- new_decision_state = decision_state.set(
- serial=source_state.serial, info=source_state.info)
- db.put_state(new_decision_state)
- return source_state, target_state, sync_state
-
- def sync_path(self, source_state, target_state, sync_state):
thread = threading.Thread(
target=self._sync_path,
args=(source_state, target_state, sync_state))
@@ -200,7 +210,16 @@ class FileSyncer(object):
target_client = clients[target_state.archive]
target_client.start_pulling_file(
source_handle, target_state, sync_state,
- callback=self.acknowledge_path)
+ callback=self.acknowledge_path,
+ failure_callback=self.mark_as_failed)
+
+ def mark_as_failed(self, state):
+ serial = state.serial
+ path = state.path
+ logger.warning(
+ "Marking failed serial %s for archive: %s, path: '%s'" %
+ (serial, state.archive, path))
+ self.failed_serials.put((serial, path), state)
def update_state(self, old_state, new_state):
db = self.get_db()
--
GitLab