Commit ad88bfaa authored by Sofia Papagiannaki's avatar Sofia Papagiannaki
Browse files

pithos: Utilize astakosclient for quota handling

Refs: #3625, #3650
parent 97d01204
......@@ -69,23 +69,22 @@ class Command(NoArgsCommand):
if options['fix']:
to_accept = b.quotaholder_serials.lookup(pending_commissions)
to_reject = list(set(pending_commissions) - set(to_accept))
response = b.quotaholder.resolve_commissions(
token=b.quotaholder_token,
accept_serials=to_accept,
reject_serials=[])
reject_serials=to_reject
)
accepted = response['accepted']
self.stdout.write("Accepted commissions: %s\n" % accepted)
b.quotaholder_serials.delete_many(to_accept)
self.stdout.write("Deleted serials: %s\n" % to_accept)
to_reject = list(set(pending_commissions) - set(to_accept))
response = b.quotaholder.resolve_commissions(
token=b.quotaholder_token,
accept_serials=[],
reject_serials=to_reject)
rejected = response['rejected']
failed = response['failed']
self.stdout.write("Accepted commissions: %s\n" % accepted)
self.stdout.write("Rejected commissions: %s\n" % rejected)
self.stdout.write("Failed commissions:\n")
for i in failed:
self.stdout.write('%s\n' % i)
b.quotaholder_serials.delete_many(accepted)
except Exception, e:
logger.exception(e)
raise CommandError(e)
......
......@@ -33,116 +33,16 @@
from django.core.management.base import NoArgsCommand, CommandError
from collections import namedtuple
from optparse import make_option
from sqlalchemy import func
from sqlalchemy.sql import select, and_, or_
from pithos.api.util import get_backend
from pithos.backends.modular import (
CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED
)
clusters = (CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED)
Usage = namedtuple('Usage', ('node', 'path', 'size', 'cluster'))
GetQuota = namedtuple('GetQuota', ('entity', 'resource', 'key'))
class ResetHoldingPayload(namedtuple('ResetHoldingPayload', (
'entity', 'resource', 'key', 'imported', 'exported', 'returned', 'released'
))):
__slots__ = ()
def __str__(self):
return '%s: %s' % (self.entity, self.imported)
from pithos.backends.modular import CLUSTER_NORMAL, DEFAULT_SOURCE
from synnefo.webproject.management import utils
from astakosclient.errors import AstakosClientException
ENTITY_KEY = '1'
backend = get_backend()
table = {}
table['nodes'] = backend.node.nodes
table['versions'] = backend.node.versions
table['policy'] = backend.node.policy
conn = backend.node.conn
def _retrieve_user_nodes(users=()):
s = select([table['nodes'].c.path, table['nodes'].c.node])
s = s.where(and_(table['nodes'].c.node != 0,
table['nodes'].c.parent == 0))
if users:
s = s.where(table['nodes'].c.path.in_(users))
return conn.execute(s).fetchall()
def _compute_usage(nodes):
usage = []
append = usage.append
for path, node in nodes:
select_children = select(
[table['nodes'].c.node]).where(table['nodes'].c.parent == node)
select_descendants = select([table['nodes'].c.node]).where(
or_(table['nodes'].c.parent.in_(select_children),
table['nodes'].c.node.in_(select_children)))
s = select([table['versions'].c.cluster,
func.sum(table['versions'].c.size)])
s = s.group_by(table['versions'].c.cluster)
s = s.where(table['nodes'].c.node == table['versions'].c.node)
s = s.where(table['nodes'].c.node.in_(select_descendants))
s = s.where(table['versions'].c.cluster == CLUSTER_NORMAL)
d2 = dict(conn.execute(s).fetchall())
try:
size = d2[CLUSTER_NORMAL]
except KeyError:
size = 0
append(Usage(
node=node,
path=path,
size=size,
cluster=CLUSTER_NORMAL))
return usage
def _get_quotaholder_usage(usage):
payload = []
append = payload.append
[append(GetQuota(
entity=item.path,
resource='pithos+.diskspace',
key=ENTITY_KEY
)) for item in usage]
result = backend.quotaholder.get_quota(
context={}, clientkey='pithos', get_quota=payload
)
return dict((entity, imported - exported + returned - released) for (
entity, resource, quantity, capacity, import_limit, export_limit,
imported, exported, returned, released, flags
) in result)
def _prepare_reset_holding(usage, only_diverging=False):
"""Verify usage and set quotaholder user usage"""
reset_holding = []
append = reset_holding.append
quotaholder_usage = {}
if only_diverging:
quotaholder_usage = _get_quotaholder_usage(usage)
for item in(usage):
if only_diverging and quotaholder_usage.get(item.path) == item.size:
continue
if item.cluster == CLUSTER_NORMAL:
append(ResetHoldingPayload(
entity=item.path,
resource='pithos+.diskspace',
key=ENTITY_KEY,
imported=item.size,
exported=0,
returned=0,
released=0))
return reset_holding
class Command(NoArgsCommand):
help = "List and reset pithos usage"
......@@ -169,49 +69,97 @@ class Command(NoArgsCommand):
metavar='USER_UUID',
help=("Specify which users --list or --reset applies."
"This option can be repeated several times."
"If no user is specified --list or --reset will be applied globally.")),
"If no user is specified --list or --reset "
"will be applied globally.")),
make_option(
"--no-headers",
dest="headers",
action="store_false",
default=True,
help="Do not display headers"),
make_option(
"--output-format",
dest="output_format",
metavar="[pretty, csv, json]",
default="pretty",
choices=["pretty", "csv", "json"],
help="Select the output format: pretty [the default], tabs"
" [tab-separated output], csv [comma-separated output]"),
)
def handle_noargs(self, **options):
try:
user_nodes = _retrieve_user_nodes(options['users'])
if not user_nodes:
account_nodes = backend.node.node_accounts(options['users'])
if not account_nodes:
raise CommandError('No users found.')
usage = _compute_usage(user_nodes)
reset_holding = _prepare_reset_holding(
usage, only_diverging=options['diverging']
db_usage = {}
for path, node in account_nodes:
size = backend.node.node_account_usage(node, CLUSTER_NORMAL)
db_usage[path] = size or 0
result = backend.quotaholder.service_get_quotas(
backend.quotaholder_token,
)
if options['list']:
print '\n'.join([str(i) for i in reset_holding])
qh_usage = {}
resource = 'pithos.diskspace'
pending_list = []
for uuid, d in result.iteritems():
pithos_dict = d.get(DEFAULT_SOURCE, {}).get(resource, {})
pending = pithos_dict.get('pending', 0)
if pending != 0:
pending_list.append(pending)
continue
qh_usage[uuid] = pithos_dict.get('usage', 0)
if pending_list:
self.stdout.write((
"There are pending commissions for: %s.\n"
"Reconcile commissions and retry"
"in order to list/reset their quota.\n"
) % pending_list)
headers = ['uuid', 'usage']
table = []
provisions = []
for uuid in db_usage.keys():
try:
delta = db_usage[uuid] - qh_usage[uuid]
except KeyError:
self.stdout.write('Unknown holder: %s\n' % uuid)
continue
else:
if options['diverging'] and delta == 0:
continue
table.append((uuid, db_usage[uuid]))
provisions.append({"holder": uuid,
"source": DEFAULT_SOURCE,
"resource": resource,
"quantity": delta
})
if options['reset']:
if not backend.quotaholder_enabled:
raise CommandError('Quotaholder component is not enabled')
if not backend.quotaholder_url:
raise CommandError('Quotaholder url is not set')
if not backend.quotaholder_token:
raise CommandError('Quotaholder token is not set')
while True:
result = backend.quotaholder.reset_holding(
context={},
clientkey='pithos',
reset_holding=reset_holding)
if not result:
break
missing_entities = [reset_holding[x].entity for x in result]
self.stdout.write(
'Unknown quotaholder users: %s\n' %
', '.join(missing_entities))
m = 'Retrying sending quota usage for the rest...\n'
self.stdout.write(m)
missing_indexes = set(result)
reset_holding = [x for i, x in enumerate(reset_holding)
if i not in missing_indexes]
if not provisions:
raise CommandError('Nothing to reset')
request = {}
request['force'] = True
request['auto_accept'] = True
request['provisions'] = provisions
try:
serial = backend.quotaholder.issue_commission(
backend.quotaholder_token, request
)
except AstakosClientException, e:
self.stdout.write(e.details)
else:
backend.quotaholder_serials.insert_many([serial])
elif options['list'] and table:
output_format = options["output_format"]
if output_format != "json" and not options["headers"]:
headers = None
utils.pprint_table(self.stdout, table, headers, output_format)
finally:
backend.close()
......@@ -56,7 +56,7 @@ class SwissArmy():
self.backend.close()
def existing_accounts(self):
return self.backend.node.node_accounts()
return sorted([path for path, _ in self.backend.node.node_accounts()])
def duplicate_accounts(self):
accounts = self.existing_accounts()
......
......@@ -449,11 +449,32 @@ class Node(DBWorker):
self.conn.execute(s).close()
return True
def node_accounts(self):
s = select([self.nodes.c.path])
s = s.where(and_(self.nodes.c.node != 0, self.nodes.c.parent == 0))
account_nodes = self.conn.execute(s).fetchall()
return sorted(i[0] for i in account_nodes)
def node_accounts(self, accounts=()):
s = select([self.nodes.c.path, self.nodes.c.node])
s = s.where(and_(self.nodes.c.node != 0,
self.nodes.c.parent == 0))
if accounts:
s = s.where(self.nodes.c.path.in_(accounts))
r = self.conn.execute(s)
rows = r.fetchall()
r.close()
return rows
def node_account_usage(self, account_node, cluster):
select_children = select(
[self.nodes.c.node]).where(self.nodes.c.parent == account_node)
select_descendants = select([self.nodes.c.node]).where(
or_(self.nodes.c.parent.in_(select_children),
self.nodes.c.node.in_(select_children)))
s = select([func.sum(self.versions.c.size)])
s = s.group_by(self.versions.c.cluster)
s = s.where(self.nodes.c.node == self.versions.c.node)
s = s.where(self.nodes.c.node.in_(select_descendants))
s = s.where(self.versions.c.cluster == cluster)
r = self.conn.execute(s)
usage = r.fetchone()[0]
r.close()
return usage
def policy_get(self, node):
s = select([self.policy.c.key, self.policy.c.value],
......
......@@ -384,6 +384,30 @@ class Node(DBWorker):
self.execute(q, (node,))
return True
def node_accounts(self, accounts=()):
q = ("select path, node from nodes where node != 0 and parent == 0 ")
args = []
if accounts:
placeholders = ','.join('?' for a in accounts)
q += ("and path in (%s)" % placeholders)
args += accounts
return self.execute(q, args).fetchall()
def node_account_usage(self, account_node, cluster):
select_children = ("select node from nodes where parent = ?")
select_descedents = ("select node from nodes "
"where parent in (%s) "
"or node in (%s) ") % ((select_children,)*2)
args = [account_node]*2
q = ("select sum(v.size) from versions v, nodes n "
"where v.node = n.node "
"and n.node in (%s) "
"and v.cluster = ?") % select_descedents
args += [cluster]
self.execute(q, args)
return self.fetchone()[0]
def policy_get(self, node):
q = "select key, value from policy where node = ?"
self.execute(q, (node,))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment