diff --git a/agkyra/agkyra/syncer/localfs_client.py b/agkyra/agkyra/syncer/localfs_client.py index e6bfe1ed4851dcf99e887710f7b74a0336da305f..5d585486bfe2dc72d0f91904ca69e9fdaff134d8 100644 --- a/agkyra/agkyra/syncer/localfs_client.py +++ b/agkyra/agkyra/syncer/localfs_client.py @@ -9,7 +9,7 @@ from watchdog.events import FileSystemEventHandler import logging from agkyra.syncer.file_client import FileClient -from agkyra.syncer import utils, common +from agkyra.syncer import utils, common, messaging from agkyra.syncer.database import transaction logger = logging.getLogger(__name__) @@ -192,6 +192,7 @@ def is_info_eq(info1, info2): class LocalfsTargetHandle(object): def __init__(self, settings, target_state): + self.settings = settings self.SIGNATURE = "LocalfsTargetHandle" self.rootpath = settings.local_root_path self.cache_hide_name = settings.cache_hide_name @@ -289,8 +290,9 @@ class LocalfsTargetHandle(object): def stash_file(self): stash_name = mk_stash_name(self.objname) stash_path = utils.join_path(self.rootpath, stash_name) - logger.warning("Stashing file '%s' to '%s'" % - (self.objname, stash_name)) + msg = messaging.ConflictStashMessage( + objname=self.objname, stash_name=stash_name, logger=logger) + self.settings.messager.put(msg) os.rename(self.hidden_path, stash_path) def finalize(self, filename, live_info): @@ -382,6 +384,7 @@ class LocalfsSourceHandle(object): logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path)) def __init__(self, settings, source_state): + self.settings = settings self.SIGNATURE = "LocalfsSourceHandle" self.rootpath = settings.local_root_path self.cache_stage_name = settings.cache_stage_name diff --git a/agkyra/agkyra/syncer/messaging.py b/agkyra/agkyra/syncer/messaging.py new file mode 100644 index 0000000000000000000000000000000000000000..541a0cba7941e5b18d8ba965738454771e774514 --- /dev/null +++ b/agkyra/agkyra/syncer/messaging.py @@ -0,0 +1,73 @@ +import Queue + +from agkyra.syncer import utils + + +class Messager(object): + def __init__(self, *args, **kwargs): + self.queue = Queue.Queue() + + def put(self, obj): + return self.queue.put(obj) + + def get(self, **kwargs): + try: + return self.queue.get(**kwargs) + except Queue.Empty: + return None + + +class Message(object): + def __init__(self, *args, **kwargs): + self.tstamp = utils.time_stamp() + self.logger = kwargs["logger"] + self.name = self.__class__.__name__ + + +class UpdateMessage(Message): + def __init__(self, *args, **kwargs): + Message.__init__(self, *args, **kwargs) + self.archive = kwargs["archive"] + self.objname = kwargs["objname"] + self.serial = kwargs["serial"] + self.logger.info("Updating archive: %s, object: '%s', serial: %s" % + (self.archive, self.objname, self.serial)) + + +class SyncMessage(Message): + def __init__(self, *args, **kwargs): + Message.__init__(self, *args, **kwargs) + self.objname = kwargs["objname"] + self.archive = kwargs["archive"] + self.serial = kwargs["serial"] + self.logger.info("Syncing archive: %s, object: '%s', serial: %s" % + (self.archive, self.objname, self.serial)) + + +class AckSyncMessage(Message): + def __init__(self, *args, **kwargs): + Message.__init__(self, *args, **kwargs) + self.objname = kwargs["objname"] + self.archive = kwargs["archive"] + self.serial = kwargs["serial"] + self.logger.info("Acking archive: %s, object: '%s', serial: %s" % + (self.archive, self.objname, self.serial)) + + +class CollisionMessage(Message): + def __init__(self, *args, **kwargs): + Message.__init__(self, *args, **kwargs) + self.objname = kwargs["objname"] + self.etag = kwargs["etag"] + self.logger.warning( + "Failed to upload; object: '%s' with etag: %s " + "collided with upstream" % (self.objname, self.etag)) + + +class ConflictStashMessage(Message): + def __init__(self, *args, **kwargs): + Message.__init__(self, *args, **kwargs) + self.objname = kwargs["objname"] + self.stash_name = kwargs["stash_name"] + self.logger.warning("Stashing file '%s' to '%s'" % + (self.objname, self.stash_name)) diff --git a/agkyra/agkyra/syncer/pithos_client.py b/agkyra/agkyra/syncer/pithos_client.py index f6a834c12c160d1d6909a81fb3c5ba9d57a0fb8c..b6d7856ef260dab62b828f96f77444291cdb929a 100644 --- a/agkyra/agkyra/syncer/pithos_client.py +++ b/agkyra/agkyra/syncer/pithos_client.py @@ -6,7 +6,7 @@ import threading import logging import re -from agkyra.syncer import utils, common +from agkyra.syncer import utils, common, messaging from agkyra.syncer.file_client import FileClient from agkyra.syncer.setup import ClientError from agkyra.syncer.database import transaction @@ -23,7 +23,7 @@ def heartbeat_event(settings, heartbeat, objname): client, prev_tstamp = hb.get(objname) tpl = (client, utils.time_stamp()) hb.set(objname, tpl) - logger.info("HEARTBEAT '%s' %s %s" % ((objname,) + tpl)) + logger.debug("HEARTBEAT '%s' %s %s" % ((objname,) + tpl)) def go(): interval = 0.2 @@ -200,27 +200,36 @@ class PithosTargetHandle(object): # assert isinstance(source_handle, LocalfsSourceHandle) info = sync_state.info etag = info.get("pithos_etag") - if source_handle.info_is_deleted_or_unhandled(): - if etag is not None: - logger.info("Deleting object '%s'" % self.target_objname) - self.safe_object_del(self.target_objname, etag) - live_info = {} - elif source_handle.info_is_dir(): - logger.info("Creating dir '%s'" % self.target_objname) - r = self.directory_put(self.target_objname, etag) - synced_etag = r.headers["etag"] - live_info = {"pithos_etag": synced_etag, - "pithos_type": common.T_DIR} - else: - with open(source_handle.staged_path, mode="rb") as fil: - r = self.endpoint.upload_object( - self.target_objname, - fil, - if_etag_match=info.get("pithos_etag")) - synced_etag = r["etag"] + try: + if source_handle.info_is_deleted_or_unhandled(): + if etag is not None: + logger.info("Deleting object '%s'" % self.target_objname) + self.safe_object_del(self.target_objname, etag) + live_info = {} + elif source_handle.info_is_dir(): + logger.info("Creating dir '%s'" % self.target_objname) + r = self.directory_put(self.target_objname, etag) + synced_etag = r.headers["etag"] + live_info = {"pithos_etag": synced_etag, + "pithos_type": common.T_DIR} + else: + with open(source_handle.staged_path, mode="rb") as fil: + r = self.endpoint.upload_object( + self.target_objname, + fil, + if_etag_match=info.get("pithos_etag")) + synced_etag = r["etag"] live_info = {"pithos_etag": synced_etag, - "pithos_type": common.T_FILE} - return self.target_state.set(info=live_info) + "pithos_type": common.T_DIR} + return self.target_state.set(info=live_info) + except ClientError as e: + if e.status == 412: # Precondition failed + msg = messaging.CollisionMessage( + objname=self.target_objname, etag=etag, logger=logger) + self.settings.messager.put(msg) + raise common.CollisionError(e) + else: + raise def object_isdir(obj): diff --git a/agkyra/agkyra/syncer/setup.py b/agkyra/agkyra/syncer/setup.py index 5a3aaf04d8f84d7a7ef7e07e73a7be0fcd573665..b2b6c76fe90fea3807b2c7a885fb4f8847c6d429 100644 --- a/agkyra/agkyra/syncer/setup.py +++ b/agkyra/agkyra/syncer/setup.py @@ -5,6 +5,7 @@ import logging from agkyra.syncer.utils import join_path from agkyra.syncer.database import SqliteFileStateDB from agkyra.syncer.heartbeat import HeartBeat +from agkyra.syncer.messaging import Messager from kamaki.clients import ClientError @@ -77,6 +78,7 @@ class SyncerSettings(): self.heartbeat = HeartBeat() self.action_max_wait = kwargs.get("action_max_wait", DEFAULT_ACTION_MAX_WAIT) + self.messager = Messager() def get_db(self, initialize=False): dbs = getattr(thread_local_data, "dbs", None) diff --git a/agkyra/agkyra/syncer/syncer.py b/agkyra/agkyra/syncer/syncer.py index 51e366b2b8bf7895198434cc3e45175b9d2206af..d032c54b5c75c83a7708a0e6800cf8262c860e34 100644 --- a/agkyra/agkyra/syncer/syncer.py +++ b/agkyra/agkyra/syncer/syncer.py @@ -8,6 +8,7 @@ from agkyra.syncer.setup import SyncerSettings from agkyra.syncer.database import transaction from agkyra.syncer.localfs_client import LocalfsFileClient from agkyra.syncer.pithos_client import PithosFileClient +from agkyra.syncer import messaging logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) @@ -43,6 +44,7 @@ class FileSyncer(object): self.clients = {self.MASTER: master, self.SLAVE: slave} self.decide_event = None self.failed_serials = common.LockedDict() + self.messager = settings.messager @property def paused(self): @@ -67,6 +69,9 @@ class FileSyncer(object): if self.decide_event is not None: self.decide_event.clear() + def get_next_message(self): + return self.messager.get(block=False) + def exclude_file(self, objname): parts = objname.split(common.OBJECT_DIRSEP) init_part = parts[0] @@ -104,8 +109,9 @@ class FileSyncer(object): logger.warning("Ignoring update archive: %s, object: %s" % (archive, objname)) return - logger.info("Updating archive: %s, object: '%s', serial: %s" % - (archive, objname, serial)) + msg = messaging.UpdateMessage( + archive=archive, objname=objname, serial=serial, logger=logger) + self.messager.put(msg) db_state = db.get_state(archive, objname) if db_state and db_state.serial != serial: logger.warning( @@ -190,10 +196,12 @@ class FileSyncer(object): db.put_state(new_decision_state) def sync_file(self, source_state, target_state, sync_state): - logger.info("Syncing archive: %s, object: '%s', serial: %s" % - (source_state.archive, - source_state.objname, - source_state.serial)) + msg = messaging.SyncMessage( + objname=source_state.objname, + archive=source_state.archive, + serial=source_state.serial, + logger=logger) + self.messager.put(msg) thread = threading.Thread( target=self._sync_file, args=(source_state, target_state, sync_state)) @@ -234,9 +242,10 @@ class FileSyncer(object): objname = synced_source_state.objname source = synced_source_state.archive target = synced_target_state.archive - logger.info("Acking archive: %s, object: '%s', serial: %s" % - (target, objname, serial)) - + msg = messaging.AckSyncMessage( + archive=target, objname=objname, serial=serial, + logger=logger) + self.messager.put(msg) decision_state = db.get_state(self.DECISION, objname) sync_state = db.get_state(self.SYNC, objname)