diff --git a/agkyra/scripts/test.py b/agkyra/scripts/test.py index 2e82f38c13e1a92073c53814c460defb96c02ca4..9f7377bf5c4516305ad2536064ebeeeb59d13268 100644 --- a/agkyra/scripts/test.py +++ b/agkyra/scripts/test.py @@ -287,6 +287,7 @@ class AgkyraTest(unittest.TestCase): # sync self.s.decide_file_sync(f1) + self.s.launch_syncs() dstate = self.db.get_state(self.s.DECISION, f1) self.assertEqual(dstate.serial, 0) self.assert_message(messaging.SyncMessage) @@ -323,6 +324,7 @@ class AgkyraTest(unittest.TestCase): self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.UpdateMessage) self.s.decide_file_sync(fil) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.SyncErrorMessage) @@ -336,6 +338,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.UpdateMessage) self.s.start_notifiers() self.s.decide_file_sync(fil) + self.s.launch_syncs() self.assert_message(messaging.FailedSyncIgnoreDecisionMessage) self.assert_message(messaging.SyncMessage) m = self.assert_message(messaging.ConflictStashMessage) @@ -373,6 +376,7 @@ class AgkyraTest(unittest.TestCase): open(f_path, 'a').close() self.s.probe_file(self.s.SLAVE, fil) self.s.decide_file_sync(fil) + self.s.launch_syncs() self.assert_message(messaging.UpdateMessage) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) @@ -386,6 +390,7 @@ class AgkyraTest(unittest.TestCase): self.assertEqual(state.serial, 0) self.assertEqual(state.info, {"localfs_type": "unhandled"}) self.s.decide_file_sync(ln) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) state = self.db.get_state(self.s.MASTER, ln) @@ -402,6 +407,7 @@ class AgkyraTest(unittest.TestCase): state = self.db.get_state(self.s.MASTER, ln) self.assertEqual(state.info["pithos_etag"], etag) self.s.decide_file_sync(ln) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) m = self.assert_message(messaging.ConflictStashMessage) stashed_ln = m.stash_name @@ -426,6 +432,7 @@ class AgkyraTest(unittest.TestCase): open(f_path, 'a').close() self.s.probe_file(self.s.SLAVE, fil) self.s.decide_file_sync(fil) + self.s.launch_syncs() self.assert_message(messaging.UpdateMessage) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) @@ -442,6 +449,7 @@ class AgkyraTest(unittest.TestCase): # fails because file in place of dir self.s.decide_file_sync(inner_fil) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.SyncErrorMessage) @@ -452,15 +460,18 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.UpdateMessage) # also fails because file in place of dir self.s.decide_file_sync(inner_dir) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.SyncErrorMessage) # but if we fist sync the dir, it's ok self.s.decide_file_sync(fil) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) self.assertTrue(os.path.isdir(f_path)) self.s.decide_file_sync(inner_fil) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) @@ -471,6 +482,7 @@ class AgkyraTest(unittest.TestCase): self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.UpdateMessage) self.s.decide_file_sync(fil) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.HeartbeatNoProbeMessage) @@ -486,6 +498,7 @@ class AgkyraTest(unittest.TestCase): "agkyra.syncer.database.DB.begin") as dbmock: dbmock.side_effect = sqlite3.OperationalError("locked") self.s.decide_file_sync(fil) + self.s.launch_syncs() def test_007_multiprobe(self): fil = "Ο007" @@ -514,6 +527,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.UpdateMessage) # this will also make the dir self.s.decide_file_sync(inner_fil) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) self.assertTrue(os.path.isdir(d_path)) @@ -523,6 +537,7 @@ class AgkyraTest(unittest.TestCase): slave_serial = m.serial self.assertEqual(slave_serial, 1) self.s.decide_file_sync(d) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) state = self.db.get_state(self.s.SLAVE, d) @@ -536,6 +551,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.UpdateMessage) self.s.decide_file_sync(d) self.s.decide_file_sync(inner_fil) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) @@ -557,9 +573,11 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.UpdateMessage) self.assert_message(messaging.UpdateMessage) self.s.decide_file_sync(d) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) self.s.decide_file_sync(innerd) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) self.assertTrue(os.path.isdir(d_path)) @@ -574,14 +592,17 @@ class AgkyraTest(unittest.TestCase): # will fail because local dir is non-empty self.s.decide_file_sync(d) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.SyncErrorMessage) # but this is ok self.s.decide_file_sync(innerd) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) self.s.decide_file_sync(d) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) @@ -601,6 +622,7 @@ class AgkyraTest(unittest.TestCase): f.write(f_content) self.s.decide_file_sync(fil) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.LiveInfoUpdateMessage) self.assert_message(messaging.AckSyncMessage) @@ -626,6 +648,7 @@ class AgkyraTest(unittest.TestCase): new_etag = r1['etag'] self.s.decide_file_sync(fil) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.LiveInfoUpdateMessage) self.assert_message(messaging.AckSyncMessage) @@ -676,6 +699,7 @@ class AgkyraTest(unittest.TestCase): r = self.pithos.upload_from_string(fil, "new") self.s.decide_file_sync(fil) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.SyncErrorMessage) @@ -688,6 +712,7 @@ class AgkyraTest(unittest.TestCase): r = self.pithos.upload_from_string(d, "new") self.s.decide_file_sync(d) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.SyncErrorMessage) @@ -698,6 +723,7 @@ class AgkyraTest(unittest.TestCase): self.s.probe_file(self.s.SLAVE, d_synced) self.assert_message(messaging.UpdateMessage) self.s.decide_file_sync(d_synced) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) @@ -707,6 +733,7 @@ class AgkyraTest(unittest.TestCase): r = self.pithos.upload_from_string(d_synced, "new") self.s.decide_file_sync(d_synced) + self.s.launch_syncs() self.assert_message(messaging.SyncMessage) self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.SyncErrorMessage) diff --git a/agkyra/syncer/setup.py b/agkyra/syncer/setup.py index 043cbff8b9ebc0246fd42e0c252904106f2d70de..e99061a733b569b9adb7a821bfe5e0222bbc7ab8 100644 --- a/agkyra/syncer/setup.py +++ b/agkyra/syncer/setup.py @@ -45,6 +45,7 @@ DEFAULT_ACTION_MAX_WAIT = 10 DEFAULT_PITHOS_LIST_INTERVAL = 5 DEFAULT_CONNECTION_RETRY_LIMIT = 3 INSTANCES_NAME = 'instances' +DEFAULT_MAX_ALIVE_SYNC_THREADS = 25 thread_local_data = threading.local() @@ -190,7 +191,8 @@ class SyncerSettings(): self.connection_retry_limit = kwargs.get( "connection_retry_limit", DEFAULT_CONNECTION_RETRY_LIMIT) self.endpoint.CONNECTION_RETRY_LIMIT = self.connection_retry_limit - + self.max_alive_sync_threads = kwargs.get( + "max_alive_sync_threads", DEFAULT_MAX_ALIVE_SYNC_THREADS) self.messager = Messager() def create_local_dirs(self): diff --git a/agkyra/syncer/syncer.py b/agkyra/syncer/syncer.py index 12a7fe9df5904ad12cefacbd64643dfdabc1a780..d2a936fef136cf730f1c6c3dab3a9838ffa91317 100644 --- a/agkyra/syncer/syncer.py +++ b/agkyra/syncer/syncer.py @@ -17,6 +17,7 @@ import time import threading import logging from collections import defaultdict +import Queue from agkyra.syncer import common from agkyra.syncer.setup import SyncerSettings @@ -47,6 +48,7 @@ class FileSyncer(object): self.decide_thread = None self.sync_threads = [] self.failed_serials = utils.ThreadSafeDict() + self.sync_queue = Queue.Queue() self.messager = settings.messager self.heartbeat = self.settings.heartbeat @@ -189,32 +191,43 @@ class FileSyncer(object): ident = utils.time_stamp() return self._do_decide_file_sync(db, objname, master, slave, ident, True) - def decide_file_sync(self, objname, master=None, slave=None): + def decide_file_syncs(self, objnames, master=None, slave=None): if master is None: master = self.MASTER if slave is None: slave = self.SLAVE ident = utils.time_stamp() + syncs = [] try: with TransactedConnection(self.syncer_dbtuple) as db: - states = self._decide_file_sync(db, objname, master, slave, ident) + for objname in objnames: + states = self._decide_file_sync( + db, objname, master, slave, ident) + if states is not None: + syncs.append(states) except common.DatabaseError: - self.clean_heartbeat(objname, ident) - return - if states is None: + self.clean_heartbeat(objnames, ident) return - self.sync_file(*states) + self.enqueue_syncs(syncs) - def clean_heartbeat(self, objname, ident=None): + def decide_file_sync(self, objname, master=None, slave=None): + if master is None: + master = self.MASTER + if slave is None: + slave = self.SLAVE + self.decide_file_syncs([objname], master, slave) + + def clean_heartbeat(self, objnames, ident=None): with self.heartbeat.lock() as hb: - beat = hb.pop(self.reg_name(objname), None) - if beat is None: - return - if ident and ident != beat["ident"]: - hb[self.reg_name(objname)] = beat - else: - logger.debug("cleaning heartbeat %s, object '%s'" - % (beat, objname)) + for objname in objnames: + beat = hb.pop(self.reg_name(objname), None) + if beat is None: + return + if ident and ident != beat["ident"]: + hb[self.reg_name(objname)] = beat + else: + logger.debug("cleaning heartbeat %s, object '%s'" + % (beat, objname)) def _decide_file_sync(self, db, objname, master, slave, ident): if not self.settings._sync_is_enabled(db): @@ -308,6 +321,26 @@ class FileSyncer(object): serial=source_state.serial, info=source_state.info) db.put_state(new_decision_state) + def enqueue_syncs(self, syncs): + for sync in syncs: + self.sync_queue.put(sync) + + def launch_syncs(self): + with self.heartbeat.lock() as hb: + alive_threads = len([v for v in hb.values() + if v["thread"] is not None + and v["thread"].is_alive()]) + max_alive_threads = self.settings.max_alive_sync_threads + new_threads = max_alive_threads - alive_threads + if new_threads > 0: + logger.info("Can start max %s syncs" % new_threads) + for i in range(new_threads): + try: + tpl = self.sync_queue.get(block=False) + self.sync_file(*tpl) + except Queue.Empty: + break + def sync_file(self, source_state, target_state, sync_state): msg = messaging.SyncMessage( objname=source_state.objname, @@ -352,7 +385,7 @@ class FileSyncer(object): (serial, state.archive, objname)) with self.failed_serials.lock() as d: d[(serial, objname)] = state - self.clean_heartbeat(objname) + self.clean_heartbeat([objname]) def update_state(self, db, old_state, new_state): db.put_state(new_state) @@ -365,7 +398,7 @@ class FileSyncer(object): serial = synced_source_state.serial objname = synced_source_state.objname target = synced_target_state.archive - self.clean_heartbeat(objname) + self.clean_heartbeat([objname]) msg = messaging.AckSyncMessage( archive=target, objname=objname, serial=serial, logger=logger) @@ -439,8 +472,9 @@ class FileSyncer(object): def decide_archive(self, archive=None): try: archives = [archive] if archive is not None else None - for objname in self.list_deciding(archives): - self.decide_file_sync(objname) + objnames = self.list_deciding(archives) + self.decide_file_syncs(objnames) + self.launch_syncs() except common.DatabaseError: pass