diff --git a/agkyra/syncer/database.py b/agkyra/syncer/database.py index 7aae9b0979e732016a6e31ea3df29b34e3b47485..d932826945bec8ae948297ec45fd7d141bfcba4c 100644 --- a/agkyra/syncer/database.py +++ b/agkyra/syncer/database.py @@ -66,6 +66,10 @@ class SqliteFileStateDB(FileStateDB): "primary key (cachename))") db.execute(Q) + Q = ("create table if not exists " + "config(key text, value text, primary key (key))") + db.execute(Q) + self.commit() def begin(self): @@ -194,6 +198,22 @@ class SqliteFileStateDB(FileStateDB): archive=archive, objname=objname, serial=-1, info={}) return state + def get_config(self, key): + Q = "select value from config where key = ?" + c = self.db.execute(Q, (key,)) + r = c.fetchone() + if not r: + return None + return json.loads(r[0]) + + def set_config(self, key, value): + Q = "insert or replace into config(key, value) values (?, ?)" + self.db.execute(Q, (key, json.dumps(value))) + + def purge_archives(self): + self.db.execute("delete from archives") + self.db.execute("delete from serials") + def rand(lim): return random.random() * lim diff --git a/agkyra/syncer/localfs_client.py b/agkyra/syncer/localfs_client.py index 78b661cfce2112104db082705a225fd3733e90ee..4e70eedfa3f1d9e7e15eda821103fa4e7a04ef8d 100644 --- a/agkyra/syncer/localfs_client.py +++ b/agkyra/syncer/localfs_client.py @@ -548,6 +548,12 @@ class LocalfsFileClient(FileClient): self.CACHEPATH = settings.cache_path self.get_db = settings.get_db self.probe_candidates = utils.ThreadSafeDict() + self.check_enabled() + + def check_enabled(self): + if not self.settings.localfs_is_enabled(): + msg = messaging.LocalfsSyncDisabled(logger=logger) + self.settings.messager.put(msg) def remove_candidates(self, objnames, ident): with self.probe_candidates.lock() as d: @@ -560,6 +566,14 @@ class LocalfsFileClient(FileClient): pass def list_candidate_files(self, forced=False): + if not self.settings.localfs_is_enabled(): + return {} + if not os.path.isdir(self.ROOTPATH): + self.settings.set_localfs_enabled(False) + msg = messaging.LocalfsSyncDisabled(logger=logger) + self.settings.messager.put(msg) + return {} + with self.probe_candidates.lock() as d: if forced: candidates = self.walk_filesystem() @@ -658,6 +672,7 @@ class LocalfsFileClient(FileClient): with self.probe_candidates.lock() as d: d[objname] = self.none_info() + root_path = utils.from_unicode(self.ROOTPATH) class EventHandler(FileSystemEventHandler): def on_created(this, event): # if not event.is_directory: @@ -669,6 +684,10 @@ class LocalfsFileClient(FileClient): def on_deleted(this, event): path = event.src_path logger.debug("Handling %s" % event) + if path == root_path: + self.settings.set_localfs_enabled(False) + msg = messaging.LocalfsSyncDisabled(logger=logger) + self.settings.messager.put(msg) handle_path(path) def on_modified(this, event): @@ -685,9 +704,8 @@ class LocalfsFileClient(FileClient): handle_path(src_path) handle_path(dest_path) - path = utils.from_unicode(self.ROOTPATH) event_handler = EventHandler() observer = Observer() - observer.schedule(event_handler, path, recursive=True) + observer.schedule(event_handler, root_path, recursive=True) observer.start() return observer diff --git a/agkyra/syncer/messaging.py b/agkyra/syncer/messaging.py index e3cab2c7712a6b78271ded9e6a00066ea728364a..6e4220d06934899b77a95246379a5bc3c562d5cb 100644 --- a/agkyra/syncer/messaging.py +++ b/agkyra/syncer/messaging.py @@ -167,3 +167,15 @@ class ConflictStashMessage(Message): self.stash_name = kwargs["stash_name"] self.logger.warning("Stashing file '%s' to '%s'" % (self.objname, self.stash_name)) + + +class LocalfsSyncDisabled(Message): + def __init__(self, *args, **kwargs): + Message.__init__(self, *args, **kwargs) + self.logger.warning("Localfs sync is disabled") + + +class PithosSyncDisabled(Message): + def __init__(self, *args, **kwargs): + Message.__init__(self, *args, **kwargs) + self.logger.warning("Pithos sync is disabled") diff --git a/agkyra/syncer/pithos_client.py b/agkyra/syncer/pithos_client.py index 8ea4f9d0632ab1e24629980caf48fc70501b87e8..5b06d0622b7bca3e6c941e081cf71bea1bd03307 100644 --- a/agkyra/syncer/pithos_client.py +++ b/agkyra/syncer/pithos_client.py @@ -283,6 +283,12 @@ class PithosFileClient(FileClient): self.endpoint = settings.endpoint self.last_modification = "0000-00-00" self.probe_candidates = utils.ThreadSafeDict() + self.check_enabled() + + def check_enabled(self): + if not self.settings.pithos_is_enabled(): + msg = messaging.PithosSyncDisabled(logger=logger) + self.settings.messager.put(msg) def remove_candidates(self, objnames, ident): with self.probe_candidates.lock() as d: @@ -302,10 +308,17 @@ class PithosFileClient(FileClient): return d.keys() def get_pithos_candidates(self, last_modified=None): + if not self.settings.pithos_is_enabled(): + return {} try: objects = self.endpoint.list_objects() except ClientError as e: - logger.error(e) + if e.status == 404: + self.settings.set_pithos_enabled(False) + msg = messaging.PithosSyncDisabled(logger=logger) + self.settings.messager.put(msg) + else: + logger.error(e) return {} self.objects = objects upstream_all = {} diff --git a/agkyra/syncer/setup.py b/agkyra/syncer/setup.py index 4aadede0336d5f23c450bd9beb364f49be2c7206..f9d9046254b6b873f7d187b8ded3c11c3296485e 100644 --- a/agkyra/syncer/setup.py +++ b/agkyra/syncer/setup.py @@ -20,7 +20,7 @@ import logging from functools import wraps from agkyra.syncer.utils import join_path, ThreadSafeDict -from agkyra.syncer.database import SqliteFileStateDB +from agkyra.syncer.database import SqliteFileStateDB, transaction from agkyra.syncer.messaging import Messager from agkyra.syncer import utils @@ -99,6 +99,8 @@ class SyncerSettings(): self.endpoint = self._get_pithos_client( auth_url, auth_token, container) + container_exists = self.check_container_exists(container) + home_dir = utils.to_unicode(os.path.expanduser('~')) default_settings_path = join_path(home_dir, GLOBAL_SETTINGS_NAME) self.settings_path = utils.to_unicode( @@ -109,40 +111,51 @@ class SyncerSettings(): self.create_dir(self.instances_path) self.local_root_path = utils.normalize_local_suffix(local_root_path) - self.create_dir(self.local_root_path) - - self.user_id = self.endpoint.account - self.instance = get_instance( - [self.auth_url, self.user_id, - self.container, self.local_root_path]) - self.instance_path = join_path(self.instances_path, self.instance) - self.create_dir(self.instance_path) - - self.dbname = utils.to_unicode(kwargs.get("dbname", DEFAULT_DBNAME)) - self.full_dbname = join_path(self.instance_path, self.dbname) - self.get_db(initialize=True) + local_root_path_exists = os.path.isdir(self.local_root_path) self.cache_name = utils.to_unicode( kwargs.get("cache_name", DEFAULT_CACHE_NAME)) self.cache_path = join_path(self.local_root_path, self.cache_name) - self.create_dir(self.cache_path) self.cache_hide_name = utils.to_unicode( kwargs.get("cache_hide_name", DEFAULT_CACHE_HIDE_NAME)) self.cache_hide_path = join_path(self.cache_path, self.cache_hide_name) - self.create_dir(self.cache_hide_path) self.cache_stage_name = utils.to_unicode( kwargs.get("cache_stage_name", DEFAULT_CACHE_STAGE_NAME)) self.cache_stage_path = join_path(self.cache_path, self.cache_stage_name) - self.create_dir(self.cache_stage_path) self.cache_fetch_name = utils.to_unicode( kwargs.get("cache_fetch_name", DEFAULT_CACHE_FETCH_NAME)) self.cache_fetch_path = join_path(self.cache_path, self.cache_fetch_name) - self.create_dir(self.cache_fetch_path) + + self.user_id = self.endpoint.account + self.instance = get_instance( + [self.auth_url, self.user_id, + self.container, self.local_root_path]) + self.instance_path = join_path(self.instances_path, self.instance) + self.create_dir(self.instance_path) + + self.dbname = utils.to_unicode(kwargs.get("dbname", DEFAULT_DBNAME)) + self.full_dbname = join_path(self.instance_path, self.dbname) + + db_existed = os.path.isfile(self.full_dbname) + if not db_existed: + self.get_db(initialize=True) + + if not db_existed: + self.set_localfs_enabled(True) + self.create_local_dirs() + self.set_pithos_enabled(True) + if not container_exists: + self.mk_container(container) + else: + if not local_root_path_exists: + self.set_localfs_enabled(False) + if not container_exists: + self.set_pithos_enabled(False) self.heartbeat = ThreadSafeDict() self.action_max_wait = kwargs.get("action_max_wait", @@ -155,8 +168,14 @@ class SyncerSettings(): self.endpoint.CONNECTION_RETRY_LIMIT = self.connection_retry_limit self.messager = Messager() + self.mtime_lag = 0 #self.determine_mtime_lag() - self.mtime_lag = self.determine_mtime_lag() + def create_local_dirs(self): + self.create_dir(self.local_root_path) + self.create_dir(self.cache_path) + self.create_dir(self.cache_hide_path) + self.create_dir(self.cache_stage_path) + self.create_dir(self.cache_fetch_path) def determine_mtime_lag(self): st = os.stat(self.cache_path) @@ -204,22 +223,71 @@ class SyncerSettings(): raise try: account = astakos.user_info['id'] - client = PithosClient(PITHOS_URL, token, account, container) + return PithosClient(PITHOS_URL, token, account, container) except ClientError: logger.error("Failed to initialize Pithos client") raise + + def check_container_exists(self, container): try: - client.get_container_info(container) + self.endpoint.get_container_info(container) + return True except ClientError as e: if e.status == 404: - logger.warning( - "Container '%s' does not exist, creating..." % container) - try: - client.create_container(container) - except ClientError: - logger.error("Failed to create container '%s'" % container) - raise + return False else: raise - return client + def mk_container(self, container): + try: + self.endpoint.create_container(container) + logger.warning("Creating container '%s'" % container) + except ClientError: + logger.error("Failed to create container '%s'" % container) + raise + + @transaction() + def set_localfs_enabled(self, enabled): + db = self.get_db() + self._set_localfs_enabled(db, enabled) + + def _set_localfs_enabled(self, db, enabled): + db.set_config("localfs_enabled", enabled) + + @transaction() + def set_pithos_enabled(self, enabled): + db = self.get_db() + self._set_pithos_enabled(db, enabled) + + def _set_pithos_enabled(self, db, enabled): + db.set_config("pithos_enabled", enabled) + + @transaction() + def localfs_is_enabled(self): + db = self.get_db() + return self._localfs_is_enabled(db) + + def _localfs_is_enabled(self, db): + return db.get_config("localfs_enabled") + + @transaction() + def pithos_is_enabled(self): + db = self.get_db() + return self._pithos_is_enabled(db) + + def _pithos_is_enabled(self, db): + return db.get_config("pithos_enabled") + + def _sync_is_enabled(self, db): + return self._localfs_is_enabled(db) and self._pithos_is_enabled(db) + + @transaction() + def purge_db_archives_and_enable(self): + db = self.get_db() + db.purge_archives() + if not self._localfs_is_enabled(db): + self.create_local_dirs() + self._set_localfs_enabled(db, True) + if not self._pithos_is_enabled(db): + self._set_pithos_enabled(db, True) + self.mk_container(self.container) diff --git a/agkyra/syncer/syncer.py b/agkyra/syncer/syncer.py index d13b8cc0ad6fa4371933ebd21c92229b6d472357..8f80880d2ee3ed6574f8d80cc18048c833c0ac9d 100644 --- a/agkyra/syncer/syncer.py +++ b/agkyra/syncer/syncer.py @@ -16,6 +16,7 @@ import time import threading import logging +from collections import defaultdict from agkyra.syncer import common from agkyra.syncer.setup import SyncerSettings @@ -161,6 +162,26 @@ class FileSyncer(object): info={}) db.put_state(sync_state) + @transaction() + def dry_run_decisions(self, objnames, master=None, slave=None): + if master is None: + master = self.MASTER + if slave is None: + slave = self.SLAVE + decisions = [] + for objname in objnames: + decision = self._dry_run_decision(objname, master, slave) + decisions.append(decision) + return decisions + + def _dry_run_decision(self, objname, master=None, slave=None): + if master is None: + master = self.MASTER + if slave is None: + slave = self.SLAVE + ident = utils.time_stamp() + return self._do_decide_file_sync(objname, master, slave, ident, True) + def decide_file_sync(self, objname, master=None, slave=None): if master is None: master = self.MASTER @@ -177,6 +198,10 @@ class FileSyncer(object): @transaction() def _decide_file_sync(self, objname, master, slave, ident): + db = self.get_db() + if not self.settings._sync_is_enabled(db): + logger.warning("Cannot decide '%s'; sync disabled." % objname) + return states = self._do_decide_file_sync(objname, master, slave, ident) if states is not None: with self.heartbeat.lock() as hb: @@ -184,7 +209,8 @@ class FileSyncer(object): hb[objname] = beat return states - def _do_decide_file_sync(self, objname, master, slave, ident): + def _do_decide_file_sync(self, objname, master, slave, ident, + dry_run=False): db = self.get_db() logger.info("Deciding object: '%s'" % objname) master_state = db.get_state(master, objname) @@ -201,15 +227,17 @@ class FileSyncer(object): logger.debug("object: %s heartbeat: %s" % (objname, beat)) if beat is not None: if beat["ident"] == ident: - msg = messaging.HeartbeatReplayDecideMessage( - objname=objname, heartbeat=beat, logger=logger) - self.messager.put(msg) + if not dry_run: + msg = messaging.HeartbeatReplayDecideMessage( + objname=objname, heartbeat=beat, logger=logger) + self.messager.put(msg) else: if utils.younger_than( beat["tstamp"], self.settings.action_max_wait): - msg = messaging.HeartbeatNoDecideMessage( - objname=objname, heartbeat=beat, logger=logger) - self.messager.put(msg) + if not dry_run: + msg = messaging.HeartbeatNoDecideMessage( + objname=objname, heartbeat=beat, logger=logger) + self.messager.put(msg) return None logger.warning("Ignoring previous run: %s %s" % (objname, beat)) @@ -231,20 +259,23 @@ class FileSyncer(object): "does not match any archive." % (decision_serial, objname)) else: - msg = messaging.FailedSyncIgnoreDecisionMessage( - objname=objname, serial=decision_serial, logger=logger) - self.messager.put(msg) + if not dry_run: + msg = messaging.FailedSyncIgnoreDecisionMessage( + objname=objname, serial=decision_serial, logger=logger) + self.messager.put(msg) if master_serial > sync_serial: if master_serial == decision_serial: # this is a failed serial return None - self._make_decision_state(decision_state, master_state) + if not dry_run: + self._make_decision_state(decision_state, master_state) return master_state, slave_state, sync_state elif master_serial == sync_serial: if slave_serial > sync_serial: if slave_serial == decision_serial: # this is a failed serial return None - self._make_decision_state(decision_state, slave_state) + if not dry_run: + self._make_decision_state(decision_state, slave_state) return slave_state, master_state, sync_state elif slave_serial == sync_serial: return None @@ -367,8 +398,14 @@ class FileSyncer(object): new_decision_state = new_sync_state.set(archive=self.DECISION) db.put_state(new_decision_state) - @transaction() def list_deciding(self, archives=None): + try: + return self._list_deciding(archives=archives) + except DatabaseError: + return self.list_deciding(archives=archives) + + @transaction() + def _list_deciding(self, archives=None): db = self.get_db() if archives is None: archives = (self.MASTER, self.SLAVE) @@ -409,6 +446,17 @@ class FileSyncer(object): time.sleep(interval) return utils.start_daemon(DecideThread) + def check_decisions(self): + deciding = self.list_deciding() + decisions = self.dry_run_decisions(deciding) + by_source = defaultdict(list) + for decision in decisions: + source_state = decision[0] + source = source_state.archive + objname = source_state.objname + by_source[source].append(objname) + return by_source + # TODO cleanup db of objects deleted in all clients # def cleanup(self): # db = self.get_db()