diff --git a/agkyra/agkyra/syncer/pithos_client.py b/agkyra/agkyra/syncer/pithos_client.py index 3701ffddc490988d9d5b5f77b5fb2c2d7d3e4316..e1d2334a627dd2671c8f5fdfed110a62499c190e 100644 --- a/agkyra/agkyra/syncer/pithos_client.py +++ b/agkyra/agkyra/syncer/pithos_client.py @@ -283,22 +283,17 @@ class PithosFileClient(FileClient): return candidates def notifier(self, callback=None, interval=10): - class PollPithos(threading.Thread): - def run(this): - while True: - utcnow = datetime.datetime.utcnow() - last_tstamp = (utcnow - - datetime.timedelta(seconds=interval)) - last_modified = last_tstamp.isoformat() - candidates = self.list_candidate_files( - last_modified=last_modified) - for (objname, info) in candidates.iteritems(): - callback(self.SIGNATURE, objname, assumed_info=info) - time.sleep(interval) - - poll = PollPithos() - poll.daemon = True - poll.start() + class PollPithosThread(utils.StoppableThread): + def run_body(this): + utcnow = datetime.datetime.utcnow() + last_tstamp = (utcnow - datetime.timedelta(seconds=interval)) + last_modified = last_tstamp.isoformat() + candidates = self.list_candidate_files( + last_modified=last_modified) + for (objname, info) in candidates.iteritems(): + callback(self.SIGNATURE, objname, assumed_info=info) + time.sleep(interval) + return utils.start_daemon(PollPithosThread) def get_object(self, objname): try: diff --git a/agkyra/agkyra/syncer/syncer.py b/agkyra/agkyra/syncer/syncer.py index 08349231e44c9f8a29e430c9a03b088085e2f27a..673475f75f9ed04dbc966dc36acb6323275f8b7d 100644 --- a/agkyra/agkyra/syncer/syncer.py +++ b/agkyra/agkyra/syncer/syncer.py @@ -8,7 +8,7 @@ from agkyra.syncer.setup import SyncerSettings from agkyra.syncer.database import transaction from agkyra.syncer.localfs_client import LocalfsFileClient from agkyra.syncer.pithos_client import PithosFileClient -from agkyra.syncer import messaging +from agkyra.syncer import messaging, utils logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) @@ -42,32 +42,55 @@ class FileSyncer(object): self.SLAVE = slave.SIGNATURE self.get_db = settings.get_db self.clients = {self.MASTER: master, self.SLAVE: slave} - self.decide_event = None + self.notifiers = {} + self.decide_thread = None + self.sync_threads = [] self.failed_serials = common.LockedDict() self.messager = settings.messager + def thread_is_active(self, t): + return t and t.is_alive() + + @property + def decide_active(self): + return self.thread_is_active(self.decide_thread) + @property def paused(self): - return (not self.decide_event.is_set()) if self.decide_event else True + return not self.decide_active - def launch_daemons(self): + def initiate_probe(self): self.start_notifiers() - self.start_decide() + self.probe_all() def start_notifiers(self): - self.notifiers = { - self.MASTER: self.master.notifier(callback=self.probe_file), - self.SLAVE: self.slave.notifier(callback=self.probe_file), - } + for signature, client in self.clients.iteritems(): + notifier = self.notifiers.get(signature) + if not self.thread_is_active(notifier): + self.notifiers[signature] = \ + client.notifier(callback=self.probe_file) + else: + print 'ignoring %s' % signature + + def stop_notifiers(self): + for notifier in self.notifiers.values(): + notifier.stop() def start_decide(self): - if self.decide_event is None: - self.decide_event = self._poll_decide() - self.decide_event.set() + if not self.decide_active: + self.decide_thread = self._poll_decide() + + def stop_decide(self): + if self.decide_active: + self.decide_thread.stop() - def pause_decide(self): - if self.decide_event is not None: - self.decide_event.clear() + def stop_all_daemons(self): + self.stop_decide() + self.stop_notifiers() + + def wait_sync_threads(self): + for thread in self.sync_threads: + thread.join() def get_next_message(self, block=False): return self.messager.get(block=block) @@ -206,6 +229,7 @@ class FileSyncer(object): target=self._sync_file, args=(source_state, target_state, sync_state)) thread.start() + self.sync_threads.append(thread) def _sync_file(self, source_state, target_state, sync_state): clients = self.clients @@ -310,24 +334,22 @@ class FileSyncer(object): for objname in self.list_deciding(): self.decide_file_sync(objname) - def probe_and_sync_all(self): + def probe_all(self): self.probe_archive(self.MASTER) self.probe_archive(self.SLAVE) + + def probe_and_sync_all(self): + self.probe_all() for objname in self.list_deciding(): self.decide_file_sync(objname) def _poll_decide(self, interval=3): - event = threading.Event() - - def go(): - while True: - event.wait() + class DecideThread(utils.StoppableThread): + def run_body(this): self.decide_all_archives() time.sleep(interval) - poll = threading.Thread(target=go) - poll.daemon = True - poll.start() - return event + return utils.start_daemon(DecideThread) + # TODO cleanup db of objects deleted in all clients # def cleanup(self): diff --git a/agkyra/agkyra/syncer/utils.py b/agkyra/agkyra/syncer/utils.py index 6f234066c591173f6ddbb6600b018f665d1802c0..27f08e63ff049dcd4874f5a9762fdce0d44c9f24 100644 --- a/agkyra/agkyra/syncer/utils.py +++ b/agkyra/agkyra/syncer/utils.py @@ -1,6 +1,7 @@ import os import hashlib import datetime +import watchdog.utils from agkyra.syncer.common import OBJECT_DIRSEP @@ -49,3 +50,24 @@ def younger_than(tstamp, seconds): ts = datetime.datetime.fromtimestamp(int(float(tstamp))) delta = now - ts return delta < datetime.timedelta(seconds=seconds) + + +BaseStoppableThread = watchdog.utils.BaseThread + + +class StoppableThread(BaseStoppableThread): + def run_body(self): + raise NotImplementedError() + + def run(self): + while True: + if not self.should_keep_running(): + return + self.run_body() + + +def start_daemon(threadClass): + thread = threadClass() + thread.daemon = True + thread.start() + return thread