diff --git a/agkyra/test.py b/agkyra/test.py index c83e3e4a8d04c005b2f5e18d9d3368e5a03d4ee4..cd7f4e89d1e110b0e3978f3699cf5a92f6e32735 100644 --- a/agkyra/test.py +++ b/agkyra/test.py @@ -19,9 +19,19 @@ from agkyra.syncer.pithos_client import PithosFileClient from agkyra.syncer.syncer import FileSyncer from agkyra.syncer import messaging, utils import random +import os +import time from agkyra.config import AgkyraConfig, CONFIG_PATH +import logging +logger = logging.getLogger('agkyra') +handler = logging.StreamHandler() +formatter = logging.Formatter("%(levelname)s:%(asctime)s:%(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) +logger.setLevel(logging.INFO) + cnf = AgkyraConfig() cloud_conf = cnf.get('cloud', 'test') if cloud_conf is None: @@ -51,92 +61,139 @@ s = FileSyncer(settings, master, slave) pithos = master.endpoint pithos.create_container(ID) +# initial upload to pithos f1 = "f1" -content1 = "content1" +f1_content1 = "content1" r1 = pithos.upload_from_string( - f1, content1) + f1, f1_content1) etag1 = r1['etag'] - +# check pithos state pithos_cands = master.get_pithos_candidates() -info = pithos_cands[f1] -assert etag1 == info["pithos_etag"] +info1 = pithos_cands[f1] +assert etag1 == info1["pithos_etag"] db = s.get_db() state = db.get_state(master.SIGNATURE, f1) assert state.serial == -1 assert state.info == {} +# probe pithos s.probe_file(master.SIGNATURE, f1) m = s.get_next_message(block=True) assert isinstance(m, messaging.UpdateMessage) state = db.get_state(master.SIGNATURE, f1) assert state.serial == 0 -assert state.info == info +assert state.info == info1 deciding = s.list_deciding() assert deciding == set([f1]) +# get local state state = db.get_state(slave.SIGNATURE, f1) assert state.serial == -1 assert state.info == {} +# sync s.decide_file_sync(f1) m = s.get_next_message(block=True) assert isinstance(m, messaging.SyncMessage) +# check local synced file m = s.get_next_message(block=True) assert isinstance(m, messaging.AckSyncMessage) state = db.get_state(slave.SIGNATURE, f1) assert state.serial == 0 info = state.info -assert info['localfs_size'] == len(content1) +assert info['localfs_size'] == len(f1_content1) -local_path = LOCAL_ROOT_PATH + '/' + f1 -assert utils.hash_file(local_path) == utils.hash_string(content1) +f1_path = os.path.join(LOCAL_ROOT_PATH, f1) +assert utils.hash_file(f1_path) == utils.hash_string(f1_content1) -def write_local(): - content2 = "content2" - with open(local_path, "w") as f: - f.write(content2) +# update local file +f1_content2 = "content22222" +with open(f1_path, "w") as f: + f.write(f1_content2) -def write_upstream(): - content3 = "content3" - r3 = pithos.upload_from_string( - f1, content3) - etag3 = r1['etag'] +# update upstream +f1_content3 = "content33" +r3 = pithos.upload_from_string( + f1, f1_content3) +etag3 = r1['etag'] +# cause a conflict +assert s.get_next_message() is None +s.probe_file(master.SIGNATURE, f1) +s.probe_file(slave.SIGNATURE, f1) +s.decide_file_sync(f1) -def func(): - write_upstream() - write_local() - assert s.get_next_message() is None - s.initiate_probe() - s.start_decide() +m = s.get_next_message(block=True) +print m +assert isinstance(m, messaging.UpdateMessage) +assert m.archive == master.SIGNATURE - m = s.get_next_message(block=True) - print m - assert isinstance(m, messaging.UpdateMessage) - assert m.archive == master.SIGNATURE +m = s.get_next_message(block=True) +print m +assert isinstance(m, messaging.UpdateMessage) +assert m.archive == slave.SIGNATURE - m = s.get_next_message(block=True) - print m - assert isinstance(m, messaging.UpdateMessage) - assert m.archive == slave.SIGNATURE +m = s.get_next_message(block=True) +print m +assert isinstance(m, messaging.SyncMessage) + +m = s.get_next_message(block=True) +print m +assert isinstance(m, messaging.ConflictStashMessage) + +m = s.get_next_message(block=True) +print m +assert isinstance(m, messaging.AckSyncMessage) - m = s.get_next_message(block=True) - print m - assert isinstance(m, messaging.SyncMessage) +assert s.get_next_message() is None - m = s.get_next_message(block=True) - print m - assert isinstance(m, messaging.ConflictStashMessage) +# notifiers instead of probing +s.start_notifiers() - m = s.get_next_message(block=True) - print m - assert isinstance(m, messaging.AckSyncMessage) +# make local dir with files +d1 = "d1" +d1_path = os.path.join(LOCAL_ROOT_PATH, d1) +logger.info('making dir %s' % d1) +os.mkdir(d1_path) +f2 = "d1/f2" +f2_path = os.path.join(LOCAL_ROOT_PATH, f2) +f2_content = "f2" +logger.info('making file %s' % f2) +with open(f2_path, "w") as f: + f.write(f2_content) +print 'Sleeping to wait for filesystem events...' +time.sleep(2) +s.decide_all_archives() + +m = s.get_next_message(block=True) +print m +assert isinstance(m, messaging.UpdateMessage) + +m = s.get_next_message(block=True) +print m +assert isinstance(m, messaging.UpdateMessage) + +m = s.get_next_message(block=True) +print m +assert isinstance(m, messaging.SyncMessage) + +m = s.get_next_message(block=True) +print m +assert isinstance(m, messaging.SyncMessage) + +m = s.get_next_message(block=True) +print m +assert isinstance(m, messaging.AckSyncMessage) + +m = s.get_next_message(block=True) +print m +assert isinstance(m, messaging.AckSyncMessage) -func() +assert s.get_next_message() is None