Commit 58a80472 authored by Sofia Papagiannaki's avatar Sofia Papagiannaki Committed by Christos Stavrakakis
Browse files

pithos: Lazy evaluation of "available" attribute

Whenever the backend tries to access the object's content
checks its database records whether the snapshot map is available or not.
If it is not, it pings Archipelago to check the status of the map.
If the map exists, then it updates the snapshot record in the Pithos
database and sets the "available" attribute to true.

Consequent Archipelago pings are limited.
The time interval limit for consequent Archipelago pings
is configured by the PITHOS_BACKEND_MAP_CHECK_INTERVAL setting.
parent b05ce287
......@@ -60,3 +60,9 @@
#
#Archipelago Configuration File
#PITHOS_BACKEND_ARCHIPELAGO_CONF = '/etc/archipelago/archipelago.conf'
#
# Archipelagp xseg pool size
#PITHOS_BACKEND_XSEG_POOL_SIZE = 8
#
# The maximum interval (in seconds) for consequent backend object map checks
#PITHOS_BACKEND_MAP_CHECK_INTERVAL = 300
......@@ -957,6 +957,8 @@ def _object_read(request, v_account, v_container, v_object):
raise faults.ItemNotFound('Object does not exist')
except VersionNotExists:
raise faults.ItemNotFound('Version does not exist')
except IllegalOperationError, e:
raise faults.Forbidden(str(e))
else:
try:
s, h = request.backend.get_object_hashmap(
......@@ -970,6 +972,8 @@ def _object_read(request, v_account, v_container, v_object):
raise faults.ItemNotFound('Object does not exist')
except VersionNotExists:
raise faults.ItemNotFound('Version does not exist')
except IllegalOperationError, e:
raise faults.Forbidden(str(e))
# Reply with the hashmap.
if hashmap_reply:
......@@ -1386,6 +1390,8 @@ def object_update(request, v_account, v_container, v_object):
raise faults.Forbidden('Not allowed')
except ItemNotExists:
raise faults.ItemNotFound('Object does not exist')
except IllegalOperationError, e:
raise faults.Forbidden(str(e))
offset, length, total = ranges
if offset is None:
......@@ -1411,6 +1417,8 @@ def object_update(request, v_account, v_container, v_object):
raise faults.Forbidden('Not allowed')
except ItemNotExists:
raise faults.ItemNotFound('Source object does not exist')
except IllegalOperationError, e:
raise faults.Forbidden(str(e))
if length is None:
length = src_size
......
......@@ -137,11 +137,6 @@ BACKEND_BLOCK_PATH = getattr(
settings, 'PITHOS_BACKEND_BLOCK_PATH', '/tmp/pithos-data/')
BACKEND_BLOCK_UMASK = getattr(settings, 'PITHOS_BACKEND_BLOCK_UMASK', 0o022)
# Archipelago Configuration File
BACKEND_ARCHIPELAGO_CONF = getattr(
settings, 'PITHOS_BACKEND_ARCHIPELAGO_CONF',
'/etc/archipelago/archipelago.conf')
# Queue for billing.
BACKEND_QUEUE_MODULE = getattr(settings, 'PITHOS_BACKEND_QUEUE_MODULE', None)
# Example: 'pithos.backends.lib.rabbitmq'
......@@ -208,3 +203,15 @@ OAUTH2_CLIENT_CREDENTIALS = getattr(settings,
# Set domain to restrict requests of pithos object contents serve endpoint or
# None for no domain restriction
UNSAFE_DOMAIN = getattr(settings, 'PITHOS_UNSAFE_DOMAIN', None)
# Archipelago Configuration File
BACKEND_ARCHIPELAGO_CONF = getattr(settings, 'PITHOS_BACKEND_ARCHIPELAGO_CONF',
'/etc/archipelago/archipelago.conf')
# Archipelagp xseg pool size
BACKEND_XSEG_POOL_SIZE = getattr(settings, 'PITHOS_BACKEND_XSEG_POOL_SIZE', 8)
# The maximum interval (in seconds) for consequent backend object map checks
BACKEND_MAP_CHECK_INTERVAL = getattr(settings,
'PITHOS_BACKEND_MAP_CHECK_INTERVAL',
300)
......@@ -64,6 +64,9 @@ from pithos.api.settings import (BACKEND_DB_MODULE, BACKEND_DB_CONNECTION,
BACKEND_VERSIONING, BACKEND_FREE_VERSIONING,
BACKEND_POOL_ENABLED, BACKEND_POOL_SIZE,
BACKEND_BLOCK_SIZE, BACKEND_HASH_ALGORITHM,
BACKEND_ARCHIPELAGO_CONF,
BACKEND_XSEG_POOL_SIZE,
BACKEND_MAP_CHECK_INTERVAL,
RADOS_STORAGE, RADOS_POOL_BLOCKS,
RADOS_POOL_MAPS, TRANSLATE_UUIDS,
PUBLIC_URL_SECURITY, PUBLIC_URL_ALPHABET,
......@@ -240,6 +243,10 @@ def put_object_headers(response, meta, restricted=False, token=None,
response.override_serialization = True
response['Content-Type'] = meta.get('type', 'application/octet-stream')
response['Last-Modified'] = http_date(int(meta['modified']))
response['Map-Exists'] = meta['available']
response['Map-Checked-At'] = (
http_date(int(meta['map_check_timestamp'])) if
meta['map_check_timestamp'] is not None else '')
if not restricted:
response['X-Object-Hash'] = meta['hash']
response['X-Object-UUID'] = meta['uuid']
......@@ -1044,7 +1051,10 @@ BACKEND_KWARGS = dict(
public_url_alphabet=PUBLIC_URL_ALPHABET,
account_quota_policy=BACKEND_ACCOUNT_QUOTA,
container_quota_policy=BACKEND_CONTAINER_QUOTA,
container_versioning_policy=BACKEND_VERSIONING)
container_versioning_policy=BACKEND_VERSIONING,
archipelago_conf_file=BACKEND_ARCHIPELAGO_CONF,
xseg_pool_size=BACKEND_XSEG_POOL_SIZE,
map_check_interval=BACKEND_MAP_CHECK_INTERVAL)
_pithos_backend_pool = PithosBackendPool(size=BACKEND_POOL_SIZE,
**BACKEND_KWARGS)
......
......@@ -33,7 +33,8 @@
from dbwrapper import DBWrapper
from node import (Node, ROOTNODE, SERIAL, NODE, HASH, SIZE, TYPE, MTIME, MUSER,
UUID, CHECKSUM, CLUSTER, MATCH_PREFIX, MATCH_EXACT)
UUID, CHECKSUM, CLUSTER, MATCH_PREFIX, MATCH_EXACT,
AVAILABLE, MAP_CHECK_TIMESTAMP)
from permissions import Permissions, READ, WRITE
from config import Config
from quotaholder_serials import QuotaholderSerial
......@@ -41,5 +42,6 @@ from quotaholder_serials import QuotaholderSerial
__all__ = ["DBWrapper",
"Node", "ROOTNODE", "NODE", "SERIAL", "HASH", "SIZE", "TYPE",
"MTIME", "MUSER", "UUID", "CHECKSUM", "CLUSTER", "MATCH_PREFIX",
"MATCH_EXACT", "Permissions", "READ", "WRITE", "Config",
"MATCH_EXACT", "AVAILABLE", "MAP_CHECK_TIMESTAMP",
"Permissions", "READ", "WRITE", "Config",
"QuotaholderSerial"]
......@@ -51,7 +51,7 @@ DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
ROOTNODE = 0
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
CLUSTER) = range(11)
CLUSTER, AVAILABLE, MAP_CHECK_TIMESTAMP) = range(13)
(MATCH_PREFIX, MATCH_EXACT) = range(2)
......@@ -105,6 +105,8 @@ _propnames = {
'uuid': 8,
'checksum': 9,
'cluster': 10,
'available':11,
'map_check_timestamp':12
}
......@@ -167,6 +169,9 @@ def create_tables(engine):
columns.append(Column('uuid', String(64), nullable=False, default=''))
columns.append(Column('checksum', String(256), nullable=False, default=''))
columns.append(Column('cluster', Integer, nullable=False, default=0))
columns.append(Column('available', Boolean, nullable=False, default=True))
columns.append(Column('map_check_timestamp', DECIMAL(precision=16,
scale=6)))
versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
Index('idx_versions_node_uuid', versions.c.uuid)
......@@ -301,7 +306,7 @@ class Node(DBWorker):
"""Return the properties of all versions at node.
If keys is empty, return all properties in the order
(serial, node, hash, size, type, source, mtime, muser, uuid,
checksum, cluster).
checksum, cluster, available, map_check_timestamp).
"""
s = select([self.versions.c.serial,
......@@ -314,7 +319,10 @@ class Node(DBWorker):
self.versions.c.muser,
self.versions.c.uuid,
self.versions.c.checksum,
self.versions.c.cluster], self.versions.c.node == node)
self.versions.c.cluster,
self.versions.c.available,
self.versions.c.map_check_timestamp],
self.versions.c.node == node)
s = s.order_by(self.versions.c.serial)
r = self.conn.execute(s)
rows = r.fetchall()
......@@ -777,7 +785,8 @@ class Node(DBWorker):
def version_create(self, node, hash, size, type, source, muser, uuid,
checksum, cluster=0,
update_statistics_ancestors_depth=None):
update_statistics_ancestors_depth=None,
available=True):
"""Create a new version from the given properties.
Return the (serial, mtime) of the new version.
"""
......@@ -786,7 +795,7 @@ class Node(DBWorker):
s = self.versions.insert().values(
node=node, hash=hash, size=size, type=type, source=source,
mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
cluster=cluster)
cluster=cluster, available=available)
serial = self.conn.execute(s).inserted_primary_key[0]
self.statistics_update_ancestors(node, 1, size, mtime, cluster,
update_statistics_ancestors_depth)
......@@ -799,7 +808,7 @@ class Node(DBWorker):
"""Lookup the current version of the given node.
Return a list with its properties:
(serial, node, hash, size, type, source, mtime,
muser, uuid, checksum, cluster)
muser, uuid, checksum, cluster, available, map_check_timestamp)
or None if the current version is not found in the given cluster.
"""
......@@ -810,7 +819,8 @@ class Node(DBWorker):
s = select([v.c.serial, v.c.node, v.c.hash,
v.c.size, v.c.type, v.c.source,
v.c.mtime, v.c.muser, v.c.uuid,
v.c.checksum, v.c.cluster])
v.c.checksum, v.c.cluster,
v.c.available, v.c.map_check_timestamp])
if before != inf:
c = select([func.max(self.versions.c.serial)],
self.versions.c.node == node)
......@@ -832,7 +842,7 @@ class Node(DBWorker):
"""Lookup the current versions of the given nodes.
Return a list with their properties:
(serial, node, hash, size, type, source, mtime, muser, uuid,
checksum, cluster).
checksum, cluster, available, map_check_timestamp).
"""
if not nodes:
return ()
......@@ -844,7 +854,8 @@ class Node(DBWorker):
s = select([v.c.serial, v.c.node, v.c.hash,
v.c.size, v.c.type, v.c.source,
v.c.mtime, v.c.muser, v.c.uuid,
v.c.checksum, v.c.cluster])
v.c.checksum, v.c.cluster,
v.c.available, v.c.map_check_timestamp])
if before != inf:
c = select([func.max(self.versions.c.serial)],
self.versions.c.node.in_(nodes))
......@@ -871,14 +882,16 @@ class Node(DBWorker):
the version specified by serial and the keys, in the order given.
If keys is empty, return all properties in the order
(serial, node, hash, size, type, source, mtime, muser, uuid,
checksum, cluster).
checksum, cluster, available, map_check_timestamp).
"""
v = self.versions.alias()
s = select([v.c.serial, v.c.node, v.c.hash,
v.c.size, v.c.type, v.c.source,
v.c.mtime, v.c.muser, v.c.uuid,
v.c.checksum, v.c.cluster], v.c.serial == serial)
v.c.checksum, v.c.cluster,
v.c.available, v.c.map_check_timestamp],
v.c.serial == serial)
if node is not None:
s = s.where(v.c.node == node)
rp = self.conn.execute(s)
......
......@@ -40,6 +40,7 @@ import binascii
from collections import defaultdict
from functools import wraps, partial
from traceback import format_exc
from time import time
from pithos.workers import glue
from archipelago.common import Segment, Xseg_ctx
......@@ -133,6 +134,8 @@ ULTIMATE_ANSWER = 42
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
DEFAULT_MAP_CHECK_INTERVAL = 300 # set to 300 secs
logger = logging.getLogger(__name__)
......@@ -241,7 +244,8 @@ class ModularBackend(BaseBackend):
container_quota_policy=None,
container_versioning_policy=None,
archipelago_conf_file=None,
xseg_pool_size=8):
xseg_pool_size=8,
map_check_interval=None):
db_module = db_module or DEFAULT_DB_MODULE
db_connection = db_connection or DEFAULT_DB_CONNECTION
block_module = block_module or DEFAULT_BLOCK_MODULE
......@@ -258,6 +262,8 @@ class ModularBackend(BaseBackend):
or DEFAULT_CONTAINER_VERSIONING
archipelago_conf_file = archipelago_conf_file \
or DEFAULT_ARCHIPELAGO_CONF_FILE
map_check_interval = map_check_interval \
or DEFAULT_MAP_CHECK_INTERVAL
self.default_account_policy = {}
self.default_container_policy = {
......@@ -276,6 +282,7 @@ class ModularBackend(BaseBackend):
self.hash_algorithm = hash_algorithm
self.block_size = block_size
self.free_versioning = free_versioning
self.map_check_interval = map_check_interval
def load_module(m):
__import__(m)
......@@ -292,7 +299,8 @@ class ModularBackend(BaseBackend):
self.node = self.db_module.Node(**params)
for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
'MATCH_PREFIX', 'MATCH_EXACT']:
'MATCH_PREFIX', 'MATCH_EXACT',
'AVAILABLE', 'MAP_CHECK_TIMESTAMP']:
setattr(self, x, getattr(self.db_module, x))
self.ALLOWED = ['read', 'write']
......@@ -988,7 +996,9 @@ class ModularBackend(BaseBackend):
'modified': modified,
'modified_by': props[self.MUSER],
'uuid': props[self.UUID],
'checksum': props[self.CHECKSUM]})
'checksum': props[self.CHECKSUM],
'available': props[self.AVAILABLE],
'map_check_timestamp': props[self.MAP_CHECK_TIMESTAMP]})
return meta
@debug_method
......@@ -1108,6 +1118,36 @@ class ModularBackend(BaseBackend):
self.permissions.public_set(
path, self.public_url_security, self.public_url_alphabet)
def _update_available(self, props):
"""Checks if the object map exists and updates the database"""
if not props[self.AVAILABLE]:
if props[self.MAP_CHECK_TIMESTAMP]:
elapsed_time = time() - float(props[self.MAP_CHECK_TIMESTAMP])
if elapsed_time < self.map_check_interval:
raise NotAllowedError(
'Consequent map checks are limited: retry later.')
try:
hashmap = self.store.map_get_archipelago(props[self.HASH],
props[self.SIZE])
except: # map does not exist
# Raising an exception results in db transaction rollback
# However we have to force the update of the database
self.wrapper.rollback() # rollback existing transaction
self.wrapper.execute() # start new transaction
self.node.version_put_property(props[self.SERIAL],
'map_check_timestamp', time())
self.wrapper.commit() # commit transaction
self.wrapper.execute() # start new transaction
raise IllegalOperationError(
'Unable to retrieve Archipelago Volume hashmap.')
else: # map exists
self.node.version_put_property(props[self.SERIAL],
'available', True)
self.node.version_put_property(props[self.SERIAL],
'map_check_timestamp', time())
return hashmap
@debug_method
@backend_method
def get_object_hashmap(self, user, account, container, name, version=None):
......@@ -1119,8 +1159,7 @@ class ModularBackend(BaseBackend):
if props[self.HASH] is None:
return 0, ()
if props[self.HASH].startswith('archip:'):
hashmap = self.store.map_get_archipelago(props[self.HASH],
props[self.SIZE])
hashmap = self._update_available(props)
return props[self.SIZE], [x for x in hashmap]
else:
hashmap = self.store.map_get(self._unhexlify_hash(
......@@ -1130,7 +1169,8 @@ class ModularBackend(BaseBackend):
def _update_object_hash(self, user, account, container, name, size, type,
hash, checksum, domain, meta, replace_meta,
permissions, src_node=None, src_version_id=None,
is_copy=False, report_size_change=True):
is_copy=False, report_size_change=True,
available=True):
if permissions is not None and user != account:
raise NotAllowedError
self._can_write_object(user, account, container, name)
......@@ -1148,7 +1188,7 @@ class ModularBackend(BaseBackend):
pre_version_id, dest_version_id = self._put_version_duplicate(
user, node, src_node=src_node, size=size, type=type, hash=hash,
checksum=checksum, is_copy=is_copy,
update_statistics_ancestors_depth=1)
update_statistics_ancestors_depth=1, available=available)
# Handle meta.
if src_version_id is None:
......@@ -1250,7 +1290,7 @@ class ModularBackend(BaseBackend):
self.lock_container_path = False
dest_version_id = self._update_object_hash(
user, account, container, name, size, type, mapfile, checksum,
domain, meta, replace_meta, permissions)
domain, meta, replace_meta, permissions, available=False)
return self.node.version_get_properties(dest_version_id,
keys=('uuid',))[0]
......@@ -1699,7 +1739,8 @@ class ModularBackend(BaseBackend):
def _put_version_duplicate(self, user, node, src_node=None, size=None,
type=None, hash=None, checksum=None,
cluster=CLUSTER_NORMAL, is_copy=False,
update_statistics_ancestors_depth=None):
update_statistics_ancestors_depth=None,
available=True):
"""Create a new version of the node."""
props = self.node.version_lookup(
......@@ -1740,7 +1781,8 @@ class ModularBackend(BaseBackend):
dest_version_id, mtime = self.node.version_create(
node, hash, size, type, src_version_id, user, uuid, checksum,
cluster, update_statistics_ancestors_depth)
cluster, update_statistics_ancestors_depth,
available=available)
self.node.attribute_unset_is_latest(node, dest_version_id)
......
......@@ -53,7 +53,10 @@ class PithosBackendPool(ObjectPool):
public_url_alphabet=None,
account_quota_policy=None,
container_quota_policy=None,
container_versioning_policy=None):
container_versioning_policy=None,
archipelago_conf_file=None,
xseg_pool_size=8,
map_check_interval=None):
super(PithosBackendPool, self).__init__(size=size)
self.db_module = db_module
self.db_connection = db_connection
......@@ -75,6 +78,9 @@ class PithosBackendPool(ObjectPool):
self.account_quota_policy = account_quota_policy
self.container_quota_policy = container_quota_policy
self.container_versioning_policy = container_versioning_policy
self.archipelago_conf_file = archipelago_conf_file
self.xseg_pool_size = xseg_pool_size
self.map_check_interval = map_check_interval
def _pool_create(self):
backend = connect_backend(
......@@ -97,7 +103,10 @@ class PithosBackendPool(ObjectPool):
public_url_alphabet=self.public_url_alphabet,
account_quota_policy=self.account_quota_policy,
container_quota_policy=self.container_quota_policy,
container_versioning_policy=self.container_versioning_policy)
container_versioning_policy=self.container_versioning_policy,
archipelago_conf_file=self.archipelago_conf_file,
xseg_pool_size=self.xseg_pool_size,
map_check_interval=self.map_check_interval)
backend._real_close = backend.close
backend.close = instancemethod(_pooled_backend_close, backend,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment