diff --git a/agkyra/agkyra/syncer/localfs_client.py b/agkyra/agkyra/syncer/localfs_client.py index d7d0556a55ee749eede0010bdc76da587e3e0bb9..d6d8528d9105831e5f68287155e1f2c7d3b05868 100644 --- a/agkyra/agkyra/syncer/localfs_client.py +++ b/agkyra/agkyra/syncer/localfs_client.py @@ -126,9 +126,9 @@ def get_live_info(path): return live_info -def stat_file(filename): +def stat_file(path): try: - file_stats = os.lstat(filename) + file_stats = os.lstat(path) except OSError as e: if e.errno == OS_NO_FILE_OR_DIR: return None @@ -201,7 +201,7 @@ class LocalfsTargetHandle(object): self.get_db = settings.get_db self.target_state = target_state self.objname = target_state.objname - self.local_path = utils.join_path(self.rootpath, self.objname) + self.fspath = utils.join_path(self.rootpath, self.objname) self.hidden_filename = None self.hidden_path = None @@ -226,10 +226,10 @@ class LocalfsTargetHandle(object): self.hidden_filename = None def hide_file(self): - local_filename = self.local_path - if file_is_open(local_filename): + fspath = self.fspath + if file_is_open(fspath): raise common.BusyError("File '%s' is open. Aborting." - % local_filename) + % fspath) new_registered = self.register_hidden_name(self.objname) hidden_filename = self.hidden_filename @@ -244,27 +244,27 @@ class LocalfsTargetHandle(object): (self.objname, hidden_path)) return try: - os.rename(local_filename, hidden_path) + os.rename(fspath, hidden_path) logger.info("Hiding file '%s' to '%s'" % - (local_filename, hidden_path)) + (fspath, hidden_path)) except OSError as e: if e.errno == OS_NO_FILE_OR_DIR: self.unregister_hidden_name(hidden_filename) - logger.info("File '%s' does not exist" % local_filename) + logger.info("File '%s' does not exist" % fspath) return else: raise e if file_is_open(hidden_path): - os.rename(hidden_path, local_filename) + os.rename(hidden_path, fspath) self.unregister_hidden_name(hidden_filename) raise common.BusyError("File '%s' is open. Undoing." % hidden_path) if path_status(hidden_path) == LOCAL_NONEMPTY_DIR: - os.rename(hidden_path, local_filename) + os.rename(hidden_path, fspath) self.unregister_hidden_name(hidden_filename) - raise common.ConflictError("'%s' is non-empty" % local_filename) + raise common.ConflictError("'%s' is non-empty" % fspath) def apply(self, fetched_file, fetched_live_info, sync_state): - local_status = path_status(self.local_path) + local_status = path_status(self.fspath) fetched_status = status_of_info(fetched_live_info) if local_status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR] \ and fetched_status == LOCAL_EMPTY_DIR: @@ -272,7 +272,7 @@ class LocalfsTargetHandle(object): if local_status == LOCAL_MISSING and fetched_status == LOCAL_MISSING: return if local_status == LOCAL_NONEMPTY_DIR: - raise common.ConflictError("'%s' is non-empty" % self.local_path) + raise common.ConflictError("'%s' is non-empty" % self.fspath) self.prepare(fetched_file, sync_state) self.finalize(fetched_file, fetched_live_info) @@ -300,13 +300,13 @@ class LocalfsTargetHandle(object): return if live_info[LOCALFS_TYPE] != common.T_DIR: try: - link_file(filename, self.local_path) + link_file(filename, self.fspath) except DirMissing: - make_dirs(os.path.dirname(self.local_path)) - link_file(filename, self.local_path) + make_dirs(os.path.dirname(self.fspath)) + link_file(filename, self.fspath) else: # assuming empty dir - make_dirs(self.local_path) + make_dirs(self.fspath) def cleanup(self, filename): status = path_status(filename) @@ -348,11 +348,11 @@ class LocalfsSourceHandle(object): def get_path_in_cache(self, name): return utils.join_path(self.cache_path, name) - def lock_file(self, local_filename): - if file_is_open(local_filename): + def lock_file(self, fspath): + if file_is_open(fspath): raise common.BusyError("File '%s' is open. Aborting" - % local_filename) - new_registered = self.register_stage_name(local_filename) + % fspath) + new_registered = self.register_stage_name(fspath) stage_filename = self.stage_filename stage_path = self.get_path_in_cache(stage_filename) self.staged_path = stage_path @@ -365,21 +365,21 @@ class LocalfsSourceHandle(object): (self.objname, stage_path)) return try: - os.rename(local_filename, stage_path) + os.rename(fspath, stage_path) except OSError as e: if e.errno == OS_NO_FILE_OR_DIR: - logger.info("Source does not exist: '%s'" % local_filename) + logger.info("Source does not exist: '%s'" % fspath) self.unregister_stage_name(stage_filename) else: raise e if file_is_open(stage_path): - os.rename(stage_path, local_filename) + os.rename(stage_path, fspath) self.unregister_stage_name(stage_filename) raise common.BusyError("File '%s' is open. Undoing" % stage_path) if path_status(stage_path) in [LOCAL_NONEMPTY_DIR, LOCAL_EMPTY_DIR]: - os.rename(stage_path, local_filename) + os.rename(stage_path, fspath) self.unregister_hidden_name(stage_filename) - raise common.ConflictError("'%s' is non-empty" % local_filename) + raise common.ConflictError("'%s' is non-empty" % fspath) logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path)) def check_stable(self, interval=1, times=5): @@ -399,15 +399,14 @@ class LocalfsSourceHandle(object): self.get_db = settings.get_db self.source_state = source_state self.objname = source_state.objname - local_filename = utils.join_path(self.rootpath, self.objname) - self.local_path = local_filename + self.fspath = utils.join_path(self.rootpath, self.objname) self.isdir = self.info_is_dir() self.stage_filename = None self.staged_path = None self.heartbeat = settings.heartbeat self.check_log() if not self.isdir: - self.lock_file(local_filename) + self.lock_file(self.fspath) # self.check_stable() def check_log(self): @@ -441,9 +440,9 @@ class LocalfsSourceHandle(object): or self.source_state.info[LOCALFS_TYPE] == common.T_UNHANDLED def stash_staged_file(self): - stash_filename = mk_stash_name(self.local_path) + stash_filename = mk_stash_name(self.fspath) logger.warning("Stashing file '%s' to '%s'" % - (self.local_path, stash_filename)) + (self.fspath, stash_filename)) os.rename(self.staged_path, stash_filename) def unstage_file(self): @@ -462,7 +461,7 @@ class LocalfsSourceHandle(object): return staged_path = self.staged_path try: - link_file(staged_path, self.local_path) + link_file(staged_path, self.fspath) print "Unlinking", staged_path os.unlink(staged_path) except common.ConflictError: @@ -493,8 +492,8 @@ class LocalfsFileClient(FileClient): # if self.exclude_files_exp.match(filename) or \ # self.exclude_dir_exp.match(filename): # continue - local_filename = utils.join_path(rel_dirpath, filename) - candidates[local_filename] = None + objname = utils.join_path(rel_dirpath, filename) + candidates[objname] = None db_cands = dict((name, None) for name in db.list_files(self.SIGNATURE)) candidates.update(db_cands) diff --git a/agkyra/agkyra/syncer/pithos_client.py b/agkyra/agkyra/syncer/pithos_client.py index 035e0449d73b43296e61d4ab8aa4391f2b1dc349..00f14bc168250d1e94804c727d717de700be8f5f 100644 --- a/agkyra/agkyra/syncer/pithos_client.py +++ b/agkyra/agkyra/syncer/pithos_client.py @@ -107,12 +107,12 @@ class PithosSourceHandle(object): @handle_client_errors @give_heartbeat def send_file(self, sync_state): - fetched_file = self.register_fetch_name(self.objname) + fetched_fspath = self.register_fetch_name(self.objname) headers = dict() - with open(fetched_file, mode='wb+') as fil: + with open(fetched_fspath, mode='wb+') as fil: try: logger.info("Downloading object: '%s', to: '%s'" % - (self.objname, fetched_file)) + (self.objname, fetched_fspath)) self.endpoint.download_object( self.objname, fil, @@ -132,13 +132,13 @@ class PithosSourceHandle(object): if actual_info == {}: logger.info("Downloading object: '%s', object is gone." % self.objname) - os.unlink(fetched_file) + os.unlink(fetched_fspath) elif actual_info["pithos_type"] == common.T_DIR: logger.info("Downloading object: '%s', object is dir." % self.objname) - os.unlink(fetched_file) - os.mkdir(fetched_file) - return fetched_file + os.unlink(fetched_fspath) + os.mkdir(fetched_fspath) + return fetched_fspath def get_synced_state(self): return self.source_state @@ -161,7 +161,7 @@ class PithosTargetHandle(object): self.settings = settings self.endpoint = settings.endpoint self.target_state = target_state - self.target_file = target_state.objname + self.target_objname = target_state.objname self.objname = target_state.objname self.heartbeat = settings.heartbeat @@ -202,19 +202,19 @@ class PithosTargetHandle(object): etag = info.get("pithos_etag") if source_handle.info_is_deleted_or_unhandled(): if etag is not None: - logger.info("Deleting object '%s'" % self.target_file) - self.safe_object_del(self.target_file, etag) + logger.info("Deleting object '%s'" % self.target_objname) + self.safe_object_del(self.target_objname, etag) live_info = {} elif source_handle.info_is_dir(): - logger.info("Creating dir '%s'" % source_handle.path) - r = self.directory_put(source_handle.path, etag) + logger.info("Creating dir '%s'" % self.target_objname) + r = self.directory_put(self.target_objname, etag) synced_etag = r.headers["etag"] live_info = {"pithos_etag": synced_etag, "pithos_type": common.T_DIR} else: with open(source_handle.staged_path, mode="rb") as fil: r = self.endpoint.upload_object( - self.target_file, + self.target_objname, fil, if_etag_match=info.get("pithos_etag")) synced_etag = r["etag"]