diff --git a/agkyra/agkyra/syncer/pithos_client.py b/agkyra/agkyra/syncer/pithos_client.py index 297d360d9ebd466ca495cba34896d466c7948cb8..676eaad49bb70c20aa7619a062ca72e659ccf227 100644 --- a/agkyra/agkyra/syncer/pithos_client.py +++ b/agkyra/agkyra/syncer/pithos_client.py @@ -35,9 +35,12 @@ def heartbeat_event(settings, heartbeat, objname): def set_log(): with heartbeat.lock() as hb: - tstamp = utils.time_stamp() - hb.set(objname, tstamp) - logger.debug("HEARTBEAT '%s' %s" % (objname, tstamp)) + beat = hb.get(objname) + assert beat is not None + new_beat = {"ident": beat["ident"], + "tstamp": utils.time_stamp()} + hb.set(objname, new_beat) + logger.debug("HEARTBEAT '%s' %s" % (objname, new_beat)) def go(): interval = 0.2 diff --git a/agkyra/agkyra/syncer/syncer.py b/agkyra/agkyra/syncer/syncer.py index eb3f3211d8791267851f2b0d2fb5c76eae14ef17..99fb3b3dfead91c7e3eda0fba54f5507fe835ac7 100644 --- a/agkyra/agkyra/syncer/syncer.py +++ b/agkyra/agkyra/syncer/syncer.py @@ -123,7 +123,7 @@ class FileSyncer(object): beat = hb.get(objname) if beat is not None: if utils.younger_than( - beat, self.settings.action_max_wait): + beat["tstamp"], self.settings.action_max_wait): logger.warning("Object '%s' already handled; " "Probe aborted." % objname) return @@ -170,20 +170,22 @@ class FileSyncer(object): master = self.MASTER if slave is None: slave = self.SLAVE - states = self._decide_file_sync(objname, master, slave) + ident = utils.time_stamp() + states = self._decide_file_sync(objname, master, slave, ident) if states is None: return self.sync_file(*states) @transaction() - def _decide_file_sync(self, objname, master, slave): - states = self._do_decide_file_sync(objname, master, slave) + def _decide_file_sync(self, objname, master, slave, ident): + states = self._do_decide_file_sync(objname, master, slave, ident) if states is not None: with self.heartbeat.lock() as hb: - hb.set(objname, utils.time_stamp()) + beat = {"ident": ident, "tstamp": utils.time_stamp()} + hb.set(objname, beat) return states - def _do_decide_file_sync(self, objname, master, slave): + def _do_decide_file_sync(self, objname, master, slave, ident): db = self.get_db() logger.info("Deciding object: '%s'" % objname) master_state = db.get_state(master, objname) @@ -196,17 +198,21 @@ class FileSyncer(object): decision_serial = decision_state.serial with self.heartbeat.lock() as hb: - prev_log = hb.get(objname) + beat = hb.get(objname) logger.info("object: %s heartbeat: %s" % - (objname, prev_log)) - if prev_log is not None: - if utils.younger_than( - prev_log, self.settings.action_max_wait): - logger.warning("Object '%s' already handled; aborting." % - objname) - return None - logger.warning("Ignoring previous run: %s %s" % - (objname, prev_log)) + (objname, beat)) + if beat is not None: + if beat["ident"] == ident: + logger.info("Found heartbeat with current ident %s" + % ident) + else: + if utils.younger_than( + beat["tstamp"], self.settings.action_max_wait): + logger.warning("Object '%s' already handled; aborting." + % objname) + return None + logger.warning("Ignoring previous run: %s %s" % + (objname, beat)) if decision_serial != sync_serial: failed_sync = self.failed_serials.get((decision_serial, objname))