Commit 6f7facbb authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

some more tests

parent 40f4397d
......@@ -17,13 +17,17 @@ from agkyra.syncer.setup import SyncerSettings
from agkyra.syncer.localfs_client import LocalfsFileClient, LocalfsTargetHandle
from agkyra.syncer.pithos_client import PithosFileClient
from agkyra.syncer.syncer import FileSyncer
import agkyra.syncer.syncer
from agkyra.syncer import messaging, utils, common
import random
import os
import time
import shutil
import unittest
import mock
import sqlite3
from functools import wraps
from agkyra.config import AgkyraConfig, CONFIG_PATH
from kamaki.clients import ClientError
......@@ -35,12 +39,30 @@ handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
TMP = "/tmp"
def hash_file(fil):
with open(fil) as f:
return utils.hash_string(f.read())
def mock_transaction(max_wait=60, init_wait=0.4, exp_backoff=1.1):
def wrap(func):
@wraps(func)
def inner(*args, **kwargs):
print "IN MOCK"
obj = args[0]
db = obj.get_db()
attempt = 0
current_max_wait = init_wait
db.begin()
r = func(*args, **kwargs)
raise common.DatabaseError()
return inner
return wrap
class AgkyraTest(unittest.TestCase):
@classmethod
......@@ -56,7 +78,7 @@ class AgkyraTest(unittest.TestCase):
cls.ID = "AGKYRATEST" + str(random.random()).split('.')[1]
cls.LOCAL_ROOT_PATH = "/tmp/" + cls.ID
cls.LOCAL_ROOT_PATH = utils.join_path("/tmp", cls.ID)
cls.settings = SyncerSettings(
auth_url=AUTHENTICATION_URL,
......@@ -78,6 +100,20 @@ class AgkyraTest(unittest.TestCase):
self.assertIsInstance(m, mtype)
return m
def assert_messages(self, mtypes_dict):
while mtypes_dict:
m = self.s.get_next_message(block=True)
print m
mtype = m.__class__
num = mtypes_dict.get(mtype, 0)
if not num:
raise AssertionError("Got unexpected message %s" % m)
new_num = num -1
if new_num:
mtypes_dict[mtype] = new_num
else:
mtypes_dict.pop(mtype)
def assert_no_message(self):
self.assertIsNone(self.s.get_next_message())
......@@ -89,7 +125,115 @@ class AgkyraTest(unittest.TestCase):
def get_path(self, f):
return os.path.join(self.LOCAL_ROOT_PATH, f)
def test_001_main(self):
def test_0001_listing_local(self):
def real(candidates):
return [c for c in candidates
if not c.startswith(self.settings.cache_name)]
candidates = self.slave.list_candidate_files()
self.assertEqual(candidates, [])
candidates = self.slave.list_candidate_files(forced=True)
self.assertEqual(real(candidates), [])
fil = "f0001"
f_path = self.get_path(fil)
open(f_path, "a").close()
d = "d0001"
d_path = self.get_path(d)
os.mkdir(d_path)
candidates = self.slave.list_candidate_files(forced=True)
self.assertEqual(sorted(real(candidates)), sorted([fil, d]))
self.s.probe_archive(self.s.SLAVE)
self.assert_messages(
{messaging.UpdateMessage: 2,
messaging.IgnoreProbeMessage: 4})
with self.slave.probe_candidates.lock() as dct:
self.assertNotIn(fil, dct)
self.assertNotIn(d, dct)
self.s.decide_archive(self.s.SLAVE)
self.assert_messages({
messaging.SyncMessage: 2,
messaging.AckSyncMessage: 2})
os.unlink(f_path)
with mock.patch(
"agkyra.syncer.localfs_client.LocalfsFileClient.list_files") as mk:
mk.return_value = []
candidates = self.slave.list_candidate_files(forced=True)
self.assertEqual(real(candidates), [d])
candidates = self.slave.list_candidate_files(forced=True)
self.assertEqual(sorted(real(candidates)), sorted([fil, d]))
candidates = self.slave.list_candidate_files()
self.assertEqual(sorted(real(candidates)), sorted([fil, d]))
self.slave.remove_candidates(candidates, None)
candidates = self.slave.list_candidate_files()
self.assertEqual(candidates, [])
def test_0002_notifier_local(self):
f_out = "f0002out"
f_cache = "f0002cache"
f_upd = "f0002upd"
f_ren = "f0002ren"
dbefore = "d0002before"
f_out_path = self.get_path(f_out)
f_cache_path = self.get_path(f_cache)
f_upd_path = self.get_path(f_upd)
f_ren_path = self.get_path(f_ren)
dbefore_path = self.get_path(dbefore)
open(f_out_path, "a").close()
open(f_cache_path, "a").close()
open(f_upd_path, "a").close()
open(f_ren_path, "a").close()
os.mkdir(dbefore_path)
notifier = self.slave.notifier()
candidates = self.slave.list_candidate_files()
self.assertEqual(candidates, [])
fafter = "f0002after"
fafter_path = self.get_path(fafter)
dafter = "d0002after"
dafter_path = self.get_path(dafter)
open(fafter_path, "a").close()
os.mkdir(dafter_path)
time.sleep(1)
candidates = self.slave.list_candidate_files()
self.assertEqual(sorted(candidates), sorted([fafter, dafter]))
os.rename(f_cache_path,
utils.join_path(self.settings.cache_path, f_cache))
os.rename(f_out_path,
utils.join_path(TMP, f_out))
with open(f_upd_path, "a") as f:
f.write("upd")
f_in = "f0002in"
f_in_path = self.get_path(f_in)
f_in_orig_path = utils.join_path(TMP, f_in)
open(f_in_orig_path, "a").close()
os.rename(f_in_orig_path, f_in_path)
f_ren_new = "f0002ren_new"
f_ren_new_path = self.get_path(f_ren_new)
os.rename(f_ren_path, f_ren_new_path)
time.sleep(1)
candidates = self.slave.list_candidate_files()
self.assertEqual(sorted(candidates),
sorted([fafter, dafter,
f_in, f_out, f_upd,
f_ren, f_ren_new]))
notifier.stop()
def test_001_probe_and_sync(self):
# initial upload to pithos
f1 = "f001"
f1_content1 = "content1"
......@@ -160,13 +304,13 @@ class AgkyraTest(unittest.TestCase):
# this will fail because serial is marked as failed
self.s.decide_file_sync(fil)
time.sleep(2)
self.assert_no_message()
self.assert_message(messaging.FailedSyncIgnoreDecisionMessage)
# now probe upstream too and retry
self.s.probe_file(self.s.MASTER, fil)
self.assert_message(messaging.UpdateMessage)
self.s.decide_file_sync(fil)
self.assert_message(messaging.FailedSyncIgnoreDecisionMessage)
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.ConflictStashMessage)
self.assert_message(messaging.AckSyncMessage)
......@@ -187,10 +331,9 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.UpdateMessage)
self.s.decide_archive(self.s.SLAVE)
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
self.assert_message(messaging.AckSyncMessage)
self.assert_messages({
messaging.SyncMessage: 2,
messaging.AckSyncMessage: 2})
def test_004_link(self):
# Check sym links
......@@ -308,6 +451,20 @@ class AgkyraTest(unittest.TestCase):
self.s.probe_file(self.s.SLAVE, fil)
self.assert_message(messaging.UpdateMessage)
with mock.patch(
"agkyra.syncer.database.SqliteFileStateDB.commit") as dbmock:
dbmock.side_effect = [sqlite3.OperationalError("locked"),
common.DatabaseError()]
self.s.decide_file_sync(fil)
self.assert_message(messaging.HeartbeatReplayDecideMessage)
self.s.decide_file_sync(fil)
self.assert_message(messaging.HeartbeatNoDecideMessage)
print "SLEEPING 11"
time.sleep(11)
self.s.decide_file_sync(fil)
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
def test_007_multiprobe(self):
fil = "f007"
f_path = self.get_path(fil)
......@@ -530,5 +687,6 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.CollisionMessage)
self.assert_message(messaging.SyncErrorMessage)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment