Commit f6991720 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Option to start/stop daemons

parent 710a4009
...@@ -283,22 +283,17 @@ class PithosFileClient(FileClient): ...@@ -283,22 +283,17 @@ class PithosFileClient(FileClient):
return candidates return candidates
def notifier(self, callback=None, interval=10): def notifier(self, callback=None, interval=10):
class PollPithos(threading.Thread): class PollPithosThread(utils.StoppableThread):
def run(this): def run_body(this):
while True: utcnow = datetime.datetime.utcnow()
utcnow = datetime.datetime.utcnow() last_tstamp = (utcnow - datetime.timedelta(seconds=interval))
last_tstamp = (utcnow - last_modified = last_tstamp.isoformat()
datetime.timedelta(seconds=interval)) candidates = self.list_candidate_files(
last_modified = last_tstamp.isoformat() last_modified=last_modified)
candidates = self.list_candidate_files( for (objname, info) in candidates.iteritems():
last_modified=last_modified) callback(self.SIGNATURE, objname, assumed_info=info)
for (objname, info) in candidates.iteritems(): time.sleep(interval)
callback(self.SIGNATURE, objname, assumed_info=info) return utils.start_daemon(PollPithosThread)
time.sleep(interval)
poll = PollPithos()
poll.daemon = True
poll.start()
def get_object(self, objname): def get_object(self, objname):
try: try:
......
...@@ -8,7 +8,7 @@ from agkyra.syncer.setup import SyncerSettings ...@@ -8,7 +8,7 @@ from agkyra.syncer.setup import SyncerSettings
from agkyra.syncer.database import transaction from agkyra.syncer.database import transaction
from agkyra.syncer.localfs_client import LocalfsFileClient from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient from agkyra.syncer.pithos_client import PithosFileClient
from agkyra.syncer import messaging from agkyra.syncer import messaging, utils
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
...@@ -42,32 +42,55 @@ class FileSyncer(object): ...@@ -42,32 +42,55 @@ class FileSyncer(object):
self.SLAVE = slave.SIGNATURE self.SLAVE = slave.SIGNATURE
self.get_db = settings.get_db self.get_db = settings.get_db
self.clients = {self.MASTER: master, self.SLAVE: slave} self.clients = {self.MASTER: master, self.SLAVE: slave}
self.decide_event = None self.notifiers = {}
self.decide_thread = None
self.sync_threads = []
self.failed_serials = common.LockedDict() self.failed_serials = common.LockedDict()
self.messager = settings.messager self.messager = settings.messager
def thread_is_active(self, t):
return t and t.is_alive()
@property
def decide_active(self):
return self.thread_is_active(self.decide_thread)
@property @property
def paused(self): def paused(self):
return (not self.decide_event.is_set()) if self.decide_event else True return not self.decide_active
def launch_daemons(self): def initiate_probe(self):
self.start_notifiers() self.start_notifiers()
self.start_decide() self.probe_all()
def start_notifiers(self): def start_notifiers(self):
self.notifiers = { for signature, client in self.clients.iteritems():
self.MASTER: self.master.notifier(callback=self.probe_file), notifier = self.notifiers.get(signature)
self.SLAVE: self.slave.notifier(callback=self.probe_file), if not self.thread_is_active(notifier):
} self.notifiers[signature] = \
client.notifier(callback=self.probe_file)
else:
print 'ignoring %s' % signature
def stop_notifiers(self):
for notifier in self.notifiers.values():
notifier.stop()
def start_decide(self): def start_decide(self):
if self.decide_event is None: if not self.decide_active:
self.decide_event = self._poll_decide() self.decide_thread = self._poll_decide()
self.decide_event.set()
def stop_decide(self):
if self.decide_active:
self.decide_thread.stop()
def pause_decide(self): def stop_all_daemons(self):
if self.decide_event is not None: self.stop_decide()
self.decide_event.clear() self.stop_notifiers()
def wait_sync_threads(self):
for thread in self.sync_threads:
thread.join()
def get_next_message(self, block=False): def get_next_message(self, block=False):
return self.messager.get(block=block) return self.messager.get(block=block)
...@@ -206,6 +229,7 @@ class FileSyncer(object): ...@@ -206,6 +229,7 @@ class FileSyncer(object):
target=self._sync_file, target=self._sync_file,
args=(source_state, target_state, sync_state)) args=(source_state, target_state, sync_state))
thread.start() thread.start()
self.sync_threads.append(thread)
def _sync_file(self, source_state, target_state, sync_state): def _sync_file(self, source_state, target_state, sync_state):
clients = self.clients clients = self.clients
...@@ -310,24 +334,22 @@ class FileSyncer(object): ...@@ -310,24 +334,22 @@ class FileSyncer(object):
for objname in self.list_deciding(): for objname in self.list_deciding():
self.decide_file_sync(objname) self.decide_file_sync(objname)
def probe_and_sync_all(self): def probe_all(self):
self.probe_archive(self.MASTER) self.probe_archive(self.MASTER)
self.probe_archive(self.SLAVE) self.probe_archive(self.SLAVE)
def probe_and_sync_all(self):
self.probe_all()
for objname in self.list_deciding(): for objname in self.list_deciding():
self.decide_file_sync(objname) self.decide_file_sync(objname)
def _poll_decide(self, interval=3): def _poll_decide(self, interval=3):
event = threading.Event() class DecideThread(utils.StoppableThread):
def run_body(this):
def go():
while True:
event.wait()
self.decide_all_archives() self.decide_all_archives()
time.sleep(interval) time.sleep(interval)
poll = threading.Thread(target=go) return utils.start_daemon(DecideThread)
poll.daemon = True
poll.start()
return event
# TODO cleanup db of objects deleted in all clients # TODO cleanup db of objects deleted in all clients
# def cleanup(self): # def cleanup(self):
......
import os import os
import hashlib import hashlib
import datetime import datetime
import watchdog.utils
from agkyra.syncer.common import OBJECT_DIRSEP from agkyra.syncer.common import OBJECT_DIRSEP
...@@ -49,3 +50,24 @@ def younger_than(tstamp, seconds): ...@@ -49,3 +50,24 @@ def younger_than(tstamp, seconds):
ts = datetime.datetime.fromtimestamp(int(float(tstamp))) ts = datetime.datetime.fromtimestamp(int(float(tstamp)))
delta = now - ts delta = now - ts
return delta < datetime.timedelta(seconds=seconds) return delta < datetime.timedelta(seconds=seconds)
BaseStoppableThread = watchdog.utils.BaseThread
class StoppableThread(BaseStoppableThread):
def run_body(self):
raise NotImplementedError()
def run(self):
while True:
if not self.should_keep_running():
return
self.run_body()
def start_daemon(threadClass):
thread = threadClass()
thread.daemon = True
thread.start()
return thread
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment