diff --git a/agkyra/agkyra/syncer/common.py b/agkyra/agkyra/syncer/common.py index 3cd17acf3d372939e4e64789e7920990059c4af0..ebda9de2df959749eeccfe135a7139b3785f717f 100644 --- a/agkyra/agkyra/syncer/common.py +++ b/agkyra/agkyra/syncer/common.py @@ -59,8 +59,25 @@ class LockedDict(object): self._Dict[key] = value self._Lock.release() - def get(self, key): + def get(self, key, default=None): self._Lock.acquire() - value = self._Dict.get(key) + value = self._Dict.get(key, default=default) + self._Lock.release() + return value + + def pop(self, key, d=None): + self._Lock.acquire() + value = self._Dict.pop(key, d) + self._Lock.release() + return value + + def update(self, d): + self._Lock.acquire() + self._Dict.update(d) + self._Lock.release() + + def keys(self): + self._Lock.acquire() + value = self._Dict.keys() self._Lock.release() return value diff --git a/agkyra/agkyra/syncer/localfs_client.py b/agkyra/agkyra/syncer/localfs_client.py index 5696244f600b1444235465dd388576bd5fa9dd07..db2684b136c811eeb67464a2873066e65a099aef 100644 --- a/agkyra/agkyra/syncer/localfs_client.py +++ b/agkyra/agkyra/syncer/localfs_client.py @@ -1,7 +1,6 @@ import os import stat import re -import time import datetime import psutil from watchdog.observers import Observer @@ -450,8 +449,15 @@ class LocalfsFileClient(FileClient): self.get_db = settings.get_db self.exclude_files_exp = re.compile('.*\.tmp$') self.exclude_dir_exp = re.compile(self.CACHEPATH) + self.probe_candidates = common.LockedDict() - def list_candidate_files(self): + def list_candidate_files(self, forced=False): + if forced: + candidates = self.walk_filesystem() + self.probe_candidates.update(candidates) + return self.probe_candidates.keys() + + def walk_filesystem(self): db = self.get_db() candidates = {} for dirpath, dirnames, files in os.walk(self.ROOTPATH): @@ -487,8 +493,9 @@ class LocalfsFileClient(FileClient): if old_state.serial != ref_state.serial: logger.warning("Serial mismatch in probing path '%s'" % objname) return + cached_info = self.probe_candidates.pop(objname) live_info = (self._local_path_changes(objname, old_state) - if assumed_info is None else assumed_info) + if cached_info is None else cached_info) if live_info is None: return live_state = old_state.set(info=live_info) @@ -505,8 +512,7 @@ class LocalfsFileClient(FileClient): def handle_path(path): rel_path = os.path.relpath(path, start=self.ROOTPATH) objname = utils.to_standard_sep(rel_path) - if callback is not None: - callback(self.SIGNATURE, objname) + self.probe_candidates.put(objname, None) class EventHandler(FileSystemEventHandler): def on_created(this, event): diff --git a/agkyra/agkyra/syncer/pithos_client.py b/agkyra/agkyra/syncer/pithos_client.py index da312eff02d7d034a584cf7912d1a41db7cc0247..e61ecf2a4f17c5a44926c5b30c161abb86ad88bb 100644 --- a/agkyra/agkyra/syncer/pithos_client.py +++ b/agkyra/agkyra/syncer/pithos_client.py @@ -20,7 +20,6 @@ def heartbeat_event(settings, heartbeat, objname): def set_log(): with heartbeat.lock() as hb: - client, prev_tstamp = hb.get(objname) tstamp = utils.time_stamp() hb.set(objname, tstamp) logger.debug("HEARTBEAT '%s' %s" % (objname, tstamp)) @@ -235,8 +234,15 @@ class PithosFileClient(FileClient): self.get_db = settings.get_db self.endpoint = settings.endpoint self.last_modification = "0000-00-00" + self.probe_candidates = common.LockedDict() - def list_candidate_files(self, last_modified=None): + def list_candidate_files(self, forced=False): + if forced: + candidates = self.get_pithos_candidates() + self.probe_candidates.update(candidates) + return self.probe_candidates.keys() + + def get_pithos_candidates(self, last_modified=None): db = self.get_db() objects = self.endpoint.list_objects() self.objects = objects @@ -268,13 +274,12 @@ class PithosFileClient(FileClient): (last_modified, candidates)) return candidates - def notifier(self, callback=None, interval=10): + def notifier(self, callback=None, interval=2): class PollPithosThread(utils.StoppableThread): def run_body(this): - candidates = self.list_candidate_files( + candidates = self.get_pithos_candidates( last_modified=self.last_modification) - for (objname, info) in candidates.iteritems(): - callback(self.SIGNATURE, objname, assumed_info=info) + self.probe_candidates.update(candidates) time.sleep(interval) return utils.start_daemon(PollPithosThread) @@ -305,11 +310,12 @@ class PithosFileClient(FileClient): (old_state.archive, objname)) return info = old_state.info - if assumed_info is None: + cached_info = self.probe_candidates.pop(objname) + if cached_info is None: obj = self.get_object(objname) live_info = self.get_object_live_info(obj) else: - live_info = assumed_info + live_info = cached_info if info != live_info: if callback is not None: live_state = old_state.set(info=live_info) diff --git a/agkyra/agkyra/syncer/syncer.py b/agkyra/agkyra/syncer/syncer.py index 82efedf5d44d5dc9d6b2681b4d3429ba44991717..5e9b8f3d818c508e807a03190a5b9d4699a8e92a 100644 --- a/agkyra/agkyra/syncer/syncer.py +++ b/agkyra/agkyra/syncer/syncer.py @@ -62,7 +62,7 @@ class FileSyncer(object): def initiate_probe(self): self.start_notifiers() - self.probe_all() + self.probe_all(forced=True) def start_notifiers(self): for signature, client in self.clients.iteritems(): @@ -173,7 +173,7 @@ class FileSyncer(object): @transaction() def _decide_file_sync(self, objname, master, slave): - states = self._decide_file_sync(objname, master, slave) + states = self._do_decide_file_sync(objname, master, slave) if states is not None: with self.heartbeat.lock() as hb: hb.set(objname, utils.time_stamp()) @@ -191,7 +191,7 @@ class FileSyncer(object): sync_serial = sync_state.serial decision_serial = decision_state.serial - with self.heartbeat.lock() as hb: + with self.heartbeat.lock() as hb: prev_log = hb.get(objname) logger.info("object: %s heartbeat: %s" % (objname, prev_log)) @@ -355,11 +355,11 @@ class FileSyncer(object): return set(db.list_deciding(archives=archives, sync=self.SYNC)) - def probe_archive(self, archive): + def probe_archive(self, archive, forced=False): client = self.clients[archive] - candidates = client.list_candidate_files() - for (objname, info) in candidates.iteritems(): - self.probe_file(archive, objname, assumed_info=info) + candidates = client.list_candidate_files(forced=forced) + for objname in candidates: + self.probe_file(archive, objname) def decide_archive(self, archive): for objname in self.list_deciding([archive]): @@ -367,18 +367,14 @@ class FileSyncer(object): def decide_all_archives(self): logger.info("Checking candidates to sync") - for objname in self.list_deciding(): - self.decide_file_sync(objname) - - def probe_all(self): - self.probe_archive(self.MASTER) - self.probe_archive(self.SLAVE) - - def probe_and_sync_all(self): self.probe_all() for objname in self.list_deciding(): self.decide_file_sync(objname) + def probe_all(self, forced=False): + self.probe_archive(self.MASTER, forced=forced) + self.probe_archive(self.SLAVE, forced=forced) + def _poll_decide(self, interval=3): class DecideThread(utils.StoppableThread): def run_body(this):