Commit eb34f0cc authored by Sofia Papagiannaki's avatar Sofia Papagiannaki Committed by Georgios D. Tsoukalas
Browse files

pithos: update recursively statistics up to the container level

To achieve atomicity we lock the container path.
However, the write operations update the statistics for the ancestor
nodes (container/account).
Therefore, the backend restricts the recursion up to the container
level.
Account statistics should be computed whenever are required.
parent 4c076f4e
......@@ -315,7 +315,8 @@ class Node(DBWorker):
r.close()
return row[0]
def node_purge_children(self, parent, before=inf, cluster=0):
def node_purge_children(self, parent, before=inf, cluster=0,
update_statistics_ancestors_depth=None):
"""Delete all versions with the specified
parent and cluster, and return
the hashes, the total size and the serials of versions deleted.
......@@ -340,7 +341,8 @@ class Node(DBWorker):
nr, size = row[0], row[1] if row[1] else 0
mtime = time()
self.statistics_update(parent, -nr, -size, mtime, cluster)
self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
update_statistics_ancestors_depth)
s = select([self.versions.c.hash, self.versions.c.serial])
s = s.where(where_clause)
......@@ -371,7 +373,8 @@ class Node(DBWorker):
return hashes, size, serials
def node_purge(self, node, before=inf, cluster=0):
def node_purge(self, node, before=inf, cluster=0,
update_statistics_ancestors_depth=None):
"""Delete all versions with the specified
node and cluster, and return
the hashes and size of versions deleted.
......@@ -394,7 +397,8 @@ class Node(DBWorker):
if not nr:
return (), 0, ()
mtime = time()
self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
update_statistics_ancestors_depth)
s = select([self.versions.c.hash, self.versions.c.serial])
s = s.where(where_clause)
......@@ -425,7 +429,7 @@ class Node(DBWorker):
return hashes, size, serials
def node_remove(self, node):
def node_remove(self, node, update_statistics_ancestors_depth=None):
"""Remove the node specified.
Return false if the node has children or is not found.
"""
......@@ -442,7 +446,8 @@ class Node(DBWorker):
r = self.conn.execute(s)
for population, size, cluster in r.fetchall():
self.statistics_update_ancestors(
node, -population, -size, mtime, cluster)
node, -population, -size, mtime, cluster,
update_statistics_ancestors_depth)
r.close()
s = self.nodes.delete().where(self.nodes.c.node == node)
......@@ -557,15 +562,20 @@ class Node(DBWorker):
mtime=mtime, cluster=cluster)
self.conn.execute(ins).close()
def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
def statistics_update_ancestors(self, node, population, size, mtime,
cluster=0, recursion_depth=None):
"""Update the statistics of the given node's parent.
Then recursively update all parents up to the root.
Then recursively update all parents up to the root
or up to the ``recursion_depth`` (if not None).
Population is not recursive.
"""
i = 0
while True:
if node == ROOTNODE:
break
if recursion_depth and recursion_depth == i:
break
props = self.node_get_properties(node)
if props is None:
break
......@@ -573,6 +583,7 @@ class Node(DBWorker):
self.statistics_update(parent, population, size, mtime, cluster)
node = parent
population = 0 # Population isn't recursive
i += 1
def statistics_latest(self, node, before=inf, except_cluster=0):
"""Return population, total size and last mtime
......@@ -671,7 +682,9 @@ class Node(DBWorker):
s = s.values(latest_version=serial)
self.conn.execute(s).close()
def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
def version_create(self, node, hash, size, type, source, muser, uuid,
checksum, cluster=0,
update_statistics_ancestors_depth=None):
"""Create a new version from the given properties.
Return the (serial, mtime) of the new version.
"""
......@@ -681,7 +694,8 @@ class Node(DBWorker):
).values(node=node, hash=hash, size=size, type=type, source=source,
mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
serial = self.conn.execute(s).inserted_primary_key[0]
self.statistics_update_ancestors(node, 1, size, mtime, cluster)
self.statistics_update_ancestors(node, 1, size, mtime, cluster,
update_statistics_ancestors_depth)
self.nodes_set_latest_version(node, serial)
......@@ -782,7 +796,8 @@ class Node(DBWorker):
s = s.values(**{key: value})
self.conn.execute(s).close()
def version_recluster(self, serial, cluster):
def version_recluster(self, serial, cluster,
update_statistics_ancestors_depth=None):
"""Move the version into another cluster."""
props = self.version_get_properties(serial)
......@@ -795,15 +810,17 @@ class Node(DBWorker):
return
mtime = time()
self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
self.statistics_update_ancestors(node, 1, size, mtime, cluster)
self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
update_statistics_ancestors_depth)
self.statistics_update_ancestors(node, 1, size, mtime, cluster,
update_statistics_ancestors_depth)
s = self.versions.update()
s = s.where(self.versions.c.serial == serial)
s = s.values(cluster=cluster)
self.conn.execute(s).close()
def version_remove(self, serial):
def version_remove(self, serial, update_statistics_ancestors_depth=None):
"""Remove the serial specified."""
props = self.version_get_properties(serial)
......@@ -815,7 +832,8 @@ class Node(DBWorker):
cluster = props[CLUSTER]
mtime = time()
self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
update_statistics_ancestors_depth)
s = self.versions.delete().where(self.versions.c.serial == serial)
self.conn.execute(s).close()
......
......@@ -336,7 +336,8 @@ class ModularBackend(BaseBackend):
if user != account:
raise NotAllowedError
path, node = self._lookup_account(account, True)
self._put_metadata(user, node, domain, meta, replace)
self._put_metadata(user, node, domain, meta, replace,
update_statistics_ancestors_depth=-1)
@backend_method
def get_account_groups(self, user, account):
......@@ -409,7 +410,8 @@ class ModularBackend(BaseBackend):
raise AccountExists('Account already exists')
if policy:
self._check_policy(policy, is_account_policy=True)
node = self._put_path(user, self.ROOTNODE, account)
node = self._put_path(user, self.ROOTNODE, account,
update_statistics_ancestors_depth=-1)
self._put_policy(node, policy, True, is_account_policy=True)
@backend_method
......@@ -422,7 +424,8 @@ class ModularBackend(BaseBackend):
node = self.node.node_lookup(account)
if node is None:
return
if not self.node.node_remove(node):
if not self.node.node_remove(node,
update_statistics_ancestors_depth=-1):
raise AccountNotEmpty('Account is not empty')
self.permissions.group_destroy(account)
......@@ -517,12 +520,14 @@ class ModularBackend(BaseBackend):
raise NotAllowedError
path, node = self._lookup_container(account, container)
src_version_id, dest_version_id = self._put_metadata(
user, node, domain, meta, replace)
user, node, domain, meta, replace,
update_statistics_ancestors_depth=0)
if src_version_id is not None:
versioning = self._get_policy(
node, is_account_policy=False)['versioning']
if versioning != 'auto':
self.node.version_remove(src_version_id)
self.node.version_remove(src_version_id,
update_statistics_ancestors_depth=0)
@backend_method
def get_container_policy(self, user, account, container):
......@@ -568,7 +573,8 @@ class ModularBackend(BaseBackend):
self._check_policy(policy, is_account_policy=False)
path = '/'.join((account, container))
node = self._put_path(
user, self._lookup_account(account, True)[1], path)
user, self._lookup_account(account, True)[1], path,
update_statistics_ancestors_depth=-1)
self._put_policy(node, policy, True, is_account_policy=False)
@backend_method
......@@ -583,10 +589,12 @@ class ModularBackend(BaseBackend):
if until is not None:
hashes, size, serials = self.node.node_purge_children(
node, until, CLUSTER_HISTORY)
node, until, CLUSTER_HISTORY,
update_statistics_ancestors_depth=0)
for h in hashes:
self.store.map_delete(h)
self.node.node_purge_children(node, until, CLUSTER_DELETED)
self.node.node_purge_children(node, until, CLUSTER_DELETED,
update_statistics_ancestors_depth=0)
if not self.free_versioning:
self._report_size_change(
user, account, -size, {
......@@ -601,11 +609,13 @@ class ModularBackend(BaseBackend):
if self._get_statistics(node)[0] > 0:
raise ContainerNotEmpty('Container is not empty')
hashes, size, serials = self.node.node_purge_children(
node, inf, CLUSTER_HISTORY)
node, inf, CLUSTER_HISTORY,
update_statistics_ancestors_depth=0)
for h in hashes:
self.store.map_delete(h)
self.node.node_purge_children(node, inf, CLUSTER_DELETED)
self.node.node_remove(node)
self.node.node_purge_children(node, inf, CLUSTER_DELETED,
update_statistics_ancestors_depth=0)
self.node.node_remove(node, update_statistics_ancestors_depth=0)
if not self.free_versioning:
self._report_size_change(
user, account, -size, {
......@@ -621,9 +631,13 @@ class ModularBackend(BaseBackend):
for t in src_names:
path = '/'.join((account, container, t[0]))
node = t[2]
src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
src_version_id, dest_version_id = self._put_version_duplicate(
user, node, size=0, type='', hash=None, checksum='',
cluster=CLUSTER_DELETED,
update_statistics_ancestors_depth=1)
del_size = self._apply_versioning(
account, container, src_version_id)
account, container, src_version_id,
update_statistics_ancestors_depth=1)
self._report_size_change(
user, account, -del_size, {
'action': 'object delete',
......@@ -815,8 +829,10 @@ class ModularBackend(BaseBackend):
self._can_write(user, account, container, name)
path, node = self._lookup_object(account, container, name)
src_version_id, dest_version_id = self._put_metadata(
user, node, domain, meta, replace)
self._apply_versioning(account, container, src_version_id)
user, node, domain, meta, replace,
update_statistics_ancestors_depth=1)
self._apply_versioning(account, container, src_version_id,
update_statistics_ancestors_depth=1)
return dest_version_id
@backend_method
......@@ -905,15 +921,20 @@ class ModularBackend(BaseBackend):
path, node = self._put_object_node(
container_path, container_node, name)
pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
pre_version_id, dest_version_id = self._put_version_duplicate(
user, node, src_node=src_node, size=size, type=type, hash=hash,
checksum=checksum, is_copy=is_copy,
update_statistics_ancestors_depth=1)
# Handle meta.
if src_version_id is None:
src_version_id = pre_version_id
self._put_metadata_duplicate(
src_version_id, dest_version_id, domain, meta, replace_meta)
src_version_id, dest_version_id, domain, meta, replace_meta,
update_statistics_ancestors_depth=1)
del_size = self._apply_versioning(account, container, pre_version_id)
del_size = self._apply_versioning(account, container, pre_version_id,
update_statistics_ancestors_depth=1)
size_delta = size - del_size
if size_delta > 0:
# Check account quota.
......@@ -1065,18 +1086,21 @@ class ModularBackend(BaseBackend):
hashes = []
size = 0
serials = []
h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
update_statistics_ancestors_depth=1)
hashes += h
size += s
serials += v
h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
update_statistics_ancestors_depth=1)
hashes += h
if not self.free_versioning:
size += s
serials += v
for h in hashes:
self.store.map_delete(h)
self.node.node_purge(node, until, CLUSTER_DELETED)
self.node.node_purge(node, until, CLUSTER_DELETED,
update_statistics_ancestors_depth=1)
try:
props = self._get_version(node)
except NameError:
......@@ -1091,8 +1115,11 @@ class ModularBackend(BaseBackend):
return
path, node = self._lookup_object(account, container, name)
src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
del_size = self._apply_versioning(account, container, src_version_id)
src_version_id, dest_version_id = self._put_version_duplicate(
user, node, size=0, type='', hash=None, checksum='',
cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
del_size = self._apply_versioning(account, container, src_version_id,
update_statistics_ancestors_depth=1)
self._report_size_change(user, account, -del_size,
{'action': 'object delete', 'path': path,
'versions': ','.join([str(dest_version_id)])})
......@@ -1107,9 +1134,13 @@ class ModularBackend(BaseBackend):
for t in src_names:
path = '/'.join((account, container, t[0]))
node = t[2]
src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
src_version_id, dest_version_id = self._put_version_duplicate(
user, node, size=0, type='', hash=None, checksum='',
cluster=CLUSTER_DELETED,
update_statistics_ancestors_depth=1)
del_size = self._apply_versioning(
account, container, src_version_id)
account, container, src_version_id,
update_statistics_ancestors_depth=1)
self._report_size_change(user, account, -del_size,
{'action': 'object delete',
'path': path,
......@@ -1199,20 +1230,24 @@ class ModularBackend(BaseBackend):
path = '/'.join((path, name))
node = self.node.node_lookup(path)
if node is None:
node = self.node.node_create(parent, path)
node = self.node.node_create(parent, path,
update_statistics_ancestors_depth=1)
return path, node
def _put_path(self, user, parent, path):
def _put_path(self, user, parent, path,
update_statistics_ancestors_depth=None):
node = self.node.node_create(parent, path)
self.node.version_create(node, None, 0, '', None, user,
self._generate_uuid(), '', CLUSTER_NORMAL)
self._generate_uuid(), '', CLUSTER_NORMAL,
update_statistics_ancestors_depth)
return node
def _lookup_account(self, account, create=True):
node = self.node.node_lookup(account)
if node is None and create:
node = self._put_path(
account, self.ROOTNODE, account) # User is account.
account, self.ROOTNODE, account,
update_statistics_ancestors_depth=-1) # User is account.
return account, node
def _lookup_container(self, account, container):
......@@ -1273,7 +1308,10 @@ class ModularBackend(BaseBackend):
def _get_versions(self, nodes):
return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
def _put_version_duplicate(self, user, node, src_node=None, size=None,
type=None, hash=None, checksum=None,
cluster=CLUSTER_NORMAL, is_copy=False,
update_statistics_ancestors_depth=None):
"""Create a new version of the node."""
props = self.node.version_lookup(
......@@ -1308,9 +1346,12 @@ class ModularBackend(BaseBackend):
if props is not None:
pre_version_id = props[self.SERIAL]
if pre_version_id is not None:
self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
update_statistics_ancestors_depth)
dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
dest_version_id, mtime = self.node.version_create(
node, hash, size, type, src_version_id, user, uuid, checksum,
cluster, update_statistics_ancestors_depth)
return pre_version_id, dest_version_id
def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
......@@ -1326,11 +1367,13 @@ class ModularBackend(BaseBackend):
self.node.attribute_set(dest_version_id, domain, ((
k, v) for k, v in meta.iteritems()))
def _put_metadata(self, user, node, domain, meta, replace=False):
def _put_metadata(self, user, node, domain, meta, replace=False,
update_statistics_ancestors_depth=None):
"""Create a new version and store metadata."""
src_version_id, dest_version_id = self._put_version_duplicate(
user, node)
user, node,
update_statistics_ancestors_depth=update_statistics_ancestors_depth)
self._put_metadata_duplicate(
src_version_id, dest_version_id, domain, meta, replace)
return src_version_id, dest_version_id
......@@ -1447,7 +1490,8 @@ class ModularBackend(BaseBackend):
policy.update(self.node.policy_get(node))
return policy
def _apply_versioning(self, account, container, version_id):
def _apply_versioning(self, account, container, version_id,
update_statistics_ancestors_depth=None):
"""Delete the provided version if such is the policy.
Return size of object removed.
"""
......@@ -1458,7 +1502,8 @@ class ModularBackend(BaseBackend):
versioning = self._get_policy(
node, is_account_policy=False)['versioning']
if versioning != 'auto':
hash, size = self.node.version_remove(version_id)
hash, size = self.node.version_remove(
version_id, update_statistics_ancestors_depth)
self.store.map_delete(hash)
return size
elif self.free_versioning:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment