Commit cf5d9c29 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Context manager for thread safe dict

parent d34ed7b9
......@@ -14,7 +14,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import namedtuple
import threading
OBJECT_DIRSEP = '/'
......@@ -62,37 +61,3 @@ class HardSyncError(SyncError):
class CollisionError(HardSyncError):
pass
class LockedDict(object):
def __init__(self, *args, **kwargs):
self._Dict = {}
self._Lock = threading.Lock()
def put(self, key, value):
self._Lock.acquire()
self._Dict[key] = value
self._Lock.release()
def get(self, key, default=None):
self._Lock.acquire()
value = self._Dict.get(key, default)
self._Lock.release()
return value
def pop(self, key, d=None):
self._Lock.acquire()
value = self._Dict.pop(key, d)
self._Lock.release()
return value
def update(self, d):
self._Lock.acquire()
self._Dict.update(d)
self._Lock.release()
def keys(self):
self._Lock.acquire()
value = self._Dict.keys()
self._Lock.release()
return value
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading
class HeartBeat(object):
def __init__(self, *args, **kwargs):
self._LOG = {}
self._LOCK = threading.Lock()
def lock(self):
class Lock(object):
def __enter__(this):
self._LOCK.acquire()
return this
def __exit__(this, exctype, value, traceback):
self._LOCK.release()
if value is not None:
raise value
def get(this, key):
return self._LOG.get(key)
def set(this, key, value):
self._LOG[key] = value
def delete(this, key):
self._LOG.pop(key)
return Lock()
......@@ -464,13 +464,14 @@ class LocalfsFileClient(FileClient):
self.ROOTPATH = settings.local_root_path
self.CACHEPATH = settings.cache_path
self.get_db = settings.get_db
self.probe_candidates = common.LockedDict()
self.probe_candidates = utils.ThreadSafeDict()
def list_candidate_files(self, forced=False):
if forced:
candidates = self.walk_filesystem()
self.probe_candidates.update(candidates)
return self.probe_candidates.keys()
with self.probe_candidates.lock() as d:
if forced:
candidates = self.walk_filesystem()
d.update(candidates)
return d.keys()
def walk_filesystem(self):
db = self.get_db()
......@@ -506,7 +507,8 @@ class LocalfsFileClient(FileClient):
return exclude_pattern.match(final_part)
def start_probing_file(self, objname, old_state, ref_state, callback=None):
cached_info = self.probe_candidates.pop(objname)
with self.probe_candidates.lock() as d:
cached_info = d.pop(objname, None)
if self.exclude_file(objname):
logger.warning("Ignoring probe archive: %s, object: %s" %
(old_state.archive, objname))
......@@ -530,7 +532,8 @@ class LocalfsFileClient(FileClient):
def handle_path(path):
rel_path = os.path.relpath(path, start=self.ROOTPATH)
objname = utils.to_standard_sep(rel_path)
self.probe_candidates.put(objname, None)
with self.probe_candidates.lock() as d:
d[objname] = None
class EventHandler(FileSystemEventHandler):
def on_created(this, event):
......
......@@ -39,7 +39,7 @@ def heartbeat_event(settings, heartbeat, objname):
assert beat is not None
new_beat = {"ident": beat["ident"],
"tstamp": utils.time_stamp()}
hb.set(objname, new_beat)
hb[objname] = new_beat
logger.debug("HEARTBEAT '%s' %s" % (objname, new_beat))
def go():
......@@ -252,13 +252,14 @@ class PithosFileClient(FileClient):
self.get_db = settings.get_db
self.endpoint = settings.endpoint
self.last_modification = "0000-00-00"
self.probe_candidates = common.LockedDict()
self.probe_candidates = utils.ThreadSafeDict()
def list_candidate_files(self, forced=False):
if forced:
candidates = self.get_pithos_candidates()
self.probe_candidates.update(candidates)
return self.probe_candidates.keys()
with self.probe_candidates.lock() as d:
if forced:
candidates = self.get_pithos_candidates()
d.update(candidates)
return d.keys()
def get_pithos_candidates(self, last_modified=None):
db = self.get_db()
......@@ -302,7 +303,8 @@ class PithosFileClient(FileClient):
def run_body(this):
candidates = self.get_pithos_candidates(
last_modified=self.last_modification)
self.probe_candidates.update(candidates)
with self.probe_candidates.lock() as d:
d.update(candidates)
time.sleep(interval)
return utils.start_daemon(PollPithosThread)
......@@ -327,7 +329,8 @@ class PithosFileClient(FileClient):
def start_probing_file(self, objname, old_state, ref_state, callback=None):
info = old_state.info
cached_info = self.probe_candidates.pop(objname)
with self.probe_candidates.lock() as d:
cached_info = d.pop(objname, None)
if exclude_pattern.match(objname):
logger.warning("Ignoring probe archive: %s, object: '%s'" %
(old_state.archive, objname))
......
......@@ -17,9 +17,8 @@ import os
import threading
import logging
from agkyra.syncer.utils import join_path
from agkyra.syncer.utils import join_path, ThreadSafeDict
from agkyra.syncer.database import SqliteFileStateDB
from agkyra.syncer.heartbeat import HeartBeat
from agkyra.syncer.messaging import Messager
from kamaki.clients import ClientError
......@@ -94,7 +93,7 @@ class SyncerSettings():
self.cache_fetch_name)
self.create_dir(self.cache_fetch_path)
self.heartbeat = HeartBeat()
self.heartbeat = ThreadSafeDict()
self.action_max_wait = kwargs.get("action_max_wait",
DEFAULT_ACTION_MAX_WAIT)
self.pithos_list_interval = kwargs.get("pithos_list_interval",
......
......@@ -45,7 +45,7 @@ class FileSyncer(object):
self.notifiers = {}
self.decide_thread = None
self.sync_threads = []
self.failed_serials = common.LockedDict()
self.failed_serials = utils.ThreadSafeDict()
self.messager = settings.messager
self.heartbeat = self.settings.heartbeat
......@@ -160,7 +160,7 @@ class FileSyncer(object):
if states is not None:
with self.heartbeat.lock() as hb:
beat = {"ident": ident, "tstamp": utils.time_stamp()}
hb.set(objname, beat)
hb[objname] = beat
return states
def _do_decide_file_sync(self, objname, master, slave, ident):
......@@ -193,7 +193,8 @@ class FileSyncer(object):
(objname, beat))
if decision_serial != sync_serial:
failed_sync = self.failed_serials.get((decision_serial, objname))
with self.failed_serials.lock() as d:
failed_sync = d.get((decision_serial, objname))
if failed_sync is None:
logger.warning(
"Already decided: '%s', decision: %s, sync: %s" %
......@@ -270,8 +271,9 @@ class FileSyncer(object):
"Marking failed serial %s for archive: %s, object: '%s'" %
(serial, state.archive, objname))
with self.heartbeat.lock() as hb:
hb.delete(objname)
self.failed_serials.put((serial, objname), state)
hb.pop(objname)
with self.failed_serials.lock() as d:
d[(serial, objname)] = state
def update_state(self, old_state, new_state):
db = self.get_db()
......@@ -285,7 +287,7 @@ class FileSyncer(object):
objname = synced_source_state.objname
target = synced_target_state.archive
with self.heartbeat.lock() as hb:
hb.delete(objname)
hb.pop(objname)
msg = messaging.AckSyncMessage(
archive=target, objname=objname, serial=serial,
logger=logger)
......
......@@ -16,6 +16,7 @@
import os
import hashlib
import datetime
import threading
import watchdog.utils
from agkyra.syncer.common import OBJECT_DIRSEP
......@@ -86,3 +87,21 @@ def start_daemon(threadClass):
thread.daemon = True
thread.start()
return thread
class ThreadSafeDict(object):
def __init__(self, *args, **kwargs):
self._DICT = {}
self._LOCK = threading.Lock()
def lock(self):
class Lock(object):
def __enter__(this):
self._LOCK.acquire()
return self._DICT
def __exit__(this, exctype, value, traceback):
self._LOCK.release()
if value is not None:
raise value
return Lock()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment