Commit 751bf999 authored by Sofia Papagiannaki's avatar Sofia Papagiannaki
Browse files

pithos: Update get objects in a specific domain

Improve performance

Refs: #3510
parent eed8b9e2
"""add attributes node column
Revision ID: 3b62b3f1bf6c
Revises: 4c8ccdc58192
Create Date: 2013-07-04 13:11:01.842706
"""
# revision identifiers, used by Alembic.
revision = '3b62b3f1bf6c'
down_revision = '4c8ccdc58192'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column, and_
def upgrade():
op.add_column('attributes',
sa.Column('node', sa.Integer, default=0))
op.add_column('attributes',
sa.Column('is_latest', sa.Boolean, default=True))
n = table('nodes',
column('node', sa.Integer),
column('latest_version', sa.Integer))
v = table('versions',
column('node', sa.Integer),
column('serial', sa.Integer))
a = table('attributes',
column('serial', sa.Integer),
column('node', sa.Integer),
column('is_latest', sa.Boolean))
s = sa.select([v.c.node]).where(v.c.serial == a.c.serial)
u = a.update().values({'node': s})
op.execute(u)
s = sa.select([v.c.serial == n.c.latest_version],
and_(a.c.node == n.c.node, a.c.serial == v.c.serial))
u = a.update().values({'is_latest': s})
op.execute(u)
op.alter_column('attributes', 'node', nullable=False)
op.alter_column('attributes', 'is_latest', nullable=False)
op.create_index('idx_attributes_serial_node', 'attributes',
['serial', 'node'])
def downgrade():
op.drop_index('idx_attributes_serial_node')
op.drop_column('attributes', 'is_latest')
op.drop_column('attributes', 'node')
......@@ -35,7 +35,8 @@ from time import time
from operator import itemgetter
from itertools import groupby
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
Column, String, MetaData, ForeignKey)
from sqlalchemy.types import Text
from sqlalchemy.schema import Index, Sequence
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
......@@ -180,8 +181,11 @@ def create_tables(engine):
columns.append(Column('domain', String(256), primary_key=True))
columns.append(Column('key', String(128), primary_key=True))
columns.append(Column('value', String(256)))
columns.append(Column('node', Integer, nullable=False, default=0))
columns.append(Column('is_latest', Boolean, nullable=False, default=True))
attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
Index('idx_attributes_domain', attributes.c.domain)
Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
metadata.create_all(engine)
return metadata.sorted_tables
......@@ -865,7 +869,7 @@ class Node(DBWorker):
r.close()
return l
def attribute_set(self, serial, domain, items):
def attribute_set(self, serial, domain, node, items, is_latest=True):
"""Set the attributes of the version specified by serial.
Receive attributes as an iterable of (key, value) pairs.
"""
......@@ -881,7 +885,8 @@ class Node(DBWorker):
rp.close()
if rp.rowcount == 0:
s = self.attributes.insert()
s = s.values(serial=serial, domain=domain, key=k, value=v)
s = s.values(serial=serial, domain=domain, node=node,
is_latest=is_latest, key=k, value=v)
self.conn.execute(s).close()
def attribute_del(self, serial, domain, keys=()):
......@@ -906,25 +911,33 @@ class Node(DBWorker):
def attribute_copy(self, source, dest):
s = select(
[dest, self.attributes.c.domain,
self.attributes.c.key, self.attributes.c.value],
[dest, self.attributes.c.domain, self.attributes.c.node,
self.attributes.c.key, self.attributes.c.value],
self.attributes.c.serial == source)
rp = self.conn.execute(s)
attributes = rp.fetchall()
rp.close()
for dest, domain, k, v in attributes:
#insert or replace
for dest, domain, node, k, v in attributes:
# insert or replace
s = self.attributes.update().where(and_(
self.attributes.c.serial == dest,
self.attributes.c.domain == domain,
self.attributes.c.key == k))
rp = self.conn.execute(s, value=v)
s = s.values(value=v)
rp = self.conn.execute(s)
rp.close()
if rp.rowcount == 0:
s = self.attributes.insert()
values = {'serial': dest, 'domain': domain,
'key': k, 'value': v}
self.conn.execute(s, values).close()
s = s.values(serial=dest, domain=domain, node=node,
is_latest=True, key=k, value=v)
self.conn.execute(s).close()
def attribute_unset_is_latest(self, node, exclude):
u = self.attributes.update().where(and_(
self.attributes.c.node == node,
self.attributes.c.serial != exclude)).values(
{'is_latest': False})
self.conn.execute(u)
def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=None):
"""Return a list with all keys pairs defined
......@@ -1178,12 +1191,12 @@ class Node(DBWorker):
s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
v.c.checksum, v.c.cluster, a.c.key, a.c.value])
s = s.where(n.c.node == v.c.node)
s = s.where(n.c.latest_version == v.c.serial)
if cluster:
s = s.where(v.c.cluster == cluster)
s = s.where(v.c.serial == a.c.serial)
s = s.where(a.c.domain == domain)
s = s.where(a.c.node == n.c.node)
s = s.where(a.c.is_latest == True)
if paths:
s = s.where(n.c.path.in_(paths))
......
......@@ -930,7 +930,7 @@ class ModularBackend(BaseBackend):
if src_version_id is None:
src_version_id = pre_version_id
self._put_metadata_duplicate(
src_version_id, dest_version_id, domain, meta, replace_meta)
src_version_id, dest_version_id, domain, node, meta, replace_meta)
del_size = self._apply_versioning(account, container, pre_version_id,
update_statistics_ancestors_depth=1)
......@@ -1352,19 +1352,23 @@ class ModularBackend(BaseBackend):
dest_version_id, mtime = self.node.version_create(
node, hash, size, type, src_version_id, user, uuid, checksum,
cluster, update_statistics_ancestors_depth)
self.node.attribute_unset_is_latest(node, dest_version_id)
return pre_version_id, dest_version_id
def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
node, meta, replace=False):
if src_version_id is not None:
self.node.attribute_copy(src_version_id, dest_version_id)
if not replace:
self.node.attribute_del(dest_version_id, domain, (
k for k, v in meta.iteritems() if v == ''))
self.node.attribute_set(dest_version_id, domain, (
self.node.attribute_set(dest_version_id, domain, node, (
(k, v) for k, v in meta.iteritems() if v != ''))
else:
self.node.attribute_del(dest_version_id, domain)
self.node.attribute_set(dest_version_id, domain, ((
self.node.attribute_set(dest_version_id, domain, node, ((
k, v) for k, v in meta.iteritems()))
def _put_metadata(self, user, node, domain, meta, replace=False,
......@@ -1375,7 +1379,7 @@ class ModularBackend(BaseBackend):
user, node,
update_statistics_ancestors_depth=update_statistics_ancestors_depth)
self._put_metadata_duplicate(
src_version_id, dest_version_id, domain, meta, replace)
src_version_id, dest_version_id, domain, node, meta, replace)
return src_version_id, dest_version_id
def _list_limits(self, listing, marker, limit):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment