diff --git a/agkyra/agkyra/syncer/localfs_client.py b/agkyra/agkyra/syncer/localfs_client.py index 731ae7437a5eb311f9936493fada6d987afd9ce4..8b62ae8e1a25a30594055a3a89bb2246d60af422 100644 --- a/agkyra/agkyra/syncer/localfs_client.py +++ b/agkyra/agkyra/syncer/localfs_client.py @@ -492,6 +492,16 @@ class LocalfsFileClient(FileClient): self.get_db = settings.get_db self.probe_candidates = utils.ThreadSafeDict() + def remove_candidates(self, objnames, ident): + with self.probe_candidates.lock() as d: + for objname in objnames: + try: + cached = d.pop(objname) + if cached["ident"] != ident: + d[objname] = cached + except KeyError: + pass + def list_candidate_files(self, forced=False): with self.probe_candidates.lock() as d: if forced: @@ -499,6 +509,9 @@ class LocalfsFileClient(FileClient): d.update(candidates) return d.keys() + def none_info(self): + return {"ident": None, "info": None} + def walk_filesystem(self): db = self.get_db() candidates = {} @@ -506,16 +519,18 @@ class LocalfsFileClient(FileClient): rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH) logger.debug("'%s' '%s'" % (dirpath, rel_dirpath)) if rel_dirpath != '.': - candidates[utils.to_standard_sep(rel_dirpath)] = None + objname = utils.to_standard_sep(rel_dirpath) + candidates[objname] = self.none_info() for filename in files: if rel_dirpath == '.': prefix = "" else: prefix = utils.to_standard_sep(rel_dirpath) objname = utils.join_objname(prefix, filename) - candidates[objname] = None + candidates[objname] = self.none_info() - db_cands = dict((name, None) for name in db.list_files(self.SIGNATURE)) + db_cands = dict((name, self.none_info()) + for name in db.list_files(self.SIGNATURE)) candidates.update(db_cands) logger.info("Candidates: %s" % candidates) return candidates @@ -532,9 +547,15 @@ class LocalfsFileClient(FileClient): final_part = parts[-1] return exclude_pattern.match(final_part) - def probe_file(self, objname, old_state, ref_state): + def probe_file(self, objname, old_state, ref_state, ident): with self.probe_candidates.lock() as d: - cached_info = d.pop(objname, None) + try: + cached = d[objname] + cached_info = cached["info"] + cached["ident"] = ident + except KeyError: + cached_info = None + if self.exclude_file(objname): logger.warning("Ignoring probe archive: %s, object: %s" % (old_state.archive, objname)) @@ -558,7 +579,7 @@ class LocalfsFileClient(FileClient): rel_path = os.path.relpath(path, start=self.ROOTPATH) objname = utils.to_standard_sep(rel_path) with self.probe_candidates.lock() as d: - d[objname] = None + d[objname] = self.none_info() class EventHandler(FileSystemEventHandler): def on_created(this, event): diff --git a/agkyra/agkyra/syncer/pithos_client.py b/agkyra/agkyra/syncer/pithos_client.py index 0080bb5d182d62243f47fe5c860ff3783dacc951..97b1c2d120132b007a5bcd3aa3c3eea6833e6bb3 100644 --- a/agkyra/agkyra/syncer/pithos_client.py +++ b/agkyra/agkyra/syncer/pithos_client.py @@ -267,6 +267,16 @@ class PithosFileClient(FileClient): self.last_modification = "0000-00-00" self.probe_candidates = utils.ThreadSafeDict() + def remove_candidates(self, objnames, ident): + with self.probe_candidates.lock() as d: + for objname in objnames: + try: + cached = d.pop(objname) + if cached["ident"] != ident: + d[objname] = cached + except KeyError: + pass + def list_candidate_files(self, forced=False): with self.probe_candidates.lock() as d: if forced: @@ -285,7 +295,10 @@ class PithosFileClient(FileClient): upstream_all = {} for obj in objects: name = obj["name"] - upstream_all[name] = self.get_object_live_info(obj) + upstream_all[name] = { + "ident": None, + "info": self.get_object_live_info(obj) + } obj_last_modified = obj["last_modified"] if obj_last_modified > self.last_modification: self.last_modification = obj_last_modified @@ -303,7 +316,8 @@ class PithosFileClient(FileClient): non_deleted_in_db = set(db.list_non_deleted_files(self.SIGNATURE)) newly_deleted_names = non_deleted_in_db.difference(upstream_all_names) logger.debug("newly_deleted %s" % newly_deleted_names) - newly_deleted = dict((name, {}) for name in newly_deleted_names) + newly_deleted = dict((name, {"ident": None, "info": {}}) + for name in newly_deleted_names) candidates.update(newly_deleted) logger.info("Candidates since %s: %s" % @@ -340,10 +354,15 @@ class PithosFileClient(FileClient): PITHOS_TYPE: p_type, } - def probe_file(self, objname, old_state, ref_state): + def probe_file(self, objname, old_state, ref_state, ident): info = old_state.info with self.probe_candidates.lock() as d: - cached_info = d.pop(objname, None) + try: + cached = d[objname] + cached_info = cached["info"] + cached["ident"] = ident + except KeyError: + cached_info = None if exclude_pattern.match(objname): logger.warning("Ignoring probe archive: %s, object: '%s'" % (old_state.archive, objname)) diff --git a/agkyra/agkyra/syncer/syncer.py b/agkyra/agkyra/syncer/syncer.py index 0d8999cf6fc420241cabc0af6080b3a13b7cc796..5c1f7c666ac26e9d8139e403654bc50659c274ca 100644 --- a/agkyra/agkyra/syncer/syncer.py +++ b/agkyra/agkyra/syncer/syncer.py @@ -95,11 +95,18 @@ class FileSyncer(object): def get_next_message(self, block=False): return self.messager.get(block=block) - @transaction() def probe_file(self, archive, objname): - return self._probe_file(archive, objname) + ident = utils.time_stamp() + self._probe_files(archive, [objname], ident) + client = self.clients[archive] + client.remove_candidates([objname], ident) - def _probe_file(self, archive, objname): + @transaction() + def _probe_files(self, archive, objnames, ident): + for objname in objnames: + self._do_probe_file(archive, objname, ident) + + def _do_probe_file(self, archive, objname, ident): logger.info("Probing archive: %s, object: '%s'" % (archive, objname)) db = self.get_db() client = self.clients[archive] @@ -117,7 +124,7 @@ class FileSyncer(object): logger.warning("Serial mismatch in probing archive: %s, " "object: '%s'" % (archive, objname)) return - live_state = client.probe_file(objname, db_state, ref_state) + live_state = client.probe_file(objname, db_state, ref_state, ident) if live_state is not None: self.update_file_state(live_state) @@ -348,12 +355,12 @@ class FileSyncer(object): return set(db.list_deciding(archives=archives, sync=self.SYNC)) - @transaction() def probe_archive(self, archive, forced=False): + ident = utils.time_stamp() client = self.clients[archive] candidates = client.list_candidate_files(forced=forced) - for objname in candidates: - self._probe_file(archive, objname) + self._probe_files(archive, candidates, ident) + client.remove_candidates(candidates, ident) def decide_archive(self, archive): for objname in self.list_deciding([archive]): diff --git a/agkyra/test.py b/agkyra/test.py index 9398b1d596dbd19dd078bb9a2fee3fff4cd18a9d..f913f201ee8952309a6b0f5faa98075a5b82fd8f 100644 --- a/agkyra/test.py +++ b/agkyra/test.py @@ -70,7 +70,7 @@ etag1 = r1['etag'] # check pithos state pithos_cands = master.get_pithos_candidates() -info1 = pithos_cands[f1] +info1 = pithos_cands[f1]["info"] assert etag1 == info1["pithos_etag"] db = s.get_db()