Commit f415fc33 authored by Sofia Papagiannaki's avatar Sofia Papagiannaki
Browse files

pithos: Fix reconcile-resource-pithos command

* Improved performance (reduce interaction with the database)
* Fix command when `userid` option is provided

Conflicts:
	snf-pithos-app/pithos/api/management/commands/reconcile-resources-pithos.py
	snf-pithos-backend/pithos/backends/lib/sqlite/node.py
parent 5566c005
......@@ -31,17 +31,17 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from django.core.management.base import NoArgsCommand, CommandError
from django.core.management.base import NoArgsCommand
from optparse import make_option
from pithos.api.util import get_backend
from pithos.api.resources import resources
from pithos.backends.modular import CLUSTER_NORMAL, DEFAULT_SOURCE
from pithos.backends.modular import DEFAULT_SOURCE
from synnefo.webproject.management import utils
from astakosclient.errors import QuotaLimit
from astakosclient.errors import QuotaLimit, NotFound
backend = get_backend()
......@@ -70,21 +70,26 @@ class Command(NoArgsCommand):
def handle_noargs(self, **options):
try:
qh_result = backend.astakosclient.service_get_quotas(
backend.service_token)
userid = options['userid']
users = (options['userid'],) if options['userid'] else None
account_nodes = backend.node.node_accounts(users)
if not account_nodes:
raise CommandError('No users found.')
# Get holding from Pithos DB
db_usage = backend.node.node_account_usage(userid)
db_usage = {}
for path, node in account_nodes:
size = backend.node.node_account_usage(node, CLUSTER_NORMAL)
db_usage[path] = size or 0
users = set(db_usage.keys())
if userid and userid not in users:
self.stdout.write("User '%s' does not exist in DB!\n" % userid)
return
users = set(qh_result.keys())
users.update(db_usage.keys())
# Get holding from Quotaholder
try:
qh_result = backend.astakosclient.service_get_quotas(
backend.service_token, userid)
except NotFound:
self.stdout.write(
"User '%s' does not exist in Quotaholder!\n" % userid)
return
users.update(qh_result.keys())
pending_exists = False
unknown_user_exists = False
......@@ -95,8 +100,7 @@ class Command(NoArgsCommand):
qh_all = qh_result[uuid]
except KeyError:
self.stdout.write(
"User '%s' does not exist in Quotaholder!\n" % uuid
)
"User '%s' does not exist in Quotaholder!\n" % uuid)
unknown_user_exists = True
continue
else:
......@@ -114,8 +118,7 @@ class Command(NoArgsCommand):
self.stdout.write(
"Pending commission. "
"User '%s', resource '%s'.\n" %
(uuid, resource)
)
(uuid, resource))
pending_exists = True
continue
......@@ -146,8 +149,7 @@ class Command(NoArgsCommand):
if pending_exists:
self.stdout.write(
"Found pending commissions. Run 'snf-manage"
" reconcile-commissions-pithos'\n"
)
" reconcile-commissions-pithos'\n")
elif not (unsynced or unknown_user_exists):
self.stdout.write("Everything in sync.\n")
finally:
......
......@@ -480,20 +480,32 @@ class Node(DBWorker):
r.close()
return dict(rows)
def node_account_usage(self, account_node, cluster):
select_children = select(
[self.nodes.c.node]).where(self.nodes.c.parent == account_node)
select_descendants = select([self.nodes.c.node]).where(
or_(self.nodes.c.parent.in_(select_children),
self.nodes.c.node.in_(select_children)))
s = select([func.sum(self.versions.c.size)])
s = s.where(self.nodes.c.node == self.versions.c.node)
s = s.where(self.nodes.c.node.in_(select_descendants))
def node_account_usage(self, account=None, cluster=0):
"""Return usage for a specific account.
Keyword arguments:
account -- (default None: list usage for all the accounts)
cluster -- list current, history or deleted usage (default 0: normal)
"""
n1 = self.nodes.alias('n1')
n2 = self.nodes.alias('n2')
n3 = self.nodes.alias('n3')
s = select([n3.c.path, func.sum(self.versions.c.size)])
s = s.where(n1.c.node == self.versions.c.node)
s = s.where(self.versions.c.cluster == cluster)
s = s.where(n1.c.parent == n2.c.node)
s = s.where(n2.c.parent == n3.c.node)
s = s.where(n3.c.parent == 0)
s = s.where(n3.c.node != 0)
if account:
s = s.where(n3.c.path == account)
s = s.group_by(n3.c.path)
r = self.conn.execute(s)
usage = r.fetchone()[0]
usage = r.fetchall()
r.close()
return usage
return dict(usage)
def policy_get(self, node):
s = select([self.policy.c.key, self.policy.c.value],
......
......@@ -302,7 +302,7 @@ class Node(DBWorker):
for r in self.fetchall():
hashes += [r[0]]
serials += [r[1]]
q = ("delete from versions "
"where node in (select node "
"from nodes "
......@@ -402,20 +402,31 @@ class Node(DBWorker):
)
return dict(self.execute(q).fetchall())
def node_account_usage(self, account_node, cluster):
select_children = ("select node from nodes where parent = ?")
select_descedents = ("select node from nodes "
"where parent in (%s) "
"or node in (%s) ") % ((select_children,)*2)
args = [account_node]*2
q = ("select sum(v.size) from versions v, nodes n "
"where v.node = n.node "
"and n.node in (%s) "
"and v.cluster = ?") % select_descedents
args += [cluster]
def node_account_usage(self, account=None, cluster=0):
"""Return usage for a specific account.
Keyword arguments:
account -- (default None: list usage for all the accounts)
cluster -- list current, history or deleted usage (default 0: normal)
"""
q = ("select n3.path, sum(v.size) from "
"versions v, nodes n1, nodes n2, nodes n3 "
"where v.node = n1.node "
"and v.cluster = ? "
"and n1.parent = n2.node "
"and n2.parent = n3.node "
"and n3.parent = 0 "
"and n3.node != 0 ")
args = [cluster]
if account:
q += ("and n3.path = ? ")
args += [account]
q += ("group by n3.path")
print '###', q, args
self.execute(q, args)
return self.fetchone()[0]
return dict(self.fetchall())
def policy_get(self, node):
q = "select key, value from policy where node = ?"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment