diff --git a/agkyra/syncer/localfs_client.py b/agkyra/syncer/localfs_client.py index adcb8b5e6e995c84c5607c4c341bb9488b5cd86a..92fe51089f823136fb662f6692ae335e63e98b31 100644 --- a/agkyra/syncer/localfs_client.py +++ b/agkyra/syncer/localfs_client.py @@ -88,7 +88,11 @@ def file_is_open(path): try: flist = psutil_open_files(proc) for nt in flist: - if nt.path == path: + try: + nt_path = utils.to_unicode(nt.path) + except UnicodeDecodeError as e: + continue + if nt_path == path: return True except psutil.Error: pass @@ -548,13 +552,22 @@ class LocalfsFileClient(FileClient): def walk_filesystem(self): candidates = {} - for dirpath, dirnames, files in os.walk(self.ROOTPATH): + rootpath = utils.from_unicode(self.ROOTPATH) + for dirpath, dirnames, files in os.walk(rootpath): + try: + dirpath = utils.to_unicode(dirpath) + except UnicodeDecodeError as e: + continue rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH) logger.debug("'%s' '%s'" % (dirpath, rel_dirpath)) if rel_dirpath != '.': objname = utils.to_standard_sep(rel_dirpath) candidates[objname] = self.none_info() for filename in files: + try: + filename = utils.to_unicode(filename) + except UnicodeDecodeError as e: + continue if rel_dirpath == '.': prefix = "" else: @@ -615,24 +628,29 @@ class LocalfsFileClient(FileClient): def notifier(self): def handle_path(path): + try: + path = utils.to_unicode(path) + except UnicodeDecodeError as e: + return rel_path = os.path.relpath(path, start=self.ROOTPATH) objname = utils.to_standard_sep(rel_path) with self.probe_candidates.lock() as d: d[objname] = self.none_info() + cachepath = utils.from_unicode(self.CACHEPATH) class EventHandler(FileSystemEventHandler): def on_created(this, event): # if not event.is_directory: # return path = event.src_path - if path.startswith(self.CACHEPATH): + if path.startswith(cachepath): return logger.debug("Handling %s" % event) handle_path(path) def on_deleted(this, event): path = event.src_path - if path.startswith(self.CACHEPATH): + if path.startswith(cachepath): return logger.debug("Handling %s" % event) handle_path(path) @@ -641,7 +659,7 @@ class LocalfsFileClient(FileClient): if event.is_directory: return path = event.src_path - if path.startswith(self.CACHEPATH): + if path.startswith(cachepath): return logger.debug("Handling %s" % event) handle_path(path) @@ -649,14 +667,14 @@ class LocalfsFileClient(FileClient): def on_moved(this, event): src_path = event.src_path dest_path = event.dest_path - if src_path.startswith(self.CACHEPATH) or \ - dest_path.startswith(self.CACHEPATH): + if src_path.startswith(cachepath) or \ + dest_path.startswith(cachepath): return logger.debug("Handling %s" % event) handle_path(src_path) handle_path(dest_path) - path = self.ROOTPATH + path = utils.from_unicode(self.ROOTPATH) event_handler = EventHandler() observer = Observer() observer.schedule(event_handler, path, recursive=True) diff --git a/agkyra/syncer/setup.py b/agkyra/syncer/setup.py index 3e8b2a001761dcb5643e293c9105f783561bf606..4aadede0336d5f23c450bd9beb364f49be2c7206 100644 --- a/agkyra/syncer/setup.py +++ b/agkyra/syncer/setup.py @@ -68,9 +68,24 @@ def ssl_fall_back(method): return wrap +def check_encoding(): + platform = utils.PLATFORM + encoding = utils.ENCODING + if platform.startswith("linux"): + if not encoding.lower() in ['utf-8', 'utf8']: + raise Exception( + "Cannot operate with encoding %s. Please use UTF-8." + % encoding) + + class SyncerSettings(): def __init__(self, auth_url, auth_token, container, local_root_path, *args, **kwargs): + check_encoding() + auth_url = utils.to_unicode(auth_url) + auth_token = utils.to_unicode(auth_token) + container = utils.to_unicode(container) + local_root_path = utils.to_unicode(local_root_path) self.auth_url = utils.normalize_standard_suffix(auth_url) self.auth_token = auth_token self.container = utils.normalize_standard_suffix(container) @@ -84,9 +99,10 @@ class SyncerSettings(): self.endpoint = self._get_pithos_client( auth_url, auth_token, container) - home_dir = os.path.expanduser('~') + home_dir = utils.to_unicode(os.path.expanduser('~')) default_settings_path = join_path(home_dir, GLOBAL_SETTINGS_NAME) - self.settings_path = kwargs.get("agkyra_path", default_settings_path) + self.settings_path = utils.to_unicode( + kwargs.get("agkyra_path", default_settings_path)) self.create_dir(self.settings_path) self.instances_path = join_path(self.settings_path, INSTANCES_NAME) @@ -102,27 +118,28 @@ class SyncerSettings(): self.instance_path = join_path(self.instances_path, self.instance) self.create_dir(self.instance_path) - self.dbname = kwargs.get("dbname", DEFAULT_DBNAME) + self.dbname = utils.to_unicode(kwargs.get("dbname", DEFAULT_DBNAME)) self.full_dbname = join_path(self.instance_path, self.dbname) self.get_db(initialize=True) - self.cache_name = kwargs.get("cache_name", DEFAULT_CACHE_NAME) + self.cache_name = utils.to_unicode( + kwargs.get("cache_name", DEFAULT_CACHE_NAME)) self.cache_path = join_path(self.local_root_path, self.cache_name) self.create_dir(self.cache_path) - self.cache_hide_name = kwargs.get("cache_hide_name", - DEFAULT_CACHE_HIDE_NAME) + self.cache_hide_name = utils.to_unicode( + kwargs.get("cache_hide_name", DEFAULT_CACHE_HIDE_NAME)) self.cache_hide_path = join_path(self.cache_path, self.cache_hide_name) self.create_dir(self.cache_hide_path) - self.cache_stage_name = kwargs.get("cache_stage_name", - DEFAULT_CACHE_STAGE_NAME) + self.cache_stage_name = utils.to_unicode( + kwargs.get("cache_stage_name", DEFAULT_CACHE_STAGE_NAME)) self.cache_stage_path = join_path(self.cache_path, self.cache_stage_name) self.create_dir(self.cache_stage_path) - self.cache_fetch_name = kwargs.get("cache_fetch_name", - DEFAULT_CACHE_FETCH_NAME) + self.cache_fetch_name = utils.to_unicode( + kwargs.get("cache_fetch_name", DEFAULT_CACHE_FETCH_NAME)) self.cache_fetch_path = join_path(self.cache_path, self.cache_fetch_name) self.create_dir(self.cache_fetch_path) diff --git a/agkyra/syncer/utils.py b/agkyra/syncer/utils.py index 157e63fc6a12a6b365da89896da97f96fbe4ebf1..dc04ea4a6824eaccb2f5120e78e3fdbcc730001c 100644 --- a/agkyra/syncer/utils.py +++ b/agkyra/syncer/utils.py @@ -18,9 +18,16 @@ import hashlib import datetime import threading import watchdog.utils +import sys +import logging + +logger = logging.getLogger(__name__) from agkyra.syncer.common import OBJECT_DIRSEP +ENCODING = sys.getfilesystemencoding() or sys.getdefaultencoding() +PLATFORM = sys.platform + def to_local_sep(filename): return filename.replace(OBJECT_DIRSEP, os.path.sep) @@ -48,7 +55,24 @@ def normalize_local_suffix(path): return path.rstrip(os.path.sep) + os.path.sep +def from_unicode(s): + if type(s) is unicode: + return s.encode(ENCODING) + return s + + +def to_unicode(s): + if type(s) is unicode: + return s + try: + return unicode(s, ENCODING) + except UnicodeDecodeError as e: + logger.warning("Failed to decode %s" % s.__repr__()) + raise + + def hash_string(s): + s = from_unicode(s) return hashlib.sha256(s).hexdigest() diff --git a/test.py b/test.py index 0621b699d8e99e4b5a98289e5442e09ba6699ad9..a14aae340dd171ba9447e2996913ce2e0ccc6ab2 100644 --- a/test.py +++ b/test.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- + # Copyright (C) 2015 GRNET S.A. # # This program is free software: you can redistribute it and/or modify @@ -13,6 +15,8 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +from __future__ import unicode_literals + from agkyra.syncer.setup import SyncerSettings from agkyra.syncer import localfs_client from agkyra.syncer.pithos_client import PithosFileClient @@ -39,6 +43,13 @@ handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) +# kamakisend_logger = logging.getLogger('kamaki.clients.send') +# kamakisend_logger.addHandler(handler) +# kamakisend_logger.setLevel(logging.DEBUG) +# kamakirecv_logger = logging.getLogger('kamaki.clients.recv') +# kamakirecv_logger.addHandler(handler) +# kamakirecv_logger.setLevel(logging.DEBUG) + TMP = "/tmp" @@ -76,7 +87,7 @@ class AgkyraTest(unittest.TestCase): AUTHENTICATION_URL = cloud_conf['url'] TOKEN = cloud_conf['token'] - cls.ID = "AGKYRATEST" + str(random.random()).split('.')[1] + cls.ID = "ΞΞΞΞ₯Ξ‘ΞTEST" + str(random.random()).split('.')[1] cls.LOCAL_ROOT_PATH = utils.join_path("/tmp", cls.ID) @@ -135,10 +146,10 @@ class AgkyraTest(unittest.TestCase): candidates = self.slave.list_candidate_files(forced=True) self.assertEqual(real(candidates), []) - fil = "f0001" + fil = "Ο0001" f_path = self.get_path(fil) open(f_path, "a").close() - d = "d0001" + d = "Ξ΄0001" d_path = self.get_path(d) os.mkdir(d_path) @@ -177,11 +188,11 @@ class AgkyraTest(unittest.TestCase): self.assertEqual(candidates, []) def test_0002_notifier_local(self): - f_out = "f0002out" - f_cache = "f0002cache" - f_upd = "f0002upd" - f_ren = "f0002ren" - dbefore = "d0002before" + f_out = "Ο0002out" + f_cache = "Ο0002cache" + f_upd = "Ο0002upd" + f_ren = "Ο0002ren" + dbefore = "Ξ΄0002before" f_out_path = self.get_path(f_out) f_cache_path = self.get_path(f_cache) f_upd_path = self.get_path(f_upd) @@ -197,9 +208,9 @@ class AgkyraTest(unittest.TestCase): candidates = self.slave.list_candidate_files() self.assertEqual(candidates, []) - fafter = "f0002after" + fafter = "Ο0002after" fafter_path = self.get_path(fafter) - dafter = "d0002after" + dafter = "Ξ΄0002after" dafter_path = self.get_path(dafter) open(fafter_path, "a").close() os.mkdir(dafter_path) @@ -215,13 +226,13 @@ class AgkyraTest(unittest.TestCase): with open(f_upd_path, "a") as f: f.write("upd") - f_in = "f0002in" + f_in = "Ο0002in" f_in_path = self.get_path(f_in) f_in_orig_path = utils.join_path(TMP, f_in) open(f_in_orig_path, "a").close() os.rename(f_in_orig_path, f_in_path) - f_ren_new = "f0002ren_new" + f_ren_new = "Ο0002ren_new" f_ren_new_path = self.get_path(f_ren_new) os.rename(f_ren_path, f_ren_new_path) @@ -235,7 +246,7 @@ class AgkyraTest(unittest.TestCase): def test_001_probe_and_sync(self): # initial upload to pithos - f1 = "f001" + f1 = "Ο001" f1_content1 = "content1" r1 = self.pithos.upload_from_string( f1, f1_content1) @@ -281,7 +292,7 @@ class AgkyraTest(unittest.TestCase): self.assertEqual(sstate.serial, 0) def test_002_conflict(self): - fil = "f002" + fil = "Ο002" # local file fil_local_content = "local" with open(self.get_path(fil), "w") as f: @@ -317,10 +328,10 @@ class AgkyraTest(unittest.TestCase): def test_003_dirs(self): # make local dir with files - d = "d003" + d = "Ξ΄003" d_path = self.get_path(d) os.mkdir(d_path) - fil = "d003/f003" + fil = "Ξ΄003/Ο003" f_path = self.get_path(fil) f_content = "f2" with open(f_path, "w") as f: @@ -337,7 +348,7 @@ class AgkyraTest(unittest.TestCase): def test_004_link(self): # Check sym links - fil = "f004" + fil = "Ο004" f_path = self.get_path(fil) open(f_path, 'a').close() self.s.probe_file(self.s.SLAVE, fil) @@ -346,7 +357,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.SyncMessage) self.assert_message(messaging.AckSyncMessage) - ln = "f004.link" + ln = "Ο004.link" ln_path = self.get_path(ln) os.symlink(f_path, ln_path) self.s.probe_file(self.s.SLAVE, ln) @@ -390,7 +401,7 @@ class AgkyraTest(unittest.TestCase): self.assert_no_message() def test_005_dirs_inhibited_by_file(self): - fil = "f005" + fil = "Ο005" f_path = self.get_path(fil) open(f_path, 'a').close() self.s.probe_file(self.s.SLAVE, fil) @@ -401,7 +412,7 @@ class AgkyraTest(unittest.TestCase): r = self.pithos.object_put( fil, content_type='application/directory', content_length=0) - inner_fil = "f005/in005" + inner_fil = "Ο005/in005" inner_fil_content = "ff1 in dir " r1 = self.pithos.upload_from_string(inner_fil, inner_fil_content) self.s.probe_file(self.s.MASTER, fil) @@ -414,7 +425,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.SyncMessage) self.assert_message(messaging.SyncErrorMessage) - inner_dir = "f005/indir005" + inner_dir = "Ο005/indir005" r = self.pithos.object_put( inner_dir, content_type='application/directory', content_length=0) self.s.probe_file(self.s.MASTER, inner_dir) @@ -434,7 +445,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.AckSyncMessage) def test_006_heartbeat(self): - fil = "f006" + fil = "Ο006" f_path = self.get_path(fil) open(f_path, 'a').close() self.s.probe_file(self.s.SLAVE, fil) @@ -466,7 +477,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.AckSyncMessage) def test_007_multiprobe(self): - fil = "f007" + fil = "Ο007" f_path = self.get_path(fil) open(f_path, 'a').close() self.s.probe_file(self.s.SLAVE, fil) @@ -477,11 +488,11 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.AlreadyProbedMessage) def test_008_dir_contents(self): - d = "d008" + d = "Ξ΄008" d_path = self.get_path(d) r = self.pithos.object_put( d, content_type='application/directory', content_length=0) - inner_fil = "d008/inf008" + inner_fil = "Ξ΄008/inΟ008" inner_fil_content = "fil in dir " r1 = self.pithos.upload_from_string(inner_fil, inner_fil_content) self.s.probe_file(self.s.MASTER, d) @@ -523,11 +534,11 @@ class AgkyraTest(unittest.TestCase): self.assertEqual(cm.exception.status, 404) def test_009_dir_delete_upstream(self): - d = "d009" + d = "Ξ΄009" d_path = self.get_path(d) r = self.pithos.object_put( d, content_type='application/directory', content_length=0) - innerd = "d009/innerd009" + innerd = "Ξ΄009/innerΞ΄009" r = self.pithos.object_put( innerd, content_type='application/directory', content_length=0) self.s.probe_file(self.s.MASTER, d) @@ -564,7 +575,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.AckSyncMessage) def test_010_live_update_local(self): - fil = "f010" + fil = "Ο010" f_path = self.get_path(fil) with open(f_path, "w") as f: f.write("f to be changed") @@ -589,7 +600,7 @@ class AgkyraTest(unittest.TestCase): self.assertEqual(new_info["localfs_size"], len(f_content)) def test_011_live_update_upstream(self): - fil = "f011" + fil = "Ο011" f_path = self.get_path(fil) r = self.pithos.upload_from_string(fil, "f upstream") etag = r['etag'] @@ -612,7 +623,7 @@ class AgkyraTest(unittest.TestCase): self.assertEqual(new_info["pithos_etag"], new_etag) def test_012_cachename(self): - fil = "f012" + fil = "Ο012" f_path = self.get_path(fil) with open(f_path, "w") as f: f.write("content") @@ -643,7 +654,7 @@ class AgkyraTest(unittest.TestCase): handle.hide_file() def test_013_collisions(self): - fil = "f013" + fil = "Ο013" f_path = self.get_path(fil) with open(f_path, "w") as f: f.write("content") @@ -656,7 +667,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.SyncErrorMessage) - d = "d013" + d = "Ξ΄013" d_path = self.get_path(d) os.mkdir(d_path) self.s.probe_file(self.s.SLAVE, d) @@ -668,7 +679,7 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.CollisionMessage) self.assert_message(messaging.SyncErrorMessage) - d_synced = "d013_s" + d_synced = "Ξ΄013_s" d_synced_path = self.get_path(d_synced) os.mkdir(d_synced_path) self.s.probe_file(self.s.SLAVE, d_synced) @@ -688,9 +699,9 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.SyncErrorMessage) def test_014_staging(self): - fil = "f014" - d = "d014" - fln = "f014.link" + fil = "Ο014" + d = "Ξ΄014" + fln = "Ο014.link" f_path = self.get_path(fil) with open(f_path, "w") as f: f.write("content") @@ -719,7 +730,7 @@ class AgkyraTest(unittest.TestCase): with self.assertRaises(common.OpenBusyError): handle = localfs_client.LocalfsSourceHandle(self.s.settings, state) - ftmp_path = self.get_path("f014tmp") + ftmp_path = self.get_path("Ο014tmp") with open(ftmp_path, "w") as f: f.write("tmp") os.unlink(f_path)