Commit 5d08a159 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Precompute info for candidate files

parent fa8322ce
......@@ -480,33 +480,38 @@ class LocalfsFileClient(FileClient):
def list_candidate_files(self):
db = self.get_db()
candidates = []
candidates = {}
for dirpath, dirnames, files in os.walk(self.ROOTPATH):
rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH)
logger.debug("'%s' '%s'" % (dirpath, rel_dirpath))
# if self.exclude_dir_exp.match(dirpath):
# continue
if rel_dirpath != '.':
candidates.append(rel_dirpath)
candidates[rel_dirpath] = None
for filename in files:
# if self.exclude_files_exp.match(filename) or \
# self.exclude_dir_exp.match(filename):
# continue
local_filename = utils.join_path(rel_dirpath, filename)
candidates.append(local_filename)
candidates[local_filename] = None
db_names = set(db.list_files(self.NAME))
return db_names.union(candidates)
db_cands = dict((name, None) for name in db.list_files(self.NAME))
candidates.update(db_cands)
logger.info("Candidates: %s" % candidates)
return candidates
def _local_path_changes(self, path, state):
local_path = utils.join_path(self.ROOTPATH, path)
return local_path_changes(local_path, state)
def start_probing_path(self, path, old_state, ref_state, callback=None):
def start_probing_path(self, path, old_state, ref_state,
assumed_info=None,
callback=None):
if old_state.serial != ref_state.serial:
logger.warning("Serial mismatch in probing path '%s'" % path)
return
live_info = self._local_path_changes(path, old_state)
live_info = (self._local_path_changes(path, old_state)
if assumed_info is None else assumed_info)
if live_info is None:
return
live_state = old_state.set(info=live_info)
......
......@@ -246,18 +246,24 @@ class PithosFileClient(FileClient):
db = self.get_db()
objects = self.endpoint.list_objects()
self.objects = objects
upstream_all_names = set(obj["name"] for obj in objects)
non_deleted_in_db = set(db.list_non_deleted_files(self.NAME))
newly_deleted = non_deleted_in_db.difference(upstream_all_names)
logger.debug("newly_deleted %s" % newly_deleted)
upstream_all = dict(
(obj["name"], self.get_object_live_info(obj))
for obj in objects)
upstream_all_names = set(upstream_all.keys())
if last_modified is not None:
upstream_modified_names = set(
obj["name"] for obj in objects
if obj["last_modified"] > last_modified)
upstream_names = upstream_modified_names
upstream_modified_names = dict(
(k, v) for (k, v) in upstream_all.iteritems()
if v["last_modified"] > last_modified)
candidates = upstream_modified_names
else:
upstream_names = upstream_all_names
candidates = upstream_names.union(newly_deleted)
candidates = upstream_all
non_deleted_in_db = set(db.list_non_deleted_files(self.NAME))
newly_deleted_names = non_deleted_in_db.difference(upstream_all_names)
logger.debug("newly_deleted %s" % newly_deleted_names)
newly_deleted = dict((name, {}) for name in newly_deleted_names)
candidates.update(newly_deleted)
logger.info("Candidates: %s" % candidates)
return candidates
......@@ -271,9 +277,8 @@ class PithosFileClient(FileClient):
last_modified = last_tstamp.isoformat()
candidates = self.list_candidate_files(
last_modified=last_modified)
if callback is not None:
for candidate in candidates:
callback(self.NAME, candidate)
for (path, info) in candidates:
callback(self.NAME, path, assumed_info=info)
time.sleep(interval)
poll = PollPithos()
......@@ -301,19 +306,26 @@ class PithosFileClient(FileClient):
if obj is None:
return {}
p_type = common.T_DIR if object_isdir(obj) else common.T_FILE
obj_hash = obj["x-object-hash"]
obj_hash = obj.get("x-object-hash")
if obj_hash is None:
obj_hash = obj.get("x_object_hash")
return {PITHOS_ETAG: obj_hash,
PITHOS_TYPE: p_type,
}
def start_probing_path(self, path, old_state, ref_state, callback=None):
def start_probing_path(self, path, old_state, ref_state,
assumed_info=None,
callback=None):
if exclude_pattern.match(path):
logger.warning("Ignoring probe archive: %s, path: '%s'" %
(old_state.archive, path))
return
info = old_state.info
obj = self.get_object(path)
live_info = self.get_object_live_info(obj)
if assumed_info is None:
obj = self.get_object(path)
live_info = self.get_object_live_info(obj)
else:
live_info = assumed_info
if info != live_info:
if callback is not None:
live_state = old_state.set(info=live_info)
......
......@@ -77,7 +77,7 @@ class FileSyncer(object):
return exclude_pattern.match(final_part)
@transaction()
def probe_path(self, archive, path):
def probe_path(self, archive, path, assumed_info=None):
if self.exclude_path(path):
logger.warning("Ignoring probe archive: %s, path: %s" %
(archive, path))
......@@ -92,6 +92,7 @@ class FileSyncer(object):
% (archive, path))
return
client.start_probing_path(path, db_state, ref_state,
assumed_info=assumed_info,
callback=self.update_path)
@transaction()
......@@ -280,8 +281,9 @@ class FileSyncer(object):
def probe_archive(self, archive):
client = self.clients[archive]
for path in client.list_candidate_files():
self.probe_path(archive, path)
candidates = client.list_candidate_files()
for (path, info) in candidates.iteritems():
self.probe_path(archive, path, assumed_info=info)
def decide_archive(self, archive):
for path in self.list_deciding([archive]):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment