diff --git a/test.py b/test.py index a526737ea35d42549e3c9503b7ad3769c1c1959e..4ba6b3e313af95c8832e5d3480eb0e53747ee9a5 100644 --- a/test.py +++ b/test.py @@ -17,13 +17,17 @@ from agkyra.syncer.setup import SyncerSettings from agkyra.syncer.localfs_client import LocalfsFileClient, LocalfsTargetHandle from agkyra.syncer.pithos_client import PithosFileClient from agkyra.syncer.syncer import FileSyncer +import agkyra.syncer.syncer from agkyra.syncer import messaging, utils, common import random import os import time import shutil import unittest +import mock +import sqlite3 +from functools import wraps from agkyra.config import AgkyraConfig, CONFIG_PATH from kamaki.clients import ClientError @@ -35,12 +39,30 @@ handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) +TMP = "/tmp" + def hash_file(fil): with open(fil) as f: return utils.hash_string(f.read()) +def mock_transaction(max_wait=60, init_wait=0.4, exp_backoff=1.1): + def wrap(func): + @wraps(func) + def inner(*args, **kwargs): + print "IN MOCK" + obj = args[0] + db = obj.get_db() + attempt = 0 + current_max_wait = init_wait + db.begin() + r = func(*args, **kwargs) + raise common.DatabaseError() + return inner + return wrap + + class AgkyraTest(unittest.TestCase): @classmethod @@ -56,7 +78,7 @@ class AgkyraTest(unittest.TestCase): cls.ID = "AGKYRATEST" + str(random.random()).split('.')[1] - cls.LOCAL_ROOT_PATH = "/tmp/" + cls.ID + cls.LOCAL_ROOT_PATH = utils.join_path("/tmp", cls.ID) cls.settings = SyncerSettings( auth_url=AUTHENTICATION_URL, @@ -78,6 +100,20 @@ class AgkyraTest(unittest.TestCase): self.assertIsInstance(m, mtype) return m + def assert_messages(self, mtypes_dict): + while mtypes_dict: + m = self.s.get_next_message(block=True) + print m + mtype = m.__class__ + num = mtypes_dict.get(mtype, 0) + if not num: + raise AssertionError("Got unexpected message %s" % m) + new_num = num -1 + if new_num: + mtypes_dict[mtype] = new_num + else: + mtypes_dict.pop(mtype) + def assert_no_message(self): self.assertIsNone(self.s.get_next_message()) @@ -89,7 +125,115 @@ class AgkyraTest(unittest.TestCase): def get_path(self, f): return os.path.join(self.LOCAL_ROOT_PATH, f) - def test_001_main(self): + def test_0001_listing_local(self): + def real(candidates): + return [c for c in candidates + if not c.startswith(self.settings.cache_name)] + + candidates = self.slave.list_candidate_files() + self.assertEqual(candidates, []) + candidates = self.slave.list_candidate_files(forced=True) + self.assertEqual(real(candidates), []) + + fil = "f0001" + f_path = self.get_path(fil) + open(f_path, "a").close() + d = "d0001" + d_path = self.get_path(d) + os.mkdir(d_path) + + candidates = self.slave.list_candidate_files(forced=True) + self.assertEqual(sorted(real(candidates)), sorted([fil, d])) + self.s.probe_archive(self.s.SLAVE) + self.assert_messages( + {messaging.UpdateMessage: 2, + messaging.IgnoreProbeMessage: 4}) + + with self.slave.probe_candidates.lock() as dct: + self.assertNotIn(fil, dct) + self.assertNotIn(d, dct) + + self.s.decide_archive(self.s.SLAVE) + self.assert_messages({ + messaging.SyncMessage: 2, + messaging.AckSyncMessage: 2}) + + os.unlink(f_path) + + with mock.patch( + "agkyra.syncer.localfs_client.LocalfsFileClient.list_files") as mk: + mk.return_value = [] + candidates = self.slave.list_candidate_files(forced=True) + self.assertEqual(real(candidates), [d]) + + candidates = self.slave.list_candidate_files(forced=True) + self.assertEqual(sorted(real(candidates)), sorted([fil, d])) + + candidates = self.slave.list_candidate_files() + self.assertEqual(sorted(real(candidates)), sorted([fil, d])) + + self.slave.remove_candidates(candidates, None) + candidates = self.slave.list_candidate_files() + self.assertEqual(candidates, []) + + def test_0002_notifier_local(self): + f_out = "f0002out" + f_cache = "f0002cache" + f_upd = "f0002upd" + f_ren = "f0002ren" + dbefore = "d0002before" + f_out_path = self.get_path(f_out) + f_cache_path = self.get_path(f_cache) + f_upd_path = self.get_path(f_upd) + f_ren_path = self.get_path(f_ren) + dbefore_path = self.get_path(dbefore) + open(f_out_path, "a").close() + open(f_cache_path, "a").close() + open(f_upd_path, "a").close() + open(f_ren_path, "a").close() + os.mkdir(dbefore_path) + + notifier = self.slave.notifier() + candidates = self.slave.list_candidate_files() + self.assertEqual(candidates, []) + + fafter = "f0002after" + fafter_path = self.get_path(fafter) + dafter = "d0002after" + dafter_path = self.get_path(dafter) + open(fafter_path, "a").close() + os.mkdir(dafter_path) + + time.sleep(1) + candidates = self.slave.list_candidate_files() + self.assertEqual(sorted(candidates), sorted([fafter, dafter])) + + os.rename(f_cache_path, + utils.join_path(self.settings.cache_path, f_cache)) + os.rename(f_out_path, + utils.join_path(TMP, f_out)) + with open(f_upd_path, "a") as f: + f.write("upd") + + f_in = "f0002in" + f_in_path = self.get_path(f_in) + f_in_orig_path = utils.join_path(TMP, f_in) + open(f_in_orig_path, "a").close() + os.rename(f_in_orig_path, f_in_path) + + f_ren_new = "f0002ren_new" + f_ren_new_path = self.get_path(f_ren_new) + os.rename(f_ren_path, f_ren_new_path) + + time.sleep(1) + candidates = self.slave.list_candidate_files() + self.assertEqual(sorted(candidates), + sorted([fafter, dafter, + f_in, f_out, f_upd, + f_ren, f_ren_new])) + notifier.stop() + + def test_001_probe_and_sync(self): # initial upload to pithos f1 = "f001" f1_content1 = "content1" @@ -160,13 +304,13 @@ class AgkyraTest(unittest.TestCase): # this will fail because serial is marked as failed self.s.decide_file_sync(fil) - time.sleep(2) - self.assert_no_message() + self.assert_message(messaging.FailedSyncIgnoreDecisionMessage) # now probe upstream too and retry self.s.probe_file(self.s.MASTER, fil) self.assert_message(messaging.UpdateMessage) self.s.decide_file_sync(fil) + self.assert_message(messaging.FailedSyncIgnoreDecisionMessage) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.ConflictStashMessage) self.assert_message(messaging.AckSyncMessage) @@ -187,10 +331,9 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.UpdateMessage) self.s.decide_archive(self.s.SLAVE) - self.assert_message(messaging.SyncMessage) - self.assert_message(messaging.SyncMessage) - self.assert_message(messaging.AckSyncMessage) - self.assert_message(messaging.AckSyncMessage) + self.assert_messages({ + messaging.SyncMessage: 2, + messaging.AckSyncMessage: 2}) def test_004_link(self): # Check sym links @@ -308,6 +451,20 @@ class AgkyraTest(unittest.TestCase): self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.UpdateMessage) + with mock.patch( + "agkyra.syncer.database.SqliteFileStateDB.commit") as dbmock: + dbmock.side_effect = [sqlite3.OperationalError("locked"), + common.DatabaseError()] + self.s.decide_file_sync(fil) + self.assert_message(messaging.HeartbeatReplayDecideMessage) + self.s.decide_file_sync(fil) + self.assert_message(messaging.HeartbeatNoDecideMessage) + print "SLEEPING 11" + time.sleep(11) + self.s.decide_file_sync(fil) + self.assert_message(messaging.SyncMessage) + self.assert_message(messaging.AckSyncMessage) + def test_007_multiprobe(self): fil = "f007" f_path = self.get_path(fil) @@ -530,5 +687,6 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.SyncErrorMessage) + if __name__ == '__main__': unittest.main()