Commit ee97bf57 authored by Sofia Papagiannaki's avatar Sofia Papagiannaki Committed by Christos Stavrakakis
Browse files

Pithos backend: Return objects in a specific domain

Refs: #3510
parent c117548d
......@@ -32,6 +32,9 @@
# or implied, of GRNET S.A.
from time import time
from operator import itemgetter
from itertools import groupby
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
from sqlalchemy.types import Text
from sqlalchemy.schema import Index, Sequence
......@@ -178,6 +181,7 @@ def create_tables(engine):
columns.append(Column('key', String(128), primary_key=True))
columns.append(Column('value', String(256)))
attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
Index('idx_attributes_domain', attributes.c.domain)
metadata.create_all(engine)
return metadata.sorted_tables
......@@ -1104,3 +1108,32 @@ class Node(DBWorker):
l = r.fetchone()
r.close()
return l
def domain_object_list(self, domain, cluster=None):
"""Return a list of (path, property list, attribute dictionary)
for the objects in the specific domain and cluster.
"""
v = self.versions.alias('v')
n = self.nodes.alias('n')
a = self.attributes.alias('a')
s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
v.c.checksum, v.c.cluster, a.c.key, a.c.value])
s = s.where(n.c.node == v.c.node)
s = s.where(n.c.latest_version == v.c.serial)
if cluster:
s = s.where(v.c.cluster == cluster)
s = s.where(v.c.serial == a.c.serial)
s = s.where(a.c.domain == domain)
r = self.conn.execute(s)
rows = r.fetchall()
r.close()
group_by = itemgetter(slice(12))
rows.sort(key = group_by)
groups = groupby(rows, group_by)
return [(k[0], k[1:], dict([i[12:] for i in data])) \
for (k, data) in groups]
......@@ -32,6 +32,8 @@
# or implied, of GRNET S.A.
from time import time
from operator import itemgetter
from itertools import groupby
from dbworker import DBWorker
......@@ -180,6 +182,8 @@ class Node(DBWorker):
references versions(serial)
on update cascade
on delete cascade ) """)
execute(""" create index if not exists idx_attributes_domain
on attributes(domain) """)
wrapper = self.wrapper
wrapper.execute()
......@@ -284,7 +288,7 @@ class Node(DBWorker):
self.statistics_update(parent, -nr, -size, mtime, cluster)
self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
q = ("select hash from versions "
q = ("select hash, serial from versions "
"where node in (select node "
"from nodes "
"where parent = ?) "
......@@ -333,7 +337,7 @@ class Node(DBWorker):
mtime = time()
self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
q = ("select hash from versions "
q = ("select hash, serial from versions "
"where node = ? "
"and cluster = ? "
"and mtime <= ?")
......@@ -343,7 +347,7 @@ class Node(DBWorker):
for r in self.fetchall():
hashes += [r[0]]
serials += [r[1]]
q = ("delete from versions "
"where node = ? "
"and cluster = ? "
......@@ -1001,3 +1005,30 @@ class Node(DBWorker):
"and n.node = v.node") % cluster_where
self.execute(q, args)
return self.fetchone()
def domain_object_list(self, domain, cluster=None):
"""Return a list of (path, property list, attribute dictionary)
for the objects in the specific domain and cluster.
"""
q = ("select n.path, v.serial, v.node, v.hash, "
"v.size, v.type, v.source, v.mtime, v.muser, "
"v.uuid, v.checksum, v.cluster, a.key, a.value "
"from nodes n, versions v, attributes a "
"where n.node = v.node and "
"n.latest_version = v.serial and "
"v.serial = a.serial and "
"a.domain = ? ")
args = [domain]
if cluster != None:
q += "and v.cluster = ?"
args += [cluster]
self.execute(q, args)
rows = self.fetchall()
group_by = itemgetter(slice(12))
rows.sort(key = group_by)
groups = groupby(rows, group_by)
return [(k[0], k[1:], dict([i[12:] for i in data])) \
for (k, data) in groups]
......@@ -614,13 +614,13 @@ class ModularBackend(BaseBackend):
raise NotAllowedError
if shared and public:
# get shared first
shared = self._list_object_permissions(
shared_paths = self._list_object_permissions(
user, account, container, prefix, shared=True, public=False)
objects = set()
if shared:
if shared_paths:
path, node = self._lookup_container(account, container)
shared = self._get_formatted_paths(shared)
objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
shared_paths = self._get_formatted_paths(shared_paths)
objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared_paths, all_props))
# get public
objects |= set(self._list_public_object_properties(
......@@ -1516,3 +1516,46 @@ class ModularBackend(BaseBackend):
for path in self.permissions.access_list_paths(user, account):
allow.add(path.split('/', 2)[1])
return sorted(allow)
# Domain functions
def _get_domain_objects(self, domain, user=None):
obj_list = self.node.domain_object_list(domain, CLUSTER_NORMAL)
if user != None:
obj_list = [t for t in obj_list \
if self._has_read_access(user, t[0])]
return [(path,
self._build_metadata(props, user_defined_meta),
self.permissions.access_get(path)) \
for path, props, user_defined_meta in obj_list]
# util functions
def _build_metadata(self, props, user_defined=None,
include_user_defined=True):
meta = {'bytes': props[self.SIZE],
'type': props[self.TYPE],
'hash': props[self.HASH],
'version': props[self.SERIAL],
'version_timestamp': props[self.MTIME],
'modified_by': props[self.MUSER],
'uuid': props[self.UUID],
'checksum': props[self.CHECKSUM]}
if include_user_defined and user_defined != None:
meta.update(user_defined)
return meta
def _has_read_access(self, user, path):
try:
account, container, object = path.split('/', 2)
except ValueError:
raise ValueError('Invalid object path')
assert isinstance(user, basestring), "Invalid user"
try:
self._can_read(user, account, container, object)
except NotAllowedError:
return False
else:
return True
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment