Commit ede6421e authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

probe all in single transaction BUT check probe_candidates on retry

parent eebea533
...@@ -532,7 +532,7 @@ class LocalfsFileClient(FileClient): ...@@ -532,7 +532,7 @@ class LocalfsFileClient(FileClient):
final_part = parts[-1] final_part = parts[-1]
return exclude_pattern.match(final_part) return exclude_pattern.match(final_part)
def start_probing_file(self, objname, old_state, ref_state, callback=None): def probe_file(self, objname, old_state, ref_state):
with self.probe_candidates.lock() as d: with self.probe_candidates.lock() as d:
cached_info = d.pop(objname, None) cached_info = d.pop(objname, None)
if self.exclude_file(objname): if self.exclude_file(objname):
...@@ -545,8 +545,7 @@ class LocalfsFileClient(FileClient): ...@@ -545,8 +545,7 @@ class LocalfsFileClient(FileClient):
if live_info is None: if live_info is None:
return return
live_state = old_state.set(info=live_info) live_state = old_state.set(info=live_info)
if callback is not None: return live_state
callback(live_state)
def stage_file(self, source_state): def stage_file(self, source_state):
return LocalfsSourceHandle(self.settings, source_state) return LocalfsSourceHandle(self.settings, source_state)
......
...@@ -340,7 +340,7 @@ class PithosFileClient(FileClient): ...@@ -340,7 +340,7 @@ class PithosFileClient(FileClient):
PITHOS_TYPE: p_type, PITHOS_TYPE: p_type,
} }
def start_probing_file(self, objname, old_state, ref_state, callback=None): def probe_file(self, objname, old_state, ref_state):
info = old_state.info info = old_state.info
with self.probe_candidates.lock() as d: with self.probe_candidates.lock() as d:
cached_info = d.pop(objname, None) cached_info = d.pop(objname, None)
...@@ -354,9 +354,8 @@ class PithosFileClient(FileClient): ...@@ -354,9 +354,8 @@ class PithosFileClient(FileClient):
else: else:
live_info = cached_info live_info = cached_info
if info != live_info: if info != live_info:
if callback is not None: live_state = old_state.set(info=live_info)
live_state = old_state.set(info=live_info) return live_state
callback(live_state)
def stage_file(self, source_state): def stage_file(self, source_state):
return PithosSourceHandle(self.settings, source_state) return PithosSourceHandle(self.settings, source_state)
......
...@@ -97,6 +97,9 @@ class FileSyncer(object): ...@@ -97,6 +97,9 @@ class FileSyncer(object):
@transaction() @transaction()
def probe_file(self, archive, objname): def probe_file(self, archive, objname):
return self._probe_file(archive, objname)
def _probe_file(self, archive, objname):
logger.info("Probing archive: %s, object: '%s'" % (archive, objname)) logger.info("Probing archive: %s, object: '%s'" % (archive, objname))
db = self.get_db() db = self.get_db()
client = self.clients[archive] client = self.clients[archive]
...@@ -114,10 +117,10 @@ class FileSyncer(object): ...@@ -114,10 +117,10 @@ class FileSyncer(object):
logger.warning("Serial mismatch in probing archive: %s, " logger.warning("Serial mismatch in probing archive: %s, "
"object: '%s'" % (archive, objname)) "object: '%s'" % (archive, objname))
return return
client.start_probing_file(objname, db_state, ref_state, live_state = client.probe_file(objname, db_state, ref_state)
callback=self.update_file_state) if live_state is not None:
self.update_file_state(live_state)
@transaction()
def update_file_state(self, live_state): def update_file_state(self, live_state):
db = self.get_db() db = self.get_db()
archive = live_state.archive archive = live_state.archive
...@@ -345,11 +348,12 @@ class FileSyncer(object): ...@@ -345,11 +348,12 @@ class FileSyncer(object):
return set(db.list_deciding(archives=archives, return set(db.list_deciding(archives=archives,
sync=self.SYNC)) sync=self.SYNC))
@transaction()
def probe_archive(self, archive, forced=False): def probe_archive(self, archive, forced=False):
client = self.clients[archive] client = self.clients[archive]
candidates = client.list_candidate_files(forced=forced) candidates = client.list_candidate_files(forced=forced)
for objname in candidates: for objname in candidates:
self.probe_file(archive, objname) self._probe_file(archive, objname)
def decide_archive(self, archive): def decide_archive(self, archive):
for objname in self.list_deciding([archive]): for objname in self.list_deciding([archive]):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment