Commit f209d164 authored by Nanakos Chrysostomos's avatar Nanakos Chrysostomos
Browse files

Merge branch 'feature-pithos-performance-optimizations' into develop

parents 2c7388af af68dfe1
......@@ -702,17 +702,20 @@ def object_list(request, v_account, v_container):
if until is None:
name = '/'.join((v_account, v_container, ''))
name_idx = len(name)
objects_bulk = []
for x in request.backend.list_object_permissions(
request.user_uniq, v_account, v_container, prefix):
# filter out objects which are not under the container
if name != x[:name_idx]:
continue
object = x[name_idx:]
object_permissions[object] = \
request.backend.get_object_permissions(
request.user_uniq, v_account, v_container, object)
objects_bulk.append(x[name_idx:])
if len(objects_bulk) > 0:
object_permissions = \
request.backend.get_object_permissions_bulk(
request.user_uniq, v_account, v_container,
objects_bulk)
if request.user_uniq == v_account:
# Bring public information only if the request user
......
......@@ -42,6 +42,7 @@ from synnefo.lib import join_urls
import time as _time
import datetime
import django.utils.simplejson as json
class AccountHead(PithosAPITest):
def test_get_account_meta(self):
......@@ -81,6 +82,17 @@ class AccountHead(PithosAPITest):
usage)
def test_get_account_meta_until(self):
cnames = ['apples', 'bananas', 'kiwis']
# create containers
uploaded_bytes = 0
for cname in cnames:
self.create_container(cname)
# upload object
name, data, resp = self.upload_object(cname)
uploaded_bytes += len(data)
self.update_account_meta({'foo': 'bar'})
account_info = self.get_account_info()
......@@ -90,6 +102,15 @@ class AccountHead(PithosAPITest):
until = int(_time.mktime(t1.timetuple()))
_time.sleep(2)
# add containers
cnames = ['oranges', 'pears']
for cname in cnames:
self.create_container(cname)
# upload object
self.upload_object(cname)
self.update_account_meta({'quality': 'AAA'})
account_info = self.get_account_info()
......@@ -108,6 +129,9 @@ class AccountHead(PithosAPITest):
t = datetime.datetime.strptime(
account_info['X-Account-Until-Timestamp'], DATE_FORMATS[2])
self.assertTrue(int(_time.mktime(t1.timetuple())) <= until)
self.assertTrue('X-Account-Container-Count' in account_info)
self.assertEqual(int(account_info['X-Account-Container-Count']), 3)
self.assertTrue('X-Account-Bytes-Used' in account_info)
def test_get_account_meta_until_invalid_date(self):
self.update_account_meta({'quality': 'AAA'})
......@@ -135,6 +159,36 @@ class AccountGet(PithosAPITest):
self.assertEquals(containers,
['apples', 'bananas', 'kiwis', 'oranges', 'pears'])
def test_list_until(self):
account_info = self.get_account_info()
t = datetime.datetime.strptime(account_info['Last-Modified'],
DATE_FORMATS[2])
t1 = t + datetime.timedelta(seconds=1)
until = int(_time.mktime(t1.timetuple()))
_time.sleep(2)
self.create_container()
url = join_urls(self.pithos_path, self.user)
r = self.get('%s?until=%s' % (url, until))
self.assertEqual(r.status_code, 200)
containers = r.content.split('\n')
if '' in containers:
containers.remove('')
self.assertEqual(containers,
['apples', 'bananas', 'kiwis', 'oranges', 'pears'])
r = self.get('%s?until=%s&format=json' % (url, until))
self.assertEqual(r.status_code, 200)
try:
containers = json.loads(r.content)
except:
self.fail('json format expected')
self.assertEqual([c['name'] for c in containers],
['apples', 'bananas', 'kiwis', 'oranges', 'pears'])
def test_list_shared(self):
# upload and publish object
oname, data, resp = self.upload_object('apples')
......
......@@ -83,6 +83,65 @@ class ContainerHead(PithosAPITest):
(self.assertTrue('foo%s' % i in r['X-Container-Object-Meta'])
for i in range(len(objects)))
def test_get_container_meta_until(self):
self.create_container('apples')
# populate with objects
objects = {}
metalist = []
for i in range(random.randint(1, 100)):
# upload object
metakey = 'Foo%s' % i
meta = {metakey: 'bar'}
name, data, resp = self.upload_object('apples', **meta)
objects[name] = data
metalist.append(metakey)
self.update_container_meta('apples', {'foo': 'bar'})
container_info = self.get_container_info('apples')
t = datetime.datetime.strptime(container_info['Last-Modified'],
DATE_FORMATS[2])
t1 = t + datetime.timedelta(seconds=1)
until = int(_time.mktime(t1.timetuple()))
_time.sleep(2)
for i in range(random.randint(1, 100)):
# upload object
meta = {'foo%s' % i: 'bar'}
self.upload_object('apples', **meta)
self.update_container_meta('apples', {'quality': 'AAA'})
container_info = self.get_container_info('apples')
self.assertTrue('X-Container-Meta-Quality' in container_info)
self.assertTrue('X-Container-Meta-Foo' in container_info)
self.assertTrue('X-Container-Object-Count' in container_info)
self.assertTrue(int(container_info['X-Container-Object-Count']) > len(objects))
self.assertTrue('X-Container-Bytes-Used' in container_info)
t = datetime.datetime.strptime(container_info['Last-Modified'],
DATE_FORMATS[-1])
last_modified = int(_time.mktime(t.timetuple()))
assert until < last_modified
container_info = self.get_container_info('apples', until=until)
self.assertTrue('X-Container-Meta-Quality' not in container_info)
self.assertTrue('X-Container-Meta-Foo' in container_info)
self.assertTrue('X-Container-Until-Timestamp' in container_info)
t = datetime.datetime.strptime(
container_info['X-Container-Until-Timestamp'], DATE_FORMATS[2])
self.assertTrue(int(_time.mktime(t1.timetuple())) <= until)
self.assertTrue('X-Container-Object-Count' in container_info)
self.assertEqual(int(container_info['X-Container-Object-Count']), len(objects))
self.assertTrue('X-Container-Bytes-Used' in container_info)
self.assertEqual(int(container_info['X-Container-Bytes-Used']),
sum([len(data) for data in objects.values()]))
self.assertTrue('X-Container-Object-Meta' in container_info)
self.assertEqual(container_info['X-Container-Object-Meta'],
','.join(sorted(metalist)))
class ContainerGet(PithosAPITest):
def setUp(self):
......@@ -102,6 +161,36 @@ class ContainerGet(PithosAPITest):
name, data, resp = self.upload_object('apples', o)
self.objects['apples'][name] = data
def test_list_until(self):
account_info = self.get_account_info()
t = datetime.datetime.strptime(account_info['Last-Modified'],
DATE_FORMATS[2])
t1 = t + datetime.timedelta(seconds=1)
until = int(_time.mktime(t1.timetuple()))
_time.sleep(2)
cname = self.cnames[0]
self.upload_object(cname)
url = join_urls(self.pithos_path, self.user, cname)
r = self.get('%s?until=%s' % (url, until))
self.assertTrue(r.status_code, 200)
objects = r.content.split('\n')
if '' in objects:
objects.remove('')
self.assertEqual(objects,
sorted(self.objects[cname].keys()))
r = self.get('%s?until=%s&format=json' % (url, until))
self.assertTrue(r.status_code, 200)
try:
objects = json.loads(r.content)
except:
self.fail('json format expected')
self.assertEqual([o['name'] for o in objects],
sorted(self.objects[cname].keys()))
def test_list_shared(self):
# share an object
cname = self.cnames[0]
......
......@@ -106,9 +106,12 @@ def printable_header_dict(d):
Format 'last_modified' timestamp.
"""
if 'last_modified' in d and d['last_modified']:
d['last_modified'] = utils.isoformat(
datetime.fromtimestamp(d['last_modified']))
timestamps = ('last_modified', 'x_container_until_timestamp',
'x_acount_until_timestamp')
for timestamp in timestamps:
if timestamp in d and d[timestamp]:
d[timestamp] = utils.isoformat(
datetime.fromtimestamp(d[timestamp]))
return dict([(k.lower().replace('-', '_'), v) for k, v in d.iteritems()])
......
......@@ -102,7 +102,7 @@ _propnames = {
'muser': 7,
'uuid': 8,
'checksum': 9,
'cluster': 10
'cluster': 10,
}
......@@ -650,9 +650,9 @@ class Node(DBWorker):
func.sum(v.c.size),
func.max(v.c.mtime)])
if before != inf:
c1 = select([func.max(self.versions.c.serial)])
c1 = c1.where(self.versions.c.mtime < before)
c1.where(self.versions.c.node == v.c.node)
c1 = select([func.max(self.versions.c.serial)],
and_(self.versions.c.mtime < before,
self.versions.c.node == v.c.node))
else:
c1 = select([self.nodes.c.latest_version])
c1 = c1.where(self.nodes.c.node == v.c.node)
......@@ -672,28 +672,38 @@ class Node(DBWorker):
# All children (get size and mtime).
# This is why the full path is stored.
s = select([func.count(v.c.serial),
if before != inf:
s = select([func.count(v.c.serial),
func.sum(v.c.size),
func.max(v.c.mtime)])
if before != inf:
c1 = select([func.max(self.versions.c.serial)],
self.versions.c.node == v.c.node)
c1 = c1.where(self.versions.c.mtime < before)
and_(self.versions.c.mtime < before,
self.versions.c.node == v.c.node))
else:
c1 = select([self.nodes.c.latest_version],
self.nodes.c.node == v.c.node)
inner_join = \
self.versions.join(self.nodes, onclause=\
self.versions.c.serial == self.nodes.c.latest_version)
s = select([func.count(self.versions.c.serial),
func.sum(self.versions.c.size),
func.max(self.versions.c.mtime)], from_obj=[inner_join])
c2 = select([self.nodes.c.node],
self.nodes.c.path.like(self.escape_like(path) + '%',
escape=ESCAPE_CHAR))
s = s.where(and_(v.c.serial == c1,
if before != inf:
s = s.where(and_(v.c.serial == c1,
v.c.cluster != except_cluster,
v.c.node.in_(c2)))
else:
s = s.where(and_(self.versions.c.cluster != except_cluster,
self.versions.c.node.in_(c2)))
rp = self.conn.execute(s)
r = rp.fetchone()
rp.close()
if not r:
return None
size = r[1] - props[SIZE]
size = long(r[1] - props[SIZE])
mtime = max(mtime, r[2])
return (count, size, mtime)
......@@ -976,33 +986,34 @@ class Node(DBWorker):
pathq = pathq or []
# TODO: Use another table to store before=inf results.
a = self.attributes.alias('a')
v = self.versions.alias('v')
n = self.nodes.alias('n')
s = select([a.c.key]).distinct()
s = select([self.attributes.c.key]).distinct()
if before != inf:
filtered = select([func.max(self.versions.c.serial)])
filtered = filtered.where(self.versions.c.mtime < before)
filtered = filtered.where(self.versions.c.node == v.c.node)
filtered = select([func.max(v.c.serial)],
and_(v.c.mtime < before,
v.c.node == self.versions.c.node))
else:
filtered = select([self.nodes.c.latest_version])
filtered = filtered.where(self.nodes.c.node == v.c.node)
s = s.where(v.c.serial == filtered)
s = s.where(v.c.cluster != except_cluster)
s = s.where(v.c.node.in_(select([self.nodes.c.node],
filtered = filtered.where(self.nodes.c.node == \
self.versions.c.node).correlate(self.versions)
s = s.where(self.versions.c.serial == filtered)
s = s.where(self.versions.c.cluster != except_cluster)
s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
self.nodes.c.parent == parent)))
s = s.where(a.c.serial == v.c.serial)
s = s.where(a.c.domain == domain)
s = s.where(n.c.node == v.c.node)
conj = []
s = s.where(self.attributes.c.serial == self.versions.c.serial)
s = s.where(self.attributes.c.domain == domain)
s = s.where(self.nodes.c.node == self.versions.c.node)
s = s.order_by(self.attributes.c.key)
conja = []
conjb = []
for path, match in pathq:
if match == MATCH_PREFIX:
conj.append(n.c.path.like(self.escape_like(path) + '%',
conja.append(self.nodes.c.path.like(self.escape_like(path) + '%',
escape=ESCAPE_CHAR))
elif match == MATCH_EXACT:
conj.append(n.c.path == path)
if conj:
s = s.where(or_(*conj))
conjb.append(path)
if conja or conjb:
s = s.where(or_(self.nodes.c.path.in_(conjb),*conja))
rp = self.conn.execute(s)
rows = rp.fetchall()
rp.close()
......@@ -1069,68 +1080,83 @@ class Node(DBWorker):
v = self.versions.alias('v')
n = self.nodes.alias('n')
if not all_props:
s = select([n.c.path, v.c.serial]).distinct()
else:
s = select([n.c.path,
v.c.serial, v.c.node, v.c.hash,
v.c.size, v.c.type, v.c.source,
v.c.mtime, v.c.muser, v.c.uuid,
v.c.checksum, v.c.cluster]).distinct()
if before != inf:
filtered = select([func.max(self.versions.c.serial)])
filtered = filtered.where(self.versions.c.mtime < before)
filtered = select([func.max(v.c.serial)],
and_(v.c.mtime < before,
v.c.node == self.versions.c.node))
inner_join = \
self.nodes.join(self.versions,
onclause=self.versions.c.serial==filtered)
else:
filtered = select([self.nodes.c.latest_version])
s = s.where(
v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
s = s.where(v.c.cluster != except_cluster)
s = s.where(v.c.node.in_(select([self.nodes.c.node],
self.nodes.c.parent == parent)))
s = s.where(n.c.node == v.c.node)
s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
conj = []
filtered = filtered.where(self.nodes.c.node == self.versions.c.node).correlate(self.versions)
inner_join = \
self.nodes.join(self.versions,
onclause=\
self.versions.c.serial==filtered)
if not all_props:
s = select([self.nodes.c.path,
self.versions.c.serial],from_obj=[inner_join]).distinct()
else:
s = select([self.nodes.c.path,
self.versions.c.serial, self.versions.c.node,
self.versions.c.hash,
self.versions.c.size, self.versions.c.type,
self.versions.c.source,
self.versions.c.mtime, self.versions.c.muser,
self.versions.c.uuid,
self.versions.c.checksum,
self.versions.c.cluster],from_obj=[inner_join]).distinct()
s = s.where(self.versions.c.cluster != except_cluster)
s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
self.nodes.c.parent == parent)))
s = s.where(self.versions.c.node == self.nodes.c.node)
s = s.where(and_(self.nodes.c.path > bindparam('start'),
self.nodes.c.path < nextling))
conja = []
conjb = []
for path, match in pathq:
if match == MATCH_PREFIX:
conj.append(n.c.path.like(self.escape_like(path) + '%',
escape=ESCAPE_CHAR))
conja.append(self.nodes.c.path.like(self.escape_like(path) + '%',
escape=ESCAPE_CHAR))
elif match == MATCH_EXACT:
conj.append(n.c.path == path)
if conj:
s = s.where(or_(*conj))
conjb.append(path)
if conja or conjb:
s = s.where(or_(self.nodes.c.path.in_(conjb),*conja))
if sizeq and len(sizeq) == 2:
if sizeq[0]:
s = s.where(v.c.size >= sizeq[0])
s = s.where(self.versions.c.size >= sizeq[0])
if sizeq[1]:
s = s.where(v.c.size < sizeq[1])
s = s.where(self.versions.c.size < sizeq[1])
if domain and filterq:
a = self.attributes.alias('a')
included, excluded, opers = parse_filters(filterq)
if included:
subs = select([1])
subs = subs.where(a.c.serial == v.c.serial).correlate(v)
subs = subs.where(a.c.domain == domain)
subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
subs = subs.where(self.attributes.c.domain == domain)
subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for x in included]))
s = s.where(exists(subs))
if excluded:
subs = select([1])
subs = subs.where(a.c.serial == v.c.serial).correlate(v)
subs = subs.where(a.c.domain == domain)
subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
subs = subs.where(self.attributes.c.domain == domain)
subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for x in excluded]))
s = s.where(not_(exists(subs)))
if opers:
for k, o, val in opers:
subs = select([1])
subs = subs.where(a.c.serial == v.c.serial).correlate(v)
subs = subs.where(a.c.domain == domain)
subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
subs = subs.where(self.attributes.c.domain == domain)
subs = subs.where(
and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
and_(self.attributes.c.key.op('=')(k), self.attributes.c.value.op(o)(val)))
s = s.where(exists(subs))
s = s.order_by(n.c.path)
s = s.order_by(self.nodes.c.path)
if not delimiter:
s = s.limit(limit)
......@@ -1229,3 +1255,17 @@ class Node(DBWorker):
groups = groupby(rows, group_by)
return [(k[0], k[1:], dict([i[12:] for i in data])) for
(k, data) in groups]
def get_props(self, paths):
inner_join = \
self.nodes.join(self.versions,
onclause=self.versions.c.serial == self.nodes.c.latest_version)
cc = self.nodes.c.path.in_(paths)
s = select([self.nodes.c.path, self.versions.c.type],
from_obj=[inner_join]).where(cc).distinct()
r = self.conn.execute(s)
rows = r.fetchall()
r.close()
if rows:
return rows
return None
......@@ -31,13 +31,14 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from sqlalchemy.sql import select, literal, or_
from sqlalchemy.sql import select, literal, or_, and_
from sqlalchemy.sql.expression import join, union
from xfeatures import XFeatures
from groups import Groups
from public import Public
from node import Node
from collections import defaultdict
from dbworker import ESCAPE_CHAR
......@@ -81,6 +82,23 @@ class Permissions(XFeatures, Groups, Public, Node):
if w:
self.feature_setmany(feature, WRITE, w)
def access_get_for_bulk(self, perms):
"""Get permissions for path."""
allowed = None
d = defaultdict(list)
for value, feature_id, key in perms:
d[key].append(value)
permissions = d
if READ in permissions:
allowed = 0
permissions['read'] = permissions[READ]
del(permissions[READ])
if WRITE in permissions:
allowed = 1
permissions['write'] = permissions[WRITE]
del(permissions[WRITE])
return (permissions, allowed)
def access_get(self, path):
"""Get permissions for path."""
......@@ -139,6 +157,29 @@ class Permissions(XFeatures, Groups, Public, Node):
return True
return False
def access_check_bulk(self, paths, member):
rows = None
xfeatures_xfeaturevals = self.xfeaturevals.join(self.xfeatures,
onclause=and_(self.xfeatures.c.feature_id ==
self.xfeaturevals.c.feature_id, self.xfeatures.c.path.in_(paths)))
s = select([self.xfeatures.c.path,
self.xfeaturevals.c.value,
self.xfeaturevals.c.feature_id,
self.xfeaturevals.c.key], from_obj=[xfeatures_xfeaturevals])
r = self.conn.execute(s)
rows = r.fetchall()
r.close()
if rows:
access_check_paths = {}
for path, value, feature_id, key in rows:
try:
access_check_paths[path].append((value, feature_id, key))
except KeyError:
access_check_paths[path] = [(value, feature_id, key)]
access_check_paths['group_parents'] = self.group_parents(member)
return access_check_paths
return None