Commit 4ab79b9d authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Rename path to objname

parent ac8763ca
......@@ -2,7 +2,7 @@ from collections import namedtuple
import threading
FileStateTuple = namedtuple('FileStateTuple',
['archive', 'path', 'serial', 'info'])
['archive', 'objname', 'serial', 'info'])
class FileState(FileStateTuple):
......
......@@ -12,7 +12,7 @@ logger = logging.getLogger(__name__)
class FileStateDB(object):
def new_serial(self, path):
def new_serial(self, objname):
raise NotImplementedError
def list_files(self, archive):
......@@ -21,7 +21,7 @@ class FileStateDB(object):
def put_state(self, state):
raise NotImplementedError
def get_state(self, archive, path):
def get_state(self, archive, objname):
raise NotImplementedError
......@@ -38,17 +38,17 @@ class SqliteFileStateDB(FileStateDB):
db = self.db
Q = ("create table if not exists "
"archives(archive text, path text, serial integer, "
"info blob, primary key (archive, path))")
"archives(archive text, objname text, serial integer, "
"info blob, primary key (archive, objname))")
db.execute(Q)
Q = ("create table if not exists "
"serials(path text, nextserial bigint, primary key (path))")
"serials(objname text, nextserial bigint, primary key (objname))")
db.execute(Q)
Q = ("create table if not exists "
"cachepaths(cachepath text, client text, path text, "
"primary key (cachepath))")
"cachenames(cachename text, client text, objname text, "
"primary key (cachename))")
db.execute(Q)
self.commit()
......@@ -62,43 +62,44 @@ class SqliteFileStateDB(FileStateDB):
def rollback(self):
self.db.rollback()
def get_cachepath(self, cachepath):
def get_cachename(self, cachename):
db = self.db
Q = "select * from cachepaths where cachepath = ?"
c = db.execute(Q, (cachepath,))
Q = "select * from cachenames where cachename = ?"
c = db.execute(Q, (cachename,))
r = c.fetchone()
if r:
return r
else:
return None
def insert_cachepath(self, cachepath, client, path):
def insert_cachename(self, cachename, client, objname):
db = self.db
Q = "insert into cachepaths(cachepath, client, path) values (?, ?, ?)"
db.execute(Q, (cachepath, client, path))
Q = ("insert into cachenames(cachename, client, objname) "
"values (?, ?, ?)")
db.execute(Q, (cachename, client, objname))
def delete_cachepath(self, cachepath):
def delete_cachename(self, cachename):
db = self.db
Q = "delete from cachepaths where cachepath = ?"
db.execute(Q, (cachepath,))
Q = "delete from cachenames where cachename = ?"
db.execute(Q, (cachename,))
def new_serial(self, path):
def new_serial(self, objname):
db = self.db
Q = ("select nextserial from serials where path = ?")
c = db.execute(Q, (path,))
Q = ("select nextserial from serials where objname = ?")
c = db.execute(Q, (objname,))
r = c.fetchone()
if r:
serial = r[0]
Q = "update serials set nextserial = ? where path = ?"
Q = "update serials set nextserial = ? where objname = ?"
else:
serial = 0
Q = "insert into serials(nextserial, path) values (?, ?)"
db.execute(Q, (serial + 1, path))
Q = "insert into serials(nextserial, objname) values (?, ?)"
db.execute(Q, (serial + 1, objname))
return serial
def list_files_with_info(self, archive, info):
Q = ("select path from archives where archive = ? and info = ?"
" order by path")
Q = ("select objname from archives where archive = ? and info = ?"
" order by objname")
c = self.db.execute(Q, (archive, info))
fetchone = c.fetchone
while True:
......@@ -108,8 +109,8 @@ class SqliteFileStateDB(FileStateDB):
yield r[0]
def list_non_deleted_files(self, archive):
Q = ("select path from archives where archive = ? and info != '{}'"
" order by path")
Q = ("select objname from archives where archive = ? and info != '{}'"
" order by objname")
c = self.db.execute(Q, (archive,))
fetchone = c.fetchone
while True:
......@@ -119,14 +120,14 @@ class SqliteFileStateDB(FileStateDB):
yield r[0]
def list_files(self, archive, prefix=None):
Q = "select path from archives where archive = ?"
Q = "select objname from archives where archive = ?"
if prefix is not None:
Q += " and path like ?"
Q += " and objname like ?"
tpl = (archive, prefix + '%')
else:
tpl = (archive,)
Q += " order by path"
Q += " order by objname"
c = self.db.execute(Q, tpl)
fetchone = c.fetchone
while True:
......@@ -140,9 +141,10 @@ class SqliteFileStateDB(FileStateDB):
archive = archives[0]
archives = (archive, archive)
archives = tuple(archives)
Q = ("select client.path from archives client, archives sync "
Q = ("select client.objname from archives client, archives sync "
"where client.archive in (?, ?) and sync.archive = ? "
"and client.path = sync.path and client.serial > sync.serial")
"and client.objname = sync.objname "
"and client.serial > sync.serial")
c = self.db.execute(Q, archives + (sync,))
fetchone = c.fetchone
while True:
......@@ -153,28 +155,28 @@ class SqliteFileStateDB(FileStateDB):
def put_state(self, state):
Q = ("insert or replace into "
"archives(archive, path, serial, info) "
"archives(archive, objname, serial, info) "
"values (?, ?, ?, ?)")
args = (state.archive, state.path, state.serial,
args = (state.archive, state.objname, state.serial,
json.dumps(state.info))
self.db.execute(Q, args)
def _get_state(self, archive, path):
Q = ("select archive, path, serial, info from archives "
"where archive = ? and path = ?")
c = self.db.execute(Q, (archive, path))
def _get_state(self, archive, objname):
Q = ("select archive, objname, serial, info from archives "
"where archive = ? and objname = ?")
c = self.db.execute(Q, (archive, objname))
r = c.fetchone()
if not r:
return None
return common.FileState(archive=r[0], path=r[1], serial=r[2],
return common.FileState(archive=r[0], objname=r[1], serial=r[2],
info=json.loads(r[3]))
def get_state(self, archive, path):
state = self._get_state(archive, path)
def get_state(self, archive, objname):
state = self._get_state(archive, objname)
if state is None:
state = common.FileState(archive=archive, path=path, serial=-1,
info={})
state = common.FileState(
archive=archive, objname=objname, serial=-1, info={})
return state
......
......@@ -9,7 +9,7 @@ class FileClient(object):
def list_candidate_files(self, archive):
raise NotImplementedError
def start_probing_path(self, path, old_state, ref_state, callback=None):
def start_probing_file(self, objname, old_state, ref_state, callback=None):
raise NotImplementedError
def stage_file(self, source_state):
......
......@@ -200,13 +200,13 @@ class LocalfsTargetHandle(object):
self.cache_path = settings.cache_path
self.get_db = settings.get_db
self.target_state = target_state
self.path = target_state.path
self.local_path = utils.join_path(self.rootpath, self.path)
self.objname = target_state.objname
self.local_path = utils.join_path(self.rootpath, self.objname)
self.hidden_filename = None
self.hidden_path = None
def get_path_in_cache(self, path):
return utils.join_path(self.cache_path, path)
def get_path_in_cache(self, name):
return utils.join_path(self.cache_path, name)
@transaction()
def register_hidden_name(self, filename):
......@@ -214,15 +214,15 @@ class LocalfsTargetHandle(object):
f = utils.hash_string(filename)
hide_filename = utils.join_path(self.cache_hide_name, f)
self.hidden_filename = hide_filename
if db.get_cachepath(hide_filename):
if db.get_cachename(hide_filename):
return False
db.insert_cachepath(hide_filename, self.NAME, filename)
db.insert_cachename(hide_filename, self.NAME, filename)
return True
@transaction()
def unregister_hidden_name(self, hidden_filename):
db = self.get_db()
db.delete_cachepath(hidden_filename)
db.delete_cachename(hidden_filename)
self.hidden_filename = None
def hide_file(self):
......@@ -231,17 +231,17 @@ class LocalfsTargetHandle(object):
raise common.BusyError("File '%s' is open. Aborting."
% local_filename)
new_registered = self.register_hidden_name(self.path)
new_registered = self.register_hidden_name(self.objname)
hidden_filename = self.hidden_filename
hidden_path = self.get_path_in_cache(hidden_filename)
self.hidden_path = hidden_path
if not new_registered:
logger.warning("Hiding already registered for file %s" %
(self.path,))
(self.objname,))
if os.path.lexists(hidden_path):
logger.warning("File %s already hidden at %s" %
(self.path, hidden_path))
(self.objname, hidden_path))
return
try:
os.rename(local_filename, hidden_path)
......@@ -288,10 +288,11 @@ class LocalfsTargetHandle(object):
self.stash_file()
def stash_file(self):
stash_filename = mk_stash_name(self.local_path)
stash_name = mk_stash_name(self.objname)
stash_path = utils.join_path(self.rootpath, stash_name)
logger.warning("Stashing file '%s' to '%s'" %
(self.local_path, stash_filename))
os.rename(self.hidden_path, stash_filename)
(self.objname, stash_name))
os.rename(self.hidden_path, stash_path)
def finalize(self, filename, live_info):
logger.info("Finalizing file '%s'" % filename)
......@@ -333,19 +334,19 @@ class LocalfsSourceHandle(object):
f = utils.hash_string(filename)
stage_filename = utils.join_path(self.cache_stage_name, f)
self.stage_filename = stage_filename
if db.get_cachepath(stage_filename):
if db.get_cachename(stage_filename):
return False
db.insert_cachepath(stage_filename, self.NAME, filename)
db.insert_cachename(stage_filename, self.NAME, filename)
return True
@transaction()
def unregister_stage_name(self, stage_filename):
db = self.get_db()
db.delete_cachepath(stage_filename)
db.delete_cachename(stage_filename)
self.stage_filename = None
def get_path_in_cache(self, path):
return utils.join_path(self.cache_path, path)
def get_path_in_cache(self, name):
return utils.join_path(self.cache_path, name)
def lock_file(self, local_filename):
if file_is_open(local_filename):
......@@ -358,10 +359,10 @@ class LocalfsSourceHandle(object):
if not new_registered:
logger.warning("Staging already registered for file %s" %
(self.path,))
(self.objname,))
if os.path.lexists(stage_path):
logger.warning("File %s already staged at %s" %
(self.path, stage_path))
(self.objname, stage_path))
return
try:
os.rename(local_filename, stage_path)
......@@ -379,7 +380,7 @@ class LocalfsSourceHandle(object):
os.rename(stage_path, local_filename)
self.unregister_hidden_name(stage_filename)
raise common.ConflictError("'%s' is non-empty" % local_filename)
logger.info("Staging file '%s' to '%s'" % (self.path, stage_path))
logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path))
def check_stable(self, interval=1, times=5):
for i in range(times):
......@@ -397,9 +398,8 @@ class LocalfsSourceHandle(object):
self.cache_path = settings.cache_path
self.get_db = settings.get_db
self.source_state = source_state
path = source_state.path
self.path = path
local_filename = utils.join_path(self.rootpath, path)
self.objname = source_state.objname
local_filename = utils.join_path(self.rootpath, self.objname)
self.local_path = local_filename
self.isdir = self.info_is_dir()
self.stage_filename = None
......@@ -412,16 +412,17 @@ class LocalfsSourceHandle(object):
def check_log(self):
with self.heartbeat.lock() as hb:
prev_log = hb.get(self.path)
prev_log = hb.get(self.objname)
if prev_log is not None:
actionstate, ts = prev_log
if actionstate != self.NAME or \
utils.younger_than(ts, 10):
raise common.HandledError("Action mismatch in %s: %s %s" %
(self.NAME, self.path, prev_log))
raise common.HandledError(
"Action mismatch in %s: %s %s" %
(self.NAME, self.objname, prev_log))
logger.warning("Ignoring previous run in %s: %s %s" %
(self.NAME, self.path, prev_log))
hb.set(self.path, (self.NAME, utils.time_stamp()))
(self.NAME, self.objname, prev_log))
hb.set(self.objname, (self.NAME, utils.time_stamp()))
def get_synced_state(self):
return self.source_state
......@@ -452,7 +453,7 @@ class LocalfsSourceHandle(object):
def clear_log(self):
with self.heartbeat.lock() as hb:
hb.delete(self.path)
hb.delete(self.objname)
def do_unstage(self):
if self.stage_filename is None:
......@@ -500,17 +501,17 @@ class LocalfsFileClient(FileClient):
logger.info("Candidates: %s" % candidates)
return candidates
def _local_path_changes(self, path, state):
local_path = utils.join_path(self.ROOTPATH, path)
def _local_path_changes(self, name, state):
local_path = utils.join_path(self.ROOTPATH, name)
return local_path_changes(local_path, state)
def start_probing_path(self, path, old_state, ref_state,
def start_probing_file(self, objname, old_state, ref_state,
assumed_info=None,
callback=None):
if old_state.serial != ref_state.serial:
logger.warning("Serial mismatch in probing path '%s'" % path)
logger.warning("Serial mismatch in probing path '%s'" % objname)
return
live_info = (self._local_path_changes(path, old_state)
live_info = (self._local_path_changes(objname, old_state)
if assumed_info is None else assumed_info)
if live_info is None:
return
......
......@@ -14,16 +14,16 @@ from agkyra.syncer.database import transaction
logger = logging.getLogger(__name__)
def heartbeat_event(settings, heartbeat, path):
def heartbeat_event(settings, heartbeat, objname):
event = threading.Event()
max_interval = settings.action_max_wait / 2.0
def set_log():
with heartbeat.lock() as hb:
client, prev_tstamp = hb.get(path)
client, prev_tstamp = hb.get(objname)
tpl = (client, utils.time_stamp())
hb.set(path, tpl)
logger.info("HEARTBEAT %s %s %s" % ((path,) + tpl))
hb.set(objname, tpl)
logger.info("HEARTBEAT '%s' %s %s" % ((objname,) + tpl))
def go():
interval = 0.2
......@@ -42,10 +42,10 @@ def give_heartbeat(f):
@wraps(f)
def inner(*args, **kwargs):
obj = args[0]
path = obj.path
objname = obj.objname
heartbeat = obj.heartbeat
settings = obj.settings
event = heartbeat_event(settings, heartbeat, path)
event = heartbeat_event(settings, heartbeat, objname)
try:
return f(*args, **kwargs)
finally:
......@@ -76,22 +76,23 @@ class PithosSourceHandle(object):
self.cache_path = settings.cache_path
self.get_db = settings.get_db
self.source_state = source_state
self.path = source_state.path
self.objname = source_state.objname
self.heartbeat = settings.heartbeat
self.check_log()
def check_log(self):
with self.heartbeat.lock() as hb:
prev_log = hb.get(self.path)
prev_log = hb.get(self.objname)
if prev_log is not None:
actionstate, ts = prev_log
if actionstate != self.NAME or \
utils.younger_than(ts, self.settings.action_max_wait):
raise common.HandledError("Action mismatch in %s: %s %s" %
(self.NAME, self.path, prev_log))
raise common.HandledError(
"Action mismatch in %s: %s %s" %
(self.NAME, self.objname, prev_log))
logger.warning("Ignoring previous run in %s: %s %s" %
(self.NAME, self.path, prev_log))
hb.set(self.path, (self.NAME, utils.time_stamp()))
(self.NAME, self.objname, prev_log))
hb.set(self.objname, (self.NAME, utils.time_stamp()))
@transaction()
def register_fetch_name(self, filename):
......@@ -100,20 +101,20 @@ class PithosSourceHandle(object):
datetime.datetime.now().strftime("%s")
fetch_name = utils.join_path(self.cache_fetch_name, f)
self.fetch_name = fetch_name
db.insert_cachepath(fetch_name, self.NAME, filename)
db.insert_cachename(fetch_name, self.NAME, filename)
return utils.join_path(self.cache_path, fetch_name)
@handle_client_errors
@give_heartbeat
def send_file(self, sync_state):
fetched_file = self.register_fetch_name(self.path)
fetched_file = self.register_fetch_name(self.objname)
headers = dict()
with open(fetched_file, mode='wb+') as fil:
try:
logger.info("Downloading path: '%s', to: '%s'" %
(self.path, fetched_file))
logger.info("Downloading object: '%s', to: '%s'" %
(self.objname, fetched_file))
self.endpoint.download_object(
self.path,
self.objname,
fil,
headers=headers)
except ClientError as e:
......@@ -129,10 +130,12 @@ class PithosSourceHandle(object):
"pithos_type": actual_type}
self.source_state = self.source_state.set(info=actual_info)
if actual_info == {}:
logger.info("Downloading path: '%s', object is gone." % self.path)
logger.info("Downloading object: '%s', object is gone."
% self.objname)
os.unlink(fetched_file)
elif actual_info["pithos_type"] == common.T_DIR:
logger.info("Downloading path: '%s', object is dir." % self.path)
logger.info("Downloading object: '%s', object is dir."
% self.objname)
os.unlink(fetched_file)
os.mkdir(fetched_file)
return fetched_file
......@@ -145,7 +148,7 @@ class PithosSourceHandle(object):
def clear_log(self):
with self.heartbeat.lock() as hb:
hb.delete(self.path)
hb.delete(self.objname)
STAGED_FOR_DELETION_SUFFIX = ".pithos_staged_for_deletion"
......@@ -158,34 +161,34 @@ class PithosTargetHandle(object):
self.settings = settings
self.endpoint = settings.endpoint
self.target_state = target_state
self.target_file = target_state.path
self.path = target_state.path
self.target_file = target_state.objname
self.objname = target_state.objname
self.heartbeat = settings.heartbeat
def mk_del_name(self, name, etag):
return "%s.%s%s" % (name, etag, STAGED_FOR_DELETION_SUFFIX)
def safe_object_del(self, path, etag):
def safe_object_del(self, objname, etag):
container = self.endpoint.container
del_name = self.mk_del_name(path, etag)
del_name = self.mk_del_name(objname, etag)
logger.info("Moving temporarily to '%s'" % del_name)
try:
self.endpoint.object_move(
path,
objname,
destination='/%s/%s' % (container, del_name),
if_etag_match=etag)
except ClientError as e:
if e.status == 404:
logger.warning("'%s' not found; already moved?" % path)
logger.warning("'%s' not found; already moved?" % objname)
else:
raise
finally:
self.endpoint.del_object(del_name)
logger.info("Deleted tmp '%s'" % del_name)
def directory_put(self, path, etag):
def directory_put(self, objname, etag):
r = self.endpoint.object_put(
path,
objname,
content_type='application/directory',
content_length=0,
if_etag_match=etag)
......@@ -278,26 +281,26 @@ class PithosFileClient(FileClient):
last_modified = last_tstamp.isoformat()
candidates = self.list_candidate_files(
last_modified=last_modified)
for (path, info) in candidates:
callback(self.NAME, path, assumed_info=info)
for (objname, info) in candidates:
callback(self.NAME, objname, assumed_info=info)
time.sleep(interval)
poll = PollPithos()
poll.daemon = True
poll.start()
def get_object_from_cache(self, path):
def get_object_from_cache(self, objname):
if self.objects is None:
self.objects = self.endpoint.list_objects()
objs = [o for o in self.objects if o["name"] == path]
objs = [o for o in self.objects if o["name"] == objname]
try:
return objs[0]
except IndexError:
return None
def get_object(self, path):
def get_object(self, objname):
try:
return self.endpoint.get_object_info(path)
return self.endpoint.get_object_info(objname)
except ClientError as e:
if e.status == 404:
return None
......@@ -314,16 +317,16 @@ class PithosFileClient(FileClient):
PITHOS_TYPE: p_type,
}
def start_probing_path(self, path, old_state, ref_state,
def start_probing_file(self, objname, old_state, ref_state,
assumed_info=None,
callback=None):
if exclude_pattern.match(path):
logger.warning("Ignoring probe archive: %s, path: '%s'" %
(old_state.archive, path))
if exclude_pattern.match(objname):
logger.warning("Ignoring probe archive: %s, object: '%s'" %
(old_state.archive, objname))
return
info = old_state.info
if assumed_info is None:
obj = self.get_object(path)
obj = self.get_object(objname)
live_info = self.get_object_live_info(obj)
else:
live_info = assumed_info
......
......@@ -55,8 +55,8 @@ class FileSyncer(object):
def start_notifiers(self):