Commit 37c88017 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Initial version of syncer backend

parent 25ad520a
from collections import namedtuple
FileStateTuple = namedtuple('FileStateTuple',
['archive', 'path', 'serial', 'info'])
class FileState(FileStateTuple):
__slots__ = ()
def set(self, *args, **kwargs):
return self._replace(*args, **kwargs)
T_DIR = "dir"
T_FILE = "file"
T_UNHANDLED = "unhandled"
class SyncError(Exception):
pass
class BusyError(SyncError):
pass
class ConflictError(SyncError):
pass
class InvalidInput(SyncError):
pass
class HandledError(SyncError):
pass
from functools import wraps
import time
import sqlite3
import json
import logging
from agkyra.syncer import common
logger = logging.getLogger(__name__)
class FileStateDB(object):
def new_serial(self, path):
raise NotImplementedError
def list_files(self, archive):
raise NotImplementedError
def put_state(self, state):
raise NotImplementedError
def get_state(self, archive, path):
raise NotImplementedError
class SqliteFileStateDB(FileStateDB):
def __init__(self, dbname, initialize=False):
self.dbname = dbname
self.db = sqlite3.connect(dbname)
if initialize:
self.init()
def init(self):
logger.warning("Initializing DB '%s'" % self.dbname)
db = self.db
Q = ("create table if not exists "
"archives(archive text, path text, serial integer, "
"info blob, primary key (archive, path))")
db.execute(Q)
Q = ("create table if not exists "
"serials(path text, nextserial bigint, primary key (path))")
db.execute(Q)
Q = ("create table if not exists "
"cachepaths(cachepath text, client text, path text, "
"primary key (cachepath))")
db.execute(Q)
self.commit()
def begin(self):
self.db.execute("begin")
def commit(self):
self.db.commit()
def rollback(self):
self.db.rollback()
def get_cachepath(self, cachepath):
db = self.db
Q = "select * from cachepaths where cachepath = ?"
c = db.execute(Q, (cachepath,))
r = c.fetchone()
if r:
return r
else:
return None
def insert_cachepath(self, cachepath, client, path):
db = self.db
Q = "insert into cachepaths(cachepath, client, path) values (?, ?, ?)"
db.execute(Q, (cachepath, client, path))
def delete_cachepath(self, cachepath):
db = self.db
Q = "delete from cachepaths where cachepath = ?"
db.execute(Q, (cachepath,))
def new_serial(self, path):
db = self.db
Q = ("select nextserial from serials where path = ?")
c = db.execute(Q, (path,))
r = c.fetchone()
if r:
serial = r[0]
Q = "update serials set nextserial = ? where path = ?"
else:
serial = 0
Q = "insert into serials(nextserial, path) values (?, ?)"
db.execute(Q, (serial + 1, path))
return serial
def list_files_with_info(self, archive, info):
Q = ("select path from archives where archive = ? and info = ?"
" order by path")
c = self.db.execute(Q, (archive, info))
fetchone = c.fetchone
while True:
r = fetchone()
if not r:
break
yield r[0]
def list_non_deleted_files(self, archive):
Q = ("select path from archives where archive = ? and info != '{}'"
" order by path")
c = self.db.execute(Q, (archive,))
fetchone = c.fetchone
while True:
r = fetchone()
if not r:
break
yield r[0]
def list_files(self, archive, prefix=None):
Q = "select path from archives where archive = ?"
if prefix is not None:
Q += " and path like ?"
tpl = (archive, prefix + '%')
else:
tpl = (archive,)
Q += " order by path"
c = self.db.execute(Q, tpl)
fetchone = c.fetchone
while True:
r = fetchone()
if not r:
break
yield r[0]
def list_deciding(self, archives, sync):
if len(archives) == 1:
archive = archives[0]
archives = (archive, archive)
archives = tuple(archives)
Q = ("select client.path from archives client, archives sync "
"where client.archive in (?, ?) and sync.archive = ? "
"and client.path = sync.path and client.serial > sync.serial")
c = self.db.execute(Q, archives + (sync,))
fetchone = c.fetchone
while True:
r = fetchone()
if not r:
break
yield r[0]
def put_state(self, state):
Q = ("insert or replace into "
"archives(archive, path, serial, info) "
"values (?, ?, ?, ?)")
args = (state.archive, state.path, state.serial,
json.dumps(state.info))
self.db.execute(Q, args)
def _get_state(self, archive, path):
Q = ("select archive, path, serial, info from archives "
"where archive = ? and path = ?")
c = self.db.execute(Q, (archive, path))
r = c.fetchone()
if not r:
return None
return common.FileState(archive=r[0], path=r[1], serial=r[2],
info=json.loads(r[3]))
def get_state(self, archive, path):
state = self._get_state(archive, path)
if state is None:
state = common.FileState(archive=archive, path=path, serial=-1,
info={})
return state
def transaction(retries=5, retry_wait=1):
def wrap(func):
@wraps(func)
def inner(*args, **kwargs):
obj = args[0]
db = obj.get_db()
attempt = 0
while True:
try:
db.begin()
r = func(*args, **kwargs)
db.commit()
return r
except Exception as e:
db.rollback()
# TODO check conflict
if isinstance(e, sqlite3.OperationalError) and \
"locked" in e.message and attempt < retries:
logger.warning(
"Got DB error '%s'. Retrying transaction." % e)
time.sleep(retry_wait)
attempt += 1
else:
raise e
return inner
return wrap
import logging
logger = logging.getLogger(__name__)
from agkyra.syncer import common
class FileClient(object):
def list_candidate_files(self, archive):
raise NotImplementedError
def start_probing_path(self, path, old_state, ref_state, callback=None):
raise NotImplementedError
def stage_file(self, source_state):
raise NotImplementedError
def prepare_target(self, state):
raise NotImplementedError
def start_pulling_file(self, source_handle, target_state, sync_state,
callback=None):
try:
synced_source_state, synced_target_state = \
self._start(source_handle, target_state, sync_state)
if callback is not None:
callback(synced_source_state, synced_target_state)
except common.SyncError as e:
logger.warning(e)
def _start(self, source_handle, target_state, sync_state):
try:
target_handle = self.prepare_target(target_state)
synced_target_state = target_handle.pull(source_handle, sync_state)
synced_source_state = source_handle.get_synced_state()
return synced_source_state, synced_target_state
finally:
source_handle.unstage_file()
import threading
class HeartBeat(object):
def __init__(self, *args, **kwargs):
self._LOG = {}
self._LOCK = threading.Lock()
def lock(self):
class Lock(object):
def __enter__(this):
self._LOCK.acquire()
return this
def __exit__(this, exctype, value, traceback):
self._LOCK.release()
if value is not None:
raise value
def get(this, key):
return self._LOG.get(key)
def set(this, key, value):
self._LOG[key] = value
def delete(this, key):
self._LOG.pop(key)
return Lock()
import os
import re
import time
import datetime
import psutil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import logging
from agkyra.syncer.file_client import FileClient
from agkyra.syncer import utils, common
from agkyra.syncer.database import transaction
logger = logging.getLogger(__name__)
LOCAL_FILE = 0
LOCAL_EMPTY_DIR = 1
LOCAL_NONEMPTY_DIR = 2
LOCAL_MISSING = 3
LOCAL_SOFTLINK = 4
LOCAL_OTHER = 5
OS_FILE_EXISTS = 17
OS_NOT_A_DIR = 20
OS_NO_FILE_OR_DIR = 2
class DirMissing(BaseException):
pass
def link_file(src, dest):
try:
os.link(src, dest)
except OSError as e:
if e.errno in [OS_FILE_EXISTS, OS_NOT_A_DIR]:
raise common.ConflictError("Cannot link, '%s' exists." % dest)
if e.errno == OS_NO_FILE_OR_DIR:
raise DirMissing()
def make_dirs(path):
try:
os.makedirs(path)
except OSError as e:
if e.errno == OS_FILE_EXISTS and os.path.isdir(path):
return
if e.errno in [OS_FILE_EXISTS, OS_NOT_A_DIR]:
raise common.ConflictError("Cannot make dir '%s'." % path)
raise
psutil_open_files = \
(lambda proc: proc.open_files()) if psutil.version_info[0] >= 2 else \
(lambda proc: proc.get_open_files())
def file_is_open(path):
for proc in psutil.process_iter():
try:
flist = psutil_open_files(proc)
for nt in flist:
if nt.path == path:
return True
except psutil.NoSuchProcess:
pass
return False
def mk_stash_name(filename):
tstamp = datetime.datetime.now().strftime("%s")
return filename + '.' + tstamp + '.local'
def eq_float(f1, f2):
return abs(f1 - f2) < 0.01
def files_equal(f1, f2):
logger.info("Comparing files: '%s', '%s'" % (f1, f2))
st1 = path_status(f1)
st2 = path_status(f2)
if st1 != st2:
return False
if st1 != LOCAL_FILE:
return True
(mtime1, msize1) = stat_file(f1)
(mtime2, msize2) = stat_file(f2)
if msize1 != msize2:
return False
hash1 = utils.hash_file(f1)
hash2 = utils.hash_file(f2)
return hash1 == hash2
def info_is_unhandled(info):
return info != {} and info[LOCALFS_TYPE] == common.T_UNHANDLED
def local_path_changes(path, state):
live_info = get_live_info(path)
info = state.info
if is_info_eq(live_info, info):
return None
return live_info
def get_live_info(path):
if path is None:
return {}
status = path_status(path)
if status == LOCAL_MISSING:
return {}
if status in [LOCAL_SOFTLINK, LOCAL_OTHER]:
return {LOCALFS_TYPE: common.T_UNHANDLED}
if status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
return {LOCALFS_TYPE: common.T_DIR}
stats = stat_file(path)
if stats is None:
return {}
(st_mtime, st_size) = stats
live_info = {LOCALFS_MTIME: st_mtime,
LOCALFS_SIZE: st_size,
LOCALFS_TYPE: common.T_FILE,
}
return live_info
def stat_file(filename):
try:
file_stats = os.lstat(filename)
except OSError as e:
if e.errno == OS_NO_FILE_OR_DIR:
return None
raise
return (file_stats.st_mtime, file_stats.st_size)
LOCALFS_TYPE = "localfs_type"
LOCALFS_MTIME = "localfs_mtime"
LOCALFS_SIZE = "localfs_size"
def status_of_info(info):
if info == {}:
return LOCAL_MISSING
if info[LOCALFS_TYPE] == common.T_DIR:
return LOCAL_EMPTY_DIR
if info[LOCALFS_TYPE] == common.T_UNHANDLED:
return LOCAL_OTHER # shouldn't happen
return LOCAL_FILE
def path_status(path):
if os.path.islink(path):
return LOCAL_SOFTLINK
try:
contents = os.listdir(path)
return LOCAL_NONEMPTY_DIR if contents else LOCAL_EMPTY_DIR
except OSError as e:
if e.errno == OS_NOT_A_DIR:
if os.path.isfile(path):
return LOCAL_FILE
else:
return LOCAL_OTHER
if e.errno == OS_NO_FILE_OR_DIR:
return LOCAL_MISSING
def old_path_status(path):
try:
contents = os.listdir(path)
return LOCAL_NONEMPTY_DIR if contents else LOCAL_EMPTY_DIR
except OSError as e:
if e.errno == OS_NOT_A_DIR:
return LOCAL_FILE
if e.errno == OS_NO_FILE_OR_DIR:
return LOCAL_MISSING
def is_info_eq(info1, info2):
if {} in [info1, info2]:
return info1 == info2
if info1[LOCALFS_TYPE] != info2[LOCALFS_TYPE]:
return False
if info1[LOCALFS_TYPE] == common.T_UNHANDLED:
return False
if info1[LOCALFS_TYPE] == common.T_DIR:
return True
return eq_float(info1[LOCALFS_MTIME], info2[LOCALFS_MTIME]) \
and info1[LOCALFS_SIZE] == info2[LOCALFS_SIZE]
class LocalfsTargetHandle(object):
def __init__(self, settings, target_state):
self.NAME = "LocalfsTargetHandle"
self.rootpath = settings.local_root_path
self.cache_hide_name = settings.cache_hide_name
self.cache_hide_path = settings.cache_hide_path
self.cache_path = settings.cache_path
self.get_db = settings.get_db
self.target_state = target_state
self.path = target_state.path
self.local_path = utils.join_path(self.rootpath, self.path)
self.hidden_filename = None
self.hidden_path = None
def get_path_in_cache(self, path):
return utils.join_path(self.cache_path, path)
@transaction()
def register_hidden_name(self, filename):
db = self.get_db()
f = utils.hash_string(filename)
hide_filename = utils.join_path(self.cache_hide_name, f)
self.hidden_filename = hide_filename
if db.get_cachepath(hide_filename):
return False
db.insert_cachepath(hide_filename, self.NAME, filename)
return True
@transaction()
def unregister_hidden_name(self, hidden_filename):
db = self.get_db()
db.delete_cachepath(hidden_filename)
self.hidden_filename = None
def hide_file(self):
local_filename = self.local_path
if file_is_open(local_filename):
raise common.BusyError("File '%s' is open. Aborting."
% local_filename)
new_registered = self.register_hidden_name(self.path)
hidden_filename = self.hidden_filename
hidden_path = self.get_path_in_cache(hidden_filename)
self.hidden_path = hidden_path
if not new_registered:
logger.warning("Hiding already registered for file %s" %
(self.path,))
if os.path.lexists(hidden_path):
logger.warning("File %s already hidden at %s" %
(self.path, hidden_path))
return
try:
os.rename(local_filename, hidden_path)
logger.info("Hiding file '%s' to '%s'" %
(local_filename, hidden_path))
except OSError as e:
if e.errno == OS_NO_FILE_OR_DIR:
self.unregister_hidden_name(hidden_filename)
logger.info("File '%s' does not exist" % local_filename)
return
else:
raise e
if file_is_open(hidden_path):
os.rename(hidden_path, local_filename)
self.unregister_hidden_name(hidden_filename)
raise common.BusyError("File '%s' is open. Undoing." % hidden_path)
if path_status(hidden_path) == LOCAL_NONEMPTY_DIR:
os.rename(hidden_path, local_filename)
self.unregister_hidden_name(hidden_filename)
raise common.ConflictError("'%s' is non-empty" % local_filename)
def apply(self, fetched_file, fetched_live_info, sync_state):
local_status = path_status(self.local_path)
fetched_status = status_of_info(fetched_live_info)
if local_status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR] \
and fetched_status == LOCAL_EMPTY_DIR:
return
if local_status == LOCAL_MISSING and fetched_status == LOCAL_MISSING:
return
if local_status == LOCAL_NONEMPTY_DIR:
raise common.ConflictError("'%s' is non-empty" % self.local_path)
self.prepare(fetched_file, sync_state)
self.finalize(fetched_file, fetched_live_info)
self.cleanup(self.hidden_path)
self.unregister_hidden_name(self.hidden_filename)
def prepare(self, fetched_file, sync_state):
self.hide_file()
info_changed = local_path_changes(self.hidden_path, sync_state)
print 'info changed', info_changed
if info_changed is not None and info_changed != {}:
if not files_equal(self.hidden_path, fetched_file):
self.stash_file()
def stash_file(self):