From b1c5c1384f845ae9d791201e4d3542fd47b2f59e Mon Sep 17 00:00:00 2001 From: Giorgos Korfiatis <gkorf@grnet.gr> Date: Thu, 4 Jun 2015 18:04:24 +0300 Subject: [PATCH] Handle unicode issues Operate internally only with unicode. Talk to the filesystem with utf8 encoded strings and then decode the retrieved paths to unicode. Refuse to operate if the assumed filesystem encoding is not utf8. --- agkyra/syncer/localfs_client.py | 34 +++++++++---- agkyra/syncer/setup.py | 37 ++++++++++---- agkyra/syncer/utils.py | 24 ++++++++++ test.py | 85 +++++++++++++++++++-------------- 4 files changed, 125 insertions(+), 55 deletions(-) diff --git a/agkyra/syncer/localfs_client.py b/agkyra/syncer/localfs_client.py index adcb8b5..92fe510 100644 --- a/agkyra/syncer/localfs_client.py +++ b/agkyra/syncer/localfs_client.py @@ -88,7 +88,11 @@ def file_is_open(path): try: flist = psutil_open_files(proc) for nt in flist: - if nt.path == path: + try: + nt_path = utils.to_unicode(nt.path) + except UnicodeDecodeError as e: + continue + if nt_path == path: return True except psutil.Error: pass @@ -548,13 +552,22 @@ class LocalfsFileClient(FileClient): def walk_filesystem(self): candidates = {} - for dirpath, dirnames, files in os.walk(self.ROOTPATH): + rootpath = utils.from_unicode(self.ROOTPATH) + for dirpath, dirnames, files in os.walk(rootpath): + try: + dirpath = utils.to_unicode(dirpath) + except UnicodeDecodeError as e: + continue rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH) logger.debug("'%s' '%s'" % (dirpath, rel_dirpath)) if rel_dirpath != '.': objname = utils.to_standard_sep(rel_dirpath) candidates[objname] = self.none_info() for filename in files: + try: + filename = utils.to_unicode(filename) + except UnicodeDecodeError as e: + continue if rel_dirpath == '.': prefix = "" else: @@ -615,24 +628,29 @@ class LocalfsFileClient(FileClient): def notifier(self): def handle_path(path): + try: + path = utils.to_unicode(path) + except UnicodeDecodeError as e: + return rel_path = os.path.relpath(path, start=self.ROOTPATH) objname = utils.to_standard_sep(rel_path) with self.probe_candidates.lock() as d: d[objname] = self.none_info() + cachepath = utils.from_unicode(self.CACHEPATH) class EventHandler(FileSystemEventHandler): def on_created(this, event): # if not event.is_directory: # return path = event.src_path - if path.startswith(self.CACHEPATH): + if path.startswith(cachepath): return logger.debug("Handling %s" % event) handle_path(path) def on_deleted(this, event): path = event.src_path - if path.startswith(self.CACHEPATH): + if path.startswith(cachepath): return logger.debug("Handling %s" % event) handle_path(path) @@ -641,7 +659,7 @@ class LocalfsFileClient(FileClient): if event.is_directory: return path = event.src_path - if path.startswith(self.CACHEPATH): + if path.startswith(cachepath): return logger.debug("Handling %s" % event) handle_path(path) @@ -649,14 +667,14 @@ class LocalfsFileClient(FileClient): def on_moved(this, event): src_path = event.src_path dest_path = event.dest_path - if src_path.startswith(self.CACHEPATH) or \ - dest_path.startswith(self.CACHEPATH): + if src_path.startswith(cachepath) or \ + dest_path.startswith(cachepath): return logger.debug("Handling %s" % event) handle_path(src_path) handle_path(dest_path) - path = self.ROOTPATH + path = utils.from_unicode(self.ROOTPATH) event_handler = EventHandler() observer = Observer() observer.schedule(event_handler, path, recursive=True) diff --git a/agkyra/syncer/setup.py b/agkyra/syncer/setup.py index 3e8b2a0..4aadede 100644 --- a/agkyra/syncer/setup.py +++ b/agkyra/syncer/setup.py @@ -68,9 +68,24 @@ def ssl_fall_back(method): return wrap +def check_encoding(): + platform = utils.PLATFORM + encoding = utils.ENCODING + if platform.startswith("linux"): + if not encoding.lower() in ['utf-8', 'utf8']: + raise Exception( + "Cannot operate with encoding %s. Please use UTF-8." + % encoding) + + class SyncerSettings(): def __init__(self, auth_url, auth_token, container, local_root_path, *args, **kwargs): + check_encoding() + auth_url = utils.to_unicode(auth_url) + auth_token = utils.to_unicode(auth_token) + container = utils.to_unicode(container) + local_root_path = utils.to_unicode(local_root_path) self.auth_url = utils.normalize_standard_suffix(auth_url) self.auth_token = auth_token self.container = utils.normalize_standard_suffix(container) @@ -84,9 +99,10 @@ class SyncerSettings(): self.endpoint = self._get_pithos_client( auth_url, auth_token, container) - home_dir = os.path.expanduser('~') + home_dir = utils.to_unicode(os.path.expanduser('~')) default_settings_path = join_path(home_dir, GLOBAL_SETTINGS_NAME) - self.settings_path = kwargs.get("agkyra_path", default_settings_path) + self.settings_path = utils.to_unicode( + kwargs.get("agkyra_path", default_settings_path)) self.create_dir(self.settings_path) self.instances_path = join_path(self.settings_path, INSTANCES_NAME) @@ -102,27 +118,28 @@ class SyncerSettings(): self.instance_path = join_path(self.instances_path, self.instance) self.create_dir(self.instance_path) - self.dbname = kwargs.get("dbname", DEFAULT_DBNAME) + self.dbname = utils.to_unicode(kwargs.get("dbname", DEFAULT_DBNAME)) self.full_dbname = join_path(self.instance_path, self.dbname) self.get_db(initialize=True) - self.cache_name = kwargs.get("cache_name", DEFAULT_CACHE_NAME) + self.cache_name = utils.to_unicode( + kwargs.get("cache_name", DEFAULT_CACHE_NAME)) self.cache_path = join_path(self.local_root_path, self.cache_name) self.create_dir(self.cache_path) - self.cache_hide_name = kwargs.get("cache_hide_name", - DEFAULT_CACHE_HIDE_NAME) + self.cache_hide_name = utils.to_unicode( + kwargs.get("cache_hide_name", DEFAULT_CACHE_HIDE_NAME)) self.cache_hide_path = join_path(self.cache_path, self.cache_hide_name) self.create_dir(self.cache_hide_path) - self.cache_stage_name = kwargs.get("cache_stage_name", - DEFAULT_CACHE_STAGE_NAME) + self.cache_stage_name = utils.to_unicode( + kwargs.get("cache_stage_name", DEFAULT_CACHE_STAGE_NAME)) self.cache_stage_path = join_path(self.cache_path, self.cache_stage_name) self.create_dir(self.cache_stage_path) - self.cache_fetch_name = kwargs.get("cache_fetch_name", - DEFAULT_CACHE_FETCH_NAME) + self.cache_fetch_name = utils.to_unicode( + kwargs.get("cache_fetch_name", DEFAULT_CACHE_FETCH_NAME)) self.cache_fetch_path = join_path(self.cache_path, self.cache_fetch_name) self.create_dir(self.cache_fetch_path) diff --git a/agkyra/syncer/utils.py b/agkyra/syncer/utils.py index 157e63f..dc04ea4 100644 --- a/agkyra/syncer/utils.py +++ b/agkyra/syncer/utils.py @@ -18,9 +18,16 @@ import hashlib import datetime import threading import watchdog.utils +import sys +import logging + +logger = logging.getLogger(__name__) from agkyra.syncer.common import OBJECT_DIRSEP +ENCODING = sys.getfilesystemencoding() or sys.getdefaultencoding() +PLATFORM = sys.platform + def to_local_sep(filename): return filename.replace(OBJECT_DIRSEP, os.path.sep) @@ -48,7 +55,24 @@ def normalize_local_suffix(path): return path.rstrip(os.path.sep) + os.path.sep +def from_unicode(s): + if type(s) is unicode: + return s.encode(ENCODING) + return s + + +def to_unicode(s): + if type(s) is unicode: + return s + try: + return unicode(s, ENCODING) + except UnicodeDecodeError as e: + logger.warning("Failed to decode %s" % s.__repr__()) + raise + + def hash_string(s): + s = from_unicode(s) return hashlib.sha256(s).hexdigest() diff --git a/test.py b/test.py index 0621b69..a14aae3 100644 --- a/test.py +++ b/test.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- + # Copyright (C) 2015 GRNET S.A. # # This program is free software: you can redistribute it and/or modify @@ -13,6 +15,8 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +from __future__ import unicode_literals + from agkyra.syncer.setup import SyncerSettings from agkyra.syncer import localfs_client from agkyra.syncer.pithos_client import PithosFileClient @@ -39,6 +43,13 @@ handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) +# kamakisend_logger = logging.getLogger('kamaki.clients.send') +# kamakisend_logger.addHandler(handler) +# kamakisend_logger.setLevel(logging.DEBUG) +# kamakirecv_logger = logging.getLogger('kamaki.clients.recv') +# kamakirecv_logger.addHandler(handler) +# kamakirecv_logger.setLevel(logging.DEBUG) + TMP = "/tmp" @@ -76,7 +87,7 @@ class AgkyraTest(unittest.TestCase): AUTHENTICATION_URL = cloud_conf['url'] TOKEN = cloud_conf['token'] - cls.ID = "AGKYRATEST" + str(random.random()).split('.')[1] + cls.ID = "ΞΞΞΞ₯Ξ‘ΞTEST" + str(random.random()).split('.')[1] cls.LOCAL_ROOT_PATH = utils.join_path("/tmp", cls.ID) @@ -135,10 +146,10 @@ class AgkyraTest(unittest.TestCase): candidates = self.slave.list_candidate_files(forced=True) self.assertEqual(real(candidates), []) - fil = "f0001" + fil = "Ο0001" f_path = self.get_path(fil) open(f_path, "a").close() - d = "d0001" + d = "Ξ΄0001" d_path = self.get_path(d) os.mkdir(d_path) @@ -177,11 +188,11 @@ class AgkyraTest(unittest.TestCase): self.assertEqual(candidates, []) def test_0002_notifier_local(self): - f_out = "f0002out" - f_cache = "f0002cache" - f_upd = "f0002upd" - f_ren = "f0002ren" - dbefore = "d0002before" + f_out = "Ο0002out" + f_cache = "Ο0002cache" + f_upd = "Ο0002upd" + f_ren = "Ο0002ren" + dbefore = "Ξ΄0002before" f_out_path = self.get_path(f_out) f_cache_path = self.get_path(f_cache) f_upd_path = self.get_path(f_upd) @@ -197,9 +208,9 @@ class AgkyraTest(unittest.TestCase): candidates = self.slave.list_candidate_files() self.assertEqual(candidates, []) - fafter = "f0002after" + fafter = "Ο0002after" fafter_path = self.get_path(fafter) - dafter = "d0002after" + dafter = "Ξ΄0002after" dafter_path = self.get_path(dafter) open(fafter_path, "a").close() os.mkdir(dafter_path) @@ -215,13 +226,13 @@ class AgkyraTest(unittest.TestCase): with open(f_upd_path, "a") as f: f.write("upd") - f_in = "f0002in" + f_in = "Ο0002in" f_in_path = self.get_path(f_in) f_in_orig_path = utils.join_path(TMP, f_in) open(f_in_orig_path, "a").close() os.rename(f_in_orig_path, f_in_path) - f_ren_new = "f0002ren_new" + f_ren_new = "Ο0002ren_new" f_ren_new_path = self.get_path(f_ren_new) os.rename(f_ren_path, f_ren_new_path) @@ -235,7 +246,7 @@ class AgkyraTest(unittest.TestCase): def test_001_probe_and_sync(self): # initial upload to pithos - f1 = "f001" + f1 = "Ο001" f1_content1 = "content1" r1 = self.pithos.upload_from_string( f1, f1_content1) @@ -281,7 +292,7 @@ class AgkyraTest(unittest.TestCase): self.assertEqual(sstate.serial, 0) def test_002_conflict(self): - fil = "f002" + fil = "Ο002" # local file fil_local_content = "local" with open(self.get_path(fil), "w") as f: @@ -317,10 +328,10 @@ class AgkyraTest(unittest.TestCase): def test_003_dirs(self): # make local dir with files - d = "d003" + d = "Ξ΄003" d_path = self.get_path(d) os.mkdir(d_path) - fil = "d003/f003" + fil = "Ξ΄003/Ο003" f_path = self.get_path(fil) f_content = "f2" with open(f_path, "w") as f: @@ -337,7 +348,7 @@ class AgkyraTest(unittest.TestCase): def test_004_link(self): # Check sym links - fil = "f004" + fil = "Ο004" f_path = self.get_path(fil) open(f_path, 'a').close() self.s.probe_file(self.s.SLAVE, fil) @@ -346,7 +357,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) - ln = "f004.link" + ln = "Ο004.link" ln_path = self.get_path(ln) os.symlink(f_path, ln_path) self.s.probe_file(self.s.SLAVE, ln) @@ -390,7 +401,7 @@ class AgkyraTest(unittest.TestCase): self.assert_no_message() def test_005_dirs_inhibited_by_file(self): - fil = "f005" + fil = "Ο005" f_path = self.get_path(fil) open(f_path, 'a').close() self.s.probe_file(self.s.SLAVE, fil) @@ -401,7 +412,7 @@ class AgkyraTest(unittest.TestCase): r = self.pithos.object_put( fil, content_type='application/directory', content_length=0) - inner_fil = "f005/in005" + inner_fil = "Ο005/in005" inner_fil_content = "ff1 in dir " r1 = self.pithos.upload_from_string(inner_fil, inner_fil_content) self.s.probe_file(self.s.MASTER, fil) @@ -414,7 +425,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.SyncMessage) self.assert_message(messaging.SyncErrorMessage) - inner_dir = "f005/indir005" + inner_dir = "Ο005/indir005" r = self.pithos.object_put( inner_dir, content_type='application/directory', content_length=0) self.s.probe_file(self.s.MASTER, inner_dir) @@ -434,7 +445,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.AckSyncMessage) def test_006_heartbeat(self): - fil = "f006" + fil = "Ο006" f_path = self.get_path(fil) open(f_path, 'a').close() self.s.probe_file(self.s.SLAVE, fil) @@ -466,7 +477,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.AckSyncMessage) def test_007_multiprobe(self): - fil = "f007" + fil = "Ο007" f_path = self.get_path(fil) open(f_path, 'a').close() self.s.probe_file(self.s.SLAVE, fil) @@ -477,11 +488,11 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.AlreadyProbedMessage) def test_008_dir_contents(self): - d = "d008" + d = "Ξ΄008" d_path = self.get_path(d) r = self.pithos.object_put( d, content_type='application/directory', content_length=0) - inner_fil = "d008/inf008" + inner_fil = "Ξ΄008/inΟ008" inner_fil_content = "fil in dir " r1 = self.pithos.upload_from_string(inner_fil, inner_fil_content) self.s.probe_file(self.s.MASTER, d) @@ -523,11 +534,11 @@ class AgkyraTest(unittest.TestCase): self.assertEqual(cm.exception.status, 404) def test_009_dir_delete_upstream(self): - d = "d009" + d = "Ξ΄009" d_path = self.get_path(d) r = self.pithos.object_put( d, content_type='application/directory', content_length=0) - innerd = "d009/innerd009" + innerd = "Ξ΄009/innerΞ΄009" r = self.pithos.object_put( innerd, content_type='application/directory', content_length=0) self.s.probe_file(self.s.MASTER, d) @@ -564,7 +575,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.AckSyncMessage) def test_010_live_update_local(self): - fil = "f010" + fil = "Ο010" f_path = self.get_path(fil) with open(f_path, "w") as f: f.write("f to be changed") @@ -589,7 +600,7 @@ class AgkyraTest(unittest.TestCase): self.assertEqual(new_info["localfs_size"], len(f_content)) def test_011_live_update_upstream(self): - fil = "f011" + fil = "Ο011" f_path = self.get_path(fil) r = self.pithos.upload_from_string(fil, "f upstream") etag = r['etag'] @@ -612,7 +623,7 @@ class AgkyraTest(unittest.TestCase): self.assertEqual(new_info["pithos_etag"], new_etag) def test_012_cachename(self): - fil = "f012" + fil = "Ο012" f_path = self.get_path(fil) with open(f_path, "w") as f: f.write("content") @@ -643,7 +654,7 @@ class AgkyraTest(unittest.TestCase): handle.hide_file() def test_013_collisions(self): - fil = "f013" + fil = "Ο013" f_path = self.get_path(fil) with open(f_path, "w") as f: f.write("content") @@ -656,7 +667,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.SyncErrorMessage) - d = "d013" + d = "Ξ΄013" d_path = self.get_path(d) os.mkdir(d_path) self.s.probe_file(self.s.SLAVE, d) @@ -668,7 +679,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.SyncErrorMessage) - d_synced = "d013_s" + d_synced = "Ξ΄013_s" d_synced_path = self.get_path(d_synced) os.mkdir(d_synced_path) self.s.probe_file(self.s.SLAVE, d_synced) @@ -688,9 +699,9 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.SyncErrorMessage) def test_014_staging(self): - fil = "f014" - d = "d014" - fln = "f014.link" + fil = "Ο014" + d = "Ξ΄014" + fln = "Ο014.link" f_path = self.get_path(fil) with open(f_path, "w") as f: f.write("content") @@ -719,7 +730,7 @@ class AgkyraTest(unittest.TestCase): with self.assertRaises(common.OpenBusyError): handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) - ftmp_path = self.get_path("f014tmp") + ftmp_path = self.get_path("Ο014tmp") with open(ftmp_path, "w") as f: f.write("tmp") os.unlink(f_path) -- GitLab