Commit 13d49ad6 authored by Sofia Papagiannaki's avatar Sofia Papagiannaki Committed by Chrysostomos Nanakos
Browse files

pithos: Differentiate object hashmap from mapfile

Object mapfiles on the storage backend will no more be identified by the
object's hashmap (content depended).
Each object upon creation will be assigned with a unique tag by which they
will be referenced on the storage backend.
This commit contains the necessary modifications in the pithos database.
parent 7855a8f9
......@@ -196,3 +196,8 @@ BACKEND_XSEG_POOL_SIZE = getattr(settings, 'PITHOS_BACKEND_XSEG_POOL_SIZE', 8)
# The maximum interval (in seconds) for consequent backend object map checks
BACKEND_MAP_CHECK_INTERVAL = getattr(settings,
'PITHOS_BACKEND_MAP_CHECK_INTERVAL', 5)
# The archipelago mapfile prefix (it should not exceed 15 characters)
# Once set it should not be changed
BACKEND_MAPFILE_PREFIX = getattr(settings,
'PITHOS_BACKEND_MAPFILE_PREFIX', 'snf_file_')
......@@ -49,6 +49,7 @@ from pithos.api.settings import (BACKEND_DB_MODULE, BACKEND_DB_CONNECTION,
BACKEND_ARCHIPELAGO_CONF,
BACKEND_XSEG_POOL_SIZE,
BACKEND_MAP_CHECK_INTERVAL,
BACKEND_MAPFILE_PREFIX,
RADOS_STORAGE, RADOS_POOL_BLOCKS,
RADOS_POOL_MAPS, TRANSLATE_UUIDS,
PUBLIC_URL_SECURITY, PUBLIC_URL_ALPHABET,
......@@ -1038,7 +1039,8 @@ BACKEND_KWARGS = dict(
container_versioning_policy=BACKEND_VERSIONING,
archipelago_conf_file=BACKEND_ARCHIPELAGO_CONF,
xseg_pool_size=BACKEND_XSEG_POOL_SIZE,
map_check_interval=BACKEND_MAP_CHECK_INTERVAL)
map_check_interval=BACKEND_MAP_CHECK_INTERVAL,
mapfile_prefix=BACKEND_MAPFILE_PREFIX)
_pithos_backend_pool = PithosBackendPool(size=BACKEND_POOL_SIZE,
**BACKEND_KWARGS)
......
......@@ -14,16 +14,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from dbwrapper import DBWrapper
from node import (Node, ROOTNODE, SERIAL, NODE, HASH, SIZE, TYPE, MTIME, MUSER,
UUID, CHECKSUM, CLUSTER, MATCH_PREFIX, MATCH_EXACT,
AVAILABLE, MAP_CHECK_TIMESTAMP)
from node import (Node, ROOTNODE, MATCH_PREFIX, MATCH_EXACT)
from permissions import Permissions, READ, WRITE
from config import Config
from quotaholder_serials import QuotaholderSerial
__all__ = ["DBWrapper",
"Node", "ROOTNODE", "NODE", "SERIAL", "HASH", "SIZE", "TYPE",
"MTIME", "MUSER", "UUID", "CHECKSUM", "CLUSTER", "MATCH_PREFIX",
"MATCH_EXACT", "AVAILABLE", "MAP_CHECK_TIMESTAMP",
"Permissions", "READ", "WRITE", "Config",
"QuotaholderSerial"]
"Node", "ROOTNODE", "MATCH_PREFIX", "MATCH_EXACT", "Permissions",
"READ", "WRITE", "Config", "QuotaholderSerial"]
"""Differentiate hashmap from mapfile
Revision ID: 2efddde15abf
Revises: e6edec1b499
Create Date: 2014-06-11 10:46:04.116321
"""
# revision identifiers, used by Alembic.
revision = '2efddde15abf'
down_revision = 'e6edec1b499'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.execute(sa.schema.CreateSequence(sa.schema.Sequence("mapfile_seq")))
op.add_column('versions', sa.Column('mapfile', sa.String(256)))
op.add_column('versions', sa.Column('is_snapshot', sa.Boolean,
nullable=False, default=False,
server_default='False'))
v = sa.sql.table(
'versions',
sa.sql.column('hash', sa.String),
sa.sql.column('mapfile', sa.String),
sa.sql.column('is_snapshot', sa.Boolean))
u = v.update().values({'mapfile': v.c.hash,
'is_snapshot': sa.case([(v.c.hash.like('archip:%'),
True)], else_=False)})
op.execute(u)
def downgrade():
op.drop_column('versions', 'is_snapshot')
op.drop_column('versions', 'mapfile')
op.execute(sa.schema.DropSequence(sa.schema.Sequence("mapfile_seq")))
......@@ -20,9 +20,10 @@ from collections import defaultdict
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
Column, String, MetaData, ForeignKey)
from sqlalchemy.schema import Index
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists
from sqlalchemy.sql.expression import true
from sqlalchemy.schema import Index, Sequence
from sqlalchemy.sql import (func, and_, or_, not_, select, bindparam, exists,
functions)
from sqlalchemy.sql.expression import true, literal
from sqlalchemy.exc import NoSuchTableError
from dbworker import DBWorker, ESCAPE_CHAR
......@@ -32,9 +33,6 @@ from pithos.backends.filter import parse_filters
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
ROOTNODE = 0
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
CLUSTER, AVAILABLE, MAP_CHECK_TIMESTAMP) = range(13)
(MATCH_PREFIX, MATCH_EXACT) = range(2)
inf = float('inf')
......@@ -75,22 +73,6 @@ def strprevling(prefix):
s += unichr(c - 1) + unichr(0xffff)
return s
_propnames = {
'serial': 0,
'node': 1,
'hash': 2,
'size': 3,
'type': 4,
'source': 5,
'mtime': 6,
'muser': 7,
'uuid': 8,
'checksum': 9,
'cluster': 10,
'available':11,
'map_check_timestamp':12
}
def create_tables(engine):
metadata = MetaData()
......@@ -154,6 +136,10 @@ def create_tables(engine):
columns.append(Column('available', Boolean, nullable=False, default=True))
columns.append(Column('map_check_timestamp', DECIMAL(precision=16,
scale=6)))
columns.append(Column('mapfile', String(256)))
columns.append(Column('is_snapshot', Boolean, nullable=False,
default=False))
versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
Index('idx_versions_node_uuid', versions.c.uuid)
......@@ -174,8 +160,12 @@ def create_tables(engine):
Index('idx_attributes_domain', attributes.c.domain)
Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
# TODO: handle backends not supporting sequences
mapfile_seq = Sequence('mapfile_seq', metadata=metadata)
metadata.create_all(engine)
return metadata.sorted_tables
return metadata.sorted_tables + [mapfile_seq]
class Node(DBWorker):
......@@ -187,6 +177,8 @@ class Node(DBWorker):
# TODO: Provide an interface for included and excluded clusters.
def __init__(self, **params):
self._props = params.pop('props')
self.mapfile_prefix = params.pop('mapfile_prefix', 'snf_file_')
DBWorker.__init__(self, **params)
try:
metadata = MetaData(self.engine)
......@@ -195,6 +187,7 @@ class Node(DBWorker):
self.statistics = Table('statistics', metadata, autoload=True)
self.versions = Table('versions', metadata, autoload=True)
self.attributes = Table('attributes', metadata, autoload=True)
self.mapfile_seq = Sequence('mapfile_seq', metadata)
except NoSuchTableError:
tables = create_tables(self.engine)
map(lambda t: self.__setattr__(t.name, t), tables)
......@@ -284,13 +277,14 @@ class Node(DBWorker):
r.close()
return l[0] if l is not None else None
def node_get_versions(self, node, keys=(), propnames=_propnames):
def node_get_versions(self, node, keys=(), props=None):
"""Return the properties of all versions at node.
If keys is empty, return all properties in the order
(serial, node, hash, size, type, source, mtime, muser, uuid,
checksum, cluster, available, map_check_timestamp).
"""
props = props or self._props
s = select([self.versions.c.serial,
self.versions.c.node,
self.versions.c.hash,
......@@ -315,7 +309,7 @@ class Node(DBWorker):
if not keys:
return rows
return [[p[propnames[k]] for k in keys if k in propnames] for
return [[p[props[k]] for k in keys if k in props] for
p in rows]
def node_count_children(self, node):
......@@ -695,7 +689,7 @@ class Node(DBWorker):
r.close()
if not props:
return None
mtime = props[MTIME]
mtime = props.mtime
# First level, just under node (get population).
v = self.versions.alias('v')
......@@ -758,7 +752,7 @@ class Node(DBWorker):
rp.close()
if not r:
return None
size = long(r[1] - props[SIZE])
size = long(r[1] - props.size)
mtime = max(mtime, r[2])
return (count, size, mtime)
......@@ -770,42 +764,70 @@ class Node(DBWorker):
def version_create(self, node, hash, size, type, source, muser, uuid,
checksum, cluster=0,
update_statistics_ancestors_depth=None,
available=True, map_check_timestamp=None):
available=True, map_check_timestamp=None,
mapfile=None, is_snapshot=False):
"""Create a new version from the given properties.
Return the (serial, mtime) of the new version.
If mapfile is not None, set mapfile to this value.
Otherwise, assign to the mapfile a new unique identifier.
:raises DatabaseError
"""
mtime = time()
s = self.versions.insert().values(
s = self.versions.insert().returning(self.versions.c.serial,
self.versions.c.mtime,
self.versions.c.mapfile)
s = s.values(
node=node, hash=hash, size=size, type=type, source=source,
mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
cluster=cluster, available=available,
map_check_timestamp=map_check_timestamp)
serial = self.conn.execute(s).inserted_primary_key[0]
map_check_timestamp=map_check_timestamp,
mapfile=(mapfile or
functions.concat(literal(self.mapfile_prefix),
functions.next_value(self.mapfile_seq))),
is_snapshot=is_snapshot)
r = self.conn.execute(s)
serial, mtime, mapfile = r.fetchone()
r.close()
self.statistics_update_ancestors(node, 1, size, mtime, cluster,
update_statistics_ancestors_depth)
self.nodes_set_latest_version(node, serial)
return serial, mtime
return serial, mtime, mapfile
def version_lookup(self, node, before=inf, cluster=0, all_props=True):
def version_lookup(self, node, before=inf, cluster=0, all_props=True,
keys=()):
"""Lookup the current version of the given node.
Return a list with its properties:
(serial, node, hash, size, type, source, mtime,
muser, uuid, checksum, cluster, available, map_check_timestamp)
or None if the current version is not found in the given cluster.
If the current version is not found in the given cluster,
return None.
If all_props is False, return the version's serial.
Otherwise:
If keys is not empty, return only the specific properties
(by filtering out the invalid ones).
If keys is empty, return all properties in the order
(serial, node, hash, size, type, source, mtime, muser, uuid,
checksum, cluster, available, map_check_timestamp)
This is bad tactic, since it may have considerable
impact on the performance.
"""
v = self.versions.alias('v')
if not all_props:
s = select([v.c.serial])
else:
s = select([v.c.serial, v.c.node, v.c.hash,
if keys:
cols = [getattr(v.c, col) for col in keys if hasattr(v.c, col)]
else:
cols = [v.c.serial, v.c.node, v.c.hash,
v.c.size, v.c.type, v.c.source,
v.c.mtime, v.c.muser, v.c.uuid,
v.c.checksum, v.c.cluster,
v.c.available, v.c.map_check_timestamp])
v.c.available, v.c.map_check_timestamp]
s = select(cols)
if before != inf:
c = select([func.max(self.versions.c.serial)],
self.versions.c.node == node)
......@@ -823,7 +845,8 @@ class Node(DBWorker):
return None
def version_lookup_bulk(self, nodes, before=inf, cluster=0,
all_props=True, order_by_path=False):
all_props=True, order_by_path=False,
keys=()):
"""Lookup the current versions of the given nodes.
Return a list with their properties:
(serial, node, hash, size, type, source, mtime, muser, uuid,
......@@ -836,11 +859,15 @@ class Node(DBWorker):
if not all_props:
s = select([v.c.serial])
else:
s = select([v.c.serial, v.c.node, v.c.hash,
if keys:
cols = [getattr(v.c, col) for col in keys if hasattr(v.c, col)]
else:
cols = [v.c.serial, v.c.node, v.c.hash,
v.c.size, v.c.type, v.c.source,
v.c.mtime, v.c.muser, v.c.uuid,
v.c.checksum, v.c.cluster,
v.c.available, v.c.map_check_timestamp])
v.c.available, v.c.map_check_timestamp]
s = select(cols)
if before != inf:
c = select([func.max(self.versions.c.serial)],
self.versions.c.node.in_(nodes))
......@@ -861,7 +888,7 @@ class Node(DBWorker):
r.close()
return (tuple(row.values()) for row in rproxy)
def version_get_properties(self, serial, keys=(), propnames=_propnames,
def version_get_properties(self, serial, keys=(), props=None,
node=None):
"""Return a sequence of values for the properties of
the version specified by serial and the keys, in the order given.
......@@ -870,29 +897,23 @@ class Node(DBWorker):
checksum, cluster, available, map_check_timestamp).
"""
props = props or self._props
keys = keys or props.keys()
v = self.versions.alias()
s = select([v.c.serial, v.c.node, v.c.hash,
v.c.size, v.c.type, v.c.source,
v.c.mtime, v.c.muser, v.c.uuid,
v.c.checksum, v.c.cluster,
v.c.available, v.c.map_check_timestamp],
v.c.serial == serial)
cols = [getattr(v.c, p) for p in keys if hasattr(v.c, p)]
s = select(cols, v.c.serial == serial)
if node is not None:
s = s.where(v.c.node == node)
rp = self.conn.execute(s)
r = rp.fetchone()
rp.close()
if r is None:
return r
if not keys:
return r
return [r[propnames[k]] for k in keys if k in propnames]
return r
def version_put_property(self, serial, key, value):
def version_put_property(self, serial, key, value, props=None):
"""Set value for the property of version specified by key."""
if key not in _propnames:
props = props or self._props
if key not in props:
return
s = self.versions.update()
s = s.where(self.versions.c.serial == serial)
......@@ -906,9 +927,9 @@ class Node(DBWorker):
props = self.version_get_properties(serial)
if not props:
return
node = props[NODE]
size = props[SIZE]
oldcluster = props[CLUSTER]
node = props.node
size = props.size
oldcluster = props.cluster
if cluster == oldcluster:
return
......@@ -929,10 +950,10 @@ class Node(DBWorker):
props = self.version_get_properties(serial)
if not props:
return
node = props[NODE]
hash = props[HASH]
size = props[SIZE]
cluster = props[CLUSTER]
node = props.node
hash = props.hash
size = props.size
cluster = props.cluster
mtime = time()
self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
......
......@@ -19,7 +19,7 @@ import logging
import hashlib
import binascii
from collections import defaultdict
from collections import defaultdict, OrderedDict
from functools import wraps, partial
from traceback import format_exc
from time import time
......@@ -118,8 +118,16 @@ DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
DEFAULT_MAP_CHECK_INTERVAL = 5 # set to 5 secs
DEFAULT_MAPFILE_PREFIX = 'snf_file_'
logger = logging.getLogger(__name__)
_propnames = ('serial', 'node', 'hash', 'size', 'type', 'source', 'mtime',
'muser', 'uuid', 'checksum', 'cluster', 'available',
'map_check_timestamp', 'mapfile', 'is_snapshot')
_api_propnames = _propnames[:-2]
_props = lambda props: OrderedDict((props[i], i) for i in range(len(props)))
def backend_method(func):
@wraps(func)
......@@ -227,7 +235,8 @@ class ModularBackend(BaseBackend):
container_versioning_policy=None,
archipelago_conf_file=None,
xseg_pool_size=8,
map_check_interval=None):
map_check_interval=None,
mapfile_prefix=None):
db_module = db_module or DEFAULT_DB_MODULE
db_connection = db_connection or DEFAULT_DB_CONNECTION
block_module = block_module or DEFAULT_BLOCK_MODULE
......@@ -246,6 +255,8 @@ class ModularBackend(BaseBackend):
or DEFAULT_ARCHIPELAGO_CONF_FILE
map_check_interval = map_check_interval \
or DEFAULT_MAP_CHECK_INTERVAL
mapfile_prefix = mapfile_prefix \
or DEFAULT_MAPFILE_PREFIX
self.default_account_policy = {QUOTA_POLICY: account_quota_policy}
self.default_container_policy = {
......@@ -265,6 +276,7 @@ class ModularBackend(BaseBackend):
self.block_size = block_size
self.free_versioning = free_versioning
self.map_check_interval = map_check_interval
self.mapfile_prefix = mapfile_prefix
def load_module(m):
__import__(m)
......@@ -273,17 +285,18 @@ class ModularBackend(BaseBackend):
self.db_module = load_module(db_module)
self.wrapper = self.db_module.DBWrapper(db_connection)
params = {'wrapper': self.wrapper}
self.permissions = self.db_module.Permissions(**params)
self.config = self.db_module.Config(**params)
self.commission_serials = self.db_module.QuotaholderSerial(**params)
for x in ['READ', 'WRITE']:
setattr(self, x, getattr(self.db_module, x))
params.update({'mapfile_prefix': self.mapfile_prefix,
'props': _props(_api_propnames)})
self.permissions = self.db_module.Permissions(**params)
self.node = self.db_module.Node(**params)
for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
'MATCH_PREFIX', 'MATCH_EXACT',
'AVAILABLE', 'MAP_CHECK_TIMESTAMP']:
for x in ['ROOTNODE', 'MATCH_PREFIX', 'MATCH_EXACT']:
setattr(self, x, getattr(self.db_module, x))
for p in _propnames:
setattr(self, p.upper(), _props(_propnames)[p])
self.ALLOWED = ['read', 'write']
......@@ -768,10 +781,15 @@ class ModularBackend(BaseBackend):
node = t[2]
if not self._exists(node):
continue
src_version_id, dest_version_id = self._put_version_duplicate(
user, node, size=0, type='', hash=None, checksum='',
cluster=CLUSTER_DELETED,
update_statistics_ancestors_depth=1)
# keep reference to the mapfile
# in case we will want to delete them in the future
src_version_id, dest_version_id, _ = \
self._put_version_duplicate(
user, node, size=0, type='', hash=None, checksum='',
cluster=CLUSTER_DELETED,
update_statistics_ancestors_depth=1,
keep_src_mapfile=True)
dest_versions.append(dest_version_id)
del_size = self._apply_versioning(
account, container, src_version_id,
......@@ -1169,7 +1187,8 @@ class ModularBackend(BaseBackend):
hash, checksum, domain, meta, replace_meta,
permissions, src_node=None, src_version_id=None,
is_copy=False, report_size_change=True,
available=True, keep_available=False):
available=True, keep_available=False,
force_mapfile=None, is_snapshot=False):
if permissions is not None and user != account:
raise NotAllowedError
self._can_write_object(user, account, container, name)
......@@ -1184,11 +1203,12 @@ class ModularBackend(BaseBackend):
path, node = self._put_object_node(
container_path, container_node, name)
pre_version_id, dest_version_id = self._put_version_duplicate(
pre_version_id, dest_version_id, mapfile = self._put_version_duplicate(
user, node, src_node=src_node, size=size, type=type, hash=hash,
checksum=checksum, is_copy=is_copy,
update_statistics_ancestors_depth=1,
available=available, keep_available=keep_available)
available=available, keep_available=keep_available,
force_mapfile=force_mapfile, is_snapshot=is_snapshot)
# Handle meta.
if src_version_id is None:
......@@ -1238,7 +1258,7 @@ class ModularBackend(BaseBackend):
self._report_object_change(
user, account, path,
details={'version': dest_version_id, 'action': 'object update'})
return dest_version_id, size_delta
return dest_version_id, size_delta, mapfile
@debug_method
@backend_method
......@@ -1288,9 +1308,10 @@ class ModularBackend(BaseBackend):
pass
finally:
self.lock_container_path = False
dest_version_id, _ = self._update_object_hash(
dest_version_id, _, mapfile = self._update_object_hash(
user, account, container, name, size, type, mapfile, checksum,
domain, meta, replace_meta, permissions, available=False)
domain, meta, replace_meta, permissions, available=False,
force_mapfile=mapfile, is_snapshot=True)
return self.node.version_get_properties(dest_version_id,
keys=('uuid',))[0]
......@@ -1307,21 +1328,21 @@ class ModularBackend(BaseBackend):
meta = meta or {}
if size == 0: # No such thing as an empty hashmap.
hashmap = [self.put_block('')]
map = HashMap(self.block_size, self.hash_algorithm)
map.extend([self._unhexlify_hash(x) for x in hashmap])
missing = self.store.block_search(map)
map_ = HashMap(self.block_size, self.hash_algorithm)
map_.extend([self._unhexlify_hash(x) for x in hashmap])
missing = self.store.block_search(map_)
if missing:
ie = IndexError()
ie.data = [binascii.hexlify(x) for x in missing]
raise ie
hash = map.hash()
hexlified = binascii.hexlify(hash)
hash_ = map_.hash()
hexlified = binascii.hexlify(hash_)
# _update_object_hash() locks destination path
dest_version_id, _ = self._update_object_hash(
dest_version_id, _, mapfile = self._update_object_hash(
user, account, container, name, size, type, hexlified, checksum,
domain, meta, replace_meta, permissions)
self.store.map_put(hash, map, size, self.block_size)
domain, meta, replace_meta, permissions, is_snapshot=False)
self.store.map_put(mapfile, map_, size, self.block_size)
return dest_version_id, hexlified
@debug_method
......@@ -1392,6 +1413,7 @@ class ModularBackend(BaseBackend):
src_version_id = props[self.SERIAL]
hash = props[self.HASH]
size = props[self.SIZE]
is_snapshot = props[self.IS_SNAPSHOT]
is_copy = not is_move and (src_account, src_container, src_name) != (
dest_account, dest_container, dest_name) # New uuid.
dest_version_id, size_delta = self._update_object_hash(
......@@ -1399,7 +1421,7 @@ class ModularBackend(BaseBackend):
None, dest_domain, dest_meta, replace_meta, permissions,
src_node=node, src_version_id=src_version_id, is_copy=is_copy,
report_size_change=(not bulk_report_size_change),