diff --git a/agkyra/agkyra/syncer/localfs_client.py b/agkyra/agkyra/syncer/localfs_client.py index 2ad1f4fd67afc7ca7b54f292c27d6f6241594655..5696244f600b1444235465dd388576bd5fa9dd07 100644 --- a/agkyra/agkyra/syncer/localfs_client.py +++ b/agkyra/agkyra/syncer/localfs_client.py @@ -398,27 +398,9 @@ class LocalfsSourceHandle(object): self.stage_filename = None self.staged_path = None self.heartbeat = settings.heartbeat - self.check_log() if not self.isdir: self.lock_file(self.fspath) - def check_log(self): - with self.heartbeat.lock() as hb: - prev_log = hb.get(self.objname) - logger.info("object: %s heartbeat: %s" % - (self.objname, prev_log)) - if prev_log is not None: - actionstate, ts = prev_log - if actionstate != self.SIGNATURE or \ - utils.younger_than(ts, 10): - raise common.HandledError( - "Action mismatch in %s: %s %s" % - (self.SIGNATURE, self.objname, prev_log)) - logger.warning("Ignoring previous run in %s: %s %s" % - (self.SIGNATURE, self.objname, prev_log)) - hb.set(self.objname, (self.SIGNATURE, utils.time_stamp())) - print "LOG", self.heartbeat._LOG - def get_synced_state(self): return self.source_state @@ -444,12 +426,6 @@ class LocalfsSourceHandle(object): def unstage_file(self): self.do_unstage() self.unregister_stage_name(self.stage_filename) - self.clear_log() - - def clear_log(self): - with self.heartbeat.lock() as hb: - hb.delete(self.objname) - logger.info("DELETED %s" % self.objname) def do_unstage(self): if self.stage_filename is None: diff --git a/agkyra/agkyra/syncer/pithos_client.py b/agkyra/agkyra/syncer/pithos_client.py index 8f5bd8b83decba6d1f79b05c1049d8bbb0993af1..da312eff02d7d034a584cf7912d1a41db7cc0247 100644 --- a/agkyra/agkyra/syncer/pithos_client.py +++ b/agkyra/agkyra/syncer/pithos_client.py @@ -21,9 +21,9 @@ def heartbeat_event(settings, heartbeat, objname): def set_log(): with heartbeat.lock() as hb: client, prev_tstamp = hb.get(objname) - tpl = (client, utils.time_stamp()) - hb.set(objname, tpl) - logger.debug("HEARTBEAT '%s' %s %s" % ((objname,) + tpl)) + tstamp = utils.time_stamp() + hb.set(objname, tstamp) + logger.debug("HEARTBEAT '%s' %s" % (objname, tstamp)) def go(): interval = 0.2 @@ -78,23 +78,6 @@ class PithosSourceHandle(object): self.source_state = source_state self.objname = source_state.objname self.heartbeat = settings.heartbeat - self.check_log() - - def check_log(self): - with self.heartbeat.lock() as hb: - prev_log = hb.get(self.objname) - logger.info("object: %s heartbeat: %s" % - (self.objname, prev_log)) - if prev_log is not None: - actionstate, ts = prev_log - if actionstate != self.SIGNATURE or \ - utils.younger_than(ts, self.settings.action_max_wait): - raise common.HandledError( - "Action mismatch in %s: %s %s" % - (self.SIGNATURE, self.objname, prev_log)) - logger.warning("Ignoring previous run in %s: %s %s" % - (self.SIGNATURE, self.objname, prev_log)) - hb.set(self.objname, (self.SIGNATURE, utils.time_stamp())) @transaction() def register_fetch_name(self, filename): @@ -146,12 +129,7 @@ class PithosSourceHandle(object): return self.source_state def unstage_file(self): - self.clear_log() - - def clear_log(self): - with self.heartbeat.lock() as hb: - hb.delete(self.objname) - logger.info("DELETED %s" % self.objname) + pass STAGED_FOR_DELETION_SUFFIX = ".pithos_staged_for_deletion" exclude_staged_regex = ".*" + STAGED_FOR_DELETION_SUFFIX + "$" diff --git a/agkyra/agkyra/syncer/syncer.py b/agkyra/agkyra/syncer/syncer.py index faccab0670bba4742a25f91844100b3991150fc4..82efedf5d44d5dc9d6b2681b4d3429ba44991717 100644 --- a/agkyra/agkyra/syncer/syncer.py +++ b/agkyra/agkyra/syncer/syncer.py @@ -47,6 +47,7 @@ class FileSyncer(object): self.sync_threads = [] self.failed_serials = common.LockedDict() self.messager = settings.messager + self.heartbeat = self.settings.heartbeat def thread_is_active(self, t): return t and t.is_alive() @@ -114,6 +115,14 @@ class FileSyncer(object): client = self.clients[archive] db_state = db.get_state(archive, objname) ref_state = db.get_state(self.SYNC, objname) + with self.heartbeat.lock() as hb: + beat = hb.get(objname) + if beat is not None: + if utils.younger_than( + beat, self.settings.action_max_wait): + logger.warning("Object '%s' already handled; " + "Probe aborted." % objname) + return if db_state.serial != ref_state.serial: logger.warning("Serial mismatch in probing archive: %s, " "object: '%s'" % (archive, objname)) @@ -164,6 +173,13 @@ class FileSyncer(object): @transaction() def _decide_file_sync(self, objname, master, slave): + states = self._decide_file_sync(objname, master, slave) + if states is not None: + with self.heartbeat.lock() as hb: + hb.set(objname, utils.time_stamp()) + return states + + def _do_decide_file_sync(self, objname, master, slave): db = self.get_db() logger.info("Deciding object: '%s'" % objname) master_state = db.get_state(master, objname) @@ -175,6 +191,19 @@ class FileSyncer(object): sync_serial = sync_state.serial decision_serial = decision_state.serial + with self.heartbeat.lock() as hb: + prev_log = hb.get(objname) + logger.info("object: %s heartbeat: %s" % + (objname, prev_log)) + if prev_log is not None: + if utils.younger_than( + prev_log, self.settings.action_max_wait): + logger.warning("Object '%s' already handled; aborting." % + objname) + return None + logger.warning("Ignoring previous run: %s %s" % + (objname, prev_log)) + if decision_serial != sync_serial: failed_sync = self.failed_serials.get((decision_serial, objname)) if failed_sync is None: @@ -252,6 +281,8 @@ class FileSyncer(object): logger.warning( "Marking failed serial %s for archive: %s, object: '%s'" % (serial, state.archive, objname)) + with self.heartbeat.lock() as hb: + hb.delete(objname) self.failed_serials.put((serial, objname), state) def update_state(self, old_state, new_state): @@ -265,6 +296,8 @@ class FileSyncer(object): serial = synced_source_state.serial objname = synced_source_state.objname target = synced_target_state.archive + with self.heartbeat.lock() as hb: + hb.delete(objname) msg = messaging.AckSyncMessage( archive=target, objname=objname, serial=serial, logger=logger)