diff --git a/agkyra/agkyra/syncer/localfs_client.py b/agkyra/agkyra/syncer/localfs_client.py index c16731c6de1ca649a2f6c1a4f06dbdd87402ec56..76d676f69e9cb2f43c84b8b580422cfa3ec6338a 100644 --- a/agkyra/agkyra/syncer/localfs_client.py +++ b/agkyra/agkyra/syncer/localfs_client.py @@ -353,6 +353,8 @@ class LocalfsSourceHandle(object): f = utils.hash_string(filename) stage_filename = utils.join_path(self.cache_stage_name, f) self.stage_filename = stage_filename + stage_path = self.get_path_in_cache(stage_filename) + self.staged_path = stage_path if db.get_cachename(stage_filename): return False db.insert_cachename(stage_filename, self.SIGNATURE, filename) @@ -363,6 +365,7 @@ class LocalfsSourceHandle(object): db = self.get_db() db.delete_cachename(stage_filename) self.stage_filename = None + self.staged_path = None def get_path_in_cache(self, name): return utils.join_path(self.cache_path, name) @@ -373,8 +376,7 @@ class LocalfsSourceHandle(object): % fspath) new_registered = self.register_stage_name(fspath) stage_filename = self.stage_filename - stage_path = self.get_path_in_cache(stage_filename) - self.staged_path = stage_path + stage_path = self.staged_path if not new_registered: logger.warning("Staging already registered for file %s" % @@ -383,6 +385,8 @@ class LocalfsSourceHandle(object): logger.warning("File %s already staged at %s" % (self.objname, stage_path)) return + + logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path)) try: os.rename(fspath, stage_path) except OSError as e: @@ -394,12 +398,15 @@ class LocalfsSourceHandle(object): if file_is_open(stage_path): os.rename(stage_path, fspath) self.unregister_stage_name(stage_filename) + logger.warning("File '%s' is open; unstaged" % self.objname) raise common.BusyError("File '%s' is open. Undoing" % stage_path) - if path_status(stage_path) in [LOCAL_NONEMPTY_DIR, LOCAL_EMPTY_DIR]: + + self.check_update_source_state() + if path_status(stage_path) != LOCAL_FILE: os.rename(stage_path, fspath) - self.unregister_hidden_name(stage_filename) - raise common.ConflictError("'%s' is non-empty" % fspath) - logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path)) + self.unregister_stage_name(stage_filename) + logger.warning("Object '%s' is not a regular file; unstaged" % + self.objname) def __init__(self, settings, source_state): self.settings = settings @@ -412,16 +419,34 @@ class LocalfsSourceHandle(object): self.source_state = source_state self.objname = source_state.objname self.fspath = utils.join_path(self.rootpath, self.objname) - self.isdir = self.info_is_dir() self.stage_filename = None self.staged_path = None self.heartbeat = settings.heartbeat - if not self.isdir: + if self.needs_staging(): self.lock_file(self.fspath) + @transaction() + def update_state(self, state): + db = self.get_db() + db.put_state(state) + + def check_update_source_state(self): + live_info = local_path_changes( + self.staged_path, self.source_state) + if live_info is not None: + logger.warning("Actual info differs in %s for object: '%s'; " + "updating..." % (self.SIGNATURE, self.objname)) + new_state = self.source_state.set(info=live_info) + self.update_state(new_state) + self.source_state = new_state + def get_synced_state(self): return self.source_state + def needs_staging(self): + info = self.source_state.info + return info and info[LOCALFS_TYPE] == common.T_FILE + def info_is_dir(self): try: return self.source_state.info[LOCALFS_TYPE] == common.T_DIR diff --git a/agkyra/agkyra/syncer/pithos_client.py b/agkyra/agkyra/syncer/pithos_client.py index 14b54b6a81e5223badbc14e643d76fc685f3244f..14818def4521c7aa2a6c300758fec0ebb1ac901c 100644 --- a/agkyra/agkyra/syncer/pithos_client.py +++ b/agkyra/agkyra/syncer/pithos_client.py @@ -130,7 +130,7 @@ class PithosSourceHandle(object): else common.T_FILE) actual_info = {"pithos_etag": actual_etag, "pithos_type": actual_type} - self.source_state = self.source_state.set(info=actual_info) + self.check_update_source_state(actual_info) if actual_info == {}: logger.info("Downloading object: '%s', object is gone." % self.objname) @@ -142,6 +142,19 @@ class PithosSourceHandle(object): os.mkdir(fetched_fspath) return fetched_fspath + @transaction() + def update_state(self, state): + db = self.get_db() + db.put_state(state) + + def check_update_source_state(self, actual_info): + if actual_info != self.source_state.info: + logger.warning("Actual info differs in %s for object: '%s'; " + "updating..." % (self.SIGNATURE, self.objname)) + new_state = self.source_state.set(info=actual_info) + self.update_state(new_state) + self.source_state = new_state + def get_synced_state(self): return self.source_state