Commit b1c5c138 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Handle unicode issues

Operate internally only with unicode. Talk to the filesystem with utf8
encoded strings and then decode the retrieved paths to unicode. Refuse to
operate if the assumed filesystem encoding is not utf8.
parent 894d3dc9
......@@ -88,7 +88,11 @@ def file_is_open(path):
try:
flist = psutil_open_files(proc)
for nt in flist:
if nt.path == path:
try:
nt_path = utils.to_unicode(nt.path)
except UnicodeDecodeError as e:
continue
if nt_path == path:
return True
except psutil.Error:
pass
......@@ -548,13 +552,22 @@ class LocalfsFileClient(FileClient):
def walk_filesystem(self):
candidates = {}
for dirpath, dirnames, files in os.walk(self.ROOTPATH):
rootpath = utils.from_unicode(self.ROOTPATH)
for dirpath, dirnames, files in os.walk(rootpath):
try:
dirpath = utils.to_unicode(dirpath)
except UnicodeDecodeError as e:
continue
rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH)
logger.debug("'%s' '%s'" % (dirpath, rel_dirpath))
if rel_dirpath != '.':
objname = utils.to_standard_sep(rel_dirpath)
candidates[objname] = self.none_info()
for filename in files:
try:
filename = utils.to_unicode(filename)
except UnicodeDecodeError as e:
continue
if rel_dirpath == '.':
prefix = ""
else:
......@@ -615,24 +628,29 @@ class LocalfsFileClient(FileClient):
def notifier(self):
def handle_path(path):
try:
path = utils.to_unicode(path)
except UnicodeDecodeError as e:
return
rel_path = os.path.relpath(path, start=self.ROOTPATH)
objname = utils.to_standard_sep(rel_path)
with self.probe_candidates.lock() as d:
d[objname] = self.none_info()
cachepath = utils.from_unicode(self.CACHEPATH)
class EventHandler(FileSystemEventHandler):
def on_created(this, event):
# if not event.is_directory:
# return
path = event.src_path
if path.startswith(self.CACHEPATH):
if path.startswith(cachepath):
return
logger.debug("Handling %s" % event)
handle_path(path)
def on_deleted(this, event):
path = event.src_path
if path.startswith(self.CACHEPATH):
if path.startswith(cachepath):
return
logger.debug("Handling %s" % event)
handle_path(path)
......@@ -641,7 +659,7 @@ class LocalfsFileClient(FileClient):
if event.is_directory:
return
path = event.src_path
if path.startswith(self.CACHEPATH):
if path.startswith(cachepath):
return
logger.debug("Handling %s" % event)
handle_path(path)
......@@ -649,14 +667,14 @@ class LocalfsFileClient(FileClient):
def on_moved(this, event):
src_path = event.src_path
dest_path = event.dest_path
if src_path.startswith(self.CACHEPATH) or \
dest_path.startswith(self.CACHEPATH):
if src_path.startswith(cachepath) or \
dest_path.startswith(cachepath):
return
logger.debug("Handling %s" % event)
handle_path(src_path)
handle_path(dest_path)
path = self.ROOTPATH
path = utils.from_unicode(self.ROOTPATH)
event_handler = EventHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
......
......@@ -68,9 +68,24 @@ def ssl_fall_back(method):
return wrap
def check_encoding():
platform = utils.PLATFORM
encoding = utils.ENCODING
if platform.startswith("linux"):
if not encoding.lower() in ['utf-8', 'utf8']:
raise Exception(
"Cannot operate with encoding %s. Please use UTF-8."
% encoding)
class SyncerSettings():
def __init__(self, auth_url, auth_token, container, local_root_path,
*args, **kwargs):
check_encoding()
auth_url = utils.to_unicode(auth_url)
auth_token = utils.to_unicode(auth_token)
container = utils.to_unicode(container)
local_root_path = utils.to_unicode(local_root_path)
self.auth_url = utils.normalize_standard_suffix(auth_url)
self.auth_token = auth_token
self.container = utils.normalize_standard_suffix(container)
......@@ -84,9 +99,10 @@ class SyncerSettings():
self.endpoint = self._get_pithos_client(
auth_url, auth_token, container)
home_dir = os.path.expanduser('~')
home_dir = utils.to_unicode(os.path.expanduser('~'))
default_settings_path = join_path(home_dir, GLOBAL_SETTINGS_NAME)
self.settings_path = kwargs.get("agkyra_path", default_settings_path)
self.settings_path = utils.to_unicode(
kwargs.get("agkyra_path", default_settings_path))
self.create_dir(self.settings_path)
self.instances_path = join_path(self.settings_path, INSTANCES_NAME)
......@@ -102,27 +118,28 @@ class SyncerSettings():
self.instance_path = join_path(self.instances_path, self.instance)
self.create_dir(self.instance_path)
self.dbname = kwargs.get("dbname", DEFAULT_DBNAME)
self.dbname = utils.to_unicode(kwargs.get("dbname", DEFAULT_DBNAME))
self.full_dbname = join_path(self.instance_path, self.dbname)
self.get_db(initialize=True)
self.cache_name = kwargs.get("cache_name", DEFAULT_CACHE_NAME)
self.cache_name = utils.to_unicode(
kwargs.get("cache_name", DEFAULT_CACHE_NAME))
self.cache_path = join_path(self.local_root_path, self.cache_name)
self.create_dir(self.cache_path)
self.cache_hide_name = kwargs.get("cache_hide_name",
DEFAULT_CACHE_HIDE_NAME)
self.cache_hide_name = utils.to_unicode(
kwargs.get("cache_hide_name", DEFAULT_CACHE_HIDE_NAME))
self.cache_hide_path = join_path(self.cache_path, self.cache_hide_name)
self.create_dir(self.cache_hide_path)
self.cache_stage_name = kwargs.get("cache_stage_name",
DEFAULT_CACHE_STAGE_NAME)
self.cache_stage_name = utils.to_unicode(
kwargs.get("cache_stage_name", DEFAULT_CACHE_STAGE_NAME))
self.cache_stage_path = join_path(self.cache_path,
self.cache_stage_name)
self.create_dir(self.cache_stage_path)
self.cache_fetch_name = kwargs.get("cache_fetch_name",
DEFAULT_CACHE_FETCH_NAME)
self.cache_fetch_name = utils.to_unicode(
kwargs.get("cache_fetch_name", DEFAULT_CACHE_FETCH_NAME))
self.cache_fetch_path = join_path(self.cache_path,
self.cache_fetch_name)
self.create_dir(self.cache_fetch_path)
......
......@@ -18,9 +18,16 @@ import hashlib
import datetime
import threading
import watchdog.utils
import sys
import logging
logger = logging.getLogger(__name__)
from agkyra.syncer.common import OBJECT_DIRSEP
ENCODING = sys.getfilesystemencoding() or sys.getdefaultencoding()
PLATFORM = sys.platform
def to_local_sep(filename):
return filename.replace(OBJECT_DIRSEP, os.path.sep)
......@@ -48,7 +55,24 @@ def normalize_local_suffix(path):
return path.rstrip(os.path.sep) + os.path.sep
def from_unicode(s):
if type(s) is unicode:
return s.encode(ENCODING)
return s
def to_unicode(s):
if type(s) is unicode:
return s
try:
return unicode(s, ENCODING)
except UnicodeDecodeError as e:
logger.warning("Failed to decode %s" % s.__repr__())
raise
def hash_string(s):
s = from_unicode(s)
return hashlib.sha256(s).hexdigest()
......
# -*- coding: utf-8 -*-
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
......@@ -13,6 +15,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from agkyra.syncer.setup import SyncerSettings
from agkyra.syncer import localfs_client
from agkyra.syncer.pithos_client import PithosFileClient
......@@ -39,6 +43,13 @@ handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# kamakisend_logger = logging.getLogger('kamaki.clients.send')
# kamakisend_logger.addHandler(handler)
# kamakisend_logger.setLevel(logging.DEBUG)
# kamakirecv_logger = logging.getLogger('kamaki.clients.recv')
# kamakirecv_logger.addHandler(handler)
# kamakirecv_logger.setLevel(logging.DEBUG)
TMP = "/tmp"
......@@ -76,7 +87,7 @@ class AgkyraTest(unittest.TestCase):
AUTHENTICATION_URL = cloud_conf['url']
TOKEN = cloud_conf['token']
cls.ID = "AGKYRATEST" + str(random.random()).split('.')[1]
cls.ID = "ΑΓΚΥΡΑTEST" + str(random.random()).split('.')[1]
cls.LOCAL_ROOT_PATH = utils.join_path("/tmp", cls.ID)
......@@ -135,10 +146,10 @@ class AgkyraTest(unittest.TestCase):
candidates = self.slave.list_candidate_files(forced=True)
self.assertEqual(real(candidates), [])
fil = "f0001"
fil = "φ0001"
f_path = self.get_path(fil)
open(f_path, "a").close()
d = "d0001"
d = "δ0001"
d_path = self.get_path(d)
os.mkdir(d_path)
......@@ -177,11 +188,11 @@ class AgkyraTest(unittest.TestCase):
self.assertEqual(candidates, [])
def test_0002_notifier_local(self):
f_out = "f0002out"
f_cache = "f0002cache"
f_upd = "f0002upd"
f_ren = "f0002ren"
dbefore = "d0002before"
f_out = "φ0002out"
f_cache = "φ0002cache"
f_upd = "φ0002upd"
f_ren = "φ0002ren"
dbefore = "δ0002before"
f_out_path = self.get_path(f_out)
f_cache_path = self.get_path(f_cache)
f_upd_path = self.get_path(f_upd)
......@@ -197,9 +208,9 @@ class AgkyraTest(unittest.TestCase):
candidates = self.slave.list_candidate_files()
self.assertEqual(candidates, [])
fafter = "f0002after"
fafter = "φ0002after"
fafter_path = self.get_path(fafter)
dafter = "d0002after"
dafter = "δ0002after"
dafter_path = self.get_path(dafter)
open(fafter_path, "a").close()
os.mkdir(dafter_path)
......@@ -215,13 +226,13 @@ class AgkyraTest(unittest.TestCase):
with open(f_upd_path, "a") as f:
f.write("upd")
f_in = "f0002in"
f_in = "φ0002in"
f_in_path = self.get_path(f_in)
f_in_orig_path = utils.join_path(TMP, f_in)
open(f_in_orig_path, "a").close()
os.rename(f_in_orig_path, f_in_path)
f_ren_new = "f0002ren_new"
f_ren_new = "φ0002ren_new"
f_ren_new_path = self.get_path(f_ren_new)
os.rename(f_ren_path, f_ren_new_path)
......@@ -235,7 +246,7 @@ class AgkyraTest(unittest.TestCase):
def test_001_probe_and_sync(self):
# initial upload to pithos
f1 = "f001"
f1 = "φ001"
f1_content1 = "content1"
r1 = self.pithos.upload_from_string(
f1, f1_content1)
......@@ -281,7 +292,7 @@ class AgkyraTest(unittest.TestCase):
self.assertEqual(sstate.serial, 0)
def test_002_conflict(self):
fil = "f002"
fil = "φ002"
# local file
fil_local_content = "local"
with open(self.get_path(fil), "w") as f:
......@@ -317,10 +328,10 @@ class AgkyraTest(unittest.TestCase):
def test_003_dirs(self):
# make local dir with files
d = "d003"
d = "δ003"
d_path = self.get_path(d)
os.mkdir(d_path)
fil = "d003/f003"
fil = "δ003/φ003"
f_path = self.get_path(fil)
f_content = "f2"
with open(f_path, "w") as f:
......@@ -337,7 +348,7 @@ class AgkyraTest(unittest.TestCase):
def test_004_link(self):
# Check sym links
fil = "f004"
fil = "φ004"
f_path = self.get_path(fil)
open(f_path, 'a').close()
self.s.probe_file(self.s.SLAVE, fil)
......@@ -346,7 +357,7 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.AckSyncMessage)
ln = "f004.link"
ln = "φ004.link"
ln_path = self.get_path(ln)
os.symlink(f_path, ln_path)
self.s.probe_file(self.s.SLAVE, ln)
......@@ -390,7 +401,7 @@ class AgkyraTest(unittest.TestCase):
self.assert_no_message()
def test_005_dirs_inhibited_by_file(self):
fil = "f005"
fil = "φ005"
f_path = self.get_path(fil)
open(f_path, 'a').close()
self.s.probe_file(self.s.SLAVE, fil)
......@@ -401,7 +412,7 @@ class AgkyraTest(unittest.TestCase):
r = self.pithos.object_put(
fil, content_type='application/directory', content_length=0)
inner_fil = "f005/in005"
inner_fil = "φ005/in005"
inner_fil_content = "ff1 in dir "
r1 = self.pithos.upload_from_string(inner_fil, inner_fil_content)
self.s.probe_file(self.s.MASTER, fil)
......@@ -414,7 +425,7 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.SyncMessage)
self.assert_message(messaging.SyncErrorMessage)
inner_dir = "f005/indir005"
inner_dir = "φ005/indir005"
r = self.pithos.object_put(
inner_dir, content_type='application/directory', content_length=0)
self.s.probe_file(self.s.MASTER, inner_dir)
......@@ -434,7 +445,7 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.AckSyncMessage)
def test_006_heartbeat(self):
fil = "f006"
fil = "φ006"
f_path = self.get_path(fil)
open(f_path, 'a').close()
self.s.probe_file(self.s.SLAVE, fil)
......@@ -466,7 +477,7 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.AckSyncMessage)
def test_007_multiprobe(self):
fil = "f007"
fil = "φ007"
f_path = self.get_path(fil)
open(f_path, 'a').close()
self.s.probe_file(self.s.SLAVE, fil)
......@@ -477,11 +488,11 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.AlreadyProbedMessage)
def test_008_dir_contents(self):
d = "d008"
d = "δ008"
d_path = self.get_path(d)
r = self.pithos.object_put(
d, content_type='application/directory', content_length=0)
inner_fil = "d008/inf008"
inner_fil = "δ008/inφ008"
inner_fil_content = "fil in dir "
r1 = self.pithos.upload_from_string(inner_fil, inner_fil_content)
self.s.probe_file(self.s.MASTER, d)
......@@ -523,11 +534,11 @@ class AgkyraTest(unittest.TestCase):
self.assertEqual(cm.exception.status, 404)
def test_009_dir_delete_upstream(self):
d = "d009"
d = "δ009"
d_path = self.get_path(d)
r = self.pithos.object_put(
d, content_type='application/directory', content_length=0)
innerd = "d009/innerd009"
innerd = "δ009/innerδ009"
r = self.pithos.object_put(
innerd, content_type='application/directory', content_length=0)
self.s.probe_file(self.s.MASTER, d)
......@@ -564,7 +575,7 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.AckSyncMessage)
def test_010_live_update_local(self):
fil = "f010"
fil = "φ010"
f_path = self.get_path(fil)
with open(f_path, "w") as f:
f.write("f to be changed")
......@@ -589,7 +600,7 @@ class AgkyraTest(unittest.TestCase):
self.assertEqual(new_info["localfs_size"], len(f_content))
def test_011_live_update_upstream(self):
fil = "f011"
fil = "φ011"
f_path = self.get_path(fil)
r = self.pithos.upload_from_string(fil, "f upstream")
etag = r['etag']
......@@ -612,7 +623,7 @@ class AgkyraTest(unittest.TestCase):
self.assertEqual(new_info["pithos_etag"], new_etag)
def test_012_cachename(self):
fil = "f012"
fil = "φ012"
f_path = self.get_path(fil)
with open(f_path, "w") as f:
f.write("content")
......@@ -643,7 +654,7 @@ class AgkyraTest(unittest.TestCase):
handle.hide_file()
def test_013_collisions(self):
fil = "f013"
fil = "φ013"
f_path = self.get_path(fil)
with open(f_path, "w") as f:
f.write("content")
......@@ -656,7 +667,7 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.CollisionMessage)
self.assert_message(messaging.SyncErrorMessage)
d = "d013"
d = "δ013"
d_path = self.get_path(d)
os.mkdir(d_path)
self.s.probe_file(self.s.SLAVE, d)
......@@ -668,7 +679,7 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.CollisionMessage)
self.assert_message(messaging.SyncErrorMessage)
d_synced = "d013_s"
d_synced = "δ013_s"
d_synced_path = self.get_path(d_synced)
os.mkdir(d_synced_path)
self.s.probe_file(self.s.SLAVE, d_synced)
......@@ -688,9 +699,9 @@ class AgkyraTest(unittest.TestCase):
self.assert_message(messaging.SyncErrorMessage)
def test_014_staging(self):
fil = "f014"
d = "d014"
fln = "f014.link"
fil = "φ014"
d = "δ014"
fln = "φ014.link"
f_path = self.get_path(fil)
with open(f_path, "w") as f:
f.write("content")
......@@ -719,7 +730,7 @@ class AgkyraTest(unittest.TestCase):
with self.assertRaises(common.OpenBusyError):
handle = localfs_client.LocalfsSourceHandle(self.s.settings, state)
ftmp_path = self.get_path("f014tmp")
ftmp_path = self.get_path("φ014tmp")
with open(ftmp_path, "w") as f:
f.write("tmp")
os.unlink(f_path)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment