diff --git a/agkyra/agkyra/syncer/localfs_client.py b/agkyra/agkyra/syncer/localfs_client.py index 352d4340e185ea672080e02264ee279b17f010fd..f5f95ff6b1ee51523ab1dbf2d1df2f051c927159 100644 --- a/agkyra/agkyra/syncer/localfs_client.py +++ b/agkyra/agkyra/syncer/localfs_client.py @@ -480,33 +480,38 @@ class LocalfsFileClient(FileClient): def list_candidate_files(self): db = self.get_db() - candidates = [] + candidates = {} for dirpath, dirnames, files in os.walk(self.ROOTPATH): rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH) logger.debug("'%s' '%s'" % (dirpath, rel_dirpath)) # if self.exclude_dir_exp.match(dirpath): # continue if rel_dirpath != '.': - candidates.append(rel_dirpath) + candidates[rel_dirpath] = None for filename in files: # if self.exclude_files_exp.match(filename) or \ # self.exclude_dir_exp.match(filename): # continue local_filename = utils.join_path(rel_dirpath, filename) - candidates.append(local_filename) + candidates[local_filename] = None - db_names = set(db.list_files(self.NAME)) - return db_names.union(candidates) + db_cands = dict((name, None) for name in db.list_files(self.NAME)) + candidates.update(db_cands) + logger.info("Candidates: %s" % candidates) + return candidates def _local_path_changes(self, path, state): local_path = utils.join_path(self.ROOTPATH, path) return local_path_changes(local_path, state) - def start_probing_path(self, path, old_state, ref_state, callback=None): + def start_probing_path(self, path, old_state, ref_state, + assumed_info=None, + callback=None): if old_state.serial != ref_state.serial: logger.warning("Serial mismatch in probing path '%s'" % path) return - live_info = self._local_path_changes(path, old_state) + live_info = (self._local_path_changes(path, old_state) + if assumed_info is None else assumed_info) if live_info is None: return live_state = old_state.set(info=live_info) diff --git a/agkyra/agkyra/syncer/pithos_client.py b/agkyra/agkyra/syncer/pithos_client.py index 4692ba5182394c86c361235cd1dc298da3a9759f..effc9390ab98e4dfd834391be6f40ff0159d9961 100644 --- a/agkyra/agkyra/syncer/pithos_client.py +++ b/agkyra/agkyra/syncer/pithos_client.py @@ -246,18 +246,24 @@ class PithosFileClient(FileClient): db = self.get_db() objects = self.endpoint.list_objects() self.objects = objects - upstream_all_names = set(obj["name"] for obj in objects) - non_deleted_in_db = set(db.list_non_deleted_files(self.NAME)) - newly_deleted = non_deleted_in_db.difference(upstream_all_names) - logger.debug("newly_deleted %s" % newly_deleted) + upstream_all = dict( + (obj["name"], self.get_object_live_info(obj)) + for obj in objects) + upstream_all_names = set(upstream_all.keys()) if last_modified is not None: - upstream_modified_names = set( - obj["name"] for obj in objects - if obj["last_modified"] > last_modified) - upstream_names = upstream_modified_names + upstream_modified_names = dict( + (k, v) for (k, v) in upstream_all.iteritems() + if v["last_modified"] > last_modified) + candidates = upstream_modified_names else: - upstream_names = upstream_all_names - candidates = upstream_names.union(newly_deleted) + candidates = upstream_all + + non_deleted_in_db = set(db.list_non_deleted_files(self.NAME)) + newly_deleted_names = non_deleted_in_db.difference(upstream_all_names) + logger.debug("newly_deleted %s" % newly_deleted_names) + newly_deleted = dict((name, {}) for name in newly_deleted_names) + + candidates.update(newly_deleted) logger.info("Candidates: %s" % candidates) return candidates @@ -271,9 +277,8 @@ class PithosFileClient(FileClient): last_modified = last_tstamp.isoformat() candidates = self.list_candidate_files( last_modified=last_modified) - if callback is not None: - for candidate in candidates: - callback(self.NAME, candidate) + for (path, info) in candidates: + callback(self.NAME, path, assumed_info=info) time.sleep(interval) poll = PollPithos() @@ -301,19 +306,26 @@ class PithosFileClient(FileClient): if obj is None: return {} p_type = common.T_DIR if object_isdir(obj) else common.T_FILE - obj_hash = obj["x-object-hash"] + obj_hash = obj.get("x-object-hash") + if obj_hash is None: + obj_hash = obj.get("x_object_hash") return {PITHOS_ETAG: obj_hash, PITHOS_TYPE: p_type, } - def start_probing_path(self, path, old_state, ref_state, callback=None): + def start_probing_path(self, path, old_state, ref_state, + assumed_info=None, + callback=None): if exclude_pattern.match(path): logger.warning("Ignoring probe archive: %s, path: '%s'" % (old_state.archive, path)) return info = old_state.info - obj = self.get_object(path) - live_info = self.get_object_live_info(obj) + if assumed_info is None: + obj = self.get_object(path) + live_info = self.get_object_live_info(obj) + else: + live_info = assumed_info if info != live_info: if callback is not None: live_state = old_state.set(info=live_info) diff --git a/agkyra/agkyra/syncer/syncer.py b/agkyra/agkyra/syncer/syncer.py index b375472a59be10dd220df21091f9d19c7c825eca..bda45a6f1806125d5110c7d5c19468f9004d1be8 100644 --- a/agkyra/agkyra/syncer/syncer.py +++ b/agkyra/agkyra/syncer/syncer.py @@ -77,7 +77,7 @@ class FileSyncer(object): return exclude_pattern.match(final_part) @transaction() - def probe_path(self, archive, path): + def probe_path(self, archive, path, assumed_info=None): if self.exclude_path(path): logger.warning("Ignoring probe archive: %s, path: %s" % (archive, path)) @@ -92,6 +92,7 @@ class FileSyncer(object): % (archive, path)) return client.start_probing_path(path, db_state, ref_state, + assumed_info=assumed_info, callback=self.update_path) @transaction() @@ -280,8 +281,9 @@ class FileSyncer(object): def probe_archive(self, archive): client = self.clients[archive] - for path in client.list_candidate_files(): - self.probe_path(archive, path) + candidates = client.list_candidate_files() + for (path, info) in candidates.iteritems(): + self.probe_path(archive, path, assumed_info=info) def decide_archive(self, archive): for path in self.list_deciding([archive]):