Skip to content
Snippets Groups Projects
Commit 9111f177 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

test wip

parent 8c482bca
No related branches found
No related tags found
No related merge requests found
...@@ -22,6 +22,7 @@ import random ...@@ -22,6 +22,7 @@ import random
import os import os
import time import time
import shutil import shutil
import unittest
from agkyra.config import AgkyraConfig, CONFIG_PATH from agkyra.config import AgkyraConfig, CONFIG_PATH
from kamaki.clients import ClientError from kamaki.clients import ClientError
...@@ -34,253 +35,273 @@ handler.setFormatter(formatter) ...@@ -34,253 +35,273 @@ handler.setFormatter(formatter)
logger.addHandler(handler) logger.addHandler(handler)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
cnf = AgkyraConfig()
cloud_conf = cnf.get('cloud', 'test')
if cloud_conf is None:
print "Define a 'test' cloud in %s" % CONFIG_PATH
exit()
AUTHENTICATION_URL = cloud_conf['url']
TOKEN = cloud_conf['token']
ID = "AGKYRATEST" + str(random.random()).split('.')[1]
LOCAL_ROOT_PATH = "/tmp/" + ID
settings = SyncerSettings(
auth_url=AUTHENTICATION_URL,
auth_token=TOKEN,
container=ID,
local_root_path=LOCAL_ROOT_PATH,
ignore_ssl=True)
master = PithosFileClient(settings)
slave = LocalfsFileClient(settings)
s = FileSyncer(settings, master, slave)
pithos = master.endpoint
pithos.create_container(ID)
# initial upload to pithos
f1 = "f1"
f1_content1 = "content1"
r1 = pithos.upload_from_string(
f1, f1_content1)
etag1 = r1['etag']
# check pithos state
pithos_cands = master.get_pithos_candidates()
info1 = pithos_cands[f1]["info"]
assert etag1 == info1["pithos_etag"]
db = s.get_db()
state = db.get_state(master.SIGNATURE, f1)
assert state.serial == -1
assert state.info == {}
def assert_message(mtype):
m = s.get_next_message(block=True)
print m
assert isinstance(m, mtype)
return m
# probe pithos
s.probe_file(master.SIGNATURE, f1)
assert_message(messaging.UpdateMessage)
state = db.get_state(master.SIGNATURE, f1)
assert state.serial == 0
assert state.info == info1
deciding = s.list_deciding()
assert deciding == set([f1])
# get local state
state = db.get_state(slave.SIGNATURE, f1)
assert state.serial == -1
assert state.info == {}
# sync
s.decide_file_sync(f1)
assert_message(messaging.SyncMessage)
# check local synced file
assert_message(messaging.AckSyncMessage)
state = db.get_state(slave.SIGNATURE, f1)
assert state.serial == 0
info = state.info
assert info['localfs_size'] == len(f1_content1)
f1_path = os.path.join(LOCAL_ROOT_PATH, f1)
def hash_file(fil): def hash_file(fil):
with open(fil) as f: with open(fil) as f:
return utils.hash_string(f.read()) return utils.hash_string(f.read())
assert hash_file(f1_path) == utils.hash_string(f1_content1)
class AgkyraTest(unittest.TestCase):
# update local file
f1_content2 = "content22222" @classmethod
with open(f1_path, "w") as f: def setUpClass(cls):
f.write(f1_content2) cnf = AgkyraConfig()
cloud_conf = cnf.get('cloud', 'test')
# update upstream if cloud_conf is None:
f1_content3 = "content33" print "Define a 'test' cloud in %s" % CONFIG_PATH
r3 = pithos.upload_from_string( exit()
f1, f1_content3)
etag3 = r1['etag'] AUTHENTICATION_URL = cloud_conf['url']
TOKEN = cloud_conf['token']
# cause a conflict
assert s.get_next_message() is None cls.ID = "AGKYRATEST" + str(random.random()).split('.')[1]
# first try to upload ignoring upstream changes
s.probe_file(slave.SIGNATURE, f1) cls.LOCAL_ROOT_PATH = "/tmp/" + cls.ID
s.decide_file_sync(f1)
cls.settings = SyncerSettings(
m = assert_message(messaging.UpdateMessage) auth_url=AUTHENTICATION_URL,
assert m.archive == slave.SIGNATURE auth_token=TOKEN,
container=cls.ID,
assert_message(messaging.SyncMessage) local_root_path=cls.LOCAL_ROOT_PATH,
assert_message(messaging.CollisionMessage) ignore_ssl=True)
assert_message(messaging.SyncErrorMessage)
cls.master = PithosFileClient(cls.settings)
# this will fail because serial is marked as failed cls.slave = LocalfsFileClient(cls.settings)
s.decide_file_sync(f1) cls.s = FileSyncer(cls.settings, cls.master, cls.slave)
time.sleep(2) cls.pithos = cls.master.endpoint
assert s.get_next_message() is None cls.pithos.create_container(cls.ID)
cls.db = cls.s.get_db()
# now probe upstream too and retry
s.probe_file(master.SIGNATURE, f1) def assert_message(self, mtype):
s.decide_file_sync(f1) m = self.s.get_next_message(block=True)
print m
m = assert_message(messaging.UpdateMessage) self.assertIsInstance(m, mtype)
assert m.archive == master.SIGNATURE return m
assert_message(messaging.SyncMessage) def assert_no_message(self):
assert_message(messaging.ConflictStashMessage) self.assertIsNone(self.s.get_next_message())
assert_message(messaging.AckSyncMessage)
@classmethod
assert s.get_next_message() is None def tearDownClass(cls):
cls.pithos.del_container(delimiter='/')
# notifiers instead of probing cls.pithos.purge_container()
s.start_notifiers()
def get_path(self, f):
# make local dir with files return os.path.join(self.LOCAL_ROOT_PATH, f)
d1 = "d1"
d1_path = os.path.join(LOCAL_ROOT_PATH, d1) def test_001_main(self):
logger.info('making dir %s' % d1) # initial upload to pithos
os.mkdir(d1_path) f1 = "f001"
f2 = "d1/f2" f1_content1 = "content1"
f2_path = os.path.join(LOCAL_ROOT_PATH, f2) r1 = self.pithos.upload_from_string(
f2_content = "f2" f1, f1_content1)
logger.info('making file %s' % f2) etag1 = r1['etag']
with open(f2_path, "w") as f:
f.write(f2_content) state = self.db.get_state(self.s.MASTER, f1)
self.assertEqual(state.serial, -1)
print 'Sleeping to wait for filesystem events...' self.assertEqual(state.info, {})
time.sleep(2)
s.decide_all_archives() # probe pithos
self.s.probe_file(self.s.MASTER, f1)
assert_message(messaging.UpdateMessage) m = self.assert_message(messaging.UpdateMessage)
assert_message(messaging.UpdateMessage) self.assertEqual(m.archive, self.s.MASTER)
assert_message(messaging.SyncMessage) self.assertEqual(m.serial, -1)
assert_message(messaging.SyncMessage)
assert_message(messaging.AckSyncMessage) state = self.db.get_state(self.s.MASTER, f1)
assert_message(messaging.AckSyncMessage) self.assertEqual(state.serial, 0)
self.assertEqual(state.info["pithos_etag"], etag1)
assert s.get_next_message() is None
s.stop_notifiers() # get local state
state = self.db.get_state(self.s.SLAVE, f1)
# Check sym links self.assertEqual(state.serial, -1)
ln1 = "f1.link" assert state.info == {}
ln1_path = os.path.join(LOCAL_ROOT_PATH, ln1)
os.symlink(f1_path, ln1_path) # sync
s.probe_file(s.SLAVE, ln1) self.s.decide_file_sync(f1)
state = db.get_state(slave.SIGNATURE, ln1) dstate = self.db.get_state(self.s.DECISION, f1)
assert state.serial == 0 self.assertEqual(dstate.serial, 0)
assert state.info == {"localfs_type": "unhandled"} self.assert_message(messaging.SyncMessage)
assert_message(messaging.UpdateMessage) # check local synced file
self.assert_message(messaging.AckSyncMessage)
s.decide_file_sync(ln1) state = self.db.get_state(self.s.SLAVE, f1)
assert state.serial == 0
assert_message(messaging.SyncMessage) info = state.info
assert_message(messaging.AckSyncMessage) assert info['localfs_size'] == len(f1_content1)
f1_path = self.get_path(f1)
# Put file upstream to cause conflict self.assertEqual(hash_file(f1_path), utils.hash_string(f1_content1))
upstream_ln1_content = "regular"
r1 = pithos.upload_from_string( dstate = self.db.get_state(self.s.DECISION, f1)
ln1, upstream_ln1_content) sstate = self.db.get_state(self.s.SYNC, f1)
s.probe_file(s.MASTER, ln1) self.assertEqual(dstate.info, sstate.info)
s.probe_file(s.SLAVE, ln1) self.assertEqual(sstate.serial, 0)
assert_message(messaging.UpdateMessage) def test_002_conflict(self):
fil = "f002"
s.decide_file_sync(ln1) # update local file
fil_local_content = "local"
assert_message(messaging.SyncMessage) with open(self.get_path(fil), "w") as f:
m = assert_message(messaging.ConflictStashMessage) f.write(fil_local_content)
stashed_ln1 = m.stash_name
# update upstream
assert_message(messaging.AckSyncMessage) fil_upstream_content = "upstream"
r = self.pithos.upload_from_string(
assert s.get_next_message() is None fil, fil_upstream_content)
etag = r['etag']
s.probe_file(s.SLAVE, stashed_ln1)
m = assert_message(messaging.UpdateMessage) # cause a conflict
assert m.objname == stashed_ln1 # first try to upload ignoring upstream changes
self.s.probe_file(self.s.SLAVE, fil)
state = db.get_state(slave.SIGNATURE, stashed_ln1) self.assert_message(messaging.UpdateMessage)
assert state.serial == 0 self.s.decide_file_sync(fil)
assert state.info == {"localfs_type": "unhandled"} self.assert_message(messaging.SyncMessage)
assert s.get_next_message() is None self.assert_message(messaging.CollisionMessage)
self.assert_message(messaging.SyncErrorMessage)
# nothing to be synced
s.decide_file_sync(f1) # this will fail because serial is marked as failed
time.sleep(2) self.s.decide_file_sync(fil)
assert s.get_next_message() is None time.sleep(2)
self.assert_no_message()
# directories
r = pithos.object_put(f1, content_type='application/directory', # now probe upstream too and retry
content_length=0) self.s.probe_file(self.s.MASTER, fil)
ff1 = "f1/ff1" self.assert_message(messaging.UpdateMessage)
ff1_content = "ff1 in dir " self.s.decide_file_sync(fil)
r1 = pithos.upload_from_string(ff1, ff1_content) self.assert_message(messaging.SyncMessage)
s.probe_file(s.MASTER, f1) self.assert_message(messaging.ConflictStashMessage)
s.probe_file(s.MASTER, ff1) self.assert_message(messaging.AckSyncMessage)
assert_message(messaging.UpdateMessage) def test_003_dirs(self):
assert_message(messaging.UpdateMessage) # make local dir with files
d = "d003"
# fails because file in place of dir d_path = self.get_path(d)
s.decide_file_sync(ff1) os.mkdir(d_path)
fil = "d003/f003"
assert_message(messaging.SyncMessage) f_path = self.get_path(fil)
assert_message(messaging.SyncErrorMessage) f_content = "f2"
with open(f_path, "w") as f:
fd1 = "f1/fd1" f.write(f_content)
r = pithos.object_put(fd1, content_type='application/directory', self.s.probe_file(self.s.SLAVE, d)
content_length=0) self.s.probe_file(self.s.SLAVE, fil)
s.probe_file(s.MASTER, fd1) self.assert_message(messaging.UpdateMessage)
assert_message(messaging.UpdateMessage) self.assert_message(messaging.UpdateMessage)
# also fails because file in place of dir self.s.decide_archive(self.s.SLAVE)
s.decide_file_sync(fd1) self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.SyncMessage)
assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage)
assert_message(messaging.SyncErrorMessage) self.assert_message(messaging.AckSyncMessage)
# fail due to active heartbeat def test_004_link(self):
s.probe_file(s.MASTER, ff1) # Check sym links
time.sleep(1) fil = "f004"
assert s.get_next_message() is None f_path = self.get_path(fil)
open(f_path, 'a').close()
s.decide_file_sync(ff1) self.s.probe_file(self.s.SLAVE, fil)
time.sleep(1) self.s.decide_file_sync(fil)
assert s.get_next_message() is None self.assert_message(messaging.UpdateMessage)
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
ln = "f004.link"
ln_path = self.get_path(ln)
os.symlink(f_path, ln_path)
self.s.probe_file(self.s.SLAVE, ln)
self.assert_message(messaging.UpdateMessage)
state = self.db.get_state(self.s.SLAVE, ln)
self.assertEqual(state.serial, 0)
self.assertEqual(state.info, {"localfs_type": "unhandled"})
self.s.decide_file_sync(ln)
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
state = self.db.get_state(self.s.MASTER, ln)
self.assertEqual(state.info, {})
# Put file upstream to cause conflict
upstream_ln_content = "regular"
r = self.pithos.upload_from_string(
ln, upstream_ln_content)
etag = r['etag']
self.s.probe_file(self.s.MASTER, ln)
self.s.probe_file(self.s.SLAVE, ln)
self.assert_message(messaging.UpdateMessage)
state = self.db.get_state(self.s.MASTER, ln)
self.assertEqual(state.info["pithos_etag"], etag)
self.s.decide_file_sync(ln)
self.assert_message(messaging.SyncMessage)
m = self.assert_message(messaging.ConflictStashMessage)
stashed_ln = m.stash_name
self.assert_message(messaging.AckSyncMessage)
self.assert_no_message()
self.s.probe_file(self.s.SLAVE, stashed_ln)
m = self.assert_message(messaging.UpdateMessage)
self.assertEqual(m.objname, stashed_ln)
state = self.db.get_state(self.s.SLAVE, stashed_ln)
self.assertEqual(state.serial, 0)
self.assertEqual(state.info, {"localfs_type": "unhandled"})
self.assert_no_message()
# no changes in linked file
self.s.probe_file(self.s.SLAVE, fil)
time.sleep(2)
self.assert_no_message()
def test_005_dirs_inhibited_by_file(self):
fil = "f005"
f_path = self.get_path(fil)
open(f_path, 'a').close()
r = self.pithos.object_put(
fil, content_type='application/directory', content_length=0)
inner_fil = "f005/in005"
inner_fil_content = "ff1 in dir "
r1 = self.pithos.upload_from_string(inner_fil, inner_fil_content)
self.s.probe_file(self.s.MASTER, fil)
self.s.probe_file(self.s.MASTER, inner_fil)
self.assert_message(messaging.UpdateMessage)
self.assert_message(messaging.UpdateMessage)
# fails because file in place of dir
self.s.decide_file_sync(inner_fil)
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.SyncErrorMessage)
inner_dir = "f005/indir005"
r = self.pithos.object_put(
inner_dir, content_type='application/directory', content_length=0)
self.s.probe_file(self.s.MASTER, inner_dir)
self.assert_message(messaging.UpdateMessage)
# also fails because file in place of dir
self.s.decide_file_sync(inner_dir)
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.SyncErrorMessage)
def test_006_heartbeat(self):
fil = "f006"
f_path = self.get_path(fil)
open(f_path, 'a').close()
self.s.probe_file(self.s.SLAVE, fil)
self.assert_message(messaging.UpdateMessage)
self.s.decide_file_sync(fil)
self.assert_message(messaging.SyncMessage)
self.s.probe_file(self.s.SLAVE, fil)
self.assert_message(messaging.HeartbeatNoProbeMessage)
self.s.decide_file_sync(fil)
self.assert_message(messaging.HeartbeatNoDecideMessage)
self.assert_message(messaging.AckSyncMessage)
def test_007_multiprobe(self):
fil = "f007"
f_path = self.get_path(fil)
open(f_path, 'a').close()
self.s.probe_file(self.s.SLAVE, fil)
self.assert_message(messaging.UpdateMessage)
with open(f_path, 'w') as f:
f.write("new")
self.s.probe_file(self.s.SLAVE, fil)
self.assert_message(messaging.AlreadyProbedMessage)
if __name__ == '__main__':
unittest.main()
print "SLEEPING 10" print "SLEEPING 10"
time.sleep(10) time.sleep(10)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment