# -*- coding: utf-8 -*- # Copyright (C) 2015 GRNET S.A. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from __future__ import unicode_literals from agkyra.syncer.setup import SyncerSettings from agkyra.syncer import localfs_client from agkyra.syncer.pithos_client import PithosFileClient from agkyra.syncer.syncer import FileSyncer import agkyra.syncer.syncer from agkyra.syncer import messaging, utils, common import random import os import time import shutil import unittest import mock import sqlite3 import tempfile from functools import wraps from agkyra.config import AgkyraConfig, CONFIG_PATH from kamaki.clients import ClientError import logging logger = logging.getLogger('agkyra') handler = logging.StreamHandler() formatter = logging.Formatter("%(levelname)s:%(asctime)s:%(message)s") handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) # kamakisend_logger = logging.getLogger('kamaki.clients.send') # kamakisend_logger.addHandler(handler) # kamakisend_logger.setLevel(logging.DEBUG) # kamakirecv_logger = logging.getLogger('kamaki.clients.recv') # kamakirecv_logger.addHandler(handler) # kamakirecv_logger.setLevel(logging.DEBUG) TMP = os.path.realpath(tempfile.gettempdir()) def hash_file(fil): with open(fil) as f: return utils.hash_string(f.read()) def mock_transaction(max_wait=60, init_wait=0.4, exp_backoff=1.1): def wrap(func): @wraps(func) def inner(*args, **kwargs): print "IN MOCK" obj = args[0] db = obj.get_db() attempt = 0 current_max_wait = init_wait db.begin() r = func(*args, **kwargs) raise common.DatabaseError() return inner return wrap class AgkyraTest(unittest.TestCase): @classmethod def setUpClass(cls): cnf = AgkyraConfig() cloud_conf = cnf.get('cloud', 'test') if cloud_conf is None: print "Define a 'test' cloud in %s" % CONFIG_PATH exit() AUTHENTICATION_URL = cloud_conf['url'] TOKEN = cloud_conf['token'] cls.ID = "ΑΓΚΥΡΑTEST" + str(random.random()).split('.')[1] cls.LOCAL_ROOT_PATH = utils.join_path(TMP, cls.ID) cls.settings = SyncerSettings( auth_url=AUTHENTICATION_URL, auth_token=TOKEN, container=cls.ID, local_root_path=cls.LOCAL_ROOT_PATH, ignore_ssl=True) cls.master = PithosFileClient(cls.settings) cls.slave = localfs_client.LocalfsFileClient(cls.settings) cls.s = FileSyncer(cls.settings, cls.master, cls.slave) cls.pithos = cls.master.endpoint cls.pithos.create_container(cls.ID) cls.db = cls.s.get_db() m = cls.s.get_next_message(block=True) assert isinstance(m, messaging.PithosSyncEnabled) m = cls.s.get_next_message(block=True) assert isinstance(m, messaging.LocalfsSyncEnabled) def assert_message(self, mtype): m = self.s.get_next_message(block=True) print m self.assertIsInstance(m, mtype) return m def assert_messages(self, mtypes_dict): while mtypes_dict: m = self.s.get_next_message(block=True) print m mtype = m.__class__ num = mtypes_dict.get(mtype, 0) if not num: raise AssertionError("Got unexpected message %s" % m) new_num = num -1 if new_num: mtypes_dict[mtype] = new_num else: mtypes_dict.pop(mtype) def assert_no_message(self): self.assertIsNone(self.s.get_next_message()) @classmethod def tearDownClass(cls): cls.pithos.del_container(delimiter='/') cls.pithos.purge_container() def get_path(self, f): return os.path.join(self.LOCAL_ROOT_PATH, f) def test_0001_listing_local(self): def real(candidates): return [c for c in candidates if not c.startswith(self.settings.cache_name)] candidates = self.slave.list_candidate_files() self.assertEqual(candidates, []) candidates = self.slave.list_candidate_files(forced=True) self.assertEqual(real(candidates), []) fil = "φ0001" f_path = self.get_path(fil) open(f_path, "a").close() d = "δ0001" d_path = self.get_path(d) os.mkdir(d_path) candidates = self.slave.list_candidate_files(forced=True) self.assertEqual(sorted(real(candidates)), sorted([fil, d])) self.s.probe_archive(self.s.SLAVE) self.assert_messages( {messaging.UpdateMessage: 2, messaging.IgnoreProbeMessage: 4}) with self.slave.probe_candidates.lock() as dct: self.assertNotIn(fil, dct) self.assertNotIn(d, dct) self.s.decide_archive(self.s.SLAVE) self.assert_messages({ messaging.SyncMessage: 2, messaging.AckSyncMessage: 2}) os.unlink(f_path) with mock.patch( "agkyra.syncer.localfs_client.LocalfsFileClient.list_files") as mk: mk.return_value = [] candidates = self.slave.list_candidate_files(forced=True) self.assertEqual(real(candidates), [d]) candidates = self.slave.list_candidate_files(forced=True) self.assertEqual(sorted(real(candidates)), sorted([fil, d])) candidates = self.slave.list_candidate_files() self.assertEqual(sorted(real(candidates)), sorted([fil, d])) self.slave.remove_candidates(candidates, None) candidates = self.slave.list_candidate_files() self.assertEqual(candidates, []) def test_0002_notifier_local(self): f_out = "φ0002out" f_cache = "φ0002cache" f_upd = "φ0002upd" f_ren = "φ0002ren" f_cached = "φ0002cached" dbefore = "δ0002before" f_out_path = self.get_path(f_out) f_cache_path = self.get_path(f_cache) f_upd_path = self.get_path(f_upd) f_ren_path = self.get_path(f_ren) f_cached_path = os.path.join( self.settings.cache_path, f_cached) dbefore_path = self.get_path(dbefore) open(f_out_path, "a").close() open(f_cache_path, "a").close() open(f_upd_path, "a").close() open(f_ren_path, "a").close() open(f_cached_path, "a").close() os.mkdir(dbefore_path) notifier = self.slave.notifier() candidates = self.slave.list_candidate_files() self.assertEqual(candidates, []) fafter = "φ0002after" fafter_path = self.get_path(fafter) dafter = "δ0002after" dafter_path = self.get_path(dafter) open(fafter_path, "a").close() os.mkdir(dafter_path) time.sleep(1) candidates = self.slave.list_candidate_files() self.assertEqual(sorted(candidates), sorted([fafter, dafter])) os.rename(f_cache_path, utils.join_path(self.settings.cache_path, f_cache)) os.rename(f_out_path, utils.join_path(TMP, f_out)) with open(f_upd_path, "a") as f: f.write("upd") f_in = "φ0002in" f_in_path = self.get_path(f_in) f_in_orig_path = utils.join_path(TMP, f_in) open(f_in_orig_path, "a").close() os.rename(f_in_orig_path, f_in_path) f_ren_new = "φ0002ren_new" f_ren_new_path = self.get_path(f_ren_new) os.rename(f_ren_path, f_ren_new_path) f_cached_out_path = self.get_path(f_cached) os.rename(f_cached_path, f_cached_out_path) time.sleep(1) candidates = self.slave.list_candidate_files() self.assertEqual(sorted(candidates), sorted([fafter, dafter, f_in, f_out, f_upd, f_ren, f_ren_new, f_cache, f_cached])) notifier.stop() def test_001_probe_and_sync(self): # initial upload to pithos f1 = "φ001" f1_content1 = "content1" r1 = self.pithos.upload_from_string( f1, f1_content1) etag1 = r1['etag'] state = self.db.get_state(self.s.MASTER, f1) self.assertEqual(state.serial, -1) self.assertEqual(state.info, {}) # probe pithos self.s.probe_file(self.s.MASTER, f1) m = self.assert_message(messaging.UpdateMessage) self.assertEqual(m.archive, self.s.MASTER) self.assertEqual(m.serial, 0) state = self.db.get_state(self.s.MASTER, f1) self.assertEqual(state.serial, 0) self.assertEqual(state.info["pithos_etag"], etag1) # get local state state = self.db.get_state(self.s.SLAVE, f1) self.assertEqual(state.serial, -1) assert state.info == {} # sync self.s.decide_file_sync(f1) dstate = self.db.get_state(self.s.DECISION, f1) self.assertEqual(dstate.serial, 0) self.assert_message(messaging.SyncMessage) # check local synced file self.assert_message(messaging.AckSyncMessage) state = self.db.get_state(self.s.SLAVE, f1) assert state.serial == 0 info = state.info assert info['localfs_size'] == len(f1_content1) f1_path = self.get_path(f1) self.assertEqual(hash_file(f1_path), utils.hash_string(f1_content1)) dstate = self.db.get_state(self.s.DECISION, f1) sstate = self.db.get_state(self.s.SYNC, f1) self.assertEqual(dstate.info, sstate.info) self.assertEqual(sstate.serial, 0) def test_002_conflict(self): fil = "φ002" # local file fil_local_content = "local" with open(self.get_path(fil), "w") as f: f.write(fil_local_content) # upstream fil_upstream_content = "upstream" r = self.pithos.upload_from_string( fil, fil_upstream_content) etag = r['etag'] # cause a conflict # first try to upload ignoring upstream changes self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.UpdateMessage) self.s.decide_file_sync(fil) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.SyncErrorMessage) # this will fail because serial is marked as failed self.s.decide_file_sync(fil) self.assert_message(messaging.FailedSyncIgnoreDecisionMessage) # now probe upstream too and retry self.s.probe_file(self.s.MASTER, fil) self.assert_message(messaging.UpdateMessage) self.s.start_notifiers() self.s.decide_file_sync(fil) self.assert_message(messaging.FailedSyncIgnoreDecisionMessage) self.assert_message(messaging.SyncMessage) m = self.assert_message(messaging.ConflictStashMessage) stash_name = m.stash_name self.assert_message(messaging.AckSyncMessage) time.sleep(1) self.s.stop_notifiers() local_cands = self.slave.list_candidate_files() self.assertIn(stash_name, local_cands) def test_003_dirs(self): # make local dir with files d = "δ003" d_path = self.get_path(d) os.mkdir(d_path) fil = "δ003/φ003" f_path = self.get_path(fil) f_content = "f2" with open(f_path, "w") as f: f.write(f_content) self.s.probe_file(self.s.SLAVE, d) self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.UpdateMessage) self.assert_message(messaging.UpdateMessage) self.s.decide_archive(self.s.SLAVE) self.assert_messages({ messaging.SyncMessage: 2, messaging.AckSyncMessage: 2}) def test_004_link(self): # Check sym links fil = "φ004" f_path = self.get_path(fil) open(f_path, 'a').close() self.s.probe_file(self.s.SLAVE, fil) self.s.decide_file_sync(fil) self.assert_message(messaging.UpdateMessage) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) ln = "φ004.link" ln_path = self.get_path(ln) os.symlink(f_path, ln_path) self.s.probe_file(self.s.SLAVE, ln) self.assert_message(messaging.UpdateMessage) state = self.db.get_state(self.s.SLAVE, ln) self.assertEqual(state.serial, 0) self.assertEqual(state.info, {"localfs_type": "unhandled"}) self.s.decide_file_sync(ln) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) state = self.db.get_state(self.s.MASTER, ln) self.assertEqual(state.info, {}) # Put file upstream to cause conflict upstream_ln_content = "regular" r = self.pithos.upload_from_string( ln, upstream_ln_content) etag = r['etag'] self.s.probe_file(self.s.MASTER, ln) self.s.probe_file(self.s.SLAVE, ln) self.assert_message(messaging.UpdateMessage) state = self.db.get_state(self.s.MASTER, ln) self.assertEqual(state.info["pithos_etag"], etag) self.s.decide_file_sync(ln) self.assert_message(messaging.SyncMessage) m = self.assert_message(messaging.ConflictStashMessage) stashed_ln = m.stash_name self.assert_message(messaging.AckSyncMessage) self.assert_no_message() self.s.probe_file(self.s.SLAVE, stashed_ln) m = self.assert_message(messaging.UpdateMessage) self.assertEqual(m.objname, stashed_ln) state = self.db.get_state(self.s.SLAVE, stashed_ln) self.assertEqual(state.serial, 0) self.assertEqual(state.info, {"localfs_type": "unhandled"}) self.assert_no_message() # no changes in linked file self.s.probe_file(self.s.SLAVE, fil) time.sleep(2) self.assert_no_message() def test_005_dirs_inhibited_by_file(self): fil = "φ005" f_path = self.get_path(fil) open(f_path, 'a').close() self.s.probe_file(self.s.SLAVE, fil) self.s.decide_file_sync(fil) self.assert_message(messaging.UpdateMessage) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) r = self.pithos.object_put( fil, content_type='application/directory', content_length=0) inner_fil = "φ005/in005" inner_fil_content = "ff1 in dir " r1 = self.pithos.upload_from_string(inner_fil, inner_fil_content) self.s.probe_file(self.s.MASTER, fil) self.s.probe_file(self.s.MASTER, inner_fil) self.assert_message(messaging.UpdateMessage) self.assert_message(messaging.UpdateMessage) # fails because file in place of dir self.s.decide_file_sync(inner_fil) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.SyncErrorMessage) inner_dir = "φ005/indir005" r = self.pithos.object_put( inner_dir, content_type='application/directory', content_length=0) self.s.probe_file(self.s.MASTER, inner_dir) self.assert_message(messaging.UpdateMessage) # also fails because file in place of dir self.s.decide_file_sync(inner_dir) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.SyncErrorMessage) # but if we fist sync the dir, it's ok self.s.decide_file_sync(fil) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) self.assertTrue(os.path.isdir(f_path)) self.s.decide_file_sync(inner_fil) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) def test_006_heartbeat(self): fil = "φ006" f_path = self.get_path(fil) open(f_path, 'a').close() self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.UpdateMessage) self.s.decide_file_sync(fil) self.assert_message(messaging.SyncMessage) self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.HeartbeatNoProbeMessage) self.s.decide_file_sync(fil) self.assert_message(messaging.HeartbeatNoDecideMessage) self.assert_message(messaging.AckSyncMessage) with open(f_path, 'w') as f: f.write("new") self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.UpdateMessage) with mock.patch( "agkyra.syncer.database.SqliteFileStateDB.commit") as dbmock: dbmock.side_effect = [sqlite3.OperationalError("locked"), common.DatabaseError()] self.s.decide_file_sync(fil) self.assert_message(messaging.HeartbeatReplayDecideMessage) self.s.decide_file_sync(fil) self.assert_message(messaging.HeartbeatNoDecideMessage) print "SLEEPING 11" time.sleep(11) self.s.decide_file_sync(fil) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) def test_007_multiprobe(self): fil = "φ007" f_path = self.get_path(fil) open(f_path, 'a').close() self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.UpdateMessage) with open(f_path, 'w') as f: f.write("new") self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.AlreadyProbedMessage) def test_008_dir_contents(self): d = "δ008" d_path = self.get_path(d) r = self.pithos.object_put( d, content_type='application/directory', content_length=0) inner_fil = "δ008/inφ008" inner_fil_content = "fil in dir " r1 = self.pithos.upload_from_string(inner_fil, inner_fil_content) self.s.probe_file(self.s.MASTER, d) m = self.assert_message(messaging.UpdateMessage) master_serial = m.serial self.assertEqual(master_serial, 0) self.s.probe_file(self.s.MASTER, inner_fil) self.assert_message(messaging.UpdateMessage) # this will also make the dir self.s.decide_file_sync(inner_fil) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) self.assertTrue(os.path.isdir(d_path)) # sync the dir too self.s.probe_file(self.s.SLAVE, d) m = self.assert_message(messaging.UpdateMessage) slave_serial = m.serial self.assertEqual(slave_serial, 1) self.s.decide_file_sync(d) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) state = self.db.get_state(self.s.SLAVE, d) self.assertEqual(state.serial, master_serial) # locally remove the dir and sync shutil.rmtree(d_path) self.s.probe_file(self.s.SLAVE, d) self.s.probe_file(self.s.SLAVE, inner_fil) self.assert_message(messaging.UpdateMessage) self.assert_message(messaging.UpdateMessage) self.s.decide_file_sync(d) self.s.decide_file_sync(inner_fil) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) self.assert_message(messaging.AckSyncMessage) with self.assertRaises(ClientError) as cm: self.pithos.get_object_info(d) self.assertEqual(cm.exception.status, 404) def test_009_dir_delete_upstream(self): d = "δ009" d_path = self.get_path(d) r = self.pithos.object_put( d, content_type='application/directory', content_length=0) innerd = "δ009/innerδ009" r = self.pithos.object_put( innerd, content_type='application/directory', content_length=0) self.s.probe_file(self.s.MASTER, d) self.s.probe_file(self.s.MASTER, innerd) self.assert_message(messaging.UpdateMessage) self.assert_message(messaging.UpdateMessage) self.s.decide_file_sync(d) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) self.s.decide_file_sync(innerd) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) self.assertTrue(os.path.isdir(d_path)) # delete upstream self.pithos.del_object(d) self.pithos.del_object(innerd) self.s.probe_file(self.s.MASTER, d) self.s.probe_file(self.s.MASTER, innerd) self.assert_message(messaging.UpdateMessage) self.assert_message(messaging.UpdateMessage) # will fail because local dir is non-empty self.s.decide_file_sync(d) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.SyncErrorMessage) # but this is ok self.s.decide_file_sync(innerd) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) self.s.decide_file_sync(d) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) def test_010_live_update_local(self): fil = "φ010" f_path = self.get_path(fil) with open(f_path, "w") as f: f.write("f to be changed") self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.UpdateMessage) state = self.db.get_state(self.s.SLAVE, fil) f_info = state.info f_content = "changed" with open(f_path, "w") as f: f.write(f_content) self.s.decide_file_sync(fil) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.LiveInfoUpdateMessage) self.assert_message(messaging.AckSyncMessage) state = self.db.get_state(self.s.SLAVE, fil) new_info = state.info self.assertNotEqual(f_info, new_info) self.assertEqual(new_info["localfs_size"], len(f_content)) def test_011_live_update_upstream(self): fil = "φ011" f_path = self.get_path(fil) r = self.pithos.upload_from_string(fil, "f upstream") etag = r['etag'] self.s.probe_file(self.s.MASTER, fil) self.assert_message(messaging.UpdateMessage) state = self.db.get_state(self.s.MASTER, fil) f_info = state.info self.assertEqual(f_info["pithos_etag"], etag) r1 = self.pithos.upload_from_string(fil, "new") new_etag = r1['etag'] self.s.decide_file_sync(fil) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.LiveInfoUpdateMessage) self.assert_message(messaging.AckSyncMessage) state = self.db.get_state(self.s.MASTER, fil) new_info = state.info self.assertEqual(new_info["pithos_etag"], new_etag) def test_012_cachename(self): fil = "φ012" f_path = self.get_path(fil) with open(f_path, "w") as f: f.write("content") state = self.db.get_state(self.s.SLAVE, fil) handle = localfs_client.LocalfsTargetHandle(self.s.settings, state) hidden_filename = utils.join_path( handle.cache_hide_name, utils.hash_string(handle.objname)) hidden_path = handle.get_path_in_cache(hidden_filename) self.assertFalse(os.path.isfile(hidden_path)) self.assertIsNone(self.db.get_cachename(hidden_filename)) handle.move_file() self.assertTrue(os.path.isfile(hidden_path)) self.assertIsNotNone(self.db.get_cachename(hidden_filename)) handle.move_file() self.assertTrue(os.path.isfile(hidden_path)) shutil.move(hidden_path, f_path) self.assertIsNotNone(self.db.get_cachename(hidden_filename)) handle.move_file() self.assertTrue(os.path.isfile(hidden_path)) # open file to cause busy error f = open(hidden_path, "r") with self.assertRaises(common.BusyError): handle.hide_file() def test_013_collisions(self): fil = "φ013" f_path = self.get_path(fil) with open(f_path, "w") as f: f.write("content") self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.UpdateMessage) r = self.pithos.upload_from_string(fil, "new") self.s.decide_file_sync(fil) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.SyncErrorMessage) d = "δ013" d_path = self.get_path(d) os.mkdir(d_path) self.s.probe_file(self.s.SLAVE, d) self.assert_message(messaging.UpdateMessage) r = self.pithos.upload_from_string(d, "new") self.s.decide_file_sync(d) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.SyncErrorMessage) d_synced = "δ013_s" d_synced_path = self.get_path(d_synced) os.mkdir(d_synced_path) self.s.probe_file(self.s.SLAVE, d_synced) self.assert_message(messaging.UpdateMessage) self.s.decide_file_sync(d_synced) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) os.rmdir(d_synced_path) self.s.probe_file(self.s.SLAVE, d_synced) self.assert_message(messaging.UpdateMessage) r = self.pithos.upload_from_string(d_synced, "new") self.s.decide_file_sync(d_synced) self.assert_message(messaging.SyncMessage) self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.SyncErrorMessage) def test_014_staging(self): fil = "φ014" d = "δ014" fln = "φ014.link" f_path = self.get_path(fil) with open(f_path, "w") as f: f.write("content") fln_path = self.get_path(fln) os.symlink(f_path, fln_path) d_path = self.get_path(d) os.mkdir(d_path) self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.UpdateMessage) state = self.db.get_state(self.s.SLAVE, fil) handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) staged_path = handle.staged_path self.assertTrue(localfs_client.files_equal(f_path, staged_path)) handle.unstage_file() self.assertFalse(os.path.exists(staged_path)) with open(f_path, "w") as f: f.write("content new") handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) self.assert_message(messaging.LiveInfoUpdateMessage) self.assertTrue(localfs_client.files_equal(f_path, staged_path)) handle.unstage_file() f = open(f_path, "r") with self.assertRaises(common.OpenBusyError): handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) ftmp_path = self.get_path("φ014tmp") with open(ftmp_path, "w") as f: f.write("tmp") os.unlink(f_path) os.symlink(ftmp_path, f_path) state = self.db.get_state(self.s.SLAVE, fil) handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) self.assert_message(messaging.LiveInfoUpdateMessage) self.assertIsNone(handle.staged_path) self.s.probe_file(self.s.SLAVE, fln) self.assert_message(messaging.UpdateMessage) state = self.db.get_state(self.s.SLAVE, fln) handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) self.assertIsNone(handle.staged_path) os.unlink(fln_path) with open(fln_path, "w") as f: f.write("reg file") handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) self.assertIsNone(handle.staged_path) # try to stage now handle.stage_file() self.assert_message(messaging.LiveInfoUpdateMessage) self.assertTrue(localfs_client.files_equal( fln_path, handle.staged_path)) handle.unstage_file() fmissing = "fmissing014" fmissing_path = self.get_path(fmissing) self.s.probe_file(self.s.SLAVE, fmissing) state = self.db.get_state(self.s.SLAVE, fmissing) handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) self.assertIsNone(handle.staged_path) with open(fmissing_path, "w") as f: f.write("ref file") handle.copy_file() self.assertIsNotNone(handle.staged_path) live_info = localfs_client.get_live_info( self.s.settings, handle.fspath) handle.check_staged(live_info) with open(fmissing_path, "w") as f: f.write("ref file2") with self.assertRaises(common.ChangedBusyError): handle.check_staged(live_info) # turn it into a dir os.unlink(fmissing_path) os.mkdir(fmissing_path) handle.copy_file() with self.assertRaises(common.NotStableBusyError): handle.check_staged(live_info) # info of dir live_info = localfs_client.get_live_info( self.s.settings, handle.fspath) handle.check_staged(live_info) if __name__ == '__main__': unittest.main()