Commit d52d06d3 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

decide in batch and queue syncs

parent b109c272
......@@ -287,6 +287,7 @@ class AgkyraTest(unittest.TestCase):
# sync
self.s.decide_file_sync(f1)
self.s.launch_syncs()
dstate = self.db.get_state(self.s.DECISION, f1)
self.assertEqual(dstate.serial, 0)
self.assert_message(messaging.SyncMessage)
......@@ -323,6 +324,7 @@ class AgkyraTest(unittest.TestCase):
self.s.probe_file(self.s.SLAVE, fil)
self.assert_message(messaging.UpdateMessage)
self.s.decide_file_sync(fil)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.CollisionMessage)
self.assert_message(messaging.SyncErrorMessage)
......@@ -336,6 +338,7 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.UpdateMessage)
self.s.start_notifiers()
self.s.decide_file_sync(fil)
self.s.launch_syncs()
self.assert_message(messaging.FailedSyncIgnoreDecisionMessage)
self.assert_message(messaging.SyncMessage)
m = self.assert_message(messaging.ConflictStashMessage)
......@@ -373,6 +376,7 @@ class AgkyraTest(unittest.TestCase):
open(f_path, 'a').close()
self.s.probe_file(self.s.SLAVE, fil)
self.s.decide_file_sync(fil)
self.s.launch_syncs()
self.assert_message(messaging.UpdateMessage)
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
......@@ -386,6 +390,7 @@ class AgkyraTest(unittest.TestCase):
self.assertEqual(state.serial, 0)
self.assertEqual(state.info, {"localfs_type": "unhandled"})
self.s.decide_file_sync(ln)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
state = self.db.get_state(self.s.MASTER, ln)
......@@ -402,6 +407,7 @@ class AgkyraTest(unittest.TestCase):
state = self.db.get_state(self.s.MASTER, ln)
self.assertEqual(state.info["pithos_etag"], etag)
self.s.decide_file_sync(ln)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
m = self.assert_message(messaging.ConflictStashMessage)
stashed_ln = m.stash_name
......@@ -426,6 +432,7 @@ class AgkyraTest(unittest.TestCase):
open(f_path, 'a').close()
self.s.probe_file(self.s.SLAVE, fil)
self.s.decide_file_sync(fil)
self.s.launch_syncs()
self.assert_message(messaging.UpdateMessage)
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
......@@ -442,6 +449,7 @@ class AgkyraTest(unittest.TestCase):
# fails because file in place of dir
self.s.decide_file_sync(inner_fil)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.SyncErrorMessage)
......@@ -452,15 +460,18 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.UpdateMessage)
# also fails because file in place of dir
self.s.decide_file_sync(inner_dir)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.SyncErrorMessage)
# but if we fist sync the dir, it's ok
self.s.decide_file_sync(fil)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
self.assertTrue(os.path.isdir(f_path))
self.s.decide_file_sync(inner_fil)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
......@@ -471,6 +482,7 @@ class AgkyraTest(unittest.TestCase):
self.s.probe_file(self.s.SLAVE, fil)
self.assert_message(messaging.UpdateMessage)
self.s.decide_file_sync(fil)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.s.probe_file(self.s.SLAVE, fil)
self.assert_message(messaging.HeartbeatNoProbeMessage)
......@@ -486,6 +498,7 @@ class AgkyraTest(unittest.TestCase):
"agkyra.syncer.database.DB.begin") as dbmock:
dbmock.side_effect = sqlite3.OperationalError("locked")
self.s.decide_file_sync(fil)
self.s.launch_syncs()
def test_007_multiprobe(self):
fil = "φ007"
......@@ -514,6 +527,7 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.UpdateMessage)
# this will also make the dir
self.s.decide_file_sync(inner_fil)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
self.assertTrue(os.path.isdir(d_path))
......@@ -523,6 +537,7 @@ class AgkyraTest(unittest.TestCase):
slave_serial = m.serial
self.assertEqual(slave_serial, 1)
self.s.decide_file_sync(d)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
state = self.db.get_state(self.s.SLAVE, d)
......@@ -536,6 +551,7 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.UpdateMessage)
self.s.decide_file_sync(d)
self.s.decide_file_sync(inner_fil)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
......@@ -557,9 +573,11 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.UpdateMessage)
self.assert_message(messaging.UpdateMessage)
self.s.decide_file_sync(d)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
self.s.decide_file_sync(innerd)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
self.assertTrue(os.path.isdir(d_path))
......@@ -574,14 +592,17 @@ class AgkyraTest(unittest.TestCase):
# will fail because local dir is non-empty
self.s.decide_file_sync(d)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.SyncErrorMessage)
# but this is ok
self.s.decide_file_sync(innerd)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
self.s.decide_file_sync(d)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
......@@ -601,6 +622,7 @@ class AgkyraTest(unittest.TestCase):
f.write(f_content)
self.s.decide_file_sync(fil)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.LiveInfoUpdateMessage)
self.assert_message(messaging.AckSyncMessage)
......@@ -626,6 +648,7 @@ class AgkyraTest(unittest.TestCase):
new_etag = r1['etag']
self.s.decide_file_sync(fil)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.LiveInfoUpdateMessage)
self.assert_message(messaging.AckSyncMessage)
......@@ -676,6 +699,7 @@ class AgkyraTest(unittest.TestCase):
r = self.pithos.upload_from_string(fil, "new")
self.s.decide_file_sync(fil)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.CollisionMessage)
self.assert_message(messaging.SyncErrorMessage)
......@@ -688,6 +712,7 @@ class AgkyraTest(unittest.TestCase):
r = self.pithos.upload_from_string(d, "new")
self.s.decide_file_sync(d)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.CollisionMessage)
self.assert_message(messaging.SyncErrorMessage)
......@@ -698,6 +723,7 @@ class AgkyraTest(unittest.TestCase):
self.s.probe_file(self.s.SLAVE, d_synced)
self.assert_message(messaging.UpdateMessage)
self.s.decide_file_sync(d_synced)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
......@@ -707,6 +733,7 @@ class AgkyraTest(unittest.TestCase):
r = self.pithos.upload_from_string(d_synced, "new")
self.s.decide_file_sync(d_synced)
self.s.launch_syncs()
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.CollisionMessage)
self.assert_message(messaging.SyncErrorMessage)
......
......@@ -45,6 +45,7 @@ DEFAULT_ACTION_MAX_WAIT = 10
DEFAULT_PITHOS_LIST_INTERVAL = 5
DEFAULT_CONNECTION_RETRY_LIMIT = 3
INSTANCES_NAME = 'instances'
DEFAULT_MAX_ALIVE_SYNC_THREADS = 25
thread_local_data = threading.local()
......@@ -190,7 +191,8 @@ class SyncerSettings():
self.connection_retry_limit = kwargs.get(
"connection_retry_limit", DEFAULT_CONNECTION_RETRY_LIMIT)
self.endpoint.CONNECTION_RETRY_LIMIT = self.connection_retry_limit
self.max_alive_sync_threads = kwargs.get(
"max_alive_sync_threads", DEFAULT_MAX_ALIVE_SYNC_THREADS)
self.messager = Messager()
def create_local_dirs(self):
......
......@@ -17,6 +17,7 @@ import time
import threading
import logging
from collections import defaultdict
import Queue
from agkyra.syncer import common
from agkyra.syncer.setup import SyncerSettings
......@@ -47,6 +48,7 @@ class FileSyncer(object):
self.decide_thread = None
self.sync_threads = []
self.failed_serials = utils.ThreadSafeDict()
self.sync_queue = Queue.Queue()
self.messager = settings.messager
self.heartbeat = self.settings.heartbeat
......@@ -189,32 +191,43 @@ class FileSyncer(object):
ident = utils.time_stamp()
return self._do_decide_file_sync(db, objname, master, slave, ident, True)
def decide_file_sync(self, objname, master=None, slave=None):
def decide_file_syncs(self, objnames, master=None, slave=None):
if master is None:
master = self.MASTER
if slave is None:
slave = self.SLAVE
ident = utils.time_stamp()
syncs = []
try:
with TransactedConnection(self.syncer_dbtuple) as db:
states = self._decide_file_sync(db, objname, master, slave, ident)
for objname in objnames:
states = self._decide_file_sync(
db, objname, master, slave, ident)
if states is not None:
syncs.append(states)
except common.DatabaseError:
self.clean_heartbeat(objname, ident)
return
if states is None:
self.clean_heartbeat(objnames, ident)
return
self.sync_file(*states)
self.enqueue_syncs(syncs)
def clean_heartbeat(self, objname, ident=None):
def decide_file_sync(self, objname, master=None, slave=None):
if master is None:
master = self.MASTER
if slave is None:
slave = self.SLAVE
self.decide_file_syncs([objname], master, slave)
def clean_heartbeat(self, objnames, ident=None):
with self.heartbeat.lock() as hb:
beat = hb.pop(self.reg_name(objname), None)
if beat is None:
return
if ident and ident != beat["ident"]:
hb[self.reg_name(objname)] = beat
else:
logger.debug("cleaning heartbeat %s, object '%s'"
% (beat, objname))
for objname in objnames:
beat = hb.pop(self.reg_name(objname), None)
if beat is None:
return
if ident and ident != beat["ident"]:
hb[self.reg_name(objname)] = beat
else:
logger.debug("cleaning heartbeat %s, object '%s'"
% (beat, objname))
def _decide_file_sync(self, db, objname, master, slave, ident):
if not self.settings._sync_is_enabled(db):
......@@ -308,6 +321,26 @@ class FileSyncer(object):
serial=source_state.serial, info=source_state.info)
db.put_state(new_decision_state)
def enqueue_syncs(self, syncs):
for sync in syncs:
self.sync_queue.put(sync)
def launch_syncs(self):
with self.heartbeat.lock() as hb:
alive_threads = len([v for v in hb.values()
if v["thread"] is not None
and v["thread"].is_alive()])
max_alive_threads = self.settings.max_alive_sync_threads
new_threads = max_alive_threads - alive_threads
if new_threads > 0:
logger.info("Can start max %s syncs" % new_threads)
for i in range(new_threads):
try:
tpl = self.sync_queue.get(block=False)
self.sync_file(*tpl)
except Queue.Empty:
break
def sync_file(self, source_state, target_state, sync_state):
msg = messaging.SyncMessage(
objname=source_state.objname,
......@@ -352,7 +385,7 @@ class FileSyncer(object):
(serial, state.archive, objname))
with self.failed_serials.lock() as d:
d[(serial, objname)] = state
self.clean_heartbeat(objname)
self.clean_heartbeat([objname])
def update_state(self, db, old_state, new_state):
db.put_state(new_state)
......@@ -365,7 +398,7 @@ class FileSyncer(object):
serial = synced_source_state.serial
objname = synced_source_state.objname
target = synced_target_state.archive
self.clean_heartbeat(objname)
self.clean_heartbeat([objname])
msg = messaging.AckSyncMessage(
archive=target, objname=objname, serial=serial,
logger=logger)
......@@ -439,8 +472,9 @@ class FileSyncer(object):
def decide_archive(self, archive=None):
try:
archives = [archive] if archive is not None else None
for objname in self.list_deciding(archives):
self.decide_file_sync(objname)
objnames = self.list_deciding(archives)
self.decide_file_syncs(objnames)
self.launch_syncs()
except common.DatabaseError:
pass
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment