Commit 263eaa01 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

stage by copying

parent 17cf6a79
...@@ -43,6 +43,18 @@ class BusyError(SyncError): ...@@ -43,6 +43,18 @@ class BusyError(SyncError):
pass pass
class NotStableBusyError(BusyError):
pass
class OpenBusyError(BusyError):
pass
class ChangedBusyError(BusyError):
pass
class ConflictError(SyncError): class ConflictError(SyncError):
pass pass
......
...@@ -20,6 +20,7 @@ import datetime ...@@ -20,6 +20,7 @@ import datetime
import psutil import psutil
import time import time
import filecmp import filecmp
import shutil
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
...@@ -41,8 +42,9 @@ LOCAL_OTHER = 5 ...@@ -41,8 +42,9 @@ LOCAL_OTHER = 5
OS_FILE_EXISTS = 17 OS_FILE_EXISTS = 17
OS_NOT_A_DIR = 20 OS_NOT_A_DIR = 20
OS_NO_FILE_OR_DIR = 2 OS_NO_FILE_OR_DIR = 2
OS_IS_DIR = 21
DEFAULT_MTIME_PRECISION = 1e-7 DEFAULT_MTIME_PRECISION = 1e-4
exclude_regexes = ["\.#", "\.~", "~\$", "~.*\.tmp$", "\..*\.swp$"] exclude_regexes = ["\.#", "\.~", "~\$", "~.*\.tmp$", "\..*\.swp$"]
exclude_pattern = re.compile('|'.join(exclude_regexes)) exclude_pattern = re.compile('|'.join(exclude_regexes))
...@@ -104,7 +106,12 @@ def eq_float(f1, f2): ...@@ -104,7 +106,12 @@ def eq_float(f1, f2):
def files_equal(f1, f2): def files_equal(f1, f2):
logger.debug("Comparing files: '%s', '%s'" % (f1, f2)) logger.debug("Comparing files: '%s', '%s'" % (f1, f2))
return filecmp.cmp(f1, f2, shallow=False) try:
return filecmp.cmp(f1, f2, shallow=False)
except OSError as e:
if e.errno in [OS_NO_FILE_OR_DIR, OS_NOT_A_DIR]:
return False
raise
def info_is_unhandled(info): def info_is_unhandled(info):
...@@ -192,6 +199,10 @@ def path_status(path): ...@@ -192,6 +199,10 @@ def path_status(path):
return status return status
def info_of_regular_file(info):
return info and info[LOCALFS_TYPE] == common.T_FILE
def is_info_eq(info1, info2, unhandled_equal=True): def is_info_eq(info1, info2, unhandled_equal=True):
if {} in [info1, info2]: if {} in [info1, info2]:
return info1 == info2 return info1 == info2
...@@ -379,11 +390,11 @@ class LocalfsSourceHandle(object): ...@@ -379,11 +390,11 @@ class LocalfsSourceHandle(object):
def get_path_in_cache(self, name): def get_path_in_cache(self, name):
return utils.join_path(self.cache_path, name) return utils.join_path(self.cache_path, name)
def lock_file(self): def copy_file(self):
fspath = self.fspath fspath = self.fspath
if file_is_open(fspath): if file_is_open(fspath):
raise common.BusyError("File '%s' is open. Aborting" raise common.OpenBusyError("File '%s' is open. Aborting"
% fspath) % fspath)
new_registered = self.register_stage_name(fspath) new_registered = self.register_stage_name(fspath)
stage_filename = self.stage_filename stage_filename = self.stage_filename
stage_path = self.staged_path stage_path = self.staged_path
...@@ -398,31 +409,53 @@ class LocalfsSourceHandle(object): ...@@ -398,31 +409,53 @@ class LocalfsSourceHandle(object):
logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path)) logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path))
try: try:
os.rename(fspath, stage_path) shutil.copy2(fspath, stage_path)
except OSError as e: except IOError as e:
if e.errno in [OS_NO_FILE_OR_DIR, OS_NOT_A_DIR]: if e.errno in [OS_NO_FILE_OR_DIR, OS_IS_DIR]:
logger.info("Source does not exist: '%s'" % fspath) logger.info("Source is not a regural file: '%s'" % fspath)
self.unregister_stage_name(stage_filename) self.unregister_stage_name(stage_filename)
return return
else: else:
raise e raise e
def stage_file(self): def stage_file(self):
self.lock_file() self.copy_file()
if self.staged_path is not None: live_info = get_live_info(self.fspath)
if file_is_open(self.staged_path): print "live_info =", self.objname, live_info
os.rename(self.staged_path, self.fspath) self.check_staged(live_info)
self.unregister_stage_name(self.stage_filename) self.check_update_source_state(live_info)
logger.warning("File '%s' is open; unstaged" % self.objname)
raise common.BusyError("File '%s' is open. Undoing" % def check_staged(self, live_info):
self.staged_path) is_reg = info_of_regular_file(live_info)
print "is_reg", self.objname, is_reg
if path_status(self.staged_path) != LOCAL_FILE:
os.rename(self.staged_path, self.fspath) if self.staged_path is None:
self.unregister_stage_name(self.stage_filename) if is_reg:
logger.warning("Object '%s' is not a regular file; unstaged" % m = ("File '%s' is not in a stable state; unstaged"
self.objname) % self.objname)
self.check_update_source_state() logger.warning(m)
raise common.NotStableBusyError(m)
return
if not is_reg:
os.unlink(self.staged_path)
self.unregister_stage_name(self.stage_filename)
logger.warning("Path '%s' is not a regular file; unstaged")
return
if file_is_open(self.fspath):
os.unlink(self.staged_path)
self.unregister_stage_name(self.stage_filename)
m = "File '%s' is open; unstaged" % self.objname
logger.warning(m)
raise common.OpenBusyError(m)
if not files_equal(self.staged_path, self.fspath):
os.unlink(self.staged_path)
self.unregister_stage_name(self.stage_filename)
m = "File '%s' contents have changed; unstaged" % self.objname
logger.warning(m)
raise common.ChangedBusyError(m)
def __init__(self, settings, source_state): def __init__(self, settings, source_state):
self.settings = settings self.settings = settings
...@@ -438,7 +471,7 @@ class LocalfsSourceHandle(object): ...@@ -438,7 +471,7 @@ class LocalfsSourceHandle(object):
self.stage_filename = None self.stage_filename = None
self.staged_path = None self.staged_path = None
self.heartbeat = settings.heartbeat self.heartbeat = settings.heartbeat
if self.needs_staging(): if info_of_regular_file(self.source_state.info):
self.stage_file() self.stage_file()
@transaction() @transaction()
...@@ -446,10 +479,8 @@ class LocalfsSourceHandle(object): ...@@ -446,10 +479,8 @@ class LocalfsSourceHandle(object):
db = self.get_db() db = self.get_db()
db.put_state(state) db.put_state(state)
def check_update_source_state(self): def check_update_source_state(self, live_info):
live_info = local_path_changes( if not is_info_eq(live_info, self.source_state.info):
self.staged_path, self.source_state)
if live_info is not None:
msg = messaging.LiveInfoUpdateMessage( msg = messaging.LiveInfoUpdateMessage(
archive=self.SIGNATURE, objname=self.objname, archive=self.SIGNATURE, objname=self.objname,
info=live_info, logger=logger) info=live_info, logger=logger)
...@@ -461,10 +492,6 @@ class LocalfsSourceHandle(object): ...@@ -461,10 +492,6 @@ class LocalfsSourceHandle(object):
def get_synced_state(self): def get_synced_state(self):
return self.source_state return self.source_state
def needs_staging(self):
info = self.source_state.info
return info and info[LOCALFS_TYPE] == common.T_FILE
def info_is_dir(self): def info_is_dir(self):
try: try:
return self.source_state.info[LOCALFS_TYPE] == common.T_DIR return self.source_state.info[LOCALFS_TYPE] == common.T_DIR
...@@ -488,11 +515,7 @@ class LocalfsSourceHandle(object): ...@@ -488,11 +515,7 @@ class LocalfsSourceHandle(object):
if self.stage_filename is None: if self.stage_filename is None:
return return
staged_path = self.staged_path staged_path = self.staged_path
try: os.unlink(staged_path)
link_file(staged_path, self.fspath)
os.unlink(staged_path)
except common.ConflictError:
self.stash_staged_file()
self.unregister_stage_name(self.stage_filename) self.unregister_stage_name(self.stage_filename)
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from agkyra.syncer.setup import SyncerSettings from agkyra.syncer.setup import SyncerSettings
from agkyra.syncer.localfs_client import LocalfsFileClient, LocalfsTargetHandle from agkyra.syncer import localfs_client
from agkyra.syncer.pithos_client import PithosFileClient from agkyra.syncer.pithos_client import PithosFileClient
from agkyra.syncer.syncer import FileSyncer from agkyra.syncer.syncer import FileSyncer
import agkyra.syncer.syncer import agkyra.syncer.syncer
...@@ -88,7 +88,7 @@ class AgkyraTest(unittest.TestCase): ...@@ -88,7 +88,7 @@ class AgkyraTest(unittest.TestCase):
ignore_ssl=True) ignore_ssl=True)
cls.master = PithosFileClient(cls.settings) cls.master = PithosFileClient(cls.settings)
cls.slave = LocalfsFileClient(cls.settings) cls.slave = localfs_client.LocalfsFileClient(cls.settings)
cls.s = FileSyncer(cls.settings, cls.master, cls.slave) cls.s = FileSyncer(cls.settings, cls.master, cls.slave)
cls.pithos = cls.master.endpoint cls.pithos = cls.master.endpoint
cls.pithos.create_container(cls.ID) cls.pithos.create_container(cls.ID)
...@@ -618,7 +618,7 @@ class AgkyraTest(unittest.TestCase): ...@@ -618,7 +618,7 @@ class AgkyraTest(unittest.TestCase):
f.write("content") f.write("content")
state = self.db.get_state(self.s.SLAVE, fil) state = self.db.get_state(self.s.SLAVE, fil)
handle = LocalfsTargetHandle(self.s.settings, state) handle = localfs_client.LocalfsTargetHandle(self.s.settings, state)
hidden_filename = utils.join_path( hidden_filename = utils.join_path(
handle.cache_hide_name, utils.hash_string(handle.objname)) handle.cache_hide_name, utils.hash_string(handle.objname))
hidden_path = handle.get_path_in_cache(hidden_filename) hidden_path = handle.get_path_in_cache(hidden_filename)
...@@ -687,6 +687,99 @@ class AgkyraTest(unittest.TestCase): ...@@ -687,6 +687,99 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.CollisionMessage)
self.assert_message(messaging.SyncErrorMessage) self.assert_message(messaging.SyncErrorMessage)
def test_014_staging(self):
fil = "f014"
d = "d014"
fln = "f014.link"
f_path = self.get_path(fil)
with open(f_path, "w") as f:
f.write("content")
fln_path = self.get_path(fln)
os.symlink(f_path, fln_path)
d_path = self.get_path(d)
os.mkdir(d_path)
self.s.probe_file(self.s.SLAVE, fil)
self.assert_message(messaging.UpdateMessage)
state = self.db.get_state(self.s.SLAVE, fil)
handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)
staged_path = handle.staged_path
self.assertTrue(localfs_client.files_equal(f_path, staged_path))
handle.unstage_file()
self.assertFalse(os.path.exists(staged_path))
with open(f_path, "w") as f:
f.write("content new")
handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)
self.assert_message(messaging.LiveInfoUpdateMessage)
self.assertTrue(localfs_client.files_equal(f_path, staged_path))
handle.unstage_file()
f = open(f_path, "r")
with self.assertRaises(common.OpenBusyError):
handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)
ftmp_path = self.get_path("f014tmp")
with open(ftmp_path, "w") as f:
f.write("tmp")
os.unlink(f_path)
os.symlink(ftmp_path, f_path)
state = self.db.get_state(self.s.SLAVE, fil)
handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)
self.assert_message(messaging.LiveInfoUpdateMessage)
self.assertIsNone(handle.staged_path)
self.s.probe_file(self.s.SLAVE, fln)
self.assert_message(messaging.UpdateMessage)
state = self.db.get_state(self.s.SLAVE, fln)
handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)
self.assertIsNone(handle.staged_path)
os.unlink(fln_path)
with open(fln_path, "w") as f:
f.write("reg file")
handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)
self.assertIsNone(handle.staged_path)
# try to stage now
handle.stage_file()
self.assert_message(messaging.LiveInfoUpdateMessage)
self.assertTrue(localfs_client.files_equal(
fln_path, handle.staged_path))
handle.unstage_file()
fmissing = "fmissing014"
fmissing_path = self.get_path(fmissing)
self.s.probe_file(self.s.SLAVE, fmissing)
state = self.db.get_state(self.s.SLAVE, fmissing)
handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)
self.assertIsNone(handle.staged_path)
with open(fmissing_path, "w") as f:
f.write("ref file")
handle.copy_file()
self.assertIsNotNone(handle.staged_path)
live_info = localfs_client.get_live_info(handle.fspath)
handle.check_staged(live_info)
with open(fmissing_path, "w") as f:
f.write("ref file2")
with self.assertRaises(common.ChangedBusyError):
handle.check_staged(live_info)
# turn it into a dir
os.unlink(fmissing_path)
os.mkdir(fmissing_path)
handle.copy_file()
with self.assertRaises(common.NotStableBusyError):
handle.check_staged(live_info)
# info of dir
live_info = localfs_client.get_live_info(handle.fspath)
handle.check_staged(live_info)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment