Commit 0b752cff authored by Sofia Papagiannaki's avatar Sofia Papagiannaki
Browse files

Fix pithos purging accounting

Refs: #3442
Refs: #1733

Pithos has two different version accounting
policies:
* free: do not include object history version size 
to the total account usage
* debit: include object history version size to 
the total account usage
This configurable by
PITHOS_BACKEND_FREE_VERSIONING setting.

Pithos delete container function purges 
the history versions of the including objects.
and should send to the quota holder component
the released disk space *only*  
in the case of debit version accounting policy.

Also the purging function used to return 
the negative of the computed released disk space 
but to the quotaholder component was sent 
its negative.
So, erroneously, the released disk space was
added to the total quota usage.

Provide tests for the following distinct cases:
* free version accounting policy (PITHOS_BACKEND_FREE_VERSIONING=True) 
and `auto` container versioning policy
* free version accounting policy
(PITHOS_BACKEND_FREE_VERSIONING=True) 
and `none` container versioning policy
* debit version accounting policy
(PITHOS_BACKEND_FREE_VERSIONING=False) 
and `auto` container versioning policy
* debit version accounting policy(PITHOS_BACKEND_FREE_VERSIONING=False) 
and `none` container versioning policy
parent 467d5b8f
......@@ -309,7 +309,7 @@ class Node(DBWorker):
def node_purge_children(self, parent, before=inf, cluster=0):
"""Delete all versions with the specified
parent and cluster, and return
the hashes and size of versions deleted.
the hashes, the total size and the serials of versions deleted.
Clears out nodes with no remaining versions.
"""
#update statistics
......@@ -317,20 +317,21 @@ class Node(DBWorker):
self.nodes.c.parent == parent)
where_clause = and_(self.versions.c.node.in_(c1),
self.versions.c.cluster == cluster)
if before != inf:
where_clause = and_(where_clause,
self.versions.c.mtime <= before)
s = select([func.count(self.versions.c.serial),
func.sum(self.versions.c.size)])
s = s.where(where_clause)
if before != inf:
s = s.where(self.versions.c.mtime <= before)
r = self.conn.execute(s)
row = r.fetchone()
r.close()
if not row:
return (), 0, ()
nr, size = row[0], -row[1] if row[1] else 0
nr, size = row[0], row[1] if row[1] else 0
mtime = time()
self.statistics_update(parent, -nr, size, mtime, cluster)
self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
self.statistics_update(parent, -nr, -size, mtime, cluster)
self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
s = select([self.versions.c.hash, self.versions.c.serial])
s = s.where(where_clause)
......@@ -373,9 +374,10 @@ class Node(DBWorker):
func.sum(self.versions.c.size)])
where_clause = and_(self.versions.c.node == node,
self.versions.c.cluster == cluster)
s = s.where(where_clause)
if before != inf:
s = s.where(self.versions.c.mtime <= before)
where_clause = and_(where_clause,
self.versions.c.mtime <= before)
s = s.where(where_clause)
r = self.conn.execute(s)
row = r.fetchone()
nr, size = row[0], row[1]
......@@ -405,9 +407,9 @@ class Node(DBWorker):
and_(self.nodes.c.node == node,
select([func.count(self.versions.c.serial)],
self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
r = self.conn.execute(s)
nodes = r.fetchall()
r.close()
rp= self.conn.execute(s)
nodes = [r[0] for r in rp.fetchall()]
rp.close()
if nodes:
s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
self.conn.execute(s).close()
......
......@@ -259,7 +259,7 @@ class Node(DBWorker):
def node_purge_children(self, parent, before=inf, cluster=0):
"""Delete all versions with the specified
parent and cluster, and return
the hashes and size of versions deleted.
the hashes, the size and the serials of versions deleted.
Clears out nodes with no remaining versions.
"""
......@@ -311,7 +311,7 @@ class Node(DBWorker):
def node_purge(self, node, before=inf, cluster=0):
"""Delete all versions with the specified
node and cluster, and return
the hashes and size of versions deleted.
the hashes, the size and the serials of versions deleted.
Clears out the node if it has no remaining versions.
"""
......
......@@ -538,9 +538,14 @@ class ModularBackend(BaseBackend):
for h in hashes:
self.store.map_delete(h)
self.node.node_purge_children(node, until, CLUSTER_DELETED)
self._report_size_change(user, account, -size,
{'action':'container purge', 'path': path,
'versions': ','.join(str(i) for i in serials)})
if not self.free_versioning:
self._report_size_change(
user, account, -size, {
'action':'container purge',
'path': path,
'versions': ','.join(str(i) for i in serials)
}
)
return
if not delimiter:
......@@ -552,10 +557,14 @@ class ModularBackend(BaseBackend):
self.store.map_delete(h)
self.node.node_purge_children(node, inf, CLUSTER_DELETED)
self.node.node_remove(node)
self._report_size_change(user, account, -size,
{'action': 'container delete',
'path': path,
'versions': ','.join(str(i) for i in serials)})
if not self.free_versioning:
self._report_size_change(
user, account, -size, {
'action':'container purge',
'path': path,
'versions': ','.join(str(i) for i in serials)
}
)
else:
# remove only contents
src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
......@@ -995,7 +1004,8 @@ class ModularBackend(BaseBackend):
serials += v
h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
hashes += h
size += s
if not self.free_versioning:
size += s
serials += v
for h in hashes:
self.store.map_delete(h)
......@@ -1004,9 +1014,13 @@ class ModularBackend(BaseBackend):
props = self._get_version(node)
except NameError:
self.permissions.access_clear(path)
self._report_size_change(user, account, -size,
{'action': 'object purge', 'path': path,
'versions': ','.join(str(i) for i in serials)})
self._report_size_change(
user, account, -size, {
'action': 'object purge',
'path': path,
'versions': ','.join(str(i) for i in serials)
}
)
return
path, node = self._lookup_object(account, container, name)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment