diff --git a/test.py b/test.py index 62bc4eaa48e5ef9613bdb106e5123555be862804..a6e9efa09c168f651ba7cf426525d116b98e4a6c 100644 --- a/test.py +++ b/test.py @@ -22,6 +22,7 @@ import random import os import time import shutil +import unittest from agkyra.config import AgkyraConfig, CONFIG_PATH from kamaki.clients import ClientError @@ -34,253 +35,273 @@ handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) -cnf = AgkyraConfig() -cloud_conf = cnf.get('cloud', 'test') -if cloud_conf is None: - print "Define a 'test' cloud in %s" % CONFIG_PATH - exit() - -AUTHENTICATION_URL = cloud_conf['url'] -TOKEN = cloud_conf['token'] - -ID = "AGKYRATEST" + str(random.random()).split('.')[1] - -LOCAL_ROOT_PATH = "/tmp/" + ID - - -settings = SyncerSettings( - auth_url=AUTHENTICATION_URL, - auth_token=TOKEN, - container=ID, - local_root_path=LOCAL_ROOT_PATH, - ignore_ssl=True) - -master = PithosFileClient(settings) -slave = LocalfsFileClient(settings) -s = FileSyncer(settings, master, slave) - -pithos = master.endpoint -pithos.create_container(ID) - -# initial upload to pithos -f1 = "f1" -f1_content1 = "content1" -r1 = pithos.upload_from_string( - f1, f1_content1) -etag1 = r1['etag'] - -# check pithos state -pithos_cands = master.get_pithos_candidates() -info1 = pithos_cands[f1]["info"] -assert etag1 == info1["pithos_etag"] - -db = s.get_db() -state = db.get_state(master.SIGNATURE, f1) -assert state.serial == -1 -assert state.info == {} - - -def assert_message(mtype): - m = s.get_next_message(block=True) - print m - assert isinstance(m, mtype) - return m - -# probe pithos -s.probe_file(master.SIGNATURE, f1) -assert_message(messaging.UpdateMessage) - -state = db.get_state(master.SIGNATURE, f1) -assert state.serial == 0 -assert state.info == info1 - -deciding = s.list_deciding() -assert deciding == set([f1]) - -# get local state -state = db.get_state(slave.SIGNATURE, f1) -assert state.serial == -1 -assert state.info == {} - -# sync -s.decide_file_sync(f1) -assert_message(messaging.SyncMessage) - -# check local synced file -assert_message(messaging.AckSyncMessage) -state = db.get_state(slave.SIGNATURE, f1) -assert state.serial == 0 -info = state.info -assert info['localfs_size'] == len(f1_content1) - -f1_path = os.path.join(LOCAL_ROOT_PATH, f1) - def hash_file(fil): with open(fil) as f: return utils.hash_string(f.read()) -assert hash_file(f1_path) == utils.hash_string(f1_content1) - -# update local file -f1_content2 = "content22222" -with open(f1_path, "w") as f: - f.write(f1_content2) - -# update upstream -f1_content3 = "content33" -r3 = pithos.upload_from_string( - f1, f1_content3) -etag3 = r1['etag'] - -# cause a conflict -assert s.get_next_message() is None -# first try to upload ignoring upstream changes -s.probe_file(slave.SIGNATURE, f1) -s.decide_file_sync(f1) - -m = assert_message(messaging.UpdateMessage) -assert m.archive == slave.SIGNATURE - -assert_message(messaging.SyncMessage) -assert_message(messaging.CollisionMessage) -assert_message(messaging.SyncErrorMessage) - -# this will fail because serial is marked as failed -s.decide_file_sync(f1) -time.sleep(2) -assert s.get_next_message() is None - -# now probe upstream too and retry -s.probe_file(master.SIGNATURE, f1) -s.decide_file_sync(f1) - -m = assert_message(messaging.UpdateMessage) -assert m.archive == master.SIGNATURE - -assert_message(messaging.SyncMessage) -assert_message(messaging.ConflictStashMessage) -assert_message(messaging.AckSyncMessage) - -assert s.get_next_message() is None - -# notifiers instead of probing -s.start_notifiers() - -# make local dir with files -d1 = "d1" -d1_path = os.path.join(LOCAL_ROOT_PATH, d1) -logger.info('making dir %s' % d1) -os.mkdir(d1_path) -f2 = "d1/f2" -f2_path = os.path.join(LOCAL_ROOT_PATH, f2) -f2_content = "f2" -logger.info('making file %s' % f2) -with open(f2_path, "w") as f: - f.write(f2_content) - -print 'Sleeping to wait for filesystem events...' -time.sleep(2) -s.decide_all_archives() - -assert_message(messaging.UpdateMessage) -assert_message(messaging.UpdateMessage) -assert_message(messaging.SyncMessage) -assert_message(messaging.SyncMessage) -assert_message(messaging.AckSyncMessage) -assert_message(messaging.AckSyncMessage) - -assert s.get_next_message() is None -s.stop_notifiers() - -# Check sym links -ln1 = "f1.link" -ln1_path = os.path.join(LOCAL_ROOT_PATH, ln1) -os.symlink(f1_path, ln1_path) -s.probe_file(s.SLAVE, ln1) -state = db.get_state(slave.SIGNATURE, ln1) -assert state.serial == 0 -assert state.info == {"localfs_type": "unhandled"} - -assert_message(messaging.UpdateMessage) - -s.decide_file_sync(ln1) - -assert_message(messaging.SyncMessage) -assert_message(messaging.AckSyncMessage) - -# Put file upstream to cause conflict -upstream_ln1_content = "regular" -r1 = pithos.upload_from_string( - ln1, upstream_ln1_content) -s.probe_file(s.MASTER, ln1) -s.probe_file(s.SLAVE, ln1) - -assert_message(messaging.UpdateMessage) - -s.decide_file_sync(ln1) - -assert_message(messaging.SyncMessage) -m = assert_message(messaging.ConflictStashMessage) -stashed_ln1 = m.stash_name - -assert_message(messaging.AckSyncMessage) - -assert s.get_next_message() is None - -s.probe_file(s.SLAVE, stashed_ln1) -m = assert_message(messaging.UpdateMessage) -assert m.objname == stashed_ln1 - -state = db.get_state(slave.SIGNATURE, stashed_ln1) -assert state.serial == 0 -assert state.info == {"localfs_type": "unhandled"} -assert s.get_next_message() is None - -# nothing to be synced -s.decide_file_sync(f1) -time.sleep(2) -assert s.get_next_message() is None - -# directories -r = pithos.object_put(f1, content_type='application/directory', - content_length=0) -ff1 = "f1/ff1" -ff1_content = "ff1 in dir " -r1 = pithos.upload_from_string(ff1, ff1_content) -s.probe_file(s.MASTER, f1) -s.probe_file(s.MASTER, ff1) - -assert_message(messaging.UpdateMessage) -assert_message(messaging.UpdateMessage) - -# fails because file in place of dir -s.decide_file_sync(ff1) - -assert_message(messaging.SyncMessage) -assert_message(messaging.SyncErrorMessage) - -fd1 = "f1/fd1" -r = pithos.object_put(fd1, content_type='application/directory', - content_length=0) -s.probe_file(s.MASTER, fd1) -assert_message(messaging.UpdateMessage) - -# also fails because file in place of dir -s.decide_file_sync(fd1) - -assert_message(messaging.SyncMessage) -assert_message(messaging.SyncErrorMessage) - -# fail due to active heartbeat -s.probe_file(s.MASTER, ff1) -time.sleep(1) -assert s.get_next_message() is None - -s.decide_file_sync(ff1) -time.sleep(1) -assert s.get_next_message() is None +class AgkyraTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cnf = AgkyraConfig() + cloud_conf = cnf.get('cloud', 'test') + if cloud_conf is None: + print "Define a 'test' cloud in %s" % CONFIG_PATH + exit() + + AUTHENTICATION_URL = cloud_conf['url'] + TOKEN = cloud_conf['token'] + + cls.ID = "AGKYRATEST" + str(random.random()).split('.')[1] + + cls.LOCAL_ROOT_PATH = "/tmp/" + cls.ID + + cls.settings = SyncerSettings( + auth_url=AUTHENTICATION_URL, + auth_token=TOKEN, + container=cls.ID, + local_root_path=cls.LOCAL_ROOT_PATH, + ignore_ssl=True) + + cls.master = PithosFileClient(cls.settings) + cls.slave = LocalfsFileClient(cls.settings) + cls.s = FileSyncer(cls.settings, cls.master, cls.slave) + cls.pithos = cls.master.endpoint + cls.pithos.create_container(cls.ID) + cls.db = cls.s.get_db() + + def assert_message(self, mtype): + m = self.s.get_next_message(block=True) + print m + self.assertIsInstance(m, mtype) + return m + + def assert_no_message(self): + self.assertIsNone(self.s.get_next_message()) + + @classmethod + def tearDownClass(cls): + cls.pithos.del_container(delimiter='/') + cls.pithos.purge_container() + + def get_path(self, f): + return os.path.join(self.LOCAL_ROOT_PATH, f) + + def test_001_main(self): + # initial upload to pithos + f1 = "f001" + f1_content1 = "content1" + r1 = self.pithos.upload_from_string( + f1, f1_content1) + etag1 = r1['etag'] + + state = self.db.get_state(self.s.MASTER, f1) + self.assertEqual(state.serial, -1) + self.assertEqual(state.info, {}) + + # probe pithos + self.s.probe_file(self.s.MASTER, f1) + m = self.assert_message(messaging.UpdateMessage) + self.assertEqual(m.archive, self.s.MASTER) + self.assertEqual(m.serial, -1) + + state = self.db.get_state(self.s.MASTER, f1) + self.assertEqual(state.serial, 0) + self.assertEqual(state.info["pithos_etag"], etag1) + + # get local state + state = self.db.get_state(self.s.SLAVE, f1) + self.assertEqual(state.serial, -1) + assert state.info == {} + + # sync + self.s.decide_file_sync(f1) + dstate = self.db.get_state(self.s.DECISION, f1) + self.assertEqual(dstate.serial, 0) + self.assert_message(messaging.SyncMessage) + + # check local synced file + self.assert_message(messaging.AckSyncMessage) + state = self.db.get_state(self.s.SLAVE, f1) + assert state.serial == 0 + info = state.info + assert info['localfs_size'] == len(f1_content1) + f1_path = self.get_path(f1) + self.assertEqual(hash_file(f1_path), utils.hash_string(f1_content1)) + + dstate = self.db.get_state(self.s.DECISION, f1) + sstate = self.db.get_state(self.s.SYNC, f1) + self.assertEqual(dstate.info, sstate.info) + self.assertEqual(sstate.serial, 0) + + def test_002_conflict(self): + fil = "f002" + # update local file + fil_local_content = "local" + with open(self.get_path(fil), "w") as f: + f.write(fil_local_content) + + # update upstream + fil_upstream_content = "upstream" + r = self.pithos.upload_from_string( + fil, fil_upstream_content) + etag = r['etag'] + + # cause a conflict + # first try to upload ignoring upstream changes + self.s.probe_file(self.s.SLAVE, fil) + self.assert_message(messaging.UpdateMessage) + self.s.decide_file_sync(fil) + self.assert_message(messaging.SyncMessage) + self.assert_message(messaging.CollisionMessage) + self.assert_message(messaging.SyncErrorMessage) + + # this will fail because serial is marked as failed + self.s.decide_file_sync(fil) + time.sleep(2) + self.assert_no_message() + + # now probe upstream too and retry + self.s.probe_file(self.s.MASTER, fil) + self.assert_message(messaging.UpdateMessage) + self.s.decide_file_sync(fil) + self.assert_message(messaging.SyncMessage) + self.assert_message(messaging.ConflictStashMessage) + self.assert_message(messaging.AckSyncMessage) + + def test_003_dirs(self): + # make local dir with files + d = "d003" + d_path = self.get_path(d) + os.mkdir(d_path) + fil = "d003/f003" + f_path = self.get_path(fil) + f_content = "f2" + with open(f_path, "w") as f: + f.write(f_content) + self.s.probe_file(self.s.SLAVE, d) + self.s.probe_file(self.s.SLAVE, fil) + self.assert_message(messaging.UpdateMessage) + self.assert_message(messaging.UpdateMessage) + + self.s.decide_archive(self.s.SLAVE) + self.assert_message(messaging.SyncMessage) + self.assert_message(messaging.SyncMessage) + self.assert_message(messaging.AckSyncMessage) + self.assert_message(messaging.AckSyncMessage) + + def test_004_link(self): + # Check sym links + fil = "f004" + f_path = self.get_path(fil) + open(f_path, 'a').close() + self.s.probe_file(self.s.SLAVE, fil) + self.s.decide_file_sync(fil) + self.assert_message(messaging.UpdateMessage) + self.assert_message(messaging.SyncMessage) + self.assert_message(messaging.AckSyncMessage) + + ln = "f004.link" + ln_path = self.get_path(ln) + os.symlink(f_path, ln_path) + self.s.probe_file(self.s.SLAVE, ln) + self.assert_message(messaging.UpdateMessage) + state = self.db.get_state(self.s.SLAVE, ln) + self.assertEqual(state.serial, 0) + self.assertEqual(state.info, {"localfs_type": "unhandled"}) + self.s.decide_file_sync(ln) + self.assert_message(messaging.SyncMessage) + self.assert_message(messaging.AckSyncMessage) + state = self.db.get_state(self.s.MASTER, ln) + self.assertEqual(state.info, {}) + + # Put file upstream to cause conflict + upstream_ln_content = "regular" + r = self.pithos.upload_from_string( + ln, upstream_ln_content) + etag = r['etag'] + self.s.probe_file(self.s.MASTER, ln) + self.s.probe_file(self.s.SLAVE, ln) + self.assert_message(messaging.UpdateMessage) + state = self.db.get_state(self.s.MASTER, ln) + self.assertEqual(state.info["pithos_etag"], etag) + self.s.decide_file_sync(ln) + self.assert_message(messaging.SyncMessage) + m = self.assert_message(messaging.ConflictStashMessage) + stashed_ln = m.stash_name + self.assert_message(messaging.AckSyncMessage) + self.assert_no_message() + self.s.probe_file(self.s.SLAVE, stashed_ln) + m = self.assert_message(messaging.UpdateMessage) + self.assertEqual(m.objname, stashed_ln) + state = self.db.get_state(self.s.SLAVE, stashed_ln) + self.assertEqual(state.serial, 0) + self.assertEqual(state.info, {"localfs_type": "unhandled"}) + self.assert_no_message() + + # no changes in linked file + self.s.probe_file(self.s.SLAVE, fil) + time.sleep(2) + self.assert_no_message() + + def test_005_dirs_inhibited_by_file(self): + fil = "f005" + f_path = self.get_path(fil) + open(f_path, 'a').close() + r = self.pithos.object_put( + fil, content_type='application/directory', content_length=0) + inner_fil = "f005/in005" + inner_fil_content = "ff1 in dir " + r1 = self.pithos.upload_from_string(inner_fil, inner_fil_content) + self.s.probe_file(self.s.MASTER, fil) + self.s.probe_file(self.s.MASTER, inner_fil) + self.assert_message(messaging.UpdateMessage) + self.assert_message(messaging.UpdateMessage) + + # fails because file in place of dir + self.s.decide_file_sync(inner_fil) + self.assert_message(messaging.SyncMessage) + self.assert_message(messaging.SyncErrorMessage) + + inner_dir = "f005/indir005" + r = self.pithos.object_put( + inner_dir, content_type='application/directory', content_length=0) + self.s.probe_file(self.s.MASTER, inner_dir) + self.assert_message(messaging.UpdateMessage) + # also fails because file in place of dir + self.s.decide_file_sync(inner_dir) + self.assert_message(messaging.SyncMessage) + self.assert_message(messaging.SyncErrorMessage) + + def test_006_heartbeat(self): + fil = "f006" + f_path = self.get_path(fil) + open(f_path, 'a').close() + self.s.probe_file(self.s.SLAVE, fil) + self.assert_message(messaging.UpdateMessage) + self.s.decide_file_sync(fil) + self.assert_message(messaging.SyncMessage) + self.s.probe_file(self.s.SLAVE, fil) + self.assert_message(messaging.HeartbeatNoProbeMessage) + self.s.decide_file_sync(fil) + self.assert_message(messaging.HeartbeatNoDecideMessage) + self.assert_message(messaging.AckSyncMessage) + + def test_007_multiprobe(self): + fil = "f007" + f_path = self.get_path(fil) + open(f_path, 'a').close() + self.s.probe_file(self.s.SLAVE, fil) + self.assert_message(messaging.UpdateMessage) + with open(f_path, 'w') as f: + f.write("new") + self.s.probe_file(self.s.SLAVE, fil) + self.assert_message(messaging.AlreadyProbedMessage) + +if __name__ == '__main__': + unittest.main() print "SLEEPING 10" time.sleep(10)