diff --git a/agkyra/syncer/common.py b/agkyra/syncer/common.py index 3003c4d303421bc1dacee6b1c3273508b9292bd5..bef09f5e75a4c480b114182142b9d62088b1ca02 100644 --- a/agkyra/syncer/common.py +++ b/agkyra/syncer/common.py @@ -43,6 +43,18 @@ class BusyError(SyncError): pass +class NotStableBusyError(BusyError): + pass + + +class OpenBusyError(BusyError): + pass + + +class ChangedBusyError(BusyError): + pass + + class ConflictError(SyncError): pass diff --git a/agkyra/syncer/localfs_client.py b/agkyra/syncer/localfs_client.py index eca1ef2b74abeb9c5a56c3fc80b43e91ade6e5ef..fb2b74d195f2fbb7f7b5881fcf871c1ba30e3080 100644 --- a/agkyra/syncer/localfs_client.py +++ b/agkyra/syncer/localfs_client.py @@ -20,6 +20,7 @@ import datetime import psutil import time import filecmp +import shutil from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler @@ -41,8 +42,9 @@ LOCAL_OTHER = 5 OS_FILE_EXISTS = 17 OS_NOT_A_DIR = 20 OS_NO_FILE_OR_DIR = 2 +OS_IS_DIR = 21 -DEFAULT_MTIME_PRECISION = 1e-7 +DEFAULT_MTIME_PRECISION = 1e-4 exclude_regexes = ["\.#", "\.~", "~\$", "~.*\.tmp$", "\..*\.swp$"] exclude_pattern = re.compile('|'.join(exclude_regexes)) @@ -104,7 +106,12 @@ def eq_float(f1, f2): def files_equal(f1, f2): logger.debug("Comparing files: '%s', '%s'" % (f1, f2)) - return filecmp.cmp(f1, f2, shallow=False) + try: + return filecmp.cmp(f1, f2, shallow=False) + except OSError as e: + if e.errno in [OS_NO_FILE_OR_DIR, OS_NOT_A_DIR]: + return False + raise def info_is_unhandled(info): @@ -192,6 +199,10 @@ def path_status(path): return status +def info_of_regular_file(info): + return info and info[LOCALFS_TYPE] == common.T_FILE + + def is_info_eq(info1, info2, unhandled_equal=True): if {} in [info1, info2]: return info1 == info2 @@ -379,11 +390,11 @@ class LocalfsSourceHandle(object): def get_path_in_cache(self, name): return utils.join_path(self.cache_path, name) - def lock_file(self): + def copy_file(self): fspath = self.fspath if file_is_open(fspath): - raise common.BusyError("File '%s' is open. Aborting" - % fspath) + raise common.OpenBusyError("File '%s' is open. Aborting" + % fspath) new_registered = self.register_stage_name(fspath) stage_filename = self.stage_filename stage_path = self.staged_path @@ -398,31 +409,53 @@ class LocalfsSourceHandle(object): logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path)) try: - os.rename(fspath, stage_path) - except OSError as e: - if e.errno in [OS_NO_FILE_OR_DIR, OS_NOT_A_DIR]: - logger.info("Source does not exist: '%s'" % fspath) + shutil.copy2(fspath, stage_path) + except IOError as e: + if e.errno in [OS_NO_FILE_OR_DIR, OS_IS_DIR]: + logger.info("Source is not a regural file: '%s'" % fspath) self.unregister_stage_name(stage_filename) return else: raise e def stage_file(self): - self.lock_file() - if self.staged_path is not None: - if file_is_open(self.staged_path): - os.rename(self.staged_path, self.fspath) - self.unregister_stage_name(self.stage_filename) - logger.warning("File '%s' is open; unstaged" % self.objname) - raise common.BusyError("File '%s' is open. Undoing" % - self.staged_path) - - if path_status(self.staged_path) != LOCAL_FILE: - os.rename(self.staged_path, self.fspath) - self.unregister_stage_name(self.stage_filename) - logger.warning("Object '%s' is not a regular file; unstaged" % - self.objname) - self.check_update_source_state() + self.copy_file() + live_info = get_live_info(self.fspath) + print "live_info =", self.objname, live_info + self.check_staged(live_info) + self.check_update_source_state(live_info) + + def check_staged(self, live_info): + is_reg = info_of_regular_file(live_info) + print "is_reg", self.objname, is_reg + + if self.staged_path is None: + if is_reg: + m = ("File '%s' is not in a stable state; unstaged" + % self.objname) + logger.warning(m) + raise common.NotStableBusyError(m) + return + + if not is_reg: + os.unlink(self.staged_path) + self.unregister_stage_name(self.stage_filename) + logger.warning("Path '%s' is not a regular file; unstaged") + return + + if file_is_open(self.fspath): + os.unlink(self.staged_path) + self.unregister_stage_name(self.stage_filename) + m = "File '%s' is open; unstaged" % self.objname + logger.warning(m) + raise common.OpenBusyError(m) + + if not files_equal(self.staged_path, self.fspath): + os.unlink(self.staged_path) + self.unregister_stage_name(self.stage_filename) + m = "File '%s' contents have changed; unstaged" % self.objname + logger.warning(m) + raise common.ChangedBusyError(m) def __init__(self, settings, source_state): self.settings = settings @@ -438,7 +471,7 @@ class LocalfsSourceHandle(object): self.stage_filename = None self.staged_path = None self.heartbeat = settings.heartbeat - if self.needs_staging(): + if info_of_regular_file(self.source_state.info): self.stage_file() @transaction() @@ -446,10 +479,8 @@ class LocalfsSourceHandle(object): db = self.get_db() db.put_state(state) - def check_update_source_state(self): - live_info = local_path_changes( - self.staged_path, self.source_state) - if live_info is not None: + def check_update_source_state(self, live_info): + if not is_info_eq(live_info, self.source_state.info): msg = messaging.LiveInfoUpdateMessage( archive=self.SIGNATURE, objname=self.objname, info=live_info, logger=logger) @@ -461,10 +492,6 @@ class LocalfsSourceHandle(object): def get_synced_state(self): return self.source_state - def needs_staging(self): - info = self.source_state.info - return info and info[LOCALFS_TYPE] == common.T_FILE - def info_is_dir(self): try: return self.source_state.info[LOCALFS_TYPE] == common.T_DIR @@ -488,11 +515,7 @@ class LocalfsSourceHandle(object): if self.stage_filename is None: return staged_path = self.staged_path - try: - link_file(staged_path, self.fspath) - os.unlink(staged_path) - except common.ConflictError: - self.stash_staged_file() + os.unlink(staged_path) self.unregister_stage_name(self.stage_filename) diff --git a/test.py b/test.py index 4ba6b3e313af95c8832e5d3480eb0e53747ee9a5..0621b699d8e99e4b5a98289e5442e09ba6699ad9 100644 --- a/test.py +++ b/test.py @@ -14,7 +14,7 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. from agkyra.syncer.setup import SyncerSettings -from agkyra.syncer.localfs_client import LocalfsFileClient, LocalfsTargetHandle +from agkyra.syncer import localfs_client from agkyra.syncer.pithos_client import PithosFileClient from agkyra.syncer.syncer import FileSyncer import agkyra.syncer.syncer @@ -88,7 +88,7 @@ class AgkyraTest(unittest.TestCase): ignore_ssl=True) cls.master = PithosFileClient(cls.settings) - cls.slave = LocalfsFileClient(cls.settings) + cls.slave = localfs_client.LocalfsFileClient(cls.settings) cls.s = FileSyncer(cls.settings, cls.master, cls.slave) cls.pithos = cls.master.endpoint cls.pithos.create_container(cls.ID) @@ -618,7 +618,7 @@ class AgkyraTest(unittest.TestCase): f.write("content") state = self.db.get_state(self.s.SLAVE, fil) - handle = LocalfsTargetHandle(self.s.settings, state) + handle = localfs_client.LocalfsTargetHandle(self.s.settings, state) hidden_filename = utils.join_path( handle.cache_hide_name, utils.hash_string(handle.objname)) hidden_path = handle.get_path_in_cache(hidden_filename) @@ -687,6 +687,99 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.SyncErrorMessage) + def test_014_staging(self): + fil = "f014" + d = "d014" + fln = "f014.link" + f_path = self.get_path(fil) + with open(f_path, "w") as f: + f.write("content") + fln_path = self.get_path(fln) + os.symlink(f_path, fln_path) + d_path = self.get_path(d) + os.mkdir(d_path) + + self.s.probe_file(self.s.SLAVE, fil) + self.assert_message(messaging.UpdateMessage) + state = self.db.get_state(self.s.SLAVE, fil) + handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) + staged_path = handle.staged_path + self.assertTrue(localfs_client.files_equal(f_path, staged_path)) + handle.unstage_file() + self.assertFalse(os.path.exists(staged_path)) + + with open(f_path, "w") as f: + f.write("content new") + handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) + self.assert_message(messaging.LiveInfoUpdateMessage) + self.assertTrue(localfs_client.files_equal(f_path, staged_path)) + handle.unstage_file() + + f = open(f_path, "r") + with self.assertRaises(common.OpenBusyError): + handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) + + ftmp_path = self.get_path("f014tmp") + with open(ftmp_path, "w") as f: + f.write("tmp") + os.unlink(f_path) + os.symlink(ftmp_path, f_path) + state = self.db.get_state(self.s.SLAVE, fil) + handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) + self.assert_message(messaging.LiveInfoUpdateMessage) + self.assertIsNone(handle.staged_path) + + self.s.probe_file(self.s.SLAVE, fln) + self.assert_message(messaging.UpdateMessage) + state = self.db.get_state(self.s.SLAVE, fln) + handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) + self.assertIsNone(handle.staged_path) + + os.unlink(fln_path) + with open(fln_path, "w") as f: + f.write("reg file") + + handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) + self.assertIsNone(handle.staged_path) + + # try to stage now + handle.stage_file() + self.assert_message(messaging.LiveInfoUpdateMessage) + self.assertTrue(localfs_client.files_equal( + fln_path, handle.staged_path)) + handle.unstage_file() + + fmissing = "fmissing014" + fmissing_path = self.get_path(fmissing) + self.s.probe_file(self.s.SLAVE, fmissing) + state = self.db.get_state(self.s.SLAVE, fmissing) + handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) + self.assertIsNone(handle.staged_path) + + with open(fmissing_path, "w") as f: + f.write("ref file") + + handle.copy_file() + self.assertIsNotNone(handle.staged_path) + live_info = localfs_client.get_live_info(handle.fspath) + handle.check_staged(live_info) + + with open(fmissing_path, "w") as f: + f.write("ref file2") + with self.assertRaises(common.ChangedBusyError): + handle.check_staged(live_info) + + # turn it into a dir + os.unlink(fmissing_path) + os.mkdir(fmissing_path) + handle.copy_file() + with self.assertRaises(common.NotStableBusyError): + handle.check_staged(live_info) + + # info of dir + live_info = localfs_client.get_live_info(handle.fspath) + handle.check_staged(live_info) + if __name__ == '__main__': unittest.main()