Commit 85606c6a authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Handle failed uploads

When an upload fails, because a new upstream version exists, mark the
serial as failed, so as to skip it later.
parent 650271be
from collections import namedtuple
import threading
FileStateTuple = namedtuple('FileStateTuple',
['archive', 'path', 'serial', 'info'])
......@@ -35,3 +36,28 @@ class InvalidInput(SyncError):
class HandledError(SyncError):
pass
class HardSyncError(SyncError):
pass
class CollisionError(HardSyncError):
pass
class LockedDict(object):
def __init__(self, *args, **kwargs):
self._Dict = {}
self._Lock = threading.Lock()
def put(self, key, value):
self._Lock.acquire()
self._Dict[key] = value
self._Lock.release()
def get(self, key):
self._Lock.acquire()
value = self._Dict.get(key)
self._Lock.release()
return value
......@@ -19,7 +19,7 @@ class FileClient(object):
raise NotImplementedError
def start_pulling_file(self, source_handle, target_state, sync_state,
callback=None):
callback=None, failure_callback=None):
try:
synced_source_state, synced_target_state = \
self._start(source_handle, target_state, sync_state)
......@@ -27,6 +27,9 @@ class FileClient(object):
callback(synced_source_state, synced_target_state)
except common.SyncError as e:
logger.warning(e)
if isinstance(e, common.HardSyncError):
if failure_callback is not None:
failure_callback(source_handle.source_state)
def _start(self, source_handle, target_state, sync_state):
try:
......
......@@ -54,6 +54,19 @@ def give_heartbeat(f):
return inner
def handle_client_errors(f):
@wraps(f)
def inner(*args, **kwargs):
try:
return f(*args, **kwargs)
except ClientError as e:
if e.status == 412: # Precondition failed
raise common.CollisionError(e)
# TODO handle other cases, too
raise common.SyncError(e)
return inner
class PithosSourceHandle(object):
def __init__(self, settings, source_state):
self.NAME = "PithosSourceHandle"
......@@ -91,6 +104,7 @@ class PithosSourceHandle(object):
db.insert_cachepath(fetch_name, self.NAME, filename)
return utils.join_path(self.cache_path, fetch_name)
@handle_client_errors
@give_heartbeat
def send_file(self, sync_state):
fetched_file = self.register_fetch_name(self.path)
......@@ -176,6 +190,7 @@ class PithosTargetHandle(object):
if_etag_match=etag)
return r
@handle_client_errors
@give_heartbeat
def pull(self, source_handle, sync_state):
# assert isinstance(source_handle, LocalfsSourceHandle)
......
......@@ -43,6 +43,7 @@ class FileSyncer(object):
self.get_db = settings.get_db
self.clients = {self.MASTER: master, self.SLAVE: slave}
self.decide_event = None
self.failed_serials = common.LockedDict()
@property
def paused(self):
......@@ -86,6 +87,10 @@ class FileSyncer(object):
client = self.clients[archive]
db_state = db.get_state(archive, path)
ref_state = db.get_state(self.SYNC, path)
if db_state.serial != ref_state.serial:
logger.warning("Serial mismatch in probing archive: %s, path: '%s'"
% (archive, path))
return
client.start_probing_path(path, db_state, ref_state,
callback=self.update_path)
......@@ -139,16 +144,35 @@ class FileSyncer(object):
master_serial = master_state.serial
slave_serial = slave_state.serial
sync_serial = sync_state.serial
if master_serial > sync_serial:
return self._mark_sync_start_path(
master_state, slave_state,
decision_state, sync_state)
decision_serial = decision_state.serial
if decision_serial != sync_serial:
failed_sync = self.failed_serials.get((decision_serial, path))
if failed_sync is None:
logger.warning(
"Already decided: '%s', decision: %s, sync: %s" %
(path, decision_serial, sync_serial))
if decision_serial == master_serial:
return master_state, slave_state, sync_state
elif decision_serial == slave_serial:
return slave_state, master_state, sync_state
else:
raise AssertionError(
"Decision serial %s for path '%s' "
"does not match any archive." %
(decision_serial, path))
else:
logger.warning(
"Ignoring failed decision for: '%s', decision: %s" %
(path, decision_serial))
if master_serial > sync_serial:
self._make_decision_state(decision_state, master_state)
return master_state, slave_state, sync_state
elif master_serial == sync_serial:
if slave_serial > sync_serial:
return self._mark_sync_start_path(
slave_state, master_state,
decision_state, sync_state)
self._make_decision_state(decision_state, slave_state)
return slave_state, master_state, sync_state
elif slave_serial == sync_serial:
return None
else:
......@@ -159,31 +183,17 @@ class FileSyncer(object):
raise AssertionError("Master serial %s, sync serial %s"
% (master_serial, sync_serial))
def _mark_sync_start_path(self,
source_state,
target_state,
decision_state,
sync_state):
def _make_decision_state(self, decision_state, source_state):
db = self.get_db()
new_decision_state = decision_state.set(
serial=source_state.serial, info=source_state.info)
db.put_state(new_decision_state)
def sync_path(self, source_state, target_state, sync_state):
logger.info("Syncing archive: %s, path: '%s', serial: %s" %
(source_state.archive,
source_state.path,
source_state.serial))
path = source_state.path
decision_serial = decision_state.serial
sync_serial = sync_state.serial
decided = decision_serial != sync_serial
if decided:
logger.warning("Already decided: '%s', decision: %s, sync: %s" %
(path, decision_serial, sync_serial))
else:
new_decision_state = decision_state.set(
serial=source_state.serial, info=source_state.info)
db.put_state(new_decision_state)
return source_state, target_state, sync_state
def sync_path(self, source_state, target_state, sync_state):
thread = threading.Thread(
target=self._sync_path,
args=(source_state, target_state, sync_state))
......@@ -200,7 +210,16 @@ class FileSyncer(object):
target_client = clients[target_state.archive]
target_client.start_pulling_file(
source_handle, target_state, sync_state,
callback=self.acknowledge_path)
callback=self.acknowledge_path,
failure_callback=self.mark_as_failed)
def mark_as_failed(self, state):
serial = state.serial
path = state.path
logger.warning(
"Marking failed serial %s for archive: %s, path: '%s'" %
(serial, state.archive, path))
self.failed_serials.put((serial, path), state)
def update_state(self, old_state, new_state):
db = self.get_db()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment