Commit df2a067c authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Pass messages for important actions

parent d4df2770
......@@ -9,7 +9,7 @@ from watchdog.events import FileSystemEventHandler
import logging
from agkyra.syncer.file_client import FileClient
from agkyra.syncer import utils, common
from agkyra.syncer import utils, common, messaging
from agkyra.syncer.database import transaction
logger = logging.getLogger(__name__)
......@@ -192,6 +192,7 @@ def is_info_eq(info1, info2):
class LocalfsTargetHandle(object):
def __init__(self, settings, target_state):
self.settings = settings
self.SIGNATURE = "LocalfsTargetHandle"
self.rootpath = settings.local_root_path
self.cache_hide_name = settings.cache_hide_name
......@@ -289,8 +290,9 @@ class LocalfsTargetHandle(object):
def stash_file(self):
stash_name = mk_stash_name(self.objname)
stash_path = utils.join_path(self.rootpath, stash_name)
logger.warning("Stashing file '%s' to '%s'" %
(self.objname, stash_name))
msg = messaging.ConflictStashMessage(
objname=self.objname, stash_name=stash_name, logger=logger)
self.settings.messager.put(msg)
os.rename(self.hidden_path, stash_path)
def finalize(self, filename, live_info):
......@@ -382,6 +384,7 @@ class LocalfsSourceHandle(object):
logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path))
def __init__(self, settings, source_state):
self.settings = settings
self.SIGNATURE = "LocalfsSourceHandle"
self.rootpath = settings.local_root_path
self.cache_stage_name = settings.cache_stage_name
......
import Queue
from agkyra.syncer import utils
class Messager(object):
def __init__(self, *args, **kwargs):
self.queue = Queue.Queue()
def put(self, obj):
return self.queue.put(obj)
def get(self, **kwargs):
try:
return self.queue.get(**kwargs)
except Queue.Empty:
return None
class Message(object):
def __init__(self, *args, **kwargs):
self.tstamp = utils.time_stamp()
self.logger = kwargs["logger"]
self.name = self.__class__.__name__
class UpdateMessage(Message):
def __init__(self, *args, **kwargs):
Message.__init__(self, *args, **kwargs)
self.archive = kwargs["archive"]
self.objname = kwargs["objname"]
self.serial = kwargs["serial"]
self.logger.info("Updating archive: %s, object: '%s', serial: %s" %
(self.archive, self.objname, self.serial))
class SyncMessage(Message):
def __init__(self, *args, **kwargs):
Message.__init__(self, *args, **kwargs)
self.objname = kwargs["objname"]
self.archive = kwargs["archive"]
self.serial = kwargs["serial"]
self.logger.info("Syncing archive: %s, object: '%s', serial: %s" %
(self.archive, self.objname, self.serial))
class AckSyncMessage(Message):
def __init__(self, *args, **kwargs):
Message.__init__(self, *args, **kwargs)
self.objname = kwargs["objname"]
self.archive = kwargs["archive"]
self.serial = kwargs["serial"]
self.logger.info("Acking archive: %s, object: '%s', serial: %s" %
(self.archive, self.objname, self.serial))
class CollisionMessage(Message):
def __init__(self, *args, **kwargs):
Message.__init__(self, *args, **kwargs)
self.objname = kwargs["objname"]
self.etag = kwargs["etag"]
self.logger.warning(
"Failed to upload; object: '%s' with etag: %s "
"collided with upstream" % (self.objname, self.etag))
class ConflictStashMessage(Message):
def __init__(self, *args, **kwargs):
Message.__init__(self, *args, **kwargs)
self.objname = kwargs["objname"]
self.stash_name = kwargs["stash_name"]
self.logger.warning("Stashing file '%s' to '%s'" %
(self.objname, self.stash_name))
......@@ -6,7 +6,7 @@ import threading
import logging
import re
from agkyra.syncer import utils, common
from agkyra.syncer import utils, common, messaging
from agkyra.syncer.file_client import FileClient
from agkyra.syncer.setup import ClientError
from agkyra.syncer.database import transaction
......@@ -23,7 +23,7 @@ def heartbeat_event(settings, heartbeat, objname):
client, prev_tstamp = hb.get(objname)
tpl = (client, utils.time_stamp())
hb.set(objname, tpl)
logger.info("HEARTBEAT '%s' %s %s" % ((objname,) + tpl))
logger.debug("HEARTBEAT '%s' %s %s" % ((objname,) + tpl))
def go():
interval = 0.2
......@@ -200,27 +200,36 @@ class PithosTargetHandle(object):
# assert isinstance(source_handle, LocalfsSourceHandle)
info = sync_state.info
etag = info.get("pithos_etag")
if source_handle.info_is_deleted_or_unhandled():
if etag is not None:
logger.info("Deleting object '%s'" % self.target_objname)
self.safe_object_del(self.target_objname, etag)
live_info = {}
elif source_handle.info_is_dir():
logger.info("Creating dir '%s'" % self.target_objname)
r = self.directory_put(self.target_objname, etag)
synced_etag = r.headers["etag"]
live_info = {"pithos_etag": synced_etag,
"pithos_type": common.T_DIR}
else:
with open(source_handle.staged_path, mode="rb") as fil:
r = self.endpoint.upload_object(
self.target_objname,
fil,
if_etag_match=info.get("pithos_etag"))
synced_etag = r["etag"]
try:
if source_handle.info_is_deleted_or_unhandled():
if etag is not None:
logger.info("Deleting object '%s'" % self.target_objname)
self.safe_object_del(self.target_objname, etag)
live_info = {}
elif source_handle.info_is_dir():
logger.info("Creating dir '%s'" % self.target_objname)
r = self.directory_put(self.target_objname, etag)
synced_etag = r.headers["etag"]
live_info = {"pithos_etag": synced_etag,
"pithos_type": common.T_DIR}
else:
with open(source_handle.staged_path, mode="rb") as fil:
r = self.endpoint.upload_object(
self.target_objname,
fil,
if_etag_match=info.get("pithos_etag"))
synced_etag = r["etag"]
live_info = {"pithos_etag": synced_etag,
"pithos_type": common.T_FILE}
return self.target_state.set(info=live_info)
"pithos_type": common.T_DIR}
return self.target_state.set(info=live_info)
except ClientError as e:
if e.status == 412: # Precondition failed
msg = messaging.CollisionMessage(
objname=self.target_objname, etag=etag, logger=logger)
self.settings.messager.put(msg)
raise common.CollisionError(e)
else:
raise
def object_isdir(obj):
......
......@@ -5,6 +5,7 @@ import logging
from agkyra.syncer.utils import join_path
from agkyra.syncer.database import SqliteFileStateDB
from agkyra.syncer.heartbeat import HeartBeat
from agkyra.syncer.messaging import Messager
from kamaki.clients import ClientError
......@@ -77,6 +78,7 @@ class SyncerSettings():
self.heartbeat = HeartBeat()
self.action_max_wait = kwargs.get("action_max_wait",
DEFAULT_ACTION_MAX_WAIT)
self.messager = Messager()
def get_db(self, initialize=False):
dbs = getattr(thread_local_data, "dbs", None)
......
......@@ -8,6 +8,7 @@ from agkyra.syncer.setup import SyncerSettings
from agkyra.syncer.database import transaction
from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient
from agkyra.syncer import messaging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
......@@ -43,6 +44,7 @@ class FileSyncer(object):
self.clients = {self.MASTER: master, self.SLAVE: slave}
self.decide_event = None
self.failed_serials = common.LockedDict()
self.messager = settings.messager
@property
def paused(self):
......@@ -67,6 +69,9 @@ class FileSyncer(object):
if self.decide_event is not None:
self.decide_event.clear()
def get_next_message(self):
return self.messager.get(block=False)
def exclude_file(self, objname):
parts = objname.split(common.OBJECT_DIRSEP)
init_part = parts[0]
......@@ -104,8 +109,9 @@ class FileSyncer(object):
logger.warning("Ignoring update archive: %s, object: %s" %
(archive, objname))
return
logger.info("Updating archive: %s, object: '%s', serial: %s" %
(archive, objname, serial))
msg = messaging.UpdateMessage(
archive=archive, objname=objname, serial=serial, logger=logger)
self.messager.put(msg)
db_state = db.get_state(archive, objname)
if db_state and db_state.serial != serial:
logger.warning(
......@@ -190,10 +196,12 @@ class FileSyncer(object):
db.put_state(new_decision_state)
def sync_file(self, source_state, target_state, sync_state):
logger.info("Syncing archive: %s, object: '%s', serial: %s" %
(source_state.archive,
source_state.objname,
source_state.serial))
msg = messaging.SyncMessage(
objname=source_state.objname,
archive=source_state.archive,
serial=source_state.serial,
logger=logger)
self.messager.put(msg)
thread = threading.Thread(
target=self._sync_file,
args=(source_state, target_state, sync_state))
......@@ -234,9 +242,10 @@ class FileSyncer(object):
objname = synced_source_state.objname
source = synced_source_state.archive
target = synced_target_state.archive
logger.info("Acking archive: %s, object: '%s', serial: %s" %
(target, objname, serial))
msg = messaging.AckSyncMessage(
archive=target, objname=objname, serial=serial,
logger=logger)
self.messager.put(msg)
decision_state = db.get_state(self.DECISION, objname)
sync_state = db.get_state(self.SYNC, objname)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment