diff --git a/agkyra/agkyra/__init__.py b/agkyra/agkyra/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/agkyra/agkyra/syncer/__init__.py b/agkyra/agkyra/syncer/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/agkyra/agkyra/syncer/common.py b/agkyra/agkyra/syncer/common.py new file mode 100644 index 0000000000000000000000000000000000000000..ea71c4c402eca9d0fe89ad89b572a9ce8a5b072c --- /dev/null +++ b/agkyra/agkyra/syncer/common.py @@ -0,0 +1,37 @@ +from collections import namedtuple + +FileStateTuple = namedtuple('FileStateTuple', + ['archive', 'path', 'serial', 'info']) + + +class FileState(FileStateTuple): + + __slots__ = () + + def set(self, *args, **kwargs): + return self._replace(*args, **kwargs) + + +T_DIR = "dir" +T_FILE = "file" +T_UNHANDLED = "unhandled" + + +class SyncError(Exception): + pass + + +class BusyError(SyncError): + pass + + +class ConflictError(SyncError): + pass + + +class InvalidInput(SyncError): + pass + + +class HandledError(SyncError): + pass diff --git a/agkyra/agkyra/syncer/database.py b/agkyra/agkyra/syncer/database.py new file mode 100644 index 0000000000000000000000000000000000000000..55d410ae4857eb126d2884dee1cad7ef53877831 --- /dev/null +++ b/agkyra/agkyra/syncer/database.py @@ -0,0 +1,205 @@ +from functools import wraps +import time +import sqlite3 +import json +import logging + +from agkyra.syncer import common + +logger = logging.getLogger(__name__) + + +class FileStateDB(object): + + def new_serial(self, path): + raise NotImplementedError + + def list_files(self, archive): + raise NotImplementedError + + def put_state(self, state): + raise NotImplementedError + + def get_state(self, archive, path): + raise NotImplementedError + + +class SqliteFileStateDB(FileStateDB): + + def __init__(self, dbname, initialize=False): + self.dbname = dbname + self.db = sqlite3.connect(dbname) + if initialize: + self.init() + + def init(self): + logger.warning("Initializing DB '%s'" % self.dbname) + db = self.db + + Q = ("create table if not exists " + "archives(archive text, path text, serial integer, " + "info blob, primary key (archive, path))") + db.execute(Q) + + Q = ("create table if not exists " + "serials(path text, nextserial bigint, primary key (path))") + db.execute(Q) + + Q = ("create table if not exists " + "cachepaths(cachepath text, client text, path text, " + "primary key (cachepath))") + db.execute(Q) + + self.commit() + + def begin(self): + self.db.execute("begin") + + def commit(self): + self.db.commit() + + def rollback(self): + self.db.rollback() + + def get_cachepath(self, cachepath): + db = self.db + Q = "select * from cachepaths where cachepath = ?" + c = db.execute(Q, (cachepath,)) + r = c.fetchone() + if r: + return r + else: + return None + + def insert_cachepath(self, cachepath, client, path): + db = self.db + Q = "insert into cachepaths(cachepath, client, path) values (?, ?, ?)" + db.execute(Q, (cachepath, client, path)) + + def delete_cachepath(self, cachepath): + db = self.db + Q = "delete from cachepaths where cachepath = ?" + db.execute(Q, (cachepath,)) + + def new_serial(self, path): + db = self.db + Q = ("select nextserial from serials where path = ?") + c = db.execute(Q, (path,)) + r = c.fetchone() + if r: + serial = r[0] + Q = "update serials set nextserial = ? where path = ?" + else: + serial = 0 + Q = "insert into serials(nextserial, path) values (?, ?)" + db.execute(Q, (serial + 1, path)) + return serial + + def list_files_with_info(self, archive, info): + Q = ("select path from archives where archive = ? and info = ?" + " order by path") + c = self.db.execute(Q, (archive, info)) + fetchone = c.fetchone + while True: + r = fetchone() + if not r: + break + yield r[0] + + def list_non_deleted_files(self, archive): + Q = ("select path from archives where archive = ? and info != '{}'" + " order by path") + c = self.db.execute(Q, (archive,)) + fetchone = c.fetchone + while True: + r = fetchone() + if not r: + break + yield r[0] + + def list_files(self, archive, prefix=None): + Q = "select path from archives where archive = ?" + if prefix is not None: + Q += " and path like ?" + tpl = (archive, prefix + '%') + else: + tpl = (archive,) + + Q += " order by path" + c = self.db.execute(Q, tpl) + fetchone = c.fetchone + while True: + r = fetchone() + if not r: + break + yield r[0] + + def list_deciding(self, archives, sync): + if len(archives) == 1: + archive = archives[0] + archives = (archive, archive) + archives = tuple(archives) + Q = ("select client.path from archives client, archives sync " + "where client.archive in (?, ?) and sync.archive = ? " + "and client.path = sync.path and client.serial > sync.serial") + c = self.db.execute(Q, archives + (sync,)) + fetchone = c.fetchone + while True: + r = fetchone() + if not r: + break + yield r[0] + + def put_state(self, state): + Q = ("insert or replace into " + "archives(archive, path, serial, info) " + "values (?, ?, ?, ?)") + args = (state.archive, state.path, state.serial, + json.dumps(state.info)) + self.db.execute(Q, args) + + def _get_state(self, archive, path): + Q = ("select archive, path, serial, info from archives " + "where archive = ? and path = ?") + c = self.db.execute(Q, (archive, path)) + r = c.fetchone() + if not r: + return None + + return common.FileState(archive=r[0], path=r[1], serial=r[2], + info=json.loads(r[3])) + + def get_state(self, archive, path): + state = self._get_state(archive, path) + if state is None: + state = common.FileState(archive=archive, path=path, serial=-1, + info={}) + return state + + +def transaction(retries=5, retry_wait=1): + def wrap(func): + @wraps(func) + def inner(*args, **kwargs): + obj = args[0] + db = obj.get_db() + attempt = 0 + while True: + try: + db.begin() + r = func(*args, **kwargs) + db.commit() + return r + except Exception as e: + db.rollback() + # TODO check conflict + if isinstance(e, sqlite3.OperationalError) and \ + "locked" in e.message and attempt < retries: + logger.warning( + "Got DB error '%s'. Retrying transaction." % e) + time.sleep(retry_wait) + attempt += 1 + else: + raise e + return inner + return wrap diff --git a/agkyra/agkyra/syncer/file_client.py b/agkyra/agkyra/syncer/file_client.py new file mode 100644 index 0000000000000000000000000000000000000000..5645ab8da97518a5ee8e9af1a659d672e8ea9a22 --- /dev/null +++ b/agkyra/agkyra/syncer/file_client.py @@ -0,0 +1,38 @@ +import logging +logger = logging.getLogger(__name__) + +from agkyra.syncer import common + + +class FileClient(object): + + def list_candidate_files(self, archive): + raise NotImplementedError + + def start_probing_path(self, path, old_state, ref_state, callback=None): + raise NotImplementedError + + def stage_file(self, source_state): + raise NotImplementedError + + def prepare_target(self, state): + raise NotImplementedError + + def start_pulling_file(self, source_handle, target_state, sync_state, + callback=None): + try: + synced_source_state, synced_target_state = \ + self._start(source_handle, target_state, sync_state) + if callback is not None: + callback(synced_source_state, synced_target_state) + except common.SyncError as e: + logger.warning(e) + + def _start(self, source_handle, target_state, sync_state): + try: + target_handle = self.prepare_target(target_state) + synced_target_state = target_handle.pull(source_handle, sync_state) + synced_source_state = source_handle.get_synced_state() + return synced_source_state, synced_target_state + finally: + source_handle.unstage_file() diff --git a/agkyra/agkyra/syncer/heartbeat.py b/agkyra/agkyra/syncer/heartbeat.py new file mode 100644 index 0000000000000000000000000000000000000000..2ebf2acb60d6cdffafb444fccd553fee9192e64e --- /dev/null +++ b/agkyra/agkyra/syncer/heartbeat.py @@ -0,0 +1,29 @@ +import threading + + +class HeartBeat(object): + def __init__(self, *args, **kwargs): + self._LOG = {} + self._LOCK = threading.Lock() + + def lock(self): + class Lock(object): + def __enter__(this): + self._LOCK.acquire() + return this + + def __exit__(this, exctype, value, traceback): + self._LOCK.release() + if value is not None: + raise value + + def get(this, key): + return self._LOG.get(key) + + def set(this, key, value): + self._LOG[key] = value + + def delete(this, key): + self._LOG.pop(key) + + return Lock() diff --git a/agkyra/agkyra/syncer/localfs_client.py b/agkyra/agkyra/syncer/localfs_client.py new file mode 100644 index 0000000000000000000000000000000000000000..e354deaffc2b562ac024875e9f8b75ccbf94df0e --- /dev/null +++ b/agkyra/agkyra/syncer/localfs_client.py @@ -0,0 +1,569 @@ +import os +import re +import time +import datetime +import psutil +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler +import logging + +from agkyra.syncer.file_client import FileClient +from agkyra.syncer import utils, common +from agkyra.syncer.database import transaction + +logger = logging.getLogger(__name__) + +LOCAL_FILE = 0 +LOCAL_EMPTY_DIR = 1 +LOCAL_NONEMPTY_DIR = 2 +LOCAL_MISSING = 3 +LOCAL_SOFTLINK = 4 +LOCAL_OTHER = 5 + +OS_FILE_EXISTS = 17 +OS_NOT_A_DIR = 20 +OS_NO_FILE_OR_DIR = 2 + + +class DirMissing(BaseException): + pass + + +def link_file(src, dest): + try: + os.link(src, dest) + except OSError as e: + if e.errno in [OS_FILE_EXISTS, OS_NOT_A_DIR]: + raise common.ConflictError("Cannot link, '%s' exists." % dest) + if e.errno == OS_NO_FILE_OR_DIR: + raise DirMissing() + + +def make_dirs(path): + try: + os.makedirs(path) + except OSError as e: + if e.errno == OS_FILE_EXISTS and os.path.isdir(path): + return + if e.errno in [OS_FILE_EXISTS, OS_NOT_A_DIR]: + raise common.ConflictError("Cannot make dir '%s'." % path) + raise + + +psutil_open_files = \ + (lambda proc: proc.open_files()) if psutil.version_info[0] >= 2 else \ + (lambda proc: proc.get_open_files()) + + +def file_is_open(path): + for proc in psutil.process_iter(): + try: + flist = psutil_open_files(proc) + for nt in flist: + if nt.path == path: + return True + except psutil.NoSuchProcess: + pass + return False + + +def mk_stash_name(filename): + tstamp = datetime.datetime.now().strftime("%s") + return filename + '.' + tstamp + '.local' + + +def eq_float(f1, f2): + return abs(f1 - f2) < 0.01 + + +def files_equal(f1, f2): + logger.info("Comparing files: '%s', '%s'" % (f1, f2)) + st1 = path_status(f1) + st2 = path_status(f2) + if st1 != st2: + return False + if st1 != LOCAL_FILE: + return True + (mtime1, msize1) = stat_file(f1) + (mtime2, msize2) = stat_file(f2) + if msize1 != msize2: + return False + hash1 = utils.hash_file(f1) + hash2 = utils.hash_file(f2) + return hash1 == hash2 + + +def info_is_unhandled(info): + return info != {} and info[LOCALFS_TYPE] == common.T_UNHANDLED + + +def local_path_changes(path, state): + live_info = get_live_info(path) + info = state.info + if is_info_eq(live_info, info): + return None + return live_info + + +def get_live_info(path): + if path is None: + return {} + status = path_status(path) + if status == LOCAL_MISSING: + return {} + if status in [LOCAL_SOFTLINK, LOCAL_OTHER]: + return {LOCALFS_TYPE: common.T_UNHANDLED} + if status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]: + return {LOCALFS_TYPE: common.T_DIR} + stats = stat_file(path) + if stats is None: + return {} + (st_mtime, st_size) = stats + live_info = {LOCALFS_MTIME: st_mtime, + LOCALFS_SIZE: st_size, + LOCALFS_TYPE: common.T_FILE, + } + return live_info + + +def stat_file(filename): + try: + file_stats = os.lstat(filename) + except OSError as e: + if e.errno == OS_NO_FILE_OR_DIR: + return None + raise + return (file_stats.st_mtime, file_stats.st_size) + + +LOCALFS_TYPE = "localfs_type" +LOCALFS_MTIME = "localfs_mtime" +LOCALFS_SIZE = "localfs_size" + + +def status_of_info(info): + if info == {}: + return LOCAL_MISSING + if info[LOCALFS_TYPE] == common.T_DIR: + return LOCAL_EMPTY_DIR + if info[LOCALFS_TYPE] == common.T_UNHANDLED: + return LOCAL_OTHER # shouldn't happen + return LOCAL_FILE + + +def path_status(path): + if os.path.islink(path): + return LOCAL_SOFTLINK + try: + contents = os.listdir(path) + return LOCAL_NONEMPTY_DIR if contents else LOCAL_EMPTY_DIR + except OSError as e: + if e.errno == OS_NOT_A_DIR: + if os.path.isfile(path): + return LOCAL_FILE + else: + return LOCAL_OTHER + if e.errno == OS_NO_FILE_OR_DIR: + return LOCAL_MISSING + + +def old_path_status(path): + try: + contents = os.listdir(path) + return LOCAL_NONEMPTY_DIR if contents else LOCAL_EMPTY_DIR + except OSError as e: + if e.errno == OS_NOT_A_DIR: + return LOCAL_FILE + if e.errno == OS_NO_FILE_OR_DIR: + return LOCAL_MISSING + + +def is_info_eq(info1, info2): + if {} in [info1, info2]: + return info1 == info2 + if info1[LOCALFS_TYPE] != info2[LOCALFS_TYPE]: + return False + if info1[LOCALFS_TYPE] == common.T_UNHANDLED: + return False + if info1[LOCALFS_TYPE] == common.T_DIR: + return True + return eq_float(info1[LOCALFS_MTIME], info2[LOCALFS_MTIME]) \ + and info1[LOCALFS_SIZE] == info2[LOCALFS_SIZE] + + +class LocalfsTargetHandle(object): + def __init__(self, settings, target_state): + self.NAME = "LocalfsTargetHandle" + self.rootpath = settings.local_root_path + self.cache_hide_name = settings.cache_hide_name + self.cache_hide_path = settings.cache_hide_path + self.cache_path = settings.cache_path + self.get_db = settings.get_db + self.target_state = target_state + self.path = target_state.path + self.local_path = utils.join_path(self.rootpath, self.path) + self.hidden_filename = None + self.hidden_path = None + + def get_path_in_cache(self, path): + return utils.join_path(self.cache_path, path) + + @transaction() + def register_hidden_name(self, filename): + db = self.get_db() + f = utils.hash_string(filename) + hide_filename = utils.join_path(self.cache_hide_name, f) + self.hidden_filename = hide_filename + if db.get_cachepath(hide_filename): + return False + db.insert_cachepath(hide_filename, self.NAME, filename) + return True + + @transaction() + def unregister_hidden_name(self, hidden_filename): + db = self.get_db() + db.delete_cachepath(hidden_filename) + self.hidden_filename = None + + def hide_file(self): + local_filename = self.local_path + if file_is_open(local_filename): + raise common.BusyError("File '%s' is open. Aborting." + % local_filename) + + new_registered = self.register_hidden_name(self.path) + hidden_filename = self.hidden_filename + hidden_path = self.get_path_in_cache(hidden_filename) + self.hidden_path = hidden_path + + if not new_registered: + logger.warning("Hiding already registered for file %s" % + (self.path,)) + if os.path.lexists(hidden_path): + logger.warning("File %s already hidden at %s" % + (self.path, hidden_path)) + return + try: + os.rename(local_filename, hidden_path) + logger.info("Hiding file '%s' to '%s'" % + (local_filename, hidden_path)) + except OSError as e: + if e.errno == OS_NO_FILE_OR_DIR: + self.unregister_hidden_name(hidden_filename) + logger.info("File '%s' does not exist" % local_filename) + return + else: + raise e + if file_is_open(hidden_path): + os.rename(hidden_path, local_filename) + self.unregister_hidden_name(hidden_filename) + raise common.BusyError("File '%s' is open. Undoing." % hidden_path) + if path_status(hidden_path) == LOCAL_NONEMPTY_DIR: + os.rename(hidden_path, local_filename) + self.unregister_hidden_name(hidden_filename) + raise common.ConflictError("'%s' is non-empty" % local_filename) + + def apply(self, fetched_file, fetched_live_info, sync_state): + local_status = path_status(self.local_path) + fetched_status = status_of_info(fetched_live_info) + if local_status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR] \ + and fetched_status == LOCAL_EMPTY_DIR: + return + if local_status == LOCAL_MISSING and fetched_status == LOCAL_MISSING: + return + if local_status == LOCAL_NONEMPTY_DIR: + raise common.ConflictError("'%s' is non-empty" % self.local_path) + + self.prepare(fetched_file, sync_state) + self.finalize(fetched_file, fetched_live_info) + self.cleanup(self.hidden_path) + self.unregister_hidden_name(self.hidden_filename) + + def prepare(self, fetched_file, sync_state): + self.hide_file() + info_changed = local_path_changes(self.hidden_path, sync_state) + print 'info changed', info_changed + if info_changed is not None and info_changed != {}: + if not files_equal(self.hidden_path, fetched_file): + self.stash_file() + + def stash_file(self): + stash_filename = mk_stash_name(self.local_path) + logger.warning("Stashing file '%s' to '%s'" % + (self.local_path, stash_filename)) + os.rename(self.hidden_path, stash_filename) + + def finalize(self, filename, live_info): + logger.info("Finalizing file '%s'" % filename) + if live_info == {}: + return + if live_info[LOCALFS_TYPE] != common.T_DIR: + try: + link_file(filename, self.local_path) + except DirMissing: + make_dirs(os.path.dirname(self.local_path)) + link_file(filename, self.local_path) + else: + # assuming empty dir + make_dirs(self.local_path) + + def cleanup(self, filename): + status = path_status(filename) + if status == LOCAL_FILE: + try: + logger.info("Cleaning up file '%s'" % filename) + os.unlink(filename) + except: + pass + elif status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]: + os.rmdir(filename) + + def pull(self, source_handle, sync_state): + fetched_file = source_handle.send_file(sync_state) + fetched_live_info = get_live_info(fetched_file) + self.apply(fetched_file, fetched_live_info, sync_state) + self.cleanup(fetched_file) + return self.target_state.set(info=fetched_live_info) + + +class LocalfsSourceHandle(object): + @transaction() + def register_stage_name(self, filename): + db = self.get_db() + f = utils.hash_string(filename) + stage_filename = utils.join_path(self.cache_stage_name, f) + self.stage_filename = stage_filename + if db.get_cachepath(stage_filename): + return False + db.insert_cachepath(stage_filename, self.NAME, filename) + return True + + @transaction() + def unregister_stage_name(self, stage_filename): + db = self.get_db() + db.delete_cachepath(stage_filename) + self.stage_filename = None + + def get_path_in_cache(self, path): + return utils.join_path(self.cache_path, path) + + def lock_file(self, local_filename): + if file_is_open(local_filename): + raise common.BusyError("File '%s' is open. Aborting" + % local_filename) + new_registered = self.register_stage_name(local_filename) + stage_filename = self.stage_filename + stage_path = self.get_path_in_cache(stage_filename) + self.staged_path = stage_path + + if not new_registered: + logger.warning("Staging already registered for file %s" % + (self.path,)) + if os.path.lexists(stage_path): + logger.warning("File %s already staged at %s" % + (self.path, stage_path)) + return + try: + os.rename(local_filename, stage_path) + except OSError as e: + if e.errno == OS_NO_FILE_OR_DIR: + logger.info("Source does not exist: '%s'" % local_filename) + self.unregister_stage_name(stage_filename) + else: + raise e + if file_is_open(stage_path): + os.rename(stage_path, local_filename) + self.unregister_stage_name(stage_filename) + raise common.BusyError("File '%s' is open. Undoing" % stage_path) + if path_status(stage_path) in [LOCAL_NONEMPTY_DIR, LOCAL_EMPTY_DIR]: + os.rename(stage_path, local_filename) + self.unregister_hidden_name(stage_filename) + raise common.ConflictError("'%s' is non-empty" % local_filename) + logger.info("Staging file '%s' to '%s'" % (self.path, stage_path)) + + def check_stable(self, interval=1, times=5): + for i in range(times): + live_info = local_path_changes(self.staged_file, self.source_state) + if live_info is not None: + return False + time.sleep(interval) + return True + + def __init__(self, settings, source_state): + self.NAME = "LocalfsSourceHandle" + self.rootpath = settings.local_root_path + self.cache_stage_name = settings.cache_stage_name + self.cache_stage_path = settings.cache_stage_path + self.cache_path = settings.cache_path + self.get_db = settings.get_db + self.source_state = source_state + path = source_state.path + self.path = path + local_filename = utils.join_path(self.rootpath, path) + self.local_path = local_filename + self.isdir = self.info_is_dir() + self.stage_filename = None + self.staged_path = None + self.heartbeat = settings.heartbeat + self.check_log() + if not self.isdir: + self.lock_file(local_filename) + # self.check_stable() + + def check_log(self): + with self.heartbeat.lock() as hb: + prev_log = hb.get(self.path) + if prev_log is not None: + actionstate, ts = prev_log + if actionstate != self.NAME or \ + utils.younger_than(ts, 10): + raise common.HandledError("Action mismatch in %s: %s %s" % + (self.NAME, self.path, prev_log)) + logger.warning("Ignoring previous run in %s: %s %s" % + (self.NAME, self.path, prev_log)) + hb.set(self.path, (self.NAME, utils.time_stamp())) + + def get_synced_state(self): + return self.source_state + + def info_is_dir(self): + try: + return self.source_state.info[LOCALFS_TYPE] == common.T_DIR + except KeyError: + return False + + def info_is_deleted(self): + return self.source_state.info == {} + + def info_is_deleted_or_unhandled(self): + return self.source_state.info == {} \ + or self.source_state.info[LOCALFS_TYPE] == common.T_UNHANDLED + + def stash_staged_file(self): + stash_filename = mk_stash_name(self.local_path) + logger.warning("Stashing file '%s' to '%s'" % + (self.local_path, stash_filename)) + os.rename(self.staged_path, stash_filename) + + def unstage_file(self): + self.do_unstage() + self.unregister_stage_name(self.stage_filename) + self.clear_log() + + def clear_log(self): + with self.heartbeat.lock() as hb: + hb.delete(self.path) + + def do_unstage(self): + if self.stage_filename is None: + return + if self.info_is_deleted(): + return + staged_path = self.staged_path + try: + link_file(staged_path, self.local_path) + print "Unlinking", staged_path + os.unlink(staged_path) + except common.ConflictError: + self.stash_staged_file() + + +class LocalfsFileClient(FileClient): + def __init__(self, settings): + self.settings = settings + self.NAME = "LocalfsFileClient" + self.ROOTPATH = settings.local_root_path + self.CACHEPATH = settings.cache_path + self.get_db = settings.get_db + self.exclude_files_exp = re.compile('.*\.tmp$') + self.exclude_dir_exp = re.compile(self.CACHEPATH) + + def list_candidate_files(self): + db = self.get_db() + candidates = [] + for dirpath, dirnames, files in os.walk(self.ROOTPATH): + rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH) + logger.debug("'%s' '%s'" % (dirpath, rel_dirpath)) + # if self.exclude_dir_exp.match(dirpath): + # continue + if rel_dirpath != '.': + candidates.append(rel_dirpath) + for filename in files: + # if self.exclude_files_exp.match(filename) or \ + # self.exclude_dir_exp.match(filename): + # continue + local_filename = utils.join_path(rel_dirpath, filename) + candidates.append(local_filename) + + db_names = set(db.list_files(self.NAME)) + return db_names.union(candidates) + + def _local_path_changes(self, path, state): + local_path = utils.join_path(self.ROOTPATH, path) + return local_path_changes(local_path, state) + + def start_probing_path(self, path, old_state, ref_state, callback=None): + if old_state.serial != ref_state.serial: + logger.warning("Serial mismatch in probing path '%s'" % path) + return + live_info = self._local_path_changes(path, old_state) + if live_info is None: + return + live_state = old_state.set(info=live_info) + if callback is not None: + callback(live_state) + + def stage_file(self, source_state): + return LocalfsSourceHandle(self.settings, source_state) + + def prepare_target(self, target_state): + return LocalfsTargetHandle(self.settings, target_state) + + def notifier(self, callback=None): + def handle_path(path): + rel_path = os.path.relpath(path, start=self.ROOTPATH) + if callback is not None: + callback(self.NAME, rel_path) + + class EventHandler(FileSystemEventHandler): + def on_created(this, event): + # if not event.is_directory: + # return + path = event.src_path + if path.startswith(self.CACHEPATH): + return + logger.info("Handling %s" % event) + handle_path(path) + + def on_deleted(this, event): + path = event.src_path + if path.startswith(self.CACHEPATH): + return + logger.info("Handling %s" % event) + handle_path(path) + + def on_modified(this, event): + if event.is_directory: + return + path = event.src_path + if path.startswith(self.CACHEPATH): + return + logger.info("Handling %s" % event) + handle_path(path) + + def on_moved(this, event): + src_path = event.src_path + dest_path = event.dest_path + if src_path.startswith(self.CACHEPATH) or \ + dest_path.startswith(self.CACHEPATH): + return + logger.info("Handling %s" % event) + handle_path(src_path) + handle_path(dest_path) + + path = self.ROOTPATH + event_handler = EventHandler() + observer = Observer() + observer.schedule(event_handler, path, recursive=True) + observer.start() + return observer diff --git a/agkyra/agkyra/syncer/pithos_client.py b/agkyra/agkyra/syncer/pithos_client.py new file mode 100644 index 0000000000000000000000000000000000000000..925a2b803a4898975fee8a1a01f0a7fa713bc83a --- /dev/null +++ b/agkyra/agkyra/syncer/pithos_client.py @@ -0,0 +1,301 @@ +from functools import wraps +import time +import os +import datetime +import threading +import random +import logging + +from agkyra.syncer import utils, common +from agkyra.syncer.file_client import FileClient +from agkyra.syncer.setup import ClientError +from agkyra.syncer.database import transaction + +logger = logging.getLogger(__name__) + + +def heartbeat_event(settings, heartbeat, path): + event = threading.Event() + max_interval = settings.action_max_wait / 2.0 + + def set_log(): + with heartbeat.lock() as hb: + client, prev_tstamp = hb.get(path) + tpl = (client, utils.time_stamp()) + hb.set(path, tpl) + logger.info("HEARTBEAT %s %s %s" % ((path,) + tpl)) + + def go(): + interval = 0.2 + while True: + if event.is_set(): + break + set_log() + time.sleep(interval) + interval = min(2 * interval, max_interval) + thread = threading.Thread(target=go) + thread.start() + return event + + +def give_heartbeat(f): + @wraps(f) + def inner(*args, **kwargs): + obj = args[0] + path = obj.path + heartbeat = obj.heartbeat + settings = obj.settings + event = heartbeat_event(settings, heartbeat, path) + try: + return f(*args, **kwargs) + finally: + event.set() + return inner + + +class PithosSourceHandle(object): + def __init__(self, settings, source_state): + self.NAME = "PithosSourceHandle" + self.settings = settings + self.endpoint = settings.endpoint + self.cache_fetch_name = settings.cache_fetch_name + self.cache_fetch_path = settings.cache_fetch_path + self.cache_path = settings.cache_path + self.get_db = settings.get_db + self.source_state = source_state + self.path = source_state.path + self.heartbeat = settings.heartbeat + self.check_log() + + def check_log(self): + with self.heartbeat.lock() as hb: + prev_log = hb.get(self.path) + if prev_log is not None: + actionstate, ts = prev_log + if actionstate != self.NAME or \ + utils.younger_than(ts, self.settings.action_max_wait): + raise common.HandledError("Action mismatch in %s: %s %s" % + (self.NAME, self.path, prev_log)) + logger.warning("Ignoring previous run in %s: %s %s" % + (self.NAME, self.path, prev_log)) + hb.set(self.path, (self.NAME, utils.time_stamp())) + + @transaction() + def register_fetch_name(self, filename): + db = self.get_db() + f = utils.hash_string(filename) + "_" + \ + datetime.datetime.now().strftime("%s") + fetch_name = utils.join_path(self.cache_fetch_name, f) + self.fetch_name = fetch_name + db.insert_cachepath(fetch_name, self.NAME, filename) + return utils.join_path(self.cache_path, fetch_name) + + @give_heartbeat + def send_file(self, sync_state): + fetched_file = self.register_fetch_name(self.path) + headers = dict() + with open(fetched_file, mode='wb+') as fil: + try: + logger.info("Downloading path: '%s', to: '%s'" % + (self.path, fetched_file)) + self.endpoint.download_object( + self.path, + fil, + headers=headers) + except ClientError as e: + if e.status == 404: + actual_info = {} + else: + raise e + else: + actual_etag = headers["x-object-hash"] + actual_type = (common.T_DIR if object_isdir(headers) + else common.T_FILE) + actual_info = {"pithos_etag": actual_etag, + "pithos_type": actual_type} + self.source_state = self.source_state.set(info=actual_info) + if actual_info == {}: + logger.info("Downloading path: '%s', object is gone." % self.path) + os.unlink(fetched_file) + elif actual_info["pithos_type"] == common.T_DIR: + logger.info("Downloading path: '%s', object is dir." % self.path) + os.unlink(fetched_file) + os.mkdir(fetched_file) + return fetched_file + + def get_synced_state(self): + return self.source_state + + def unstage_file(self): + self.clear_log() + + def clear_log(self): + with self.heartbeat.lock() as hb: + hb.delete(self.path) + + +class PithosTargetHandle(object): + def __init__(self, settings, target_state): + self.settings = settings + self.endpoint = settings.endpoint + self.target_state = target_state + self.target_file = target_state.path + self.path = target_state.path + self.heartbeat = settings.heartbeat + + def mk_del_name(self, name): + tstamp = datetime.datetime.now().strftime("%s") + rand = str(random.getrandbits(64)) + return "%s.%s.%s.deleted" % (name, tstamp, rand) + + def safe_object_del(self, path, etag): + container = self.endpoint.container + del_name = self.mk_del_name(path) + try: + self.endpoint.object_move( + path, + destination='/%s/%s' % (container, del_name), + if_etag_match=etag) + except ClientError as e: + logger.warning("'%s' not found; already deleted?" % path) + if e.status == 404: + return + self.endpoint.del_object(del_name) + + def directory_put(self, path, etag): + r = self.endpoint.object_put( + path, + content_type='application/directory', + content_length=0, + if_etag_match=etag) + return r + + @give_heartbeat + def pull(self, source_handle, sync_state): +# assert isinstance(source_handle, LocalfsSourceHandle) + info = sync_state.info + etag = info.get("pithos_etag") + if source_handle.info_is_deleted_or_unhandled(): + if etag is not None: + logger.info("Deleting object '%s'" % self.target_file) + self.safe_object_del(self.target_file, etag) + live_info = {} + elif source_handle.info_is_dir(): + logger.info("Creating dir '%s'" % source_handle.path) + r = self.directory_put(source_handle.path, etag) + synced_etag = r.headers["etag"] + live_info = {"pithos_etag": synced_etag, + "pithos_type": common.T_DIR} + else: + with open(source_handle.staged_path, mode="rb") as fil: + r = self.endpoint.upload_object( + self.target_file, + fil, + if_etag_match=info.get("pithos_etag")) + synced_etag = r["etag"] + live_info = {"pithos_etag": synced_etag, + "pithos_type": common.T_FILE} + return self.target_state.set(info=live_info) + + +def object_isdir(obj): + try: + content_type = obj["content_type"] + except KeyError: + content_type = obj["content-type"] + return any(txt in content_type for txt in ['application/directory', + 'application/folder']) + + +PITHOS_TYPE = "pithos_type" +PITHOS_ETAG = "pithos_etag" + + +class PithosFileClient(FileClient): + def __init__(self, settings): + self.settings = settings + self.NAME = "PithosFileClient" + self.auth_url = settings.auth_url + self.auth_token = settings.auth_token + self.container = settings.container + self.get_db = settings.get_db + self.endpoint = settings.endpoint + + def list_candidate_files(self, last_modified=None): + db = self.get_db() + objects = self.endpoint.list_objects() + self.objects = objects + upstream_all_names = set(obj["name"] for obj in objects) + non_deleted_in_db = set(db.list_non_deleted_files(self.NAME)) + newly_deleted = non_deleted_in_db.difference(upstream_all_names) + logger.debug("newly_deleted %s" % newly_deleted) + if last_modified is not None: + upstream_modified_names = set( + obj["name"] for obj in objects + if obj["last_modified"] > last_modified) + upstream_names = upstream_modified_names + else: + upstream_names = upstream_all_names + candidates = upstream_names.union(newly_deleted) + logger.info("Candidates: %s" % candidates) + return candidates + + def notifier(self, callback=None, interval=10): + class PollPithos(threading.Thread): + def run(this): + while True: + utcnow = datetime.datetime.utcnow() + last_tstamp = (utcnow - + datetime.timedelta(seconds=interval)) + last_modified = last_tstamp.isoformat() + candidates = self.list_candidate_files( + last_modified=last_modified) + if callback is not None: + for candidate in candidates: + callback(self.NAME, candidate) + time.sleep(interval) + + poll = PollPithos() + poll.daemon = True + poll.start() + + def get_object_from_cache(self, path): + if self.objects is None: + self.objects = self.endpoint.list_objects() + objs = [o for o in self.objects if o["name"] == path] + try: + return objs[0] + except IndexError: + return None + + def get_object(self, path): + try: + return self.endpoint.get_object_info(path) + except ClientError as e: + if e.status == 404: + return None + raise e + + def get_object_live_info(self, obj): + if obj is None: + return {} + p_type = common.T_DIR if object_isdir(obj) else common.T_FILE + obj_hash = obj["x-object-hash"] + return {PITHOS_ETAG: obj_hash, + PITHOS_TYPE: p_type, + } + + def start_probing_path(self, path, old_state, ref_state, callback=None): + info = old_state.info + obj = self.get_object(path) + live_info = self.get_object_live_info(obj) + if info != live_info: + if callback is not None: + live_state = old_state.set(info=live_info) + callback(live_state) + + def stage_file(self, source_state): + return PithosSourceHandle(self.settings, source_state) + + def prepare_target(self, target_state): + return PithosTargetHandle(self.settings, target_state) diff --git a/agkyra/agkyra/syncer/setup.py b/agkyra/agkyra/syncer/setup.py new file mode 100644 index 0000000000000000000000000000000000000000..0dabbf9d87bea95154d867659d7a2bcdc375ae61 --- /dev/null +++ b/agkyra/agkyra/syncer/setup.py @@ -0,0 +1,121 @@ +import os +import threading +import logging + +from agkyra.syncer.utils import join_path +from agkyra.syncer.database import SqliteFileStateDB +from agkyra.syncer.heartbeat import HeartBeat + +from kamaki.clients import ClientError + +from kamaki.clients.astakos import AstakosClient +from kamaki.clients.pithos import PithosClient +from kamaki.clients.utils import https + +#### TODO: handle this +https.patch_ignore_ssl() + +logger = logging.getLogger(__name__) + + +DEFAULT_CACHE_NAME = '.agkyra_cache' +DEFAULT_CACHE_HIDE_NAME = 'hidden' +DEFAULT_CACHE_STAGE_NAME = 'staged' +DEFAULT_CACHE_FETCH_NAME = 'fetched' +GLOBAL_SETTINGS_NAME = '.agkyra' +DEFAULT_DBNAME = "syncer.db" +DEFAULT_ACTION_MAX_WAIT = 10 + +thread_local_data = threading.local() + + +class SyncerSettings(): + def __init__(self, instance, auth_url, auth_token, container, + local_root_path, + *args, **kwargs): + self.auth_url = auth_url + self.auth_token = auth_token + self.container = container + self.endpoint = self._get_pithos_client( + auth_url, auth_token, container) + + self.home_dir = os.path.expanduser('~') + self.settings_path = join_path(self.home_dir, GLOBAL_SETTINGS_NAME) + self.create_dir(self.settings_path) + self.instance_path = join_path(self.settings_path, instance) + self.create_dir(self.instance_path) + + self.dbname = kwargs.get("dbname", DEFAULT_DBNAME) + self.full_dbname = join_path(self.instance_path, self.dbname) + self.get_db(initialize=True) + + self.local_root_path = local_root_path + self.create_dir(self.local_root_path) + + self.cache_name = kwargs.get("cache_path", DEFAULT_CACHE_NAME) + self.cache_path = join_path(self.local_root_path, self.cache_name) + self.create_dir(self.cache_path) + + self.cache_hide_name = kwargs.get("cache_hide_name", + DEFAULT_CACHE_HIDE_NAME) + self.cache_hide_path = join_path(self.cache_path, self.cache_hide_name) + self.create_dir(self.cache_hide_path) + + self.cache_stage_name = kwargs.get("cache_stage_name", + DEFAULT_CACHE_STAGE_NAME) + self.cache_stage_path = join_path(self.cache_path, + self.cache_stage_name) + self.create_dir(self.cache_stage_path) + + self.cache_fetch_name = kwargs.get("cache_fetch_name", + DEFAULT_CACHE_FETCH_NAME) + self.cache_fetch_path = join_path(self.cache_path, + self.cache_fetch_name) + self.create_dir(self.cache_fetch_path) + + self.heartbeat = HeartBeat() + self.action_max_wait = kwargs.get("action_max_wait", + DEFAULT_ACTION_MAX_WAIT) + + def get_db(self, initialize=False): + dbs = getattr(thread_local_data, "dbs", None) + if dbs is not None: + db = dbs.get(self.full_dbname) + else: + db = None + + if db is None: + logger.info("Connecting db: '%s', thread: %s" % + (self.full_dbname, threading.current_thread().ident)) + db = SqliteFileStateDB(self.full_dbname, initialize=initialize) + if dbs is None: + thread_local_data.dbs = {} + thread_local_data.dbs[self.full_dbname] = db + return db + + def create_dir(self, path): + if os.path.exists(path): + if os.path.isdir(path): + return + raise Exception("Cannot create dir '%s'; file exists" % path) + logger.warning("Creating dir: '%s'" % path) + os.mkdir(path) + return path + + def _get_pithos_client(self, auth_url, token, container): + try: + astakos = AstakosClient(auth_url, token) + except ClientError: + logger.error("Failed to authenticate user token") + raise + try: + PITHOS_URL = astakos.get_endpoint_url(PithosClient.service_type) + except ClientError: + logger.error("Failed to get endpoints for Pithos") + raise + try: + account = astakos.user_info['id'] + return PithosClient(PITHOS_URL, token, account, container) + except ClientError: + logger.error("Failed to initialize Pithos client") + raise diff --git a/agkyra/agkyra/syncer/syncer.py b/agkyra/agkyra/syncer/syncer.py new file mode 100644 index 0000000000000000000000000000000000000000..cc9c01274c9ae0f7350cd0512c9516ce4cf5abd6 --- /dev/null +++ b/agkyra/agkyra/syncer/syncer.py @@ -0,0 +1,307 @@ +import time +import threading +import logging +import re +import os + +from agkyra.syncer import common +from agkyra.syncer.setup import SyncerSettings +from agkyra.syncer.database import transaction +from agkyra.syncer.localfs_client import LocalfsFileClient +from agkyra.syncer.pithos_client import PithosFileClient + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + + +class IgnoreKamakiInfo(logging.Filter): + def filter(self, record): + return not (record.name.startswith('kamaki') and + record.levelno <= logging.INFO) + +for handler in logging.root.handlers: + handler.addFilter(IgnoreKamakiInfo()) + + +exclude_regexes = ["\.#", "\.~", "~\$", "~.*\.tmp$", "\..*\.swp$"] +exclude_pattern = re.compile('|'.join(exclude_regexes)) + + +class FileSyncer(object): + + dbname = None + clients = None + + def __init__(self, settings, master, slave): + self.settings = settings + self.master = master + self.slave = slave + self.DECISION = 'DECISION' + self.SYNC = 'SYNC' + self.MASTER = master.NAME + self.SLAVE = slave.NAME + self.get_db = settings.get_db + self.clients = {self.MASTER: master, self.SLAVE: slave} + self.decide_event = None + + def launch_daemons(self): + self.start_notifiers() + self.start_decide() + + def start_notifiers(self): + self.notifiers = { + self.MASTER: self.master.notifier(callback=self.probe_path), + self.SLAVE: self.slave.notifier(callback=self.probe_path), + } + + def start_decide(self): + if self.decide_event is None: + self.decide_event = self._poll_decide() + self.decide_event.set() + + def pause_decide(self): + if self.decide_event is not None: + self.decide_event.clear() + + def exclude_path(self, path): + parts = path.split(os.path.sep) + init_part = parts[0] + if init_part in [self.settings.cache_name]: + return True + final_part = parts[-1] + return exclude_pattern.match(final_part) + + @transaction() + def probe_path(self, archive, path): + if self.exclude_path(path): + logger.warning("Ignoring probe archive: %s, path: %s" % + (archive, path)) + return + logger.info("Probing archive: %s, path: '%s'" % (archive, path)) + db = self.get_db() + client = self.clients[archive] + db_state = db.get_state(archive, path) + ref_state = db.get_state(self.SYNC, path) + client.start_probing_path(path, db_state, ref_state, + callback=self.update_path) + + @transaction() + def update_path(self, live_state): + db = self.get_db() + archive = live_state.archive + path = live_state.path + serial = live_state.serial + if self.exclude_path(path): + logger.warning("Ignoring update archive: %s, path: %s" % + (archive, path)) + return + logger.info("Updating archive: %s, path: '%s', serial: %s" % + (archive, path, serial)) + db_state = db.get_state(archive, path) + if db_state and db_state.serial != serial: + logger.warning( + "Cannot update archive: %s, path: '%s', " + "serial: %s, db_serial: %s" % + (archive, path, serial, db_state.serial)) + return + + new_serial = db.new_serial(path) + new_state = live_state.set(serial=new_serial) + db.put_state(new_state) + if new_serial == 0: + sync_state = common.FileState( + archive=self.SYNC, path=path, serial=-1, + info={}) + db.put_state(sync_state) + + def decide_path(self, path, master=None, slave=None): + if master is None: + master = self.MASTER + if slave is None: + slave = self.SLAVE + states = self._decide_path(path, master, slave) + if states is None: + return + self.sync_path(*states) + + @transaction() + def _decide_path(self, path, master, slave): + db = self.get_db() + logger.info("Deciding path: '%s'" % path) + master_state = db.get_state(master, path) + slave_state = db.get_state(slave, path) + sync_state = db.get_state(self.SYNC, path) + decision_state = db.get_state(self.DECISION, path) + master_serial = master_state.serial + slave_serial = slave_state.serial + sync_serial = sync_state.serial + if master_serial > sync_serial: + return self._mark_sync_start_path( + master_state, slave_state, + decision_state, sync_state) + + elif master_serial == sync_serial: + if slave_serial > sync_serial: + return self._mark_sync_start_path( + slave_state, master_state, + decision_state, sync_state) + elif slave_serial == sync_serial: + return None + else: + raise AssertionError("Slave serial %s, sync serial %s" + % (slave_serial, sync_serial)) + + else: + raise AssertionError("Master serial %s, sync serial %s" + % (master_serial, sync_serial)) + + def _mark_sync_start_path(self, + source_state, + target_state, + decision_state, + sync_state): + db = self.get_db() + logger.info("Syncing archive: %s, path: '%s', serial: %s" % + (source_state.archive, + source_state.path, + source_state.serial)) + + path = source_state.path + decision_serial = decision_state.serial + sync_serial = sync_state.serial + decided = decision_serial != sync_serial + if decided: + logger.warning("Already decided: '%s', decision: %s, sync: %s" % + (path, decision_serial, sync_serial)) + else: + new_decision_state = decision_state.set( + serial=source_state.serial, info=source_state.info) + db.put_state(new_decision_state) + return source_state, target_state, sync_state + + def sync_path(self, source_state, target_state, sync_state): + thread = threading.Thread( + target=self._sync_path, + args=(source_state, target_state, sync_state)) + thread.start() + + def _sync_path(self, source_state, target_state, sync_state): + clients = self.clients + source_client = clients[source_state.archive] + try: + source_handle = source_client.stage_file(source_state) + except common.SyncError as e: + logger.warning(e) + return + target_client = clients[target_state.archive] + target_client.start_pulling_file( + source_handle, target_state, sync_state, + callback=self.acknowledge_path) + + def update_state(self, old_state, new_state): + db = self.get_db() + db.put_state(new_state) + # here we could do any checks needed on the old state, + # perhaps triggering a probe + + @transaction() + def acknowledge_path(self, synced_source_state, synced_target_state): + db = self.get_db() + serial = synced_source_state.serial + path = synced_source_state.path + source = synced_source_state.archive + target = synced_target_state.archive + logger.info("Acking archive: %s, path: '%s', serial: %s" % + (target, path, serial)) + + decision_state = db.get_state(self.DECISION, path) + sync_state = db.get_state(self.SYNC, path) + + if serial != decision_state.serial: + raise AssertionError( + "Serial mismatch: assumed sync %s, decision %s" + % (serial, decision_state.serial)) + if serial <= sync_state.serial: + raise common.SyncError( + "cannot ack: serial %s < sync serial %s" % + (serial, sync_state.serial)) + + db_source_state = db.get_state(source, path) + self.update_state(db_source_state, synced_source_state) + + final_target_state = synced_target_state.set( + serial=serial) + db_target_state = db.get_state(target, path) + self.update_state(db_target_state, final_target_state) + + sync_info = dict(synced_source_state.info) + sync_info.update(synced_target_state.info) + # The 'info' namespace is global. Some attributes may be globally + # recognizable by all clients with the same semantics, such as + # a content-hash (e.g. SHA256), while other may be specific to + # each client. Clients are responsible to protect their private + # attributes creating their own namespace, for example + # 'localfs_mtime', 'object_store_etag' + new_sync_state = sync_state.set(serial=serial, info=sync_info) + db.put_state(new_sync_state) + new_decision_state = new_sync_state.set(archive=self.DECISION) + db.put_state(new_decision_state) + + @transaction() + def list_deciding(self, archives=None): + db = self.get_db() + if archives is None: + archives = (self.MASTER, self.SLAVE) + return list(db.list_deciding(archives=archives, + sync=self.SYNC)) + + def probe_archive(self, archive): + client = self.clients[archive] + for path in client.list_candidate_files(): + self.probe_path(archive, path) + + def decide_archive(self, archive): + for path in self.list_deciding([archive]): + self.decide_path(path) + + def decide_all_archives(self): + logger.info("Checking candidates to sync") + for path in self.list_deciding(): + self.decide_path(path) + + def probe_and_sync_all(self): + self.probe_archive(self.MASTER) + self.probe_archive(self.SLAVE) + for path in self.list_deciding(): + self.decide_path(path) + + def _poll_decide(self, interval=3): + event = threading.Event() + + def go(): + while True: + event.wait() + self.decide_all_archives() + time.sleep(interval) + poll = threading.Thread(target=go) + poll.daemon = True + poll.start() + return event + + # TODO cleanup db of objects deleted in all clients + # def cleanup(self): + # db = self.get_db() + # master_deleted = set(db.list_files_with_info(MASTER, {})) + # client_deleted = set(db.list_files_with_info(SLAVE, {})) + # deleted = master_deleted.intersection(client_deleted) + + +def conf(instance, auth_url, auth_token, container, local_root_path): + settings = SyncerSettings(instance=instance, + auth_url=auth_url, + auth_token=auth_token, + container=container, + local_root_path=local_root_path) + master = PithosFileClient(settings) + slave = LocalfsFileClient(settings) + return FileSyncer(settings, master, slave) diff --git a/agkyra/agkyra/syncer/utils.py b/agkyra/agkyra/syncer/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..bbc80cf5fd032954bfa77ad049ffbec72cd518a8 --- /dev/null +++ b/agkyra/agkyra/syncer/utils.py @@ -0,0 +1,37 @@ +import os +import hashlib +import datetime + +BUF_SIZE = 65536 + + +def join_path(dirpath, filename): + if dirpath == ".": + dirpath = "" + return os.path.join(dirpath, filename) + + +def hash_string(s): + return hashlib.sha256(s).hexdigest() + + +def hash_file(filename, block_size=BUF_SIZE): + sha256 = hashlib.sha256() + with open(filename, 'rb') as f: + while True: + data = f.read(BUF_SIZE) + if not data: + break + sha256.update(data) + return sha256.hexdigest() + + +def time_stamp(): + return datetime.datetime.now().strftime("%s.%f") + + +def younger_than(tstamp, seconds): + now = datetime.datetime.now() + ts = datetime.datetime.fromtimestamp(int(float(tstamp))) + delta = now - ts + return delta < datetime.timedelta(seconds=seconds)