Commit 9ae26f2c authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

make probe_candidates retryable

parent ede6421e
......@@ -492,6 +492,16 @@ class LocalfsFileClient(FileClient):
self.get_db = settings.get_db
self.probe_candidates = utils.ThreadSafeDict()
def remove_candidates(self, objnames, ident):
with self.probe_candidates.lock() as d:
for objname in objnames:
try:
cached = d.pop(objname)
if cached["ident"] != ident:
d[objname] = cached
except KeyError:
pass
def list_candidate_files(self, forced=False):
with self.probe_candidates.lock() as d:
if forced:
......@@ -499,6 +509,9 @@ class LocalfsFileClient(FileClient):
d.update(candidates)
return d.keys()
def none_info(self):
return {"ident": None, "info": None}
def walk_filesystem(self):
db = self.get_db()
candidates = {}
......@@ -506,16 +519,18 @@ class LocalfsFileClient(FileClient):
rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH)
logger.debug("'%s' '%s'" % (dirpath, rel_dirpath))
if rel_dirpath != '.':
candidates[utils.to_standard_sep(rel_dirpath)] = None
objname = utils.to_standard_sep(rel_dirpath)
candidates[objname] = self.none_info()
for filename in files:
if rel_dirpath == '.':
prefix = ""
else:
prefix = utils.to_standard_sep(rel_dirpath)
objname = utils.join_objname(prefix, filename)
candidates[objname] = None
candidates[objname] = self.none_info()
db_cands = dict((name, None) for name in db.list_files(self.SIGNATURE))
db_cands = dict((name, self.none_info())
for name in db.list_files(self.SIGNATURE))
candidates.update(db_cands)
logger.info("Candidates: %s" % candidates)
return candidates
......@@ -532,9 +547,15 @@ class LocalfsFileClient(FileClient):
final_part = parts[-1]
return exclude_pattern.match(final_part)
def probe_file(self, objname, old_state, ref_state):
def probe_file(self, objname, old_state, ref_state, ident):
with self.probe_candidates.lock() as d:
cached_info = d.pop(objname, None)
try:
cached = d[objname]
cached_info = cached["info"]
cached["ident"] = ident
except KeyError:
cached_info = None
if self.exclude_file(objname):
logger.warning("Ignoring probe archive: %s, object: %s" %
(old_state.archive, objname))
......@@ -558,7 +579,7 @@ class LocalfsFileClient(FileClient):
rel_path = os.path.relpath(path, start=self.ROOTPATH)
objname = utils.to_standard_sep(rel_path)
with self.probe_candidates.lock() as d:
d[objname] = None
d[objname] = self.none_info()
class EventHandler(FileSystemEventHandler):
def on_created(this, event):
......
......@@ -267,6 +267,16 @@ class PithosFileClient(FileClient):
self.last_modification = "0000-00-00"
self.probe_candidates = utils.ThreadSafeDict()
def remove_candidates(self, objnames, ident):
with self.probe_candidates.lock() as d:
for objname in objnames:
try:
cached = d.pop(objname)
if cached["ident"] != ident:
d[objname] = cached
except KeyError:
pass
def list_candidate_files(self, forced=False):
with self.probe_candidates.lock() as d:
if forced:
......@@ -285,7 +295,10 @@ class PithosFileClient(FileClient):
upstream_all = {}
for obj in objects:
name = obj["name"]
upstream_all[name] = self.get_object_live_info(obj)
upstream_all[name] = {
"ident": None,
"info": self.get_object_live_info(obj)
}
obj_last_modified = obj["last_modified"]
if obj_last_modified > self.last_modification:
self.last_modification = obj_last_modified
......@@ -303,7 +316,8 @@ class PithosFileClient(FileClient):
non_deleted_in_db = set(db.list_non_deleted_files(self.SIGNATURE))
newly_deleted_names = non_deleted_in_db.difference(upstream_all_names)
logger.debug("newly_deleted %s" % newly_deleted_names)
newly_deleted = dict((name, {}) for name in newly_deleted_names)
newly_deleted = dict((name, {"ident": None, "info": {}})
for name in newly_deleted_names)
candidates.update(newly_deleted)
logger.info("Candidates since %s: %s" %
......@@ -340,10 +354,15 @@ class PithosFileClient(FileClient):
PITHOS_TYPE: p_type,
}
def probe_file(self, objname, old_state, ref_state):
def probe_file(self, objname, old_state, ref_state, ident):
info = old_state.info
with self.probe_candidates.lock() as d:
cached_info = d.pop(objname, None)
try:
cached = d[objname]
cached_info = cached["info"]
cached["ident"] = ident
except KeyError:
cached_info = None
if exclude_pattern.match(objname):
logger.warning("Ignoring probe archive: %s, object: '%s'" %
(old_state.archive, objname))
......
......@@ -95,11 +95,18 @@ class FileSyncer(object):
def get_next_message(self, block=False):
return self.messager.get(block=block)
@transaction()
def probe_file(self, archive, objname):
return self._probe_file(archive, objname)
ident = utils.time_stamp()
self._probe_files(archive, [objname], ident)
client = self.clients[archive]
client.remove_candidates([objname], ident)
def _probe_file(self, archive, objname):
@transaction()
def _probe_files(self, archive, objnames, ident):
for objname in objnames:
self._do_probe_file(archive, objname, ident)
def _do_probe_file(self, archive, objname, ident):
logger.info("Probing archive: %s, object: '%s'" % (archive, objname))
db = self.get_db()
client = self.clients[archive]
......@@ -117,7 +124,7 @@ class FileSyncer(object):
logger.warning("Serial mismatch in probing archive: %s, "
"object: '%s'" % (archive, objname))
return
live_state = client.probe_file(objname, db_state, ref_state)
live_state = client.probe_file(objname, db_state, ref_state, ident)
if live_state is not None:
self.update_file_state(live_state)
......@@ -348,12 +355,12 @@ class FileSyncer(object):
return set(db.list_deciding(archives=archives,
sync=self.SYNC))
@transaction()
def probe_archive(self, archive, forced=False):
ident = utils.time_stamp()
client = self.clients[archive]
candidates = client.list_candidate_files(forced=forced)
for objname in candidates:
self._probe_file(archive, objname)
self._probe_files(archive, candidates, ident)
client.remove_candidates(candidates, ident)
def decide_archive(self, archive):
for objname in self.list_deciding([archive]):
......
......@@ -70,7 +70,7 @@ etag1 = r1['etag']
# check pithos state
pithos_cands = master.get_pithos_candidates()
info1 = pithos_cands[f1]
info1 = pithos_cands[f1]["info"]
assert etag1 == info1["pithos_etag"]
db = s.get_db()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment