Commit a19e0e9a authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Use heartbeat to block probe/decide

parent da54d3bb
......@@ -398,27 +398,9 @@ class LocalfsSourceHandle(object):
self.stage_filename = None
self.staged_path = None
self.heartbeat = settings.heartbeat
self.check_log()
if not self.isdir:
self.lock_file(self.fspath)
def check_log(self):
with self.heartbeat.lock() as hb:
prev_log = hb.get(self.objname)
logger.info("object: %s heartbeat: %s" %
(self.objname, prev_log))
if prev_log is not None:
actionstate, ts = prev_log
if actionstate != self.SIGNATURE or \
utils.younger_than(ts, 10):
raise common.HandledError(
"Action mismatch in %s: %s %s" %
(self.SIGNATURE, self.objname, prev_log))
logger.warning("Ignoring previous run in %s: %s %s" %
(self.SIGNATURE, self.objname, prev_log))
hb.set(self.objname, (self.SIGNATURE, utils.time_stamp()))
print "LOG", self.heartbeat._LOG
def get_synced_state(self):
return self.source_state
......@@ -444,12 +426,6 @@ class LocalfsSourceHandle(object):
def unstage_file(self):
self.do_unstage()
self.unregister_stage_name(self.stage_filename)
self.clear_log()
def clear_log(self):
with self.heartbeat.lock() as hb:
hb.delete(self.objname)
logger.info("DELETED %s" % self.objname)
def do_unstage(self):
if self.stage_filename is None:
......
......@@ -21,9 +21,9 @@ def heartbeat_event(settings, heartbeat, objname):
def set_log():
with heartbeat.lock() as hb:
client, prev_tstamp = hb.get(objname)
tpl = (client, utils.time_stamp())
hb.set(objname, tpl)
logger.debug("HEARTBEAT '%s' %s %s" % ((objname,) + tpl))
tstamp = utils.time_stamp()
hb.set(objname, tstamp)
logger.debug("HEARTBEAT '%s' %s" % (objname, tstamp))
def go():
interval = 0.2
......@@ -78,23 +78,6 @@ class PithosSourceHandle(object):
self.source_state = source_state
self.objname = source_state.objname
self.heartbeat = settings.heartbeat
self.check_log()
def check_log(self):
with self.heartbeat.lock() as hb:
prev_log = hb.get(self.objname)
logger.info("object: %s heartbeat: %s" %
(self.objname, prev_log))
if prev_log is not None:
actionstate, ts = prev_log
if actionstate != self.SIGNATURE or \
utils.younger_than(ts, self.settings.action_max_wait):
raise common.HandledError(
"Action mismatch in %s: %s %s" %
(self.SIGNATURE, self.objname, prev_log))
logger.warning("Ignoring previous run in %s: %s %s" %
(self.SIGNATURE, self.objname, prev_log))
hb.set(self.objname, (self.SIGNATURE, utils.time_stamp()))
@transaction()
def register_fetch_name(self, filename):
......@@ -146,12 +129,7 @@ class PithosSourceHandle(object):
return self.source_state
def unstage_file(self):
self.clear_log()
def clear_log(self):
with self.heartbeat.lock() as hb:
hb.delete(self.objname)
logger.info("DELETED %s" % self.objname)
pass
STAGED_FOR_DELETION_SUFFIX = ".pithos_staged_for_deletion"
exclude_staged_regex = ".*" + STAGED_FOR_DELETION_SUFFIX + "$"
......
......@@ -47,6 +47,7 @@ class FileSyncer(object):
self.sync_threads = []
self.failed_serials = common.LockedDict()
self.messager = settings.messager
self.heartbeat = self.settings.heartbeat
def thread_is_active(self, t):
return t and t.is_alive()
......@@ -114,6 +115,14 @@ class FileSyncer(object):
client = self.clients[archive]
db_state = db.get_state(archive, objname)
ref_state = db.get_state(self.SYNC, objname)
with self.heartbeat.lock() as hb:
beat = hb.get(objname)
if beat is not None:
if utils.younger_than(
beat, self.settings.action_max_wait):
logger.warning("Object '%s' already handled; "
"Probe aborted." % objname)
return
if db_state.serial != ref_state.serial:
logger.warning("Serial mismatch in probing archive: %s, "
"object: '%s'" % (archive, objname))
......@@ -164,6 +173,13 @@ class FileSyncer(object):
@transaction()
def _decide_file_sync(self, objname, master, slave):
states = self._decide_file_sync(objname, master, slave)
if states is not None:
with self.heartbeat.lock() as hb:
hb.set(objname, utils.time_stamp())
return states
def _do_decide_file_sync(self, objname, master, slave):
db = self.get_db()
logger.info("Deciding object: '%s'" % objname)
master_state = db.get_state(master, objname)
......@@ -175,6 +191,19 @@ class FileSyncer(object):
sync_serial = sync_state.serial
decision_serial = decision_state.serial
with self.heartbeat.lock() as hb:
prev_log = hb.get(objname)
logger.info("object: %s heartbeat: %s" %
(objname, prev_log))
if prev_log is not None:
if utils.younger_than(
prev_log, self.settings.action_max_wait):
logger.warning("Object '%s' already handled; aborting." %
objname)
return None
logger.warning("Ignoring previous run: %s %s" %
(objname, prev_log))
if decision_serial != sync_serial:
failed_sync = self.failed_serials.get((decision_serial, objname))
if failed_sync is None:
......@@ -252,6 +281,8 @@ class FileSyncer(object):
logger.warning(
"Marking failed serial %s for archive: %s, object: '%s'" %
(serial, state.archive, objname))
with self.heartbeat.lock() as hb:
hb.delete(objname)
self.failed_serials.put((serial, objname), state)
def update_state(self, old_state, new_state):
......@@ -265,6 +296,8 @@ class FileSyncer(object):
serial = synced_source_state.serial
objname = synced_source_state.objname
target = synced_target_state.archive
with self.heartbeat.lock() as hb:
hb.delete(objname)
msg = messaging.AckSyncMessage(
archive=target, objname=objname, serial=serial,
logger=logger)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment