test.py 29.6 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
# -*- coding: utf-8 -*-

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
18
19
from __future__ import unicode_literals

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
20
from agkyra.syncer.setup import SyncerSettings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
21
from agkyra.syncer import localfs_client
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
22
23
from agkyra.syncer.pithos_client import PithosFileClient
from agkyra.syncer.syncer import FileSyncer
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
24
import agkyra.syncer.syncer
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
25
from agkyra.syncer import messaging, utils, common
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
26
import random
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
27
28
import os
import time
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
29
import shutil
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
30
import unittest
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
31
32
import mock
import sqlite3
33
import tempfile
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
34

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
35
from functools import wraps
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
36
from agkyra.config import AgkyraConfig, CONFIG_PATH
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
37
from kamaki.clients import ClientError
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
38

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
39
40
41
42
43
44
45
46
import logging
logger = logging.getLogger('agkyra')
handler = logging.StreamHandler()
formatter = logging.Formatter("%(levelname)s:%(asctime)s:%(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
47
48
49
50
51
52
53
# kamakisend_logger = logging.getLogger('kamaki.clients.send')
# kamakisend_logger.addHandler(handler)
# kamakisend_logger.setLevel(logging.DEBUG)
# kamakirecv_logger = logging.getLogger('kamaki.clients.recv')
# kamakirecv_logger.addHandler(handler)
# kamakirecv_logger.setLevel(logging.DEBUG)

54
TMP = os.path.realpath(tempfile.gettempdir())
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
55

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
56
57
58
59
60

def hash_file(fil):
    with open(fil) as f:
        return utils.hash_string(f.read())

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
61

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def mock_transaction(max_wait=60, init_wait=0.4, exp_backoff=1.1):
    def wrap(func):
        @wraps(func)
        def inner(*args, **kwargs):
            print "IN MOCK"
            obj = args[0]
            db = obj.get_db()
            attempt = 0
            current_max_wait = init_wait
            db.begin()
            r = func(*args, **kwargs)
            raise common.DatabaseError()
        return inner
    return wrap


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
78
79
80
81
82
83
84
85
86
87
88
89
90
class AgkyraTest(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cnf = AgkyraConfig()
        cloud_conf = cnf.get('cloud', 'test')
        if cloud_conf is None:
            print "Define a 'test' cloud in %s" % CONFIG_PATH
            exit()

        AUTHENTICATION_URL = cloud_conf['url']
        TOKEN = cloud_conf['token']

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
91
        cls.ID = "ΑΓΚΥΡΑTEST" + str(random.random()).split('.')[1]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
92

93
        cls.LOCAL_ROOT_PATH = utils.join_path(TMP, cls.ID)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
94
95
96
97
98
99
100
101
102

        cls.settings = SyncerSettings(
            auth_url=AUTHENTICATION_URL,
            auth_token=TOKEN,
            container=cls.ID,
            local_root_path=cls.LOCAL_ROOT_PATH,
            ignore_ssl=True)

        cls.master = PithosFileClient(cls.settings)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
103
        cls.slave = localfs_client.LocalfsFileClient(cls.settings)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
104
105
106
107
        cls.s = FileSyncer(cls.settings, cls.master, cls.slave)
        cls.pithos = cls.master.endpoint
        cls.pithos.create_container(cls.ID)
        cls.db = cls.s.get_db()
108
109
110
111
        m = cls.s.get_next_message(block=True)
        assert isinstance(m, messaging.PithosSyncEnabled)
        m = cls.s.get_next_message(block=True)
        assert isinstance(m, messaging.LocalfsSyncEnabled)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
112
113
114
115
116
117
118

    def assert_message(self, mtype):
        m = self.s.get_next_message(block=True)
        print m
        self.assertIsInstance(m, mtype)
        return m

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
119
120
121
122
123
124
125
126
127
128
129
130
131
132
    def assert_messages(self, mtypes_dict):
        while mtypes_dict:
            m = self.s.get_next_message(block=True)
            print m
            mtype = m.__class__
            num = mtypes_dict.get(mtype, 0)
            if not num:
                raise AssertionError("Got unexpected message %s" % m)
            new_num = num -1
            if new_num:
                mtypes_dict[mtype] = new_num
            else:
                mtypes_dict.pop(mtype)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
133
134
135
136
137
138
139
140
141
142
143
    def assert_no_message(self):
        self.assertIsNone(self.s.get_next_message())

    @classmethod
    def tearDownClass(cls):
        cls.pithos.del_container(delimiter='/')
        cls.pithos.purge_container()

    def get_path(self, f):
        return os.path.join(self.LOCAL_ROOT_PATH, f)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
144
145
146
147
148
149
150
151
152
153
    def test_0001_listing_local(self):
        def real(candidates):
            return [c for c in candidates
                    if not c.startswith(self.settings.cache_name)]

        candidates = self.slave.list_candidate_files()
        self.assertEqual(candidates, [])
        candidates = self.slave.list_candidate_files(forced=True)
        self.assertEqual(real(candidates), [])

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
154
        fil = "φ0001"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
155
156
        f_path = self.get_path(fil)
        open(f_path, "a").close()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
157
        d = "δ0001"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
        d_path = self.get_path(d)
        os.mkdir(d_path)

        candidates = self.slave.list_candidate_files(forced=True)
        self.assertEqual(sorted(real(candidates)), sorted([fil, d]))
        self.s.probe_archive(self.s.SLAVE)
        self.assert_messages(
            {messaging.UpdateMessage: 2,
             messaging.IgnoreProbeMessage: 4})

        with self.slave.probe_candidates.lock() as dct:
            self.assertNotIn(fil, dct)
            self.assertNotIn(d, dct)

        self.s.decide_archive(self.s.SLAVE)
        self.assert_messages({
            messaging.SyncMessage: 2,
            messaging.AckSyncMessage: 2})

        os.unlink(f_path)

        with mock.patch(
                "agkyra.syncer.localfs_client.LocalfsFileClient.list_files") as mk:
            mk.return_value = []
            candidates = self.slave.list_candidate_files(forced=True)
            self.assertEqual(real(candidates), [d])

        candidates = self.slave.list_candidate_files(forced=True)
        self.assertEqual(sorted(real(candidates)), sorted([fil, d]))

        candidates = self.slave.list_candidate_files()
        self.assertEqual(sorted(real(candidates)), sorted([fil, d]))

        self.slave.remove_candidates(candidates, None)
        candidates = self.slave.list_candidate_files()
        self.assertEqual(candidates, [])

    def test_0002_notifier_local(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
196
197
198
199
        f_out = "φ0002out"
        f_cache = "φ0002cache"
        f_upd = "φ0002upd"
        f_ren = "φ0002ren"
200
        f_cached = "φ0002cached"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
201
        dbefore = "δ0002before"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
202
203
204
205
        f_out_path = self.get_path(f_out)
        f_cache_path = self.get_path(f_cache)
        f_upd_path = self.get_path(f_upd)
        f_ren_path = self.get_path(f_ren)
206
207
        f_cached_path = os.path.join(
            self.settings.cache_path, f_cached)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
208
209
210
211
212
        dbefore_path = self.get_path(dbefore)
        open(f_out_path, "a").close()
        open(f_cache_path, "a").close()
        open(f_upd_path, "a").close()
        open(f_ren_path, "a").close()
213
        open(f_cached_path, "a").close()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
214
215
216
217
218
219
        os.mkdir(dbefore_path)

        notifier = self.slave.notifier()
        candidates = self.slave.list_candidate_files()
        self.assertEqual(candidates, [])

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
220
        fafter = "φ0002after"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
221
        fafter_path = self.get_path(fafter)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
222
        dafter = "δ0002after"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
        dafter_path = self.get_path(dafter)
        open(fafter_path, "a").close()
        os.mkdir(dafter_path)

        time.sleep(1)
        candidates = self.slave.list_candidate_files()
        self.assertEqual(sorted(candidates), sorted([fafter, dafter]))

        os.rename(f_cache_path,
                  utils.join_path(self.settings.cache_path, f_cache))
        os.rename(f_out_path,
                  utils.join_path(TMP, f_out))
        with open(f_upd_path, "a") as f:
            f.write("upd")

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
238
        f_in = "φ0002in"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
239
240
241
242
243
        f_in_path = self.get_path(f_in)
        f_in_orig_path = utils.join_path(TMP, f_in)
        open(f_in_orig_path, "a").close()
        os.rename(f_in_orig_path, f_in_path)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
244
        f_ren_new = "φ0002ren_new"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
245
246
247
        f_ren_new_path = self.get_path(f_ren_new)
        os.rename(f_ren_path, f_ren_new_path)

248
249
250
        f_cached_out_path = self.get_path(f_cached)
        os.rename(f_cached_path, f_cached_out_path)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
251
252
253
254
255
        time.sleep(1)
        candidates = self.slave.list_candidate_files()
        self.assertEqual(sorted(candidates),
                         sorted([fafter, dafter,
                                 f_in, f_out, f_upd,
256
257
                                 f_ren, f_ren_new,
                                 f_cache, f_cached]))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
258
259
260
        notifier.stop()

    def test_001_probe_and_sync(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
261
        # initial upload to pithos
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
262
        f1 = "φ001"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
263
264
265
266
267
268
269
270
271
272
273
274
275
        f1_content1 = "content1"
        r1 = self.pithos.upload_from_string(
            f1, f1_content1)
        etag1 = r1['etag']

        state = self.db.get_state(self.s.MASTER, f1)
        self.assertEqual(state.serial, -1)
        self.assertEqual(state.info, {})

        # probe pithos
        self.s.probe_file(self.s.MASTER, f1)
        m = self.assert_message(messaging.UpdateMessage)
        self.assertEqual(m.archive, self.s.MASTER)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
276
        self.assertEqual(m.serial, 0)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307

        state = self.db.get_state(self.s.MASTER, f1)
        self.assertEqual(state.serial, 0)
        self.assertEqual(state.info["pithos_etag"], etag1)

        # get local state
        state = self.db.get_state(self.s.SLAVE, f1)
        self.assertEqual(state.serial, -1)
        assert state.info == {}

        # sync
        self.s.decide_file_sync(f1)
        dstate = self.db.get_state(self.s.DECISION, f1)
        self.assertEqual(dstate.serial, 0)
        self.assert_message(messaging.SyncMessage)

        # check local synced file
        self.assert_message(messaging.AckSyncMessage)
        state = self.db.get_state(self.s.SLAVE, f1)
        assert state.serial == 0
        info = state.info
        assert info['localfs_size'] == len(f1_content1)
        f1_path = self.get_path(f1)
        self.assertEqual(hash_file(f1_path), utils.hash_string(f1_content1))

        dstate = self.db.get_state(self.s.DECISION, f1)
        sstate = self.db.get_state(self.s.SYNC, f1)
        self.assertEqual(dstate.info, sstate.info)
        self.assertEqual(sstate.serial, 0)

    def test_002_conflict(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
308
        fil = "φ002"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
309
        # local file
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
310
311
312
313
        fil_local_content = "local"
        with open(self.get_path(fil), "w") as f:
            f.write(fil_local_content)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
314
        # upstream
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
        fil_upstream_content = "upstream"
        r = self.pithos.upload_from_string(
            fil, fil_upstream_content)
        etag = r['etag']

        # cause a conflict
        # first try to upload ignoring upstream changes
        self.s.probe_file(self.s.SLAVE, fil)
        self.assert_message(messaging.UpdateMessage)
        self.s.decide_file_sync(fil)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.CollisionMessage)
        self.assert_message(messaging.SyncErrorMessage)

        # this will fail because serial is marked as failed
        self.s.decide_file_sync(fil)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
331
        self.assert_message(messaging.FailedSyncIgnoreDecisionMessage)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
332
333
334
335

        # now probe upstream too and retry
        self.s.probe_file(self.s.MASTER, fil)
        self.assert_message(messaging.UpdateMessage)
336
        self.s.start_notifiers()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
337
        self.s.decide_file_sync(fil)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
338
        self.assert_message(messaging.FailedSyncIgnoreDecisionMessage)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
339
        self.assert_message(messaging.SyncMessage)
340
341
        m = self.assert_message(messaging.ConflictStashMessage)
        stash_name = m.stash_name
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
342
        self.assert_message(messaging.AckSyncMessage)
343
344
345
346
        time.sleep(1)
        self.s.stop_notifiers()
        local_cands = self.slave.list_candidate_files()
        self.assertIn(stash_name, local_cands)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
347
348
349

    def test_003_dirs(self):
        # make local dir with files
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
350
        d = "δ003"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
351
352
        d_path = self.get_path(d)
        os.mkdir(d_path)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
353
        fil = "δ003/φ003"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
354
355
356
357
358
359
360
361
362
363
        f_path = self.get_path(fil)
        f_content = "f2"
        with open(f_path, "w") as f:
            f.write(f_content)
        self.s.probe_file(self.s.SLAVE, d)
        self.s.probe_file(self.s.SLAVE, fil)
        self.assert_message(messaging.UpdateMessage)
        self.assert_message(messaging.UpdateMessage)

        self.s.decide_archive(self.s.SLAVE)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
364
365
366
        self.assert_messages({
            messaging.SyncMessage: 2,
            messaging.AckSyncMessage: 2})
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
367
368
369

    def test_004_link(self):
        # Check sym links
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
370
        fil = "φ004"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
371
372
373
374
375
376
377
378
        f_path = self.get_path(fil)
        open(f_path, 'a').close()
        self.s.probe_file(self.s.SLAVE, fil)
        self.s.decide_file_sync(fil)
        self.assert_message(messaging.UpdateMessage)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.AckSyncMessage)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
379
        ln = "φ004.link"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
        ln_path = self.get_path(ln)
        os.symlink(f_path, ln_path)
        self.s.probe_file(self.s.SLAVE, ln)
        self.assert_message(messaging.UpdateMessage)
        state = self.db.get_state(self.s.SLAVE, ln)
        self.assertEqual(state.serial, 0)
        self.assertEqual(state.info, {"localfs_type": "unhandled"})
        self.s.decide_file_sync(ln)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.AckSyncMessage)
        state = self.db.get_state(self.s.MASTER, ln)
        self.assertEqual(state.info, {})

        # Put file upstream to cause conflict
        upstream_ln_content = "regular"
        r = self.pithos.upload_from_string(
            ln, upstream_ln_content)
        etag = r['etag']
        self.s.probe_file(self.s.MASTER, ln)
        self.s.probe_file(self.s.SLAVE, ln)
        self.assert_message(messaging.UpdateMessage)
        state = self.db.get_state(self.s.MASTER, ln)
        self.assertEqual(state.info["pithos_etag"], etag)
        self.s.decide_file_sync(ln)
        self.assert_message(messaging.SyncMessage)
        m = self.assert_message(messaging.ConflictStashMessage)
        stashed_ln = m.stash_name
        self.assert_message(messaging.AckSyncMessage)
        self.assert_no_message()
        self.s.probe_file(self.s.SLAVE, stashed_ln)
        m = self.assert_message(messaging.UpdateMessage)
        self.assertEqual(m.objname, stashed_ln)
        state = self.db.get_state(self.s.SLAVE, stashed_ln)
        self.assertEqual(state.serial, 0)
        self.assertEqual(state.info, {"localfs_type": "unhandled"})
        self.assert_no_message()

        # no changes in linked file
        self.s.probe_file(self.s.SLAVE, fil)
        time.sleep(2)
        self.assert_no_message()

    def test_005_dirs_inhibited_by_file(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
423
        fil = "φ005"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
424
425
        f_path = self.get_path(fil)
        open(f_path, 'a').close()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
426
427
428
429
430
431
        self.s.probe_file(self.s.SLAVE, fil)
        self.s.decide_file_sync(fil)
        self.assert_message(messaging.UpdateMessage)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.AckSyncMessage)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
432
433
        r = self.pithos.object_put(
            fil, content_type='application/directory', content_length=0)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
434
        inner_fil = "φ005/in005"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
435
436
437
438
439
440
441
442
443
444
445
446
        inner_fil_content = "ff1 in dir "
        r1 = self.pithos.upload_from_string(inner_fil, inner_fil_content)
        self.s.probe_file(self.s.MASTER, fil)
        self.s.probe_file(self.s.MASTER, inner_fil)
        self.assert_message(messaging.UpdateMessage)
        self.assert_message(messaging.UpdateMessage)

        # fails because file in place of dir
        self.s.decide_file_sync(inner_fil)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.SyncErrorMessage)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
447
        inner_dir = "φ005/indir005"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
448
449
450
451
452
453
454
455
456
        r = self.pithos.object_put(
            inner_dir, content_type='application/directory', content_length=0)
        self.s.probe_file(self.s.MASTER, inner_dir)
        self.assert_message(messaging.UpdateMessage)
        # also fails because file in place of dir
        self.s.decide_file_sync(inner_dir)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.SyncErrorMessage)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
457
458
459
460
        # but if we fist sync the dir, it's ok
        self.s.decide_file_sync(fil)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.AckSyncMessage)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
461
        self.assertTrue(os.path.isdir(f_path))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
462
463
464
465
        self.s.decide_file_sync(inner_fil)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.AckSyncMessage)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
466
    def test_006_heartbeat(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
467
        fil = "φ006"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
468
469
470
471
472
473
474
475
476
477
478
        f_path = self.get_path(fil)
        open(f_path, 'a').close()
        self.s.probe_file(self.s.SLAVE, fil)
        self.assert_message(messaging.UpdateMessage)
        self.s.decide_file_sync(fil)
        self.assert_message(messaging.SyncMessage)
        self.s.probe_file(self.s.SLAVE, fil)
        self.assert_message(messaging.HeartbeatNoProbeMessage)
        self.s.decide_file_sync(fil)
        self.assert_message(messaging.HeartbeatNoDecideMessage)
        self.assert_message(messaging.AckSyncMessage)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
479
480
481
482
        with open(f_path, 'w') as f:
            f.write("new")
        self.s.probe_file(self.s.SLAVE, fil)
        self.assert_message(messaging.UpdateMessage)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
483

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
484
485
486
487
488
489
490
491
492
493
494
495
496
497
        with mock.patch(
                "agkyra.syncer.database.SqliteFileStateDB.commit") as dbmock:
            dbmock.side_effect = [sqlite3.OperationalError("locked"),
                                  common.DatabaseError()]
            self.s.decide_file_sync(fil)
        self.assert_message(messaging.HeartbeatReplayDecideMessage)
        self.s.decide_file_sync(fil)
        self.assert_message(messaging.HeartbeatNoDecideMessage)
        print "SLEEPING 11"
        time.sleep(11)
        self.s.decide_file_sync(fil)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.AckSyncMessage)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
498
    def test_007_multiprobe(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
499
        fil = "φ007"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
500
501
502
503
504
505
506
507
508
        f_path = self.get_path(fil)
        open(f_path, 'a').close()
        self.s.probe_file(self.s.SLAVE, fil)
        self.assert_message(messaging.UpdateMessage)
        with open(f_path, 'w') as f:
            f.write("new")
        self.s.probe_file(self.s.SLAVE, fil)
        self.assert_message(messaging.AlreadyProbedMessage)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
509
    def test_008_dir_contents(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
510
        d = "δ008"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
511
512
513
        d_path = self.get_path(d)
        r = self.pithos.object_put(
            d, content_type='application/directory', content_length=0)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
514
        inner_fil = "δ008/inφ008"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
515
516
517
518
519
520
521
522
523
524
525
526
527
        inner_fil_content = "fil in dir "
        r1 = self.pithos.upload_from_string(inner_fil, inner_fil_content)
        self.s.probe_file(self.s.MASTER, d)
        m = self.assert_message(messaging.UpdateMessage)
        master_serial = m.serial
        self.assertEqual(master_serial, 0)
        self.s.probe_file(self.s.MASTER, inner_fil)
        self.assert_message(messaging.UpdateMessage)
        # this will also make the dir
        self.s.decide_file_sync(inner_fil)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.AckSyncMessage)
        self.assertTrue(os.path.isdir(d_path))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
528
        # sync the dir too
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
529
530
531
532
533
534
535
536
537
538
        self.s.probe_file(self.s.SLAVE, d)
        m = self.assert_message(messaging.UpdateMessage)
        slave_serial = m.serial
        self.assertEqual(slave_serial, 1)
        self.s.decide_file_sync(d)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.AckSyncMessage)
        state = self.db.get_state(self.s.SLAVE, d)
        self.assertEqual(state.serial, master_serial)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
        # locally remove the dir and sync
        shutil.rmtree(d_path)
        self.s.probe_file(self.s.SLAVE, d)
        self.s.probe_file(self.s.SLAVE, inner_fil)
        self.assert_message(messaging.UpdateMessage)
        self.assert_message(messaging.UpdateMessage)
        self.s.decide_file_sync(d)
        self.s.decide_file_sync(inner_fil)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.AckSyncMessage)
        self.assert_message(messaging.AckSyncMessage)
        with self.assertRaises(ClientError) as cm:
            self.pithos.get_object_info(d)
        self.assertEqual(cm.exception.status, 404)

    def test_009_dir_delete_upstream(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
556
        d = "δ009"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
557
558
559
        d_path = self.get_path(d)
        r = self.pithos.object_put(
            d, content_type='application/directory', content_length=0)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
560
        innerd = "δ009/innerδ009"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
        r = self.pithos.object_put(
            innerd, content_type='application/directory', content_length=0)
        self.s.probe_file(self.s.MASTER, d)
        self.s.probe_file(self.s.MASTER, innerd)
        self.assert_message(messaging.UpdateMessage)
        self.assert_message(messaging.UpdateMessage)
        self.s.decide_file_sync(d)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.AckSyncMessage)
        self.s.decide_file_sync(innerd)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.AckSyncMessage)
        self.assertTrue(os.path.isdir(d_path))

        # delete upstream
        self.pithos.del_object(d)
        self.pithos.del_object(innerd)
        self.s.probe_file(self.s.MASTER, d)
        self.s.probe_file(self.s.MASTER, innerd)
        self.assert_message(messaging.UpdateMessage)
        self.assert_message(messaging.UpdateMessage)

        # will fail because local dir is non-empty
        self.s.decide_file_sync(d)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.SyncErrorMessage)

        # but this is ok
        self.s.decide_file_sync(innerd)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.AckSyncMessage)
        self.s.decide_file_sync(d)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.AckSyncMessage)

    def test_010_live_update_local(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
597
        fil = "φ010"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
        f_path = self.get_path(fil)
        with open(f_path, "w") as f:
            f.write("f to be changed")

        self.s.probe_file(self.s.SLAVE, fil)
        self.assert_message(messaging.UpdateMessage)
        state = self.db.get_state(self.s.SLAVE, fil)
        f_info = state.info

        f_content = "changed"
        with open(f_path, "w") as f:
            f.write(f_content)

        self.s.decide_file_sync(fil)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.LiveInfoUpdateMessage)
        self.assert_message(messaging.AckSyncMessage)

        state = self.db.get_state(self.s.SLAVE, fil)
        new_info = state.info
        self.assertNotEqual(f_info, new_info)
        self.assertEqual(new_info["localfs_size"], len(f_content))

    def test_011_live_update_upstream(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
622
        fil = "φ011"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
        f_path = self.get_path(fil)
        r = self.pithos.upload_from_string(fil, "f upstream")
        etag = r['etag']

        self.s.probe_file(self.s.MASTER, fil)
        self.assert_message(messaging.UpdateMessage)
        state = self.db.get_state(self.s.MASTER, fil)
        f_info = state.info
        self.assertEqual(f_info["pithos_etag"], etag)

        r1 = self.pithos.upload_from_string(fil, "new")
        new_etag = r1['etag']

        self.s.decide_file_sync(fil)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.LiveInfoUpdateMessage)
        self.assert_message(messaging.AckSyncMessage)
        state = self.db.get_state(self.s.MASTER, fil)
        new_info = state.info
        self.assertEqual(new_info["pithos_etag"], new_etag)

    def test_012_cachename(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
645
        fil = "φ012"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
646
647
648
649
650
        f_path = self.get_path(fil)
        with open(f_path, "w") as f:
            f.write("content")

        state = self.db.get_state(self.s.SLAVE, fil)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
651
        handle = localfs_client.LocalfsTargetHandle(self.s.settings, state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
        hidden_filename = utils.join_path(
            handle.cache_hide_name, utils.hash_string(handle.objname))
        hidden_path = handle.get_path_in_cache(hidden_filename)
        self.assertFalse(os.path.isfile(hidden_path))

        self.assertIsNone(self.db.get_cachename(hidden_filename))
        handle.move_file()

        self.assertTrue(os.path.isfile(hidden_path))
        self.assertIsNotNone(self.db.get_cachename(hidden_filename))
        handle.move_file()
        self.assertTrue(os.path.isfile(hidden_path))

        shutil.move(hidden_path, f_path)
        self.assertIsNotNone(self.db.get_cachename(hidden_filename))
        handle.move_file()
        self.assertTrue(os.path.isfile(hidden_path))

        # open file to cause busy error
        f = open(hidden_path, "r")
        with self.assertRaises(common.BusyError):
            handle.hide_file()

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
675
    def test_013_collisions(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
676
        fil = "φ013"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
677
678
679
680
681
682
683
684
685
686
687
688
        f_path = self.get_path(fil)
        with open(f_path, "w") as f:
            f.write("content")
        self.s.probe_file(self.s.SLAVE, fil)
        self.assert_message(messaging.UpdateMessage)

        r = self.pithos.upload_from_string(fil, "new")
        self.s.decide_file_sync(fil)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.CollisionMessage)
        self.assert_message(messaging.SyncErrorMessage)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
689
        d = "δ013"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
690
691
692
693
694
695
696
697
698
699
700
        d_path = self.get_path(d)
        os.mkdir(d_path)
        self.s.probe_file(self.s.SLAVE, d)
        self.assert_message(messaging.UpdateMessage)

        r = self.pithos.upload_from_string(d, "new")
        self.s.decide_file_sync(d)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.CollisionMessage)
        self.assert_message(messaging.SyncErrorMessage)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
701
        d_synced = "δ013_s"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
        d_synced_path = self.get_path(d_synced)
        os.mkdir(d_synced_path)
        self.s.probe_file(self.s.SLAVE, d_synced)
        self.assert_message(messaging.UpdateMessage)
        self.s.decide_file_sync(d_synced)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.AckSyncMessage)

        os.rmdir(d_synced_path)
        self.s.probe_file(self.s.SLAVE, d_synced)
        self.assert_message(messaging.UpdateMessage)

        r = self.pithos.upload_from_string(d_synced, "new")
        self.s.decide_file_sync(d_synced)
        self.assert_message(messaging.SyncMessage)
        self.assert_message(messaging.CollisionMessage)
        self.assert_message(messaging.SyncErrorMessage)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
719

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
720
    def test_014_staging(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
721
722
723
        fil = "φ014"
        d = "δ014"
        fln = "φ014.link"
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
        f_path = self.get_path(fil)
        with open(f_path, "w") as f:
            f.write("content")
        fln_path = self.get_path(fln)
        os.symlink(f_path, fln_path)
        d_path = self.get_path(d)
        os.mkdir(d_path)

        self.s.probe_file(self.s.SLAVE, fil)
        self.assert_message(messaging.UpdateMessage)
        state = self.db.get_state(self.s.SLAVE, fil)
        handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)
        staged_path = handle.staged_path
        self.assertTrue(localfs_client.files_equal(f_path, staged_path))
        handle.unstage_file()
        self.assertFalse(os.path.exists(staged_path))

        with open(f_path, "w") as f:
            f.write("content new")
        handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)
        self.assert_message(messaging.LiveInfoUpdateMessage)
        self.assertTrue(localfs_client.files_equal(f_path, staged_path))
        handle.unstage_file()

        f = open(f_path, "r")
        with self.assertRaises(common.OpenBusyError):
            handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
752
        ftmp_path = self.get_path("φ014tmp")
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
        with open(ftmp_path, "w") as f:
            f.write("tmp")
        os.unlink(f_path)
        os.symlink(ftmp_path, f_path)
        state = self.db.get_state(self.s.SLAVE, fil)
        handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)
        self.assert_message(messaging.LiveInfoUpdateMessage)
        self.assertIsNone(handle.staged_path)

        self.s.probe_file(self.s.SLAVE, fln)
        self.assert_message(messaging.UpdateMessage)
        state = self.db.get_state(self.s.SLAVE, fln)
        handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)
        self.assertIsNone(handle.staged_path)

        os.unlink(fln_path)
        with open(fln_path, "w") as f:
            f.write("reg file")

        handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)
        self.assertIsNone(handle.staged_path)

        # try to stage now
        handle.stage_file()
        self.assert_message(messaging.LiveInfoUpdateMessage)
        self.assertTrue(localfs_client.files_equal(
            fln_path, handle.staged_path))
        handle.unstage_file()

        fmissing = "fmissing014"
        fmissing_path = self.get_path(fmissing)
        self.s.probe_file(self.s.SLAVE, fmissing)
        state = self.db.get_state(self.s.SLAVE, fmissing)
        handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)
        self.assertIsNone(handle.staged_path)

        with open(fmissing_path, "w") as f:
            f.write("ref file")

        handle.copy_file()
        self.assertIsNotNone(handle.staged_path)
794
795
        live_info = localfs_client.get_live_info(
            self.s.settings, handle.fspath)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
        handle.check_staged(live_info)

        with open(fmissing_path, "w") as f:
            f.write("ref file2")
        with self.assertRaises(common.ChangedBusyError):
            handle.check_staged(live_info)

        # turn it into a dir
        os.unlink(fmissing_path)
        os.mkdir(fmissing_path)
        handle.copy_file()
        with self.assertRaises(common.NotStableBusyError):
            handle.check_staged(live_info)

        # info of dir
811
812
        live_info = localfs_client.get_live_info(
            self.s.settings, handle.fspath)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
813
814
        handle.check_staged(live_info)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
815

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
816
817
if __name__ == '__main__':
    unittest.main()