Commit eebea533 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Update state with new info after staging

parent 1ec51a46
......@@ -353,6 +353,8 @@ class LocalfsSourceHandle(object):
f = utils.hash_string(filename)
stage_filename = utils.join_path(self.cache_stage_name, f)
self.stage_filename = stage_filename
stage_path = self.get_path_in_cache(stage_filename)
self.staged_path = stage_path
if db.get_cachename(stage_filename):
return False
db.insert_cachename(stage_filename, self.SIGNATURE, filename)
......@@ -363,6 +365,7 @@ class LocalfsSourceHandle(object):
db = self.get_db()
db.delete_cachename(stage_filename)
self.stage_filename = None
self.staged_path = None
def get_path_in_cache(self, name):
return utils.join_path(self.cache_path, name)
......@@ -373,8 +376,7 @@ class LocalfsSourceHandle(object):
% fspath)
new_registered = self.register_stage_name(fspath)
stage_filename = self.stage_filename
stage_path = self.get_path_in_cache(stage_filename)
self.staged_path = stage_path
stage_path = self.staged_path
if not new_registered:
logger.warning("Staging already registered for file %s" %
......@@ -383,6 +385,8 @@ class LocalfsSourceHandle(object):
logger.warning("File %s already staged at %s" %
(self.objname, stage_path))
return
logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path))
try:
os.rename(fspath, stage_path)
except OSError as e:
......@@ -394,12 +398,15 @@ class LocalfsSourceHandle(object):
if file_is_open(stage_path):
os.rename(stage_path, fspath)
self.unregister_stage_name(stage_filename)
logger.warning("File '%s' is open; unstaged" % self.objname)
raise common.BusyError("File '%s' is open. Undoing" % stage_path)
if path_status(stage_path) in [LOCAL_NONEMPTY_DIR, LOCAL_EMPTY_DIR]:
self.check_update_source_state()
if path_status(stage_path) != LOCAL_FILE:
os.rename(stage_path, fspath)
self.unregister_hidden_name(stage_filename)
raise common.ConflictError("'%s' is non-empty" % fspath)
logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path))
self.unregister_stage_name(stage_filename)
logger.warning("Object '%s' is not a regular file; unstaged" %
self.objname)
def __init__(self, settings, source_state):
self.settings = settings
......@@ -412,16 +419,34 @@ class LocalfsSourceHandle(object):
self.source_state = source_state
self.objname = source_state.objname
self.fspath = utils.join_path(self.rootpath, self.objname)
self.isdir = self.info_is_dir()
self.stage_filename = None
self.staged_path = None
self.heartbeat = settings.heartbeat
if not self.isdir:
if self.needs_staging():
self.lock_file(self.fspath)
@transaction()
def update_state(self, state):
db = self.get_db()
db.put_state(state)
def check_update_source_state(self):
live_info = local_path_changes(
self.staged_path, self.source_state)
if live_info is not None:
logger.warning("Actual info differs in %s for object: '%s'; "
"updating..." % (self.SIGNATURE, self.objname))
new_state = self.source_state.set(info=live_info)
self.update_state(new_state)
self.source_state = new_state
def get_synced_state(self):
return self.source_state
def needs_staging(self):
info = self.source_state.info
return info and info[LOCALFS_TYPE] == common.T_FILE
def info_is_dir(self):
try:
return self.source_state.info[LOCALFS_TYPE] == common.T_DIR
......
......@@ -130,7 +130,7 @@ class PithosSourceHandle(object):
else common.T_FILE)
actual_info = {"pithos_etag": actual_etag,
"pithos_type": actual_type}
self.source_state = self.source_state.set(info=actual_info)
self.check_update_source_state(actual_info)
if actual_info == {}:
logger.info("Downloading object: '%s', object is gone."
% self.objname)
......@@ -142,6 +142,19 @@ class PithosSourceHandle(object):
os.mkdir(fetched_fspath)
return fetched_fspath
@transaction()
def update_state(self, state):
db = self.get_db()
db.put_state(state)
def check_update_source_state(self, actual_info):
if actual_info != self.source_state.info:
logger.warning("Actual info differs in %s for object: '%s'; "
"updating..." % (self.SIGNATURE, self.objname))
new_state = self.source_state.set(info=actual_info)
self.update_state(new_state)
self.source_state = new_state
def get_synced_state(self):
return self.source_state
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment