diff --git a/agkyra/agkyra/syncer/common.py b/agkyra/agkyra/syncer/common.py index 0248079b9527acb5908714658a4383f550e80f53..cea5a48c09f0a1b33872dbbdc007baa5e02de7e6 100644 --- a/agkyra/agkyra/syncer/common.py +++ b/agkyra/agkyra/syncer/common.py @@ -2,7 +2,7 @@ from collections import namedtuple import threading FileStateTuple = namedtuple('FileStateTuple', - ['archive', 'path', 'serial', 'info']) + ['archive', 'objname', 'serial', 'info']) class FileState(FileStateTuple): diff --git a/agkyra/agkyra/syncer/database.py b/agkyra/agkyra/syncer/database.py index 468e2e6df5e0503479f3fc04ae9defdcb9001783..7f798f4da81b559293370cea17dc916af170ffaa 100644 --- a/agkyra/agkyra/syncer/database.py +++ b/agkyra/agkyra/syncer/database.py @@ -12,7 +12,7 @@ logger = logging.getLogger(__name__) class FileStateDB(object): - def new_serial(self, path): + def new_serial(self, objname): raise NotImplementedError def list_files(self, archive): @@ -21,7 +21,7 @@ class FileStateDB(object): def put_state(self, state): raise NotImplementedError - def get_state(self, archive, path): + def get_state(self, archive, objname): raise NotImplementedError @@ -38,17 +38,17 @@ class SqliteFileStateDB(FileStateDB): db = self.db Q = ("create table if not exists " - "archives(archive text, path text, serial integer, " - "info blob, primary key (archive, path))") + "archives(archive text, objname text, serial integer, " + "info blob, primary key (archive, objname))") db.execute(Q) Q = ("create table if not exists " - "serials(path text, nextserial bigint, primary key (path))") + "serials(objname text, nextserial bigint, primary key (objname))") db.execute(Q) Q = ("create table if not exists " - "cachepaths(cachepath text, client text, path text, " - "primary key (cachepath))") + "cachenames(cachename text, client text, objname text, " + "primary key (cachename))") db.execute(Q) self.commit() @@ -62,43 +62,44 @@ class SqliteFileStateDB(FileStateDB): def rollback(self): self.db.rollback() - def get_cachepath(self, cachepath): + def get_cachename(self, cachename): db = self.db - Q = "select * from cachepaths where cachepath = ?" - c = db.execute(Q, (cachepath,)) + Q = "select * from cachenames where cachename = ?" + c = db.execute(Q, (cachename,)) r = c.fetchone() if r: return r else: return None - def insert_cachepath(self, cachepath, client, path): + def insert_cachename(self, cachename, client, objname): db = self.db - Q = "insert into cachepaths(cachepath, client, path) values (?, ?, ?)" - db.execute(Q, (cachepath, client, path)) + Q = ("insert into cachenames(cachename, client, objname) " + "values (?, ?, ?)") + db.execute(Q, (cachename, client, objname)) - def delete_cachepath(self, cachepath): + def delete_cachename(self, cachename): db = self.db - Q = "delete from cachepaths where cachepath = ?" - db.execute(Q, (cachepath,)) + Q = "delete from cachenames where cachename = ?" + db.execute(Q, (cachename,)) - def new_serial(self, path): + def new_serial(self, objname): db = self.db - Q = ("select nextserial from serials where path = ?") - c = db.execute(Q, (path,)) + Q = ("select nextserial from serials where objname = ?") + c = db.execute(Q, (objname,)) r = c.fetchone() if r: serial = r[0] - Q = "update serials set nextserial = ? where path = ?" + Q = "update serials set nextserial = ? where objname = ?" else: serial = 0 - Q = "insert into serials(nextserial, path) values (?, ?)" - db.execute(Q, (serial + 1, path)) + Q = "insert into serials(nextserial, objname) values (?, ?)" + db.execute(Q, (serial + 1, objname)) return serial def list_files_with_info(self, archive, info): - Q = ("select path from archives where archive = ? and info = ?" - " order by path") + Q = ("select objname from archives where archive = ? and info = ?" + " order by objname") c = self.db.execute(Q, (archive, info)) fetchone = c.fetchone while True: @@ -108,8 +109,8 @@ class SqliteFileStateDB(FileStateDB): yield r[0] def list_non_deleted_files(self, archive): - Q = ("select path from archives where archive = ? and info != '{}'" - " order by path") + Q = ("select objname from archives where archive = ? and info != '{}'" + " order by objname") c = self.db.execute(Q, (archive,)) fetchone = c.fetchone while True: @@ -119,14 +120,14 @@ class SqliteFileStateDB(FileStateDB): yield r[0] def list_files(self, archive, prefix=None): - Q = "select path from archives where archive = ?" + Q = "select objname from archives where archive = ?" if prefix is not None: - Q += " and path like ?" + Q += " and objname like ?" tpl = (archive, prefix + '%') else: tpl = (archive,) - Q += " order by path" + Q += " order by objname" c = self.db.execute(Q, tpl) fetchone = c.fetchone while True: @@ -140,9 +141,10 @@ class SqliteFileStateDB(FileStateDB): archive = archives[0] archives = (archive, archive) archives = tuple(archives) - Q = ("select client.path from archives client, archives sync " + Q = ("select client.objname from archives client, archives sync " "where client.archive in (?, ?) and sync.archive = ? " - "and client.path = sync.path and client.serial > sync.serial") + "and client.objname = sync.objname " + "and client.serial > sync.serial") c = self.db.execute(Q, archives + (sync,)) fetchone = c.fetchone while True: @@ -153,28 +155,28 @@ class SqliteFileStateDB(FileStateDB): def put_state(self, state): Q = ("insert or replace into " - "archives(archive, path, serial, info) " + "archives(archive, objname, serial, info) " "values (?, ?, ?, ?)") - args = (state.archive, state.path, state.serial, + args = (state.archive, state.objname, state.serial, json.dumps(state.info)) self.db.execute(Q, args) - def _get_state(self, archive, path): - Q = ("select archive, path, serial, info from archives " - "where archive = ? and path = ?") - c = self.db.execute(Q, (archive, path)) + def _get_state(self, archive, objname): + Q = ("select archive, objname, serial, info from archives " + "where archive = ? and objname = ?") + c = self.db.execute(Q, (archive, objname)) r = c.fetchone() if not r: return None - return common.FileState(archive=r[0], path=r[1], serial=r[2], + return common.FileState(archive=r[0], objname=r[1], serial=r[2], info=json.loads(r[3])) - def get_state(self, archive, path): - state = self._get_state(archive, path) + def get_state(self, archive, objname): + state = self._get_state(archive, objname) if state is None: - state = common.FileState(archive=archive, path=path, serial=-1, - info={}) + state = common.FileState( + archive=archive, objname=objname, serial=-1, info={}) return state diff --git a/agkyra/agkyra/syncer/file_client.py b/agkyra/agkyra/syncer/file_client.py index 60156da43cb658c89dd2ef205ab63b14cc10371a..52356332ac4c5ef809c8a8f6980144d3d61cd6c1 100644 --- a/agkyra/agkyra/syncer/file_client.py +++ b/agkyra/agkyra/syncer/file_client.py @@ -9,7 +9,7 @@ class FileClient(object): def list_candidate_files(self, archive): raise NotImplementedError - def start_probing_path(self, path, old_state, ref_state, callback=None): + def start_probing_file(self, objname, old_state, ref_state, callback=None): raise NotImplementedError def stage_file(self, source_state): diff --git a/agkyra/agkyra/syncer/localfs_client.py b/agkyra/agkyra/syncer/localfs_client.py index f5f95ff6b1ee51523ab1dbf2d1df2f051c927159..55f8129bd7a78255e2be0e1a782c1b70c2e694f9 100644 --- a/agkyra/agkyra/syncer/localfs_client.py +++ b/agkyra/agkyra/syncer/localfs_client.py @@ -200,13 +200,13 @@ class LocalfsTargetHandle(object): self.cache_path = settings.cache_path self.get_db = settings.get_db self.target_state = target_state - self.path = target_state.path - self.local_path = utils.join_path(self.rootpath, self.path) + self.objname = target_state.objname + self.local_path = utils.join_path(self.rootpath, self.objname) self.hidden_filename = None self.hidden_path = None - def get_path_in_cache(self, path): - return utils.join_path(self.cache_path, path) + def get_path_in_cache(self, name): + return utils.join_path(self.cache_path, name) @transaction() def register_hidden_name(self, filename): @@ -214,15 +214,15 @@ class LocalfsTargetHandle(object): f = utils.hash_string(filename) hide_filename = utils.join_path(self.cache_hide_name, f) self.hidden_filename = hide_filename - if db.get_cachepath(hide_filename): + if db.get_cachename(hide_filename): return False - db.insert_cachepath(hide_filename, self.NAME, filename) + db.insert_cachename(hide_filename, self.NAME, filename) return True @transaction() def unregister_hidden_name(self, hidden_filename): db = self.get_db() - db.delete_cachepath(hidden_filename) + db.delete_cachename(hidden_filename) self.hidden_filename = None def hide_file(self): @@ -231,17 +231,17 @@ class LocalfsTargetHandle(object): raise common.BusyError("File '%s' is open. Aborting." % local_filename) - new_registered = self.register_hidden_name(self.path) + new_registered = self.register_hidden_name(self.objname) hidden_filename = self.hidden_filename hidden_path = self.get_path_in_cache(hidden_filename) self.hidden_path = hidden_path if not new_registered: logger.warning("Hiding already registered for file %s" % - (self.path,)) + (self.objname,)) if os.path.lexists(hidden_path): logger.warning("File %s already hidden at %s" % - (self.path, hidden_path)) + (self.objname, hidden_path)) return try: os.rename(local_filename, hidden_path) @@ -288,10 +288,11 @@ class LocalfsTargetHandle(object): self.stash_file() def stash_file(self): - stash_filename = mk_stash_name(self.local_path) + stash_name = mk_stash_name(self.objname) + stash_path = utils.join_path(self.rootpath, stash_name) logger.warning("Stashing file '%s' to '%s'" % - (self.local_path, stash_filename)) - os.rename(self.hidden_path, stash_filename) + (self.objname, stash_name)) + os.rename(self.hidden_path, stash_path) def finalize(self, filename, live_info): logger.info("Finalizing file '%s'" % filename) @@ -333,19 +334,19 @@ class LocalfsSourceHandle(object): f = utils.hash_string(filename) stage_filename = utils.join_path(self.cache_stage_name, f) self.stage_filename = stage_filename - if db.get_cachepath(stage_filename): + if db.get_cachename(stage_filename): return False - db.insert_cachepath(stage_filename, self.NAME, filename) + db.insert_cachename(stage_filename, self.NAME, filename) return True @transaction() def unregister_stage_name(self, stage_filename): db = self.get_db() - db.delete_cachepath(stage_filename) + db.delete_cachename(stage_filename) self.stage_filename = None - def get_path_in_cache(self, path): - return utils.join_path(self.cache_path, path) + def get_path_in_cache(self, name): + return utils.join_path(self.cache_path, name) def lock_file(self, local_filename): if file_is_open(local_filename): @@ -358,10 +359,10 @@ class LocalfsSourceHandle(object): if not new_registered: logger.warning("Staging already registered for file %s" % - (self.path,)) + (self.objname,)) if os.path.lexists(stage_path): logger.warning("File %s already staged at %s" % - (self.path, stage_path)) + (self.objname, stage_path)) return try: os.rename(local_filename, stage_path) @@ -379,7 +380,7 @@ class LocalfsSourceHandle(object): os.rename(stage_path, local_filename) self.unregister_hidden_name(stage_filename) raise common.ConflictError("'%s' is non-empty" % local_filename) - logger.info("Staging file '%s' to '%s'" % (self.path, stage_path)) + logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path)) def check_stable(self, interval=1, times=5): for i in range(times): @@ -397,9 +398,8 @@ class LocalfsSourceHandle(object): self.cache_path = settings.cache_path self.get_db = settings.get_db self.source_state = source_state - path = source_state.path - self.path = path - local_filename = utils.join_path(self.rootpath, path) + self.objname = source_state.objname + local_filename = utils.join_path(self.rootpath, self.objname) self.local_path = local_filename self.isdir = self.info_is_dir() self.stage_filename = None @@ -412,16 +412,17 @@ class LocalfsSourceHandle(object): def check_log(self): with self.heartbeat.lock() as hb: - prev_log = hb.get(self.path) + prev_log = hb.get(self.objname) if prev_log is not None: actionstate, ts = prev_log if actionstate != self.NAME or \ utils.younger_than(ts, 10): - raise common.HandledError("Action mismatch in %s: %s %s" % - (self.NAME, self.path, prev_log)) + raise common.HandledError( + "Action mismatch in %s: %s %s" % + (self.NAME, self.objname, prev_log)) logger.warning("Ignoring previous run in %s: %s %s" % - (self.NAME, self.path, prev_log)) - hb.set(self.path, (self.NAME, utils.time_stamp())) + (self.NAME, self.objname, prev_log)) + hb.set(self.objname, (self.NAME, utils.time_stamp())) def get_synced_state(self): return self.source_state @@ -452,7 +453,7 @@ class LocalfsSourceHandle(object): def clear_log(self): with self.heartbeat.lock() as hb: - hb.delete(self.path) + hb.delete(self.objname) def do_unstage(self): if self.stage_filename is None: @@ -500,17 +501,17 @@ class LocalfsFileClient(FileClient): logger.info("Candidates: %s" % candidates) return candidates - def _local_path_changes(self, path, state): - local_path = utils.join_path(self.ROOTPATH, path) + def _local_path_changes(self, name, state): + local_path = utils.join_path(self.ROOTPATH, name) return local_path_changes(local_path, state) - def start_probing_path(self, path, old_state, ref_state, + def start_probing_file(self, objname, old_state, ref_state, assumed_info=None, callback=None): if old_state.serial != ref_state.serial: - logger.warning("Serial mismatch in probing path '%s'" % path) + logger.warning("Serial mismatch in probing path '%s'" % objname) return - live_info = (self._local_path_changes(path, old_state) + live_info = (self._local_path_changes(objname, old_state) if assumed_info is None else assumed_info) if live_info is None: return diff --git a/agkyra/agkyra/syncer/pithos_client.py b/agkyra/agkyra/syncer/pithos_client.py index ec2173f13db09d1d57f210dbad58e8a6fca93558..b319e21f1c6863260964bfd2f9edbff4264d966b 100644 --- a/agkyra/agkyra/syncer/pithos_client.py +++ b/agkyra/agkyra/syncer/pithos_client.py @@ -14,16 +14,16 @@ from agkyra.syncer.database import transaction logger = logging.getLogger(__name__) -def heartbeat_event(settings, heartbeat, path): +def heartbeat_event(settings, heartbeat, objname): event = threading.Event() max_interval = settings.action_max_wait / 2.0 def set_log(): with heartbeat.lock() as hb: - client, prev_tstamp = hb.get(path) + client, prev_tstamp = hb.get(objname) tpl = (client, utils.time_stamp()) - hb.set(path, tpl) - logger.info("HEARTBEAT %s %s %s" % ((path,) + tpl)) + hb.set(objname, tpl) + logger.info("HEARTBEAT '%s' %s %s" % ((objname,) + tpl)) def go(): interval = 0.2 @@ -42,10 +42,10 @@ def give_heartbeat(f): @wraps(f) def inner(*args, **kwargs): obj = args[0] - path = obj.path + objname = obj.objname heartbeat = obj.heartbeat settings = obj.settings - event = heartbeat_event(settings, heartbeat, path) + event = heartbeat_event(settings, heartbeat, objname) try: return f(*args, **kwargs) finally: @@ -76,22 +76,23 @@ class PithosSourceHandle(object): self.cache_path = settings.cache_path self.get_db = settings.get_db self.source_state = source_state - self.path = source_state.path + self.objname = source_state.objname self.heartbeat = settings.heartbeat self.check_log() def check_log(self): with self.heartbeat.lock() as hb: - prev_log = hb.get(self.path) + prev_log = hb.get(self.objname) if prev_log is not None: actionstate, ts = prev_log if actionstate != self.NAME or \ utils.younger_than(ts, self.settings.action_max_wait): - raise common.HandledError("Action mismatch in %s: %s %s" % - (self.NAME, self.path, prev_log)) + raise common.HandledError( + "Action mismatch in %s: %s %s" % + (self.NAME, self.objname, prev_log)) logger.warning("Ignoring previous run in %s: %s %s" % - (self.NAME, self.path, prev_log)) - hb.set(self.path, (self.NAME, utils.time_stamp())) + (self.NAME, self.objname, prev_log)) + hb.set(self.objname, (self.NAME, utils.time_stamp())) @transaction() def register_fetch_name(self, filename): @@ -100,20 +101,20 @@ class PithosSourceHandle(object): datetime.datetime.now().strftime("%s") fetch_name = utils.join_path(self.cache_fetch_name, f) self.fetch_name = fetch_name - db.insert_cachepath(fetch_name, self.NAME, filename) + db.insert_cachename(fetch_name, self.NAME, filename) return utils.join_path(self.cache_path, fetch_name) @handle_client_errors @give_heartbeat def send_file(self, sync_state): - fetched_file = self.register_fetch_name(self.path) + fetched_file = self.register_fetch_name(self.objname) headers = dict() with open(fetched_file, mode='wb+') as fil: try: - logger.info("Downloading path: '%s', to: '%s'" % - (self.path, fetched_file)) + logger.info("Downloading object: '%s', to: '%s'" % + (self.objname, fetched_file)) self.endpoint.download_object( - self.path, + self.objname, fil, headers=headers) except ClientError as e: @@ -129,10 +130,12 @@ class PithosSourceHandle(object): "pithos_type": actual_type} self.source_state = self.source_state.set(info=actual_info) if actual_info == {}: - logger.info("Downloading path: '%s', object is gone." % self.path) + logger.info("Downloading object: '%s', object is gone." + % self.objname) os.unlink(fetched_file) elif actual_info["pithos_type"] == common.T_DIR: - logger.info("Downloading path: '%s', object is dir." % self.path) + logger.info("Downloading object: '%s', object is dir." + % self.objname) os.unlink(fetched_file) os.mkdir(fetched_file) return fetched_file @@ -145,7 +148,7 @@ class PithosSourceHandle(object): def clear_log(self): with self.heartbeat.lock() as hb: - hb.delete(self.path) + hb.delete(self.objname) STAGED_FOR_DELETION_SUFFIX = ".pithos_staged_for_deletion" @@ -158,34 +161,34 @@ class PithosTargetHandle(object): self.settings = settings self.endpoint = settings.endpoint self.target_state = target_state - self.target_file = target_state.path - self.path = target_state.path + self.target_file = target_state.objname + self.objname = target_state.objname self.heartbeat = settings.heartbeat def mk_del_name(self, name, etag): return "%s.%s%s" % (name, etag, STAGED_FOR_DELETION_SUFFIX) - def safe_object_del(self, path, etag): + def safe_object_del(self, objname, etag): container = self.endpoint.container - del_name = self.mk_del_name(path, etag) + del_name = self.mk_del_name(objname, etag) logger.info("Moving temporarily to '%s'" % del_name) try: self.endpoint.object_move( - path, + objname, destination='/%s/%s' % (container, del_name), if_etag_match=etag) except ClientError as e: if e.status == 404: - logger.warning("'%s' not found; already moved?" % path) + logger.warning("'%s' not found; already moved?" % objname) else: raise finally: self.endpoint.del_object(del_name) logger.info("Deleted tmp '%s'" % del_name) - def directory_put(self, path, etag): + def directory_put(self, objname, etag): r = self.endpoint.object_put( - path, + objname, content_type='application/directory', content_length=0, if_etag_match=etag) @@ -278,26 +281,26 @@ class PithosFileClient(FileClient): last_modified = last_tstamp.isoformat() candidates = self.list_candidate_files( last_modified=last_modified) - for (path, info) in candidates: - callback(self.NAME, path, assumed_info=info) + for (objname, info) in candidates: + callback(self.NAME, objname, assumed_info=info) time.sleep(interval) poll = PollPithos() poll.daemon = True poll.start() - def get_object_from_cache(self, path): + def get_object_from_cache(self, objname): if self.objects is None: self.objects = self.endpoint.list_objects() - objs = [o for o in self.objects if o["name"] == path] + objs = [o for o in self.objects if o["name"] == objname] try: return objs[0] except IndexError: return None - def get_object(self, path): + def get_object(self, objname): try: - return self.endpoint.get_object_info(path) + return self.endpoint.get_object_info(objname) except ClientError as e: if e.status == 404: return None @@ -314,16 +317,16 @@ class PithosFileClient(FileClient): PITHOS_TYPE: p_type, } - def start_probing_path(self, path, old_state, ref_state, + def start_probing_file(self, objname, old_state, ref_state, assumed_info=None, callback=None): - if exclude_pattern.match(path): - logger.warning("Ignoring probe archive: %s, path: '%s'" % - (old_state.archive, path)) + if exclude_pattern.match(objname): + logger.warning("Ignoring probe archive: %s, object: '%s'" % + (old_state.archive, objname)) return info = old_state.info if assumed_info is None: - obj = self.get_object(path) + obj = self.get_object(objname) live_info = self.get_object_live_info(obj) else: live_info = assumed_info diff --git a/agkyra/agkyra/syncer/syncer.py b/agkyra/agkyra/syncer/syncer.py index bda45a6f1806125d5110c7d5c19468f9004d1be8..9625b0c92bfc078a4f4e2fdbf6293c3443870cf4 100644 --- a/agkyra/agkyra/syncer/syncer.py +++ b/agkyra/agkyra/syncer/syncer.py @@ -55,8 +55,8 @@ class FileSyncer(object): def start_notifiers(self): self.notifiers = { - self.MASTER: self.master.notifier(callback=self.probe_path), - self.SLAVE: self.slave.notifier(callback=self.probe_path), + self.MASTER: self.master.notifier(callback=self.probe_file), + self.SLAVE: self.slave.notifier(callback=self.probe_file), } def start_decide(self): @@ -68,8 +68,8 @@ class FileSyncer(object): if self.decide_event is not None: self.decide_event.clear() - def exclude_path(self, path): - parts = path.split(os.path.sep) + def exclude_file(self, objname): + parts = objname.split(os.path.sep) init_part = parts[0] if init_part in [self.settings.cache_name]: return True @@ -77,95 +77,95 @@ class FileSyncer(object): return exclude_pattern.match(final_part) @transaction() - def probe_path(self, archive, path, assumed_info=None): - if self.exclude_path(path): - logger.warning("Ignoring probe archive: %s, path: %s" % - (archive, path)) + def probe_file(self, archive, objname, assumed_info=None): + if self.exclude_file(objname): + logger.warning("Ignoring probe archive: %s, object: %s" % + (archive, objname)) return - logger.info("Probing archive: %s, path: '%s'" % (archive, path)) + logger.info("Probing archive: %s, object: '%s'" % (archive, objname)) db = self.get_db() client = self.clients[archive] - db_state = db.get_state(archive, path) - ref_state = db.get_state(self.SYNC, path) + db_state = db.get_state(archive, objname) + ref_state = db.get_state(self.SYNC, objname) if db_state.serial != ref_state.serial: - logger.warning("Serial mismatch in probing archive: %s, path: '%s'" - % (archive, path)) + logger.warning("Serial mismatch in probing archive: %s, " + "object: '%s'" % (archive, objname)) return - client.start_probing_path(path, db_state, ref_state, + client.start_probing_file(objname, db_state, ref_state, assumed_info=assumed_info, - callback=self.update_path) + callback=self.update_file_state) @transaction() - def update_path(self, live_state): + def update_file_state(self, live_state): db = self.get_db() archive = live_state.archive - path = live_state.path + objname = live_state.objname serial = live_state.serial - if self.exclude_path(path): - logger.warning("Ignoring update archive: %s, path: %s" % - (archive, path)) + if self.exclude_file(objname): + logger.warning("Ignoring update archive: %s, object: %s" % + (archive, objname)) return - logger.info("Updating archive: %s, path: '%s', serial: %s" % - (archive, path, serial)) - db_state = db.get_state(archive, path) + logger.info("Updating archive: %s, object: '%s', serial: %s" % + (archive, objname, serial)) + db_state = db.get_state(archive, objname) if db_state and db_state.serial != serial: logger.warning( - "Cannot update archive: %s, path: '%s', " + "Cannot update archive: %s, object: '%s', " "serial: %s, db_serial: %s" % - (archive, path, serial, db_state.serial)) + (archive, objname, serial, db_state.serial)) return - new_serial = db.new_serial(path) + new_serial = db.new_serial(objname) new_state = live_state.set(serial=new_serial) db.put_state(new_state) if new_serial == 0: sync_state = common.FileState( - archive=self.SYNC, path=path, serial=-1, + archive=self.SYNC, objname=objname, serial=-1, info={}) db.put_state(sync_state) - def decide_path(self, path, master=None, slave=None): + def decide_file_sync(self, objname, master=None, slave=None): if master is None: master = self.MASTER if slave is None: slave = self.SLAVE - states = self._decide_path(path, master, slave) + states = self._decide_file_sync(objname, master, slave) if states is None: return - self.sync_path(*states) + self.sync_file(*states) @transaction() - def _decide_path(self, path, master, slave): + def _decide_file_sync(self, objname, master, slave): db = self.get_db() - logger.info("Deciding path: '%s'" % path) - master_state = db.get_state(master, path) - slave_state = db.get_state(slave, path) - sync_state = db.get_state(self.SYNC, path) - decision_state = db.get_state(self.DECISION, path) + logger.info("Deciding object: '%s'" % objname) + master_state = db.get_state(master, objname) + slave_state = db.get_state(slave, objname) + sync_state = db.get_state(self.SYNC, objname) + decision_state = db.get_state(self.DECISION, objname) master_serial = master_state.serial slave_serial = slave_state.serial sync_serial = sync_state.serial decision_serial = decision_state.serial if decision_serial != sync_serial: - failed_sync = self.failed_serials.get((decision_serial, path)) + failed_sync = self.failed_serials.get((decision_serial, objname)) if failed_sync is None: logger.warning( "Already decided: '%s', decision: %s, sync: %s" % - (path, decision_serial, sync_serial)) + (objname, decision_serial, sync_serial)) if decision_serial == master_serial: return master_state, slave_state, sync_state elif decision_serial == slave_serial: return slave_state, master_state, sync_state else: raise AssertionError( - "Decision serial %s for path '%s' " + "Decision serial %s for objname '%s' " "does not match any archive." % - (decision_serial, path)) + (decision_serial, objname)) else: logger.warning( "Ignoring failed decision for: '%s', decision: %s" % - (path, decision_serial)) + (objname, decision_serial)) if master_serial > sync_serial: self._make_decision_state(decision_state, master_state) @@ -190,17 +190,17 @@ class FileSyncer(object): serial=source_state.serial, info=source_state.info) db.put_state(new_decision_state) - def sync_path(self, source_state, target_state, sync_state): - logger.info("Syncing archive: %s, path: '%s', serial: %s" % + def sync_file(self, source_state, target_state, sync_state): + logger.info("Syncing archive: %s, object: '%s', serial: %s" % (source_state.archive, - source_state.path, + source_state.objname, source_state.serial)) thread = threading.Thread( - target=self._sync_path, + target=self._sync_file, args=(source_state, target_state, sync_state)) thread.start() - def _sync_path(self, source_state, target_state, sync_state): + def _sync_file(self, source_state, target_state, sync_state): clients = self.clients source_client = clients[source_state.archive] try: @@ -211,16 +211,16 @@ class FileSyncer(object): target_client = clients[target_state.archive] target_client.start_pulling_file( source_handle, target_state, sync_state, - callback=self.acknowledge_path, + callback=self.ack_file_sync, failure_callback=self.mark_as_failed) def mark_as_failed(self, state): serial = state.serial - path = state.path + objname = state.objname logger.warning( - "Marking failed serial %s for archive: %s, path: '%s'" % - (serial, state.archive, path)) - self.failed_serials.put((serial, path), state) + "Marking failed serial %s for archive: %s, object: '%s'" % + (serial, state.archive, objname)) + self.failed_serials.put((serial, objname), state) def update_state(self, old_state, new_state): db = self.get_db() @@ -229,17 +229,17 @@ class FileSyncer(object): # perhaps triggering a probe @transaction() - def acknowledge_path(self, synced_source_state, synced_target_state): + def ack_file_sync(self, synced_source_state, synced_target_state): db = self.get_db() serial = synced_source_state.serial - path = synced_source_state.path + objname = synced_source_state.objname source = synced_source_state.archive target = synced_target_state.archive - logger.info("Acking archive: %s, path: '%s', serial: %s" % - (target, path, serial)) + logger.info("Acking archive: %s, object: '%s', serial: %s" % + (target, objname, serial)) - decision_state = db.get_state(self.DECISION, path) - sync_state = db.get_state(self.SYNC, path) + decision_state = db.get_state(self.DECISION, objname) + sync_state = db.get_state(self.SYNC, objname) if serial != decision_state.serial: raise AssertionError( @@ -250,12 +250,12 @@ class FileSyncer(object): "cannot ack: serial %s < sync serial %s" % (serial, sync_state.serial)) - db_source_state = db.get_state(source, path) + db_source_state = db.get_state(source, objname) self.update_state(db_source_state, synced_source_state) final_target_state = synced_target_state.set( serial=serial) - db_target_state = db.get_state(target, path) + db_target_state = db.get_state(target, objname) self.update_state(db_target_state, final_target_state) sync_info = dict(synced_source_state.info) @@ -282,23 +282,23 @@ class FileSyncer(object): def probe_archive(self, archive): client = self.clients[archive] candidates = client.list_candidate_files() - for (path, info) in candidates.iteritems(): - self.probe_path(archive, path, assumed_info=info) + for (objname, info) in candidates.iteritems(): + self.probe_file(archive, objname, assumed_info=info) def decide_archive(self, archive): - for path in self.list_deciding([archive]): - self.decide_path(path) + for objname in self.list_deciding([archive]): + self.decide_file_sync(objname) def decide_all_archives(self): logger.info("Checking candidates to sync") - for path in self.list_deciding(): - self.decide_path(path) + for objname in self.list_deciding(): + self.decide_file_sync(objname) def probe_and_sync_all(self): self.probe_archive(self.MASTER) self.probe_archive(self.SLAVE) - for path in self.list_deciding(): - self.decide_path(path) + for objname in self.list_deciding(): + self.decide_file_sync(objname) def _poll_decide(self, interval=3): event = threading.Event()