Commit 0724292d authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Disable syncer when root path or container disappears

parent add67300
......@@ -66,6 +66,10 @@ class SqliteFileStateDB(FileStateDB):
"primary key (cachename))")
db.execute(Q)
Q = ("create table if not exists "
"config(key text, value text, primary key (key))")
db.execute(Q)
self.commit()
def begin(self):
......@@ -194,6 +198,22 @@ class SqliteFileStateDB(FileStateDB):
archive=archive, objname=objname, serial=-1, info={})
return state
def get_config(self, key):
Q = "select value from config where key = ?"
c = self.db.execute(Q, (key,))
r = c.fetchone()
if not r:
return None
return json.loads(r[0])
def set_config(self, key, value):
Q = "insert or replace into config(key, value) values (?, ?)"
self.db.execute(Q, (key, json.dumps(value)))
def purge_archives(self):
self.db.execute("delete from archives")
self.db.execute("delete from serials")
def rand(lim):
return random.random() * lim
......
......@@ -548,6 +548,12 @@ class LocalfsFileClient(FileClient):
self.CACHEPATH = settings.cache_path
self.get_db = settings.get_db
self.probe_candidates = utils.ThreadSafeDict()
self.check_enabled()
def check_enabled(self):
if not self.settings.localfs_is_enabled():
msg = messaging.LocalfsSyncDisabled(logger=logger)
self.settings.messager.put(msg)
def remove_candidates(self, objnames, ident):
with self.probe_candidates.lock() as d:
......@@ -560,6 +566,14 @@ class LocalfsFileClient(FileClient):
pass
def list_candidate_files(self, forced=False):
if not self.settings.localfs_is_enabled():
return {}
if not os.path.isdir(self.ROOTPATH):
self.settings.set_localfs_enabled(False)
msg = messaging.LocalfsSyncDisabled(logger=logger)
self.settings.messager.put(msg)
return {}
with self.probe_candidates.lock() as d:
if forced:
candidates = self.walk_filesystem()
......@@ -658,6 +672,7 @@ class LocalfsFileClient(FileClient):
with self.probe_candidates.lock() as d:
d[objname] = self.none_info()
root_path = utils.from_unicode(self.ROOTPATH)
class EventHandler(FileSystemEventHandler):
def on_created(this, event):
# if not event.is_directory:
......@@ -669,6 +684,10 @@ class LocalfsFileClient(FileClient):
def on_deleted(this, event):
path = event.src_path
logger.debug("Handling %s" % event)
if path == root_path:
self.settings.set_localfs_enabled(False)
msg = messaging.LocalfsSyncDisabled(logger=logger)
self.settings.messager.put(msg)
handle_path(path)
def on_modified(this, event):
......@@ -685,9 +704,8 @@ class LocalfsFileClient(FileClient):
handle_path(src_path)
handle_path(dest_path)
path = utils.from_unicode(self.ROOTPATH)
event_handler = EventHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.schedule(event_handler, root_path, recursive=True)
observer.start()
return observer
......@@ -167,3 +167,15 @@ class ConflictStashMessage(Message):
self.stash_name = kwargs["stash_name"]
self.logger.warning("Stashing file '%s' to '%s'" %
(self.objname, self.stash_name))
class LocalfsSyncDisabled(Message):
def __init__(self, *args, **kwargs):
Message.__init__(self, *args, **kwargs)
self.logger.warning("Localfs sync is disabled")
class PithosSyncDisabled(Message):
def __init__(self, *args, **kwargs):
Message.__init__(self, *args, **kwargs)
self.logger.warning("Pithos sync is disabled")
......@@ -283,6 +283,12 @@ class PithosFileClient(FileClient):
self.endpoint = settings.endpoint
self.last_modification = "0000-00-00"
self.probe_candidates = utils.ThreadSafeDict()
self.check_enabled()
def check_enabled(self):
if not self.settings.pithos_is_enabled():
msg = messaging.PithosSyncDisabled(logger=logger)
self.settings.messager.put(msg)
def remove_candidates(self, objnames, ident):
with self.probe_candidates.lock() as d:
......@@ -302,10 +308,17 @@ class PithosFileClient(FileClient):
return d.keys()
def get_pithos_candidates(self, last_modified=None):
if not self.settings.pithos_is_enabled():
return {}
try:
objects = self.endpoint.list_objects()
except ClientError as e:
logger.error(e)
if e.status == 404:
self.settings.set_pithos_enabled(False)
msg = messaging.PithosSyncDisabled(logger=logger)
self.settings.messager.put(msg)
else:
logger.error(e)
return {}
self.objects = objects
upstream_all = {}
......
......@@ -20,7 +20,7 @@ import logging
from functools import wraps
from agkyra.syncer.utils import join_path, ThreadSafeDict
from agkyra.syncer.database import SqliteFileStateDB
from agkyra.syncer.database import SqliteFileStateDB, transaction
from agkyra.syncer.messaging import Messager
from agkyra.syncer import utils
......@@ -99,6 +99,8 @@ class SyncerSettings():
self.endpoint = self._get_pithos_client(
auth_url, auth_token, container)
container_exists = self.check_container_exists(container)
home_dir = utils.to_unicode(os.path.expanduser('~'))
default_settings_path = join_path(home_dir, GLOBAL_SETTINGS_NAME)
self.settings_path = utils.to_unicode(
......@@ -109,40 +111,51 @@ class SyncerSettings():
self.create_dir(self.instances_path)
self.local_root_path = utils.normalize_local_suffix(local_root_path)
self.create_dir(self.local_root_path)
self.user_id = self.endpoint.account
self.instance = get_instance(
[self.auth_url, self.user_id,
self.container, self.local_root_path])
self.instance_path = join_path(self.instances_path, self.instance)
self.create_dir(self.instance_path)
self.dbname = utils.to_unicode(kwargs.get("dbname", DEFAULT_DBNAME))
self.full_dbname = join_path(self.instance_path, self.dbname)
self.get_db(initialize=True)
local_root_path_exists = os.path.isdir(self.local_root_path)
self.cache_name = utils.to_unicode(
kwargs.get("cache_name", DEFAULT_CACHE_NAME))
self.cache_path = join_path(self.local_root_path, self.cache_name)
self.create_dir(self.cache_path)
self.cache_hide_name = utils.to_unicode(
kwargs.get("cache_hide_name", DEFAULT_CACHE_HIDE_NAME))
self.cache_hide_path = join_path(self.cache_path, self.cache_hide_name)
self.create_dir(self.cache_hide_path)
self.cache_stage_name = utils.to_unicode(
kwargs.get("cache_stage_name", DEFAULT_CACHE_STAGE_NAME))
self.cache_stage_path = join_path(self.cache_path,
self.cache_stage_name)
self.create_dir(self.cache_stage_path)
self.cache_fetch_name = utils.to_unicode(
kwargs.get("cache_fetch_name", DEFAULT_CACHE_FETCH_NAME))
self.cache_fetch_path = join_path(self.cache_path,
self.cache_fetch_name)
self.create_dir(self.cache_fetch_path)
self.user_id = self.endpoint.account
self.instance = get_instance(
[self.auth_url, self.user_id,
self.container, self.local_root_path])
self.instance_path = join_path(self.instances_path, self.instance)
self.create_dir(self.instance_path)
self.dbname = utils.to_unicode(kwargs.get("dbname", DEFAULT_DBNAME))
self.full_dbname = join_path(self.instance_path, self.dbname)
db_existed = os.path.isfile(self.full_dbname)
if not db_existed:
self.get_db(initialize=True)
if not db_existed:
self.set_localfs_enabled(True)
self.create_local_dirs()
self.set_pithos_enabled(True)
if not container_exists:
self.mk_container(container)
else:
if not local_root_path_exists:
self.set_localfs_enabled(False)
if not container_exists:
self.set_pithos_enabled(False)
self.heartbeat = ThreadSafeDict()
self.action_max_wait = kwargs.get("action_max_wait",
......@@ -155,8 +168,14 @@ class SyncerSettings():
self.endpoint.CONNECTION_RETRY_LIMIT = self.connection_retry_limit
self.messager = Messager()
self.mtime_lag = 0 #self.determine_mtime_lag()
self.mtime_lag = self.determine_mtime_lag()
def create_local_dirs(self):
self.create_dir(self.local_root_path)
self.create_dir(self.cache_path)
self.create_dir(self.cache_hide_path)
self.create_dir(self.cache_stage_path)
self.create_dir(self.cache_fetch_path)
def determine_mtime_lag(self):
st = os.stat(self.cache_path)
......@@ -204,22 +223,71 @@ class SyncerSettings():
raise
try:
account = astakos.user_info['id']
client = PithosClient(PITHOS_URL, token, account, container)
return PithosClient(PITHOS_URL, token, account, container)
except ClientError:
logger.error("Failed to initialize Pithos client")
raise
def check_container_exists(self, container):
try:
client.get_container_info(container)
self.endpoint.get_container_info(container)
return True
except ClientError as e:
if e.status == 404:
logger.warning(
"Container '%s' does not exist, creating..." % container)
try:
client.create_container(container)
except ClientError:
logger.error("Failed to create container '%s'" % container)
raise
return False
else:
raise
return client
def mk_container(self, container):
try:
self.endpoint.create_container(container)
logger.warning("Creating container '%s'" % container)
except ClientError:
logger.error("Failed to create container '%s'" % container)
raise
@transaction()
def set_localfs_enabled(self, enabled):
db = self.get_db()
self._set_localfs_enabled(db, enabled)
def _set_localfs_enabled(self, db, enabled):
db.set_config("localfs_enabled", enabled)
@transaction()
def set_pithos_enabled(self, enabled):
db = self.get_db()
self._set_pithos_enabled(db, enabled)
def _set_pithos_enabled(self, db, enabled):
db.set_config("pithos_enabled", enabled)
@transaction()
def localfs_is_enabled(self):
db = self.get_db()
return self._localfs_is_enabled(db)
def _localfs_is_enabled(self, db):
return db.get_config("localfs_enabled")
@transaction()
def pithos_is_enabled(self):
db = self.get_db()
return self._pithos_is_enabled(db)
def _pithos_is_enabled(self, db):
return db.get_config("pithos_enabled")
def _sync_is_enabled(self, db):
return self._localfs_is_enabled(db) and self._pithos_is_enabled(db)
@transaction()
def purge_db_archives_and_enable(self):
db = self.get_db()
db.purge_archives()
if not self._localfs_is_enabled(db):
self.create_local_dirs()
self._set_localfs_enabled(db, True)
if not self._pithos_is_enabled(db):
self._set_pithos_enabled(db, True)
self.mk_container(self.container)
......@@ -16,6 +16,7 @@
import time
import threading
import logging
from collections import defaultdict
from agkyra.syncer import common
from agkyra.syncer.setup import SyncerSettings
......@@ -161,6 +162,26 @@ class FileSyncer(object):
info={})
db.put_state(sync_state)
@transaction()
def dry_run_decisions(self, objnames, master=None, slave=None):
if master is None:
master = self.MASTER
if slave is None:
slave = self.SLAVE
decisions = []
for objname in objnames:
decision = self._dry_run_decision(objname, master, slave)
decisions.append(decision)
return decisions
def _dry_run_decision(self, objname, master=None, slave=None):
if master is None:
master = self.MASTER
if slave is None:
slave = self.SLAVE
ident = utils.time_stamp()
return self._do_decide_file_sync(objname, master, slave, ident, True)
def decide_file_sync(self, objname, master=None, slave=None):
if master is None:
master = self.MASTER
......@@ -177,6 +198,10 @@ class FileSyncer(object):
@transaction()
def _decide_file_sync(self, objname, master, slave, ident):
db = self.get_db()
if not self.settings._sync_is_enabled(db):
logger.warning("Cannot decide '%s'; sync disabled." % objname)
return
states = self._do_decide_file_sync(objname, master, slave, ident)
if states is not None:
with self.heartbeat.lock() as hb:
......@@ -184,7 +209,8 @@ class FileSyncer(object):
hb[objname] = beat
return states
def _do_decide_file_sync(self, objname, master, slave, ident):
def _do_decide_file_sync(self, objname, master, slave, ident,
dry_run=False):
db = self.get_db()
logger.info("Deciding object: '%s'" % objname)
master_state = db.get_state(master, objname)
......@@ -201,15 +227,17 @@ class FileSyncer(object):
logger.debug("object: %s heartbeat: %s" % (objname, beat))
if beat is not None:
if beat["ident"] == ident:
msg = messaging.HeartbeatReplayDecideMessage(
objname=objname, heartbeat=beat, logger=logger)
self.messager.put(msg)
if not dry_run:
msg = messaging.HeartbeatReplayDecideMessage(
objname=objname, heartbeat=beat, logger=logger)
self.messager.put(msg)
else:
if utils.younger_than(
beat["tstamp"], self.settings.action_max_wait):
msg = messaging.HeartbeatNoDecideMessage(
objname=objname, heartbeat=beat, logger=logger)
self.messager.put(msg)
if not dry_run:
msg = messaging.HeartbeatNoDecideMessage(
objname=objname, heartbeat=beat, logger=logger)
self.messager.put(msg)
return None
logger.warning("Ignoring previous run: %s %s" %
(objname, beat))
......@@ -231,20 +259,23 @@ class FileSyncer(object):
"does not match any archive." %
(decision_serial, objname))
else:
msg = messaging.FailedSyncIgnoreDecisionMessage(
objname=objname, serial=decision_serial, logger=logger)
self.messager.put(msg)
if not dry_run:
msg = messaging.FailedSyncIgnoreDecisionMessage(
objname=objname, serial=decision_serial, logger=logger)
self.messager.put(msg)
if master_serial > sync_serial:
if master_serial == decision_serial: # this is a failed serial
return None
self._make_decision_state(decision_state, master_state)
if not dry_run:
self._make_decision_state(decision_state, master_state)
return master_state, slave_state, sync_state
elif master_serial == sync_serial:
if slave_serial > sync_serial:
if slave_serial == decision_serial: # this is a failed serial
return None
self._make_decision_state(decision_state, slave_state)
if not dry_run:
self._make_decision_state(decision_state, slave_state)
return slave_state, master_state, sync_state
elif slave_serial == sync_serial:
return None
......@@ -367,8 +398,14 @@ class FileSyncer(object):
new_decision_state = new_sync_state.set(archive=self.DECISION)
db.put_state(new_decision_state)
@transaction()
def list_deciding(self, archives=None):
try:
return self._list_deciding(archives=archives)
except DatabaseError:
return self.list_deciding(archives=archives)
@transaction()
def _list_deciding(self, archives=None):
db = self.get_db()
if archives is None:
archives = (self.MASTER, self.SLAVE)
......@@ -409,6 +446,17 @@ class FileSyncer(object):
time.sleep(interval)
return utils.start_daemon(DecideThread)
def check_decisions(self):
deciding = self.list_deciding()
decisions = self.dry_run_decisions(deciding)
by_source = defaultdict(list)
for decision in decisions:
source_state = decision[0]
source = source_state.archive
objname = source_state.objname
by_source[source].append(objname)
return by_source
# TODO cleanup db of objects deleted in all clients
# def cleanup(self):
# db = self.get_db()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment