Commit f29c6c9e authored by Antony Chazapis's avatar Antony Chazapis

Send size deltas to the queue.

Refs #1688
parent 71a5aa45
......@@ -267,7 +267,7 @@ class Node(DBWorker):
def node_purge_children(self, parent, before=inf, cluster=0):
"""Delete all versions with the specified
parent and cluster, and return
the hashes of versions deleted.
the hashes and size of versions deleted.
Clears out nodes with no remaining versions.
"""
#update statistics
......@@ -284,7 +284,7 @@ class Node(DBWorker):
row = r.fetchone()
r.close()
if not row:
return ()
return (), 0
nr, size = row[0], -row[1] if row[1] else 0
mtime = time()
self.statistics_update(parent, -nr, size, mtime, cluster)
......@@ -312,12 +312,12 @@ class Node(DBWorker):
s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
self.conn.execute(s).close()
return hashes
return hashes, size
def node_purge(self, node, before=inf, cluster=0):
"""Delete all versions with the specified
node and cluster, and return
the hashes of versions deleted.
the hashes and size of versions deleted.
Clears out the node if it has no remaining versions.
"""
......@@ -334,7 +334,7 @@ class Node(DBWorker):
nr, size = row[0], row[1]
r.close()
if not nr:
return ()
return (), 0
mtime = time()
self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
......@@ -360,7 +360,7 @@ class Node(DBWorker):
s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
self.conn.execute(s).close()
return hashes
return hashes, size
def node_remove(self, node):
"""Remove the node specified.
......@@ -644,7 +644,7 @@ class Node(DBWorker):
s = self.versions.delete().where(self.versions.c.serial == serial)
self.conn.execute(s).close()
return hash
return hash, size
def attribute_get(self, serial, domain, keys=()):
"""Return a list of (key, value) pairs of the version specified by serial.
......
......@@ -235,7 +235,7 @@ class Node(DBWorker):
def node_purge_children(self, parent, before=inf, cluster=0):
"""Delete all versions with the specified
parent and cluster, and return
the hashes of versions deleted.
the hashes and size of versions deleted.
Clears out nodes with no remaining versions.
"""
......@@ -250,7 +250,7 @@ class Node(DBWorker):
execute(q, args)
nr, size = self.fetchone()
if not nr:
return ()
return (), 0
mtime = time()
self.statistics_update(parent, -nr, -size, mtime, cluster)
self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
......@@ -277,12 +277,12 @@ class Node(DBWorker):
"where node = n.node) = 0 "
"and parent = ?)")
execute(q, (parent,))
return hashes
return hashes, size
def node_purge(self, node, before=inf, cluster=0):
"""Delete all versions with the specified
node and cluster, and return
the hashes of versions deleted.
the hashes and size of versions deleted.
Clears out the node if it has no remaining versions.
"""
......@@ -295,7 +295,7 @@ class Node(DBWorker):
execute(q, args)
nr, size = self.fetchone()
if not nr:
return ()
return (), 0
mtime = time()
self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
......@@ -317,7 +317,7 @@ class Node(DBWorker):
"where node = n.node) = 0 "
"and node = ?)")
execute(q, (node,))
return hashes
return hashes, size
def node_remove(self, node):
"""Remove the node specified.
......@@ -549,7 +549,7 @@ class Node(DBWorker):
q = "delete from versions where serial = ?"
self.execute(q, (serial,))
return hash
return hash, size
def attribute_get(self, serial, domain, keys=()):
"""Return a list of (key, value) pairs of the version specified by serial.
......
......@@ -385,21 +385,21 @@ class ModularBackend(BaseBackend):
path, node = self._lookup_container(account, container)
if until is not None:
hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
for h in hashes:
self.store.map_delete(h)
self.node.node_purge_children(node, until, CLUSTER_DELETED)
self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
self._report_size_change(user, account, -size, {'action': 'container purge'})
return
if self._get_statistics(node)[0] > 0:
raise IndexError('Container is not empty')
hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
for h in hashes:
self.store.map_delete(h)
self.node.node_purge_children(node, inf, CLUSTER_DELETED)
self.node.node_remove(node)
self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
self._report_size_change(user, account, -size, {'action': 'container delete'})
@backend_method
def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
......@@ -552,11 +552,8 @@ class ModularBackend(BaseBackend):
pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
# Check quota.
versioning = self._get_policy(container_node)['versioning']
if versioning != 'auto':
size_delta = size - 0 # TODO: Get previous size.
else:
size_delta = size
del_size = self._apply_versioning(account, container, pre_version_id)
size_delta = size - del_size
if size_delta > 0:
account_quota = long(self._get_policy(account_node)['quota'])
container_quota = long(self._get_policy(container_node)['quota'])
......@@ -564,10 +561,10 @@ class ModularBackend(BaseBackend):
(container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
# This must be executed in a transaction, so the version is never created if it fails.
raise QuotaError
self._report_size_change(user, account, size_delta, {'action': 'object update'})
if permissions is not None:
self.permissions.access_set(path, permissions)
self._apply_versioning(account, container, pre_version_id)
return pre_version_id, dest_version_id
@backend_method
......@@ -589,7 +586,6 @@ class ModularBackend(BaseBackend):
pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
self.store.map_put(hash, map)
self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
return dest_version_id
def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
......@@ -612,7 +608,6 @@ class ModularBackend(BaseBackend):
logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
return dest_version_id
@backend_method
......@@ -625,7 +620,6 @@ class ModularBackend(BaseBackend):
dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
self._delete_object(user, src_account, src_container, src_name)
self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
return dest_version_id
def _delete_object(self, user, account, container, name, until=None):
......@@ -637,8 +631,14 @@ class ModularBackend(BaseBackend):
node = self.node.node_lookup(path)
if node is None:
return
hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
hashes = []
size = 0
h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
hashes += h
size += s
h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
hashes += h
size += s
for h in hashes:
self.store.map_delete(h)
self.node.node_purge(node, until, CLUSTER_DELETED)
......@@ -646,12 +646,14 @@ class ModularBackend(BaseBackend):
props = self._get_version(node)
except NameError:
self.permissions.access_clear(path)
self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
self._report_size_change(user, account, -size, {'action': 'object purge'})
return
path, node = self._lookup_object(account, container, name)
src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
self._apply_versioning(account, container, src_version_id)
del_size = self._apply_versioning(account, container, src_version_id)
if del_size:
self._report_size_change(user, account, -del_size, {'action': 'object delete'})
self.permissions.access_clear(path)
@backend_method
......@@ -873,6 +875,15 @@ class ModularBackend(BaseBackend):
start, limit = self._list_limits([x[0] for x in objects], marker, limit)
return objects[start:start + limit]
# Reporting functions.
def _report_size_change(self, user, account, size, details={}):
logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
account_node = self._lookup_account(account, True)[1]
total = self._get_statistics(account_node)[1]
details.update({'user': user, 'total': total})
self.queue.send(account, 'diskspace', size, details)
# Policy functions.
def _check_policy(self, policy):
......@@ -903,14 +914,19 @@ class ModularBackend(BaseBackend):
return policy
def _apply_versioning(self, account, container, version_id):
"""Delete the provided version if such is the policy.
Return size of object removed.
"""
if version_id is None:
return
return 0
path, node = self._lookup_container(account, container)
versioning = self._get_policy(node)['versioning']
if versioning != 'auto':
hash = self.node.version_remove(version_id)
hash, size = self.node.version_remove(version_id)
self.store.map_delete(hash)
self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
return size
return 0
# Access control functions.
......
......@@ -93,7 +93,7 @@ def queue_start(conn):
channel.start_consuming()
class Receipt(object):
def __init__(self, client, user, resource, value, details=None):
def __init__(self, client, user, resource, value, details={}):
self.eventVersion = 1
self.id = str(uuid.uuid4())
self.timestamp = int(time() * 1000)
......@@ -101,8 +101,7 @@ class Receipt(object):
self.userId = user
self.resource = resource
self.value = value
if details:
self.details = details
self.details = details
def format(self):
return self.__dict__
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment