Commit 3d594fbd authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Collect probe candidates; abort probe if handled

parent a19e0e9a
......@@ -59,8 +59,25 @@ class LockedDict(object):
self._Dict[key] = value
self._Lock.release()
def get(self, key):
def get(self, key, default=None):
self._Lock.acquire()
value = self._Dict.get(key)
value = self._Dict.get(key, default=default)
self._Lock.release()
return value
def pop(self, key, d=None):
self._Lock.acquire()
value = self._Dict.pop(key, d)
self._Lock.release()
return value
def update(self, d):
self._Lock.acquire()
self._Dict.update(d)
self._Lock.release()
def keys(self):
self._Lock.acquire()
value = self._Dict.keys()
self._Lock.release()
return value
import os
import stat
import re
import time
import datetime
import psutil
from watchdog.observers import Observer
......@@ -450,8 +449,15 @@ class LocalfsFileClient(FileClient):
self.get_db = settings.get_db
self.exclude_files_exp = re.compile('.*\.tmp$')
self.exclude_dir_exp = re.compile(self.CACHEPATH)
self.probe_candidates = common.LockedDict()
def list_candidate_files(self):
def list_candidate_files(self, forced=False):
if forced:
candidates = self.walk_filesystem()
self.probe_candidates.update(candidates)
return self.probe_candidates.keys()
def walk_filesystem(self):
db = self.get_db()
candidates = {}
for dirpath, dirnames, files in os.walk(self.ROOTPATH):
......@@ -487,8 +493,9 @@ class LocalfsFileClient(FileClient):
if old_state.serial != ref_state.serial:
logger.warning("Serial mismatch in probing path '%s'" % objname)
return
cached_info = self.probe_candidates.pop(objname)
live_info = (self._local_path_changes(objname, old_state)
if assumed_info is None else assumed_info)
if cached_info is None else cached_info)
if live_info is None:
return
live_state = old_state.set(info=live_info)
......@@ -505,8 +512,7 @@ class LocalfsFileClient(FileClient):
def handle_path(path):
rel_path = os.path.relpath(path, start=self.ROOTPATH)
objname = utils.to_standard_sep(rel_path)
if callback is not None:
callback(self.SIGNATURE, objname)
self.probe_candidates.put(objname, None)
class EventHandler(FileSystemEventHandler):
def on_created(this, event):
......
......@@ -20,7 +20,6 @@ def heartbeat_event(settings, heartbeat, objname):
def set_log():
with heartbeat.lock() as hb:
client, prev_tstamp = hb.get(objname)
tstamp = utils.time_stamp()
hb.set(objname, tstamp)
logger.debug("HEARTBEAT '%s' %s" % (objname, tstamp))
......@@ -235,8 +234,15 @@ class PithosFileClient(FileClient):
self.get_db = settings.get_db
self.endpoint = settings.endpoint
self.last_modification = "0000-00-00"
self.probe_candidates = common.LockedDict()
def list_candidate_files(self, last_modified=None):
def list_candidate_files(self, forced=False):
if forced:
candidates = self.get_pithos_candidates()
self.probe_candidates.update(candidates)
return self.probe_candidates.keys()
def get_pithos_candidates(self, last_modified=None):
db = self.get_db()
objects = self.endpoint.list_objects()
self.objects = objects
......@@ -268,13 +274,12 @@ class PithosFileClient(FileClient):
(last_modified, candidates))
return candidates
def notifier(self, callback=None, interval=10):
def notifier(self, callback=None, interval=2):
class PollPithosThread(utils.StoppableThread):
def run_body(this):
candidates = self.list_candidate_files(
candidates = self.get_pithos_candidates(
last_modified=self.last_modification)
for (objname, info) in candidates.iteritems():
callback(self.SIGNATURE, objname, assumed_info=info)
self.probe_candidates.update(candidates)
time.sleep(interval)
return utils.start_daemon(PollPithosThread)
......@@ -305,11 +310,12 @@ class PithosFileClient(FileClient):
(old_state.archive, objname))
return
info = old_state.info
if assumed_info is None:
cached_info = self.probe_candidates.pop(objname)
if cached_info is None:
obj = self.get_object(objname)
live_info = self.get_object_live_info(obj)
else:
live_info = assumed_info
live_info = cached_info
if info != live_info:
if callback is not None:
live_state = old_state.set(info=live_info)
......
......@@ -62,7 +62,7 @@ class FileSyncer(object):
def initiate_probe(self):
self.start_notifiers()
self.probe_all()
self.probe_all(forced=True)
def start_notifiers(self):
for signature, client in self.clients.iteritems():
......@@ -173,7 +173,7 @@ class FileSyncer(object):
@transaction()
def _decide_file_sync(self, objname, master, slave):
states = self._decide_file_sync(objname, master, slave)
states = self._do_decide_file_sync(objname, master, slave)
if states is not None:
with self.heartbeat.lock() as hb:
hb.set(objname, utils.time_stamp())
......@@ -191,7 +191,7 @@ class FileSyncer(object):
sync_serial = sync_state.serial
decision_serial = decision_state.serial
with self.heartbeat.lock() as hb:
with self.heartbeat.lock() as hb:
prev_log = hb.get(objname)
logger.info("object: %s heartbeat: %s" %
(objname, prev_log))
......@@ -355,11 +355,11 @@ class FileSyncer(object):
return set(db.list_deciding(archives=archives,
sync=self.SYNC))
def probe_archive(self, archive):
def probe_archive(self, archive, forced=False):
client = self.clients[archive]
candidates = client.list_candidate_files()
for (objname, info) in candidates.iteritems():
self.probe_file(archive, objname, assumed_info=info)
candidates = client.list_candidate_files(forced=forced)
for objname in candidates:
self.probe_file(archive, objname)
def decide_archive(self, archive):
for objname in self.list_deciding([archive]):
......@@ -367,18 +367,14 @@ class FileSyncer(object):
def decide_all_archives(self):
logger.info("Checking candidates to sync")
for objname in self.list_deciding():
self.decide_file_sync(objname)
def probe_all(self):
self.probe_archive(self.MASTER)
self.probe_archive(self.SLAVE)
def probe_and_sync_all(self):
self.probe_all()
for objname in self.list_deciding():
self.decide_file_sync(objname)
def probe_all(self, forced=False):
self.probe_archive(self.MASTER, forced=forced)
self.probe_archive(self.SLAVE, forced=forced)
def _poll_decide(self, interval=3):
class DecideThread(utils.StoppableThread):
def run_body(this):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment