Commit 9a9068b0 authored by Sofia Papagiannaki's avatar Sofia Papagiannaki

pithos: Add missing locks

_lookup_container() locks the container path
but it was not called from some write backend methods
that expect the container path to be locked.

In such cases optionally _lookup_object() calls
the _lookup_container() in order to lock the container path.
parent 4e19e944
......@@ -827,7 +827,9 @@ class ModularBackend(BaseBackend):
logger.debug("update_object_meta: %s %s %s %s %s %s %s",
user, account, container, name, domain, meta, replace)
self._can_write(user, account, container, name)
path, node = self._lookup_object(account, container, name)
path, node = self._lookup_object(account, container, name,
lock_container=True)
src_version_id, dest_version_id = self._put_metadata(
user, node, domain, meta, replace,
update_statistics_ancestors_depth=1)
......@@ -863,7 +865,8 @@ class ModularBackend(BaseBackend):
user, account, container, name, permissions)
if user != account:
raise NotAllowedError
path = self._lookup_object(account, container, name)[0]
path = self._lookup_object(account, container, name,
lock_container=True)[0]
self._check_permissions(path, permissions)
self.permissions.access_set(path, permissions)
self._report_sharing_change(user, account, path, {'members':
......@@ -887,7 +890,8 @@ class ModularBackend(BaseBackend):
logger.debug("update_object_public: %s %s %s %s %s", user,
account, container, name, public)
self._can_write(user, account, container, name)
path = self._lookup_object(account, container, name)[0]
path = self._lookup_object(account, container, name,
lock_container=True)[0]
if not public:
self.permissions.public_unset(path)
else:
......@@ -993,6 +997,7 @@ class ModularBackend(BaseBackend):
raise ie
hash = map.hash()
# _update_object_hash() locks destination path
dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
self.store.map_put(hash, map)
return dest_version_id
......@@ -1005,7 +1010,8 @@ class ModularBackend(BaseBackend):
user, account, container, name, version, checksum)
# Update objects with greater version and same hashmap and size (fix metadata updates).
self._can_write(user, account, container, name)
path, node = self._lookup_object(account, container, name)
path, node = self._lookup_object(account, container, name,
lock_container=True)
props = self._get_version(node, version)
versions = self.node.node_get_versions(node)
for x in versions:
......@@ -1017,6 +1023,17 @@ class ModularBackend(BaseBackend):
dest_meta = dest_meta or {}
dest_version_ids = []
self._can_read(user, src_account, src_container, src_name)
src_container_path = '/'.join((src_account, src_container))
dest_container_path = '/'.join((dest_account, dest_container))
# Lock container paths in alphabetical order
if src_container_path < dest_container_path:
self._lookup_container(src_account, src_container)
self._lookup_container(dest_account, dest_container)
else:
self._lookup_container(dest_account, dest_container)
self._lookup_container(src_account, src_container)
path, node = self._lookup_object(src_account, src_container, src_name)
# TODO: Will do another fetch of the properties in duplicate version...
props = self._get_version(
......@@ -1048,6 +1065,7 @@ class ModularBackend(BaseBackend):
dest_prefix = dest_name + delimiter if not dest_name.endswith(
delimiter) else dest_name
vdest_name = path.replace(prefix, dest_prefix, 1)
# _update_object_hash() locks destination path
dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
self._delete_object(user, src_account, src_container, path)
......@@ -1077,6 +1095,10 @@ class ModularBackend(BaseBackend):
if user != account:
raise NotAllowedError
# lookup object and lock container path also
path, node = self._lookup_object(account, container, name,
lock_container=True)
if until is not None:
path = '/'.join((account, container, name))
node = self.node.node_lookup(path)
......@@ -1113,7 +1135,6 @@ class ModularBackend(BaseBackend):
)
return
path, node = self._lookup_object(account, container, name)
src_version_id, dest_version_id = self._put_version_duplicate(
user, node, size=0, type='', hash=None, checksum='',
cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
......@@ -1255,7 +1276,10 @@ class ModularBackend(BaseBackend):
raise ItemNotExists('Container does not exist')
return path, node
def _lookup_object(self, account, container, name):
def _lookup_object(self, account, container, name, lock_container=False):
if lock_container:
self._lookup_container(account, container)
path = '/'.join((account, container, name))
node = self.node.node_lookup(path)
if node is None:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment