Commit 31f48b1a authored by Sofia Papagiannaki's avatar Sofia Papagiannaki

pithos: Enable stale snapshot better handling

Extend Pithos backend functionality in order to
be able to set the state of the snapshot.
This commit introduces the appropriate modifications
proposed in #231. More specifically:
* Changes 'available' flag (denoting the existence of the object mapfile
  in the backend) to integer.
* The 'available' field can be assigned with the following values:
  -1 (MAP_ERROR), 0 (MAP_UNAVAILABLE) and 1 (MAP_AVAILABLE)
* Provides a respective database migration script.
* Adds 'update_object_status' method in PithosBackend that is used in
  order to update the state of a Pithos object.
* Returns 400 (BAD REQUEST) response status when one tries to download a
  broken snapshot.
parent a61e0639
......@@ -60,7 +60,7 @@ from pithos.api.settings import (BACKEND_DB_MODULE, BACKEND_DB_CONNECTION,
from pithos.backends import connect_backend
from pithos.backends.base import (NotAllowedError, QuotaError, ItemNotExists,
VersionNotExists, IllegalOperationError,
LimitExceeded)
LimitExceeded, BrokenSnapshot)
from synnefo.lib import join_urls
......@@ -1127,6 +1127,8 @@ def api_method(http_method=None, token_required=True, user_required=True,
return response
except LimitExceeded, le:
raise faults.BadRequest(le.args[0])
except BrokenSnapshot, bs:
raise faults.BadRequest(bs.args[0])
finally:
# Always close PithosBackend connection
if getattr(request, "backend", None) is not None:
......
......@@ -18,6 +18,7 @@ DEFAULT_ACCOUNT_QUOTA = 0 # No quota.
DEFAULT_CONTAINER_QUOTA = 0 # No quota.
DEFAULT_CONTAINER_VERSIONING = 'auto'
(MAP_ERROR, MAP_UNAVAILABLE, MAP_AVAILABLE) = range(-1, 2)
class NotAllowedError(Exception):
pass
......@@ -71,6 +72,9 @@ class LimitExceeded(Exception):
pass
class BrokenSnapshot(Exception):
pass
class BaseBackend(object):
"""Abstract backend class.
......
"""Alter versions available possible states
Revision ID: 5adc52055209
Revises: 7107eaecd8c
Create Date: 2014-09-12 18:42:27.307379
"""
# revision identifiers, used by Alembic.
revision = '5adc52055209'
down_revision = '7107eaecd8c'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column('versions', sa.Column('temp', sa.INTEGER))
v = sa.sql.table(
'versions',
sa.sql.column('available', sa.Boolean),
sa.sql.column('temp', sa.Integer))
u = v.update().values({'temp': sa.case([(v.c.available ==
sa.sql.expression.true(), 1)],
else_=0)})
op.execute(u)
op.drop_column('versions', 'available')
op.add_column('versions', sa.Column('available', sa.INTEGER))
u = v.update().values({'available': v.c.temp})
op.execute(u)
op.drop_column('versions', 'temp')
def downgrade():
op.add_column('versions', sa.Column('temp', sa.Boolean))
v = sa.sql.table(
'versions',
sa.sql.column('available', sa.Boolean),
sa.sql.column('temp', sa.Integer))
u = v.update().values({'temp': sa.case([(v.c.available == 1,
sa.sql.expression.true())],
else_=False)})
op.execute(u)
op.drop_column('versions', 'available')
op.add_column('versions', sa.Column('available', sa.Boolean))
u = v.update().values({'available': v.c.temp})
op.execute(u)
op.drop_column('versions', 'temp')
......@@ -28,6 +28,7 @@ from sqlalchemy.exc import NoSuchTableError, IntegrityError
from dbworker import DBWorker, ESCAPE_CHAR
from pithos.backends.base import MAP_AVAILABLE
from pithos.backends.filter import parse_filters
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
......@@ -137,7 +138,7 @@ def create_tables(engine):
columns.append(Column('uuid', String(64), nullable=False, default=''))
columns.append(Column('checksum', String(256), nullable=False, default=''))
columns.append(Column('cluster', Integer, nullable=False, default=0))
columns.append(Column('available', Boolean, nullable=False, default=True))
columns.append(Column('available', Integer, nullable=False, default=1))
columns.append(Column('map_check_timestamp', DECIMAL(precision=16,
scale=6)))
columns.append(Column('mapfile', String(256)))
......@@ -811,7 +812,7 @@ class Node(DBWorker):
def version_create(self, node, hash, size, type, source, muser, uuid,
checksum, cluster=0,
update_statistics_ancestors_depth=None,
available=True, map_check_timestamp=None,
available=MAP_AVAILABLE, map_check_timestamp=None,
mapfile=None, is_snapshot=False):
"""Create a new version from the given properties.
Return the (serial, mtime, mapfile) of the new version.
......
......@@ -19,6 +19,7 @@ from itertools import groupby
from dbworker import DBWorker
from pithos.backends.base import MAP_AVAILABLE
from pithos.backends.filter import parse_filters
......@@ -133,7 +134,7 @@ class Node(DBWorker):
uuid text not null default '',
checksum text not null default '',
cluster integer not null default 0,
available boolean not null default true,
available integer not null default 1,
map_check_timestamp integer,
mapfile text,
is_snapshot boolean not null default false,
......@@ -573,7 +574,7 @@ class Node(DBWorker):
def version_create(self, node, hash, size, type, source, muser, uuid,
checksum, cluster=0,
update_statistics_ancestors_depth=None,
available=True, map_check_timestamp=None,
available=MAP_AVAILABLE, map_check_timestamp=None,
mapfile=True, is_snapshot=False):
"""Create a new version from the given properties.
Return the (serial, mtime, mapfile) of the new version.
......
......@@ -40,7 +40,8 @@ from pithos.backends.base import (
BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
ContainerNotEmpty, ItemNotExists, VersionNotExists,
InvalidHash, IllegalOperationError, InconsistentContentSize,
LimitExceeded, InvalidPolicy)
LimitExceeded, InvalidPolicy, BrokenSnapshot,
MAP_ERROR, MAP_UNAVAILABLE, MAP_AVAILABLE)
class DisabledAstakosClient(object):
......@@ -1024,7 +1025,7 @@ class ModularBackend(BaseBackend):
path, node = self._lookup_object(account, container, name)
props = self._get_version(node, version)
if version is None:
if not props[self.AVAILABLE]:
if props[self.AVAILABLE] == MAP_UNAVAILABLE:
try:
self._update_available(props)
except IllegalOperationError:
......@@ -1196,7 +1197,10 @@ class ModularBackend(BaseBackend):
def _update_available(self, props):
"""Checks if the object map exists and updates the database"""
if not props[self.AVAILABLE]:
if props[self.AVAILABLE] == MAP_ERROR:
raise BrokenSnapshot('This Archipelago volume is broken.')
if props[self.AVAILABLE] == MAP_UNAVAILABLE:
if props[self.MAP_CHECK_TIMESTAMP]:
elapsed_time = time() - float(props[self.MAP_CHECK_TIMESTAMP])
if elapsed_time < self.map_check_interval:
......@@ -1217,7 +1221,7 @@ class ModularBackend(BaseBackend):
'Unable to retrieve Archipelago volume hashmap')
else: # map exists
self.node.version_put_property(props[self.SERIAL],
'available', True)
'available', MAP_AVAILABLE)
self.node.version_put_property(props[self.SERIAL],
'map_check_timestamp', time())
return hashmap
......@@ -1263,8 +1267,9 @@ class ModularBackend(BaseBackend):
hash, checksum, domain, meta, replace_meta,
permissions, src_node=None, src_version_id=None,
is_copy=False, report_size_change=True,
available=True, keep_available=False,
available=None, keep_available=False,
force_mapfile=None, is_snapshot=False):
available = available if available is not None else MAP_AVAILABLE
if permissions is not None and user != account:
raise NotAllowedError
self._can_write_object(user, account, container, name)
......@@ -1389,11 +1394,24 @@ class ModularBackend(BaseBackend):
self.lock_container_path = False
dest_version_id, _, mapfile = self._update_object_hash(
user, account, container, name, size, type, mapfile, checksum,
domain, meta, replace_meta, permissions, available=False,
domain, meta, replace_meta, permissions, available=MAP_UNAVAILABLE,
force_mapfile=mapfile, is_snapshot=True)
return self.node.version_get_properties(dest_version_id,
keys=('uuid',))[0]
@debug_method
@backend_method
def update_object_status(self, uuid, state):
assert state in (MAP_ERROR,
MAP_UNAVAILABLE,
MAP_AVAILABLE), 'Invalid mapfile state'
uuid_ = self._validate_uuid(uuid)
info = self.node.latest_uuid(uuid_, CLUSTER_NORMAL)
if info is None:
raise NameError('No object found for this UUID.')
_, serial = info
self.node.version_put_property(serial, 'available', state)
@debug_method
def update_object_hashmap(self, user, account, container, name, size, type,
hashmap, checksum, domain, meta=None,
......@@ -1525,7 +1543,7 @@ class ModularBackend(BaseBackend):
is_copy = not is_move and (src_account, src_container, src_name) != (
dest_account, dest_container, dest_name) # New uuid.
if is_copy and not props[self.AVAILABLE]:
if is_copy and props[self.AVAILABLE] != MAP_AVAILABLE:
raise NotAllowedError('Copying objects not available in the '
'storage backend is forbidden.')
......@@ -1585,7 +1603,7 @@ class ModularBackend(BaseBackend):
delimiter) else dest_name
vdest_name = path.replace(prefix, dest_prefix, 1)
if is_copy and not prop[self.AVAILABLE]:
if is_copy and prop[self.AVAILABLE] != MAP_AVAILABLE:
raise NotAllowedError('Copying objects not available in '
'the storage backend is forbidden.')
......@@ -2004,7 +2022,7 @@ class ModularBackend(BaseBackend):
type=None, hash=None, checksum=None,
cluster=CLUSTER_NORMAL, is_copy=False,
update_statistics_ancestors_depth=None,
available=True, keep_available=True,
available=None, keep_available=True,
keep_src_mapfile=False,
force_mapfile=None,
is_snapshot=False):
......@@ -2021,7 +2039,7 @@ class ModularBackend(BaseBackend):
:raises ValueError: if it failed to create the new version
"""
available = available if available is not None else MAP_AVAILABLE
props = self.node.version_lookup(
node if src_node is None else src_node, inf, CLUSTER_NORMAL,
keys=_propnames)
......@@ -2478,11 +2496,11 @@ class ModularBackend(BaseBackend):
def _build_metadata(self, props, user_defined=None,
include_user_defined=True):
if not props[self.AVAILABLE]:
if props[self.AVAILABLE] == MAP_UNAVAILABLE:
try:
self._update_available(props)
except IllegalOperationError:
available = False
available = MAP_UNAVAILABLE
else:
available = self.node.version_get_properties(
props[self.SERIAL], keys=('available',))[0]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment