test.py 2.98 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from agkyra.syncer.setup import SyncerSettings
from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient
from agkyra.syncer.syncer import FileSyncer
from agkyra.syncer import messaging, utils
import random

from agkyra.config import AgkyraConfig, CONFIG_PATH

cnf = AgkyraConfig()
cloud_conf = cnf.get('cloud', 'test')
if cloud_conf is None:
    print "Define a 'test' cloud in %s" % CONFIG_PATH
    exit()

AUTHENTICATION_URL = cloud_conf['url']
TOKEN = cloud_conf['token']

ID = "AGKYRATEST" + str(random.random()).split('.')[1]

LOCAL_ROOT_PATH = "/tmp/" + ID


settings = SyncerSettings(
    instance=ID,
    auth_url=AUTHENTICATION_URL,
    auth_token=TOKEN,
    container=ID,
    local_root_path=LOCAL_ROOT_PATH,
    ignore_ssl=True)

master = PithosFileClient(settings)
slave = LocalfsFileClient(settings)
s = FileSyncer(settings, master, slave)

pithos = master.endpoint
pithos.create_container(ID)

f1 = "f1"
content1 = "content1"
r1 = pithos.upload_from_string(
    f1, content1)
etag1 = r1['etag']


pithos_cands = master.get_pithos_candidates()
info = pithos_cands[f1]
assert etag1 == info["pithos_etag"]

db = s.get_db()
state = db.get_state(master.SIGNATURE, f1)
assert state.serial == -1
assert state.info == {}

s.probe_file(master.SIGNATURE, f1)
m = s.get_next_message(block=True)
assert isinstance(m, messaging.UpdateMessage)

state = db.get_state(master.SIGNATURE, f1)
assert state.serial == 0
assert state.info == info

deciding = s.list_deciding()
assert deciding == set([f1])

state = db.get_state(slave.SIGNATURE, f1)
assert state.serial == -1
assert state.info == {}

s.decide_file_sync(f1)
m = s.get_next_message(block=True)
assert isinstance(m, messaging.SyncMessage)

m = s.get_next_message(block=True)
assert isinstance(m, messaging.AckSyncMessage)
state = db.get_state(slave.SIGNATURE, f1)
assert state.serial == 0
info = state.info
assert info['localfs_size'] == len(content1)

local_path = LOCAL_ROOT_PATH + '/' + f1
assert utils.hash_file(local_path) == utils.hash_string(content1)

def write_local():
    content2 = "content2"
    with open(local_path, "w") as f:
        f.write(content2)


def write_upstream():
    content3 = "content3"
    r3 = pithos.upload_from_string(
        f1, content3)
    etag3 = r1['etag']


def func():
    write_upstream()
    write_local()
    assert s.get_next_message() is None
    s.initiate_probe()
    s.start_decide()

    m = s.get_next_message(block=True)
    print m
    assert isinstance(m, messaging.UpdateMessage)
    assert m.archive == master.SIGNATURE

    m = s.get_next_message(block=True)
    print m
    assert isinstance(m, messaging.UpdateMessage)
    assert m.archive == slave.SIGNATURE

    m = s.get_next_message(block=True)
    print m
    assert isinstance(m, messaging.SyncMessage)

    m = s.get_next_message(block=True)
    print m
    assert isinstance(m, messaging.ConflictStashMessage)

    m = s.get_next_message(block=True)
    print m
    assert isinstance(m, messaging.AckSyncMessage)


func()