Skip to content
Snippets Groups Projects
  • Georgios D. Tsoukalas's avatar
    Fix UUID bug breaking image list and VM creation · 2e4ad3b1
    Georgios D. Tsoukalas authored
    latest_uuid(uuid) returned the latest version with the given uuid.
    This is called from pithos backend method get_uuid(),
    which is called from plankton backend method get_image().
    
    For the use in get_image() the uuid alone is not enough,
    since we need the latest version that is accessible,
    i.e. not deleted or trashed (in pithos parlance, in CLUSTER_NORMAL)
    
    The problematic scenario is cause by a MOVEing (renaming)
    a plankton image to a different name.
    Internally, pithos handles a 'move src dest' as a
    'copy src dest' + 'delete src'.
    Both copy and delete create a new version of the source,
    with the same UUID.
    We need the 'copy' version but because the delete is executed
    after the copy, the 'deleted' version becomes the latest,
    thus the one latest_uuid() would return.
    
    This has been fixed by adding a mandatory cluster argument:
    latest_uid(uuid, cluster), to which get_uuid() now specifies
    CLUSTER_NORMAL.
    The previous cluster-unaware behavior can be obtained by
    specifying cluster as None.
    
    Refs #3391
    2e4ad3b1
modular.py 62.78 KiB
# Copyright 2011-2012 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
#      copyright notice, this list of conditions and the following
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
#      copyright notice, this list of conditions and the following
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

import sys
import os
import time
import uuid as uuidlib
import logging
import hashlib
import binascii

from kamaki.clients.quotaholder import QuotaholderClient

from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists

# Stripped-down version of the HashMap class found in tools.


class HashMap(list):

    def __init__(self, blocksize, blockhash):
        super(HashMap, self).__init__()
        self.blocksize = blocksize
        self.blockhash = blockhash

    def _hash_raw(self, v):
        h = hashlib.new(self.blockhash)
        h.update(v)
        return h.digest()

    def hash(self):
        if len(self) == 0:
            return self._hash_raw('')
        if len(self) == 1:
            return self.__getitem__(0)

        h = list(self)
        s = 2
        while s < len(h):
            s = s * 2
        h += [('\x00' * len(h[0]))] * (s - len(h))
        while len(h) > 1:
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
        return h[0]

# Default modules and settings.
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
DEFAULT_BLOCK_PATH = 'data/'
DEFAULT_BLOCK_UMASK = 0o022
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
#DEFAULT_QUEUE_EXCHANGE = 'pithos'

QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
QUEUE_CLIENT_ID = 'pithos'
QUEUE_INSTANCE_ID = '1'

(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)

inf = float('inf')

ULTIMATE_ANSWER = 42


logger = logging.getLogger(__name__)


def backend_method(func=None, autocommit=1):
    if func is None:
        def fn(func):
            return backend_method(func, autocommit)
        return fn

    if not autocommit:
        return func

    def fn(self, *args, **kw):
        self.wrapper.execute()
        serials = []
        self.serials = serials
        self.messages = []

        try:
            ret = func(self, *args, **kw)
            for m in self.messages:
                self.queue.send(*m)
            if serials:
                self.quotaholder.accept_commission(
                            context     =   {},
                            clientkey   =   'pithos',
                            serials     =   serials)
            self.wrapper.commit()
            return ret
        except:
            if serials:
                self.quotaholder.reject_commission(
                            context     =   {},
                            clientkey   =   'pithos',
                            serials     =   serials)
            self.wrapper.rollback()
            raise
    return fn


class ModularBackend(BaseBackend):
    """A modular backend.

    Uses modules for SQL functions and storage.
    """

    def __init__(self, db_module=None, db_connection=None,
                 block_module=None, block_path=None, block_umask=None,
                 queue_module=None, queue_hosts=None, queue_exchange=None,
                 quotaholder_enabled=True,
                 quotaholder_url=None, quotaholder_token=None,
                 free_versioning=True, block_params=None):
        db_module = db_module or DEFAULT_DB_MODULE
        db_connection = db_connection or DEFAULT_DB_CONNECTION
        block_module = block_module or DEFAULT_BLOCK_MODULE
        block_path = block_path or DEFAULT_BLOCK_PATH
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
        block_params = block_params or DEFAULT_BLOCK_PARAMS
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE

        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE

        self.hash_algorithm = 'sha256'
        self.block_size = 4 * 1024 * 1024  # 4MB
        self.free_versioning = free_versioning

        self.default_policy = {'quota': DEFAULT_QUOTA,
                               'versioning': DEFAULT_VERSIONING}

        def load_module(m):
            __import__(m)
            return sys.modules[m]

        self.db_module = load_module(db_module)
        self.wrapper = self.db_module.DBWrapper(db_connection)
        params = {'wrapper': self.wrapper}
        self.permissions = self.db_module.Permissions(**params)
        self.config = self.db_module.Config(**params)
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
        for x in ['READ', 'WRITE']:
            setattr(self, x, getattr(self.db_module, x))
        self.node = self.db_module.Node(**params)
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
            setattr(self, x, getattr(self.db_module, x))

        self.block_module = load_module(block_module)
        self.block_params = block_params
        params = {'path': block_path,
                  'block_size': self.block_size,
                  'hash_algorithm': self.hash_algorithm,
                  'umask': block_umask}
        params.update(self.block_params)
        self.store = self.block_module.Store(**params)

        if queue_module and queue_hosts:
            self.queue_module = load_module(queue_module)
            params = {'hosts': queue_hosts,
                      'exchange': queue_exchange,
                      'client_id': QUEUE_CLIENT_ID}
            self.queue = self.queue_module.Queue(**params)
        else:
            class NoQueue:
                def send(self, *args):
                    pass

                def close(self):
                    pass

            self.queue = NoQueue()

        self.quotaholder_enabled = quotaholder_enabled
        self.quotaholder_url = quotaholder_url
        self.quotaholder_token = quotaholder_token
        self.quotaholder = QuotaholderClient(quotaholder_url, quotaholder_token)
        self.serials = []
        self.messages = []

    def close(self):
        self.wrapper.close()
        self.queue.close()

    @property
    def using_external_quotaholder(self):
        return self.quotaholder_enabled

    @backend_method
    def list_accounts(self, user, marker=None, limit=10000):
        """Return a list of accounts the user can access."""

        logger.debug("list_accounts: %s %s %s", user, marker, limit)
        allowed = self._allowed_accounts(user)
        start, limit = self._list_limits(allowed, marker, limit)
        return allowed[start:start + limit]

    @backend_method
    def get_account_meta(
            self, user, account, domain, until=None, include_user_defined=True,
            external_quota=None):
        """Return a dictionary with the account metadata for the domain."""

        logger.debug(
            "get_account_meta: %s %s %s %s", user, account, domain, until)
        path, node = self._lookup_account(account, user == account)
        if user != account:
            if until or node is None or account not in self._allowed_accounts(user):
                raise NotAllowedError
        try:
            props = self._get_properties(node, until)
            mtime = props[self.MTIME]
        except NameError:
            props = None
            mtime = until
        count, bytes, tstamp = self._get_statistics(node, until)
        tstamp = max(tstamp, mtime)
        if until is None:
            modified = tstamp
        else:
            modified = self._get_statistics(
                node)[2]  # Overall last modification.
            modified = max(modified, mtime)

        if user != account:
            meta = {'name': account}
        else:
            meta = {}
            if props is not None and include_user_defined:
                meta.update(
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
            if until is not None:
                meta.update({'until_timestamp': tstamp})
            meta.update({'name': account, 'count': count, 'bytes': bytes})
            if self.using_external_quotaholder:
                external_quota = external_quota or {}
                meta['bytes'] = external_quota.get('currValue', 0)
        meta.update({'modified': modified})
        return meta

    @backend_method
    def update_account_meta(self, user, account, domain, meta, replace=False):
        """Update the metadata associated with the account for the domain."""

        logger.debug("update_account_meta: %s %s %s %s %s", user,
                     account, domain, meta, replace)
        if user != account:
            raise NotAllowedError
        path, node = self._lookup_account(account, True)
        self._put_metadata(user, node, domain, meta, replace)

    @backend_method
    def get_account_groups(self, user, account):
        """Return a dictionary with the user groups defined for this account."""

        logger.debug("get_account_groups: %s %s", user, account)
        if user != account:
            if account not in self._allowed_accounts(user):
                raise NotAllowedError
            return {}
        self._lookup_account(account, True)
        return self.permissions.group_dict(account)

    @backend_method
    def update_account_groups(self, user, account, groups, replace=False):
        """Update the groups associated with the account."""

        logger.debug("update_account_groups: %s %s %s %s", user,
                     account, groups, replace)
        if user != account:
            raise NotAllowedError
        self._lookup_account(account, True)
        self._check_groups(groups)
        if replace:
            self.permissions.group_destroy(account)
        for k, v in groups.iteritems():
            if not replace:  # If not already deleted.
                self.permissions.group_delete(account, k)
            if v:
                self.permissions.group_addmany(account, k, v)

    @backend_method
    def get_account_policy(self, user, account, external_quota=None):
        """Return a dictionary with the account policy."""

        logger.debug("get_account_policy: %s %s", user, account)
        if user != account:
            if account not in self._allowed_accounts(user):
                raise NotAllowedError
            return {}
        path, node = self._lookup_account(account, True)
        policy = self._get_policy(node)
        if self.using_external_quotaholder:
            external_quota = external_quota or {}
            policy['quota'] = external_quota.get('maxValue', 0)
        return policy

    @backend_method
    def update_account_policy(self, user, account, policy, replace=False):
        """Update the policy associated with the account."""

        logger.debug("update_account_policy: %s %s %s %s", user,
                     account, policy, replace)
        if user != account:
            raise NotAllowedError
        path, node = self._lookup_account(account, True)
        self._check_policy(policy)
        self._put_policy(node, policy, replace)

    @backend_method
    def put_account(self, user, account, policy={}):
        """Create a new account with the given name."""

        logger.debug("put_account: %s %s %s", user, account, policy)
        if user != account:
            raise NotAllowedError
        node = self.node.node_lookup(account)
        if node is not None:
            raise AccountExists('Account already exists')
        if policy:
            self._check_policy(policy)
        node = self._put_path(user, self.ROOTNODE, account)
        self._put_policy(node, policy, True)

    @backend_method
    def delete_account(self, user, account):
        """Delete the account with the given name."""

        logger.debug("delete_account: %s %s", user, account)
        if user != account:
            raise NotAllowedError
        node = self.node.node_lookup(account)
        if node is None:
            return
        if not self.node.node_remove(node):
            raise AccountNotEmpty('Account is not empty')
        self.permissions.group_destroy(account)

    @backend_method
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
        """Return a list of containers existing under an account."""

        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
                     account, marker, limit, shared, until, public)
        if user != account:
            if until or account not in self._allowed_accounts(user):
                raise NotAllowedError
            allowed = self._allowed_containers(user, account)
            start, limit = self._list_limits(allowed, marker, limit)
            return allowed[start:start + limit]
        if shared or public:
            allowed = set()
            if shared:
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
            if public:
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
            allowed = sorted(allowed)
            start, limit = self._list_limits(allowed, marker, limit)
            return allowed[start:start + limit]
        node = self.node.node_lookup(account)
        containers = [x[0] for x in self._list_object_properties(
            node, account, '', '/', marker, limit, False, None, [], until)]
        start, limit = self._list_limits(
            [x[0] for x in containers], marker, limit)
        return containers[start:start + limit]

    @backend_method
    def list_container_meta(self, user, account, container, domain, until=None):
        """Return a list with all the container's object meta keys for the domain."""

        logger.debug("list_container_meta: %s %s %s %s %s", user,
                     account, container, domain, until)
        allowed = []
        if user != account:
            if until:
                raise NotAllowedError
            allowed = self.permissions.access_list_paths(
                user, '/'.join((account, container)))
            if not allowed:
                raise NotAllowedError
        path, node = self._lookup_container(account, container)
        before = until if until is not None else inf
        allowed = self._get_formatted_paths(allowed)
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)

    @backend_method
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
        """Return a dictionary with the container metadata for the domain."""

        logger.debug("get_container_meta: %s %s %s %s %s", user,
                     account, container, domain, until)
        if user != account:
            if until or container not in self._allowed_containers(user, account):
                raise NotAllowedError
        path, node = self._lookup_container(account, container)
        props = self._get_properties(node, until)
        mtime = props[self.MTIME]
        count, bytes, tstamp = self._get_statistics(node, until)
        tstamp = max(tstamp, mtime)
        if until is None:
            modified = tstamp
        else:
            modified = self._get_statistics(
                node)[2]  # Overall last modification.
            modified = max(modified, mtime)

        if user != account:
            meta = {'name': container}
        else:
            meta = {}
            if include_user_defined:
                meta.update(
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
            if until is not None:
                meta.update({'until_timestamp': tstamp})
            meta.update({'name': container, 'count': count, 'bytes': bytes})
        meta.update({'modified': modified})
        return meta

    @backend_method
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
        """Update the metadata associated with the container for the domain."""

        logger.debug("update_container_meta: %s %s %s %s %s %s",
                     user, account, container, domain, meta, replace)
        if user != account:
            raise NotAllowedError
        path, node = self._lookup_container(account, container)
        src_version_id, dest_version_id = self._put_metadata(
            user, node, domain, meta, replace)
        if src_version_id is not None:
            versioning = self._get_policy(node)['versioning']
            if versioning != 'auto':
                self.node.version_remove(src_version_id)

    @backend_method
    def get_container_policy(self, user, account, container):
        """Return a dictionary with the container policy."""

        logger.debug(
            "get_container_policy: %s %s %s", user, account, container)
        if user != account:
            if container not in self._allowed_containers(user, account):
                raise NotAllowedError
            return {}
        path, node = self._lookup_container(account, container)
        return self._get_policy(node)

    @backend_method
    def update_container_policy(self, user, account, container, policy, replace=False):
        """Update the policy associated with the container."""

        logger.debug("update_container_policy: %s %s %s %s %s",
                     user, account, container, policy, replace)
        if user != account:
            raise NotAllowedError
        path, node = self._lookup_container(account, container)
        self._check_policy(policy)
        self._put_policy(node, policy, replace)

    @backend_method
    def put_container(self, user, account, container, policy={}):
        """Create a new container with the given name."""

        logger.debug(
            "put_container: %s %s %s %s", user, account, container, policy)
        if user != account:
            raise NotAllowedError
        try:
            path, node = self._lookup_container(account, container)
        except NameError:
            pass
        else:
            raise ContainerExists('Container already exists')
        if policy:
            self._check_policy(policy)
        path = '/'.join((account, container))
        node = self._put_path(
            user, self._lookup_account(account, True)[1], path)
        self._put_policy(node, policy, True)

    @backend_method
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
        """Delete/purge the container with the given name."""

        logger.debug("delete_container: %s %s %s %s %s %s", user,
                     account, container, until, prefix, delimiter)
        if user != account:
            raise NotAllowedError
        path, node = self._lookup_container(account, container)

        if until is not None:
            hashes, size, serials = self.node.node_purge_children(
                node, until, CLUSTER_HISTORY)
            for h in hashes:
                self.store.map_delete(h)
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
            self._report_size_change(user, account, -size,
                                     {'action':'container purge', 'path': path,
                                      'versions': ','.join(str(i) for i in serials)})
            return

        if not delimiter:
            if self._get_statistics(node)[0] > 0:
                raise ContainerNotEmpty('Container is not empty')
            hashes, size, serials = self.node.node_purge_children(
                node, inf, CLUSTER_HISTORY)
            for h in hashes:
                self.store.map_delete(h)
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
            self.node.node_remove(node)
            self._report_size_change(user, account, -size,
                                     {'action': 'container delete',
                                      'path': path,
                                      'versions': ','.join(str(i) for i in serials)})
        else:
            # remove only contents
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
            paths = []
            for t in src_names:
                path = '/'.join((account, container, t[0]))
                node = t[2]
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
                del_size = self._apply_versioning(
                    account, container, src_version_id)
                self._report_size_change(
                        user, account, -del_size, {
                                'action': 'object delete',
                                'path': path,
                        'versions': ','.join([str(dest_version_id)])
                     }
                )
                self._report_object_change(
                    user, account, path, details={'action': 'object delete'})
                paths.append(path)
            self.permissions.access_clear_bulk(paths)

    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
        if user != account and until:
            raise NotAllowedError
        if shared and public:
            # get shared first
            shared = self._list_object_permissions(
                user, account, container, prefix, shared=True, public=False)
            objects = set()
            if shared:
                path, node = self._lookup_container(account, container)
                shared = self._get_formatted_paths(shared)
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))

            # get public
            objects |= set(self._list_public_object_properties(
                user, account, container, prefix, all_props))
            objects = list(objects)

            objects.sort(key=lambda x: x[0])
            start, limit = self._list_limits(
                [x[0] for x in objects], marker, limit)
            return objects[start:start + limit]
        elif public:
            objects = self._list_public_object_properties(
                user, account, container, prefix, all_props)
            start, limit = self._list_limits(
                [x[0] for x in objects], marker, limit)
            return objects[start:start + limit]

        allowed = self._list_object_permissions(
            user, account, container, prefix, shared, public)
        if shared and not allowed:
            return []
        path, node = self._lookup_container(account, container)
        allowed = self._get_formatted_paths(allowed)
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
        start, limit = self._list_limits(
            [x[0] for x in objects], marker, limit)
        return objects[start:start + limit]

    def _list_public_object_properties(self, user, account, container, prefix, all_props):
        public = self._list_object_permissions(
            user, account, container, prefix, shared=False, public=True)
        paths, nodes = self._lookup_objects(public)
        path = '/'.join((account, container))
        cont_prefix = path + '/'
        paths = [x[len(cont_prefix):] for x in paths]
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
        objects = [(path,) + props for path, props in zip(paths, props)]
        return objects

    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
        objects = []
        while True:
            marker = objects[-1] if objects else None
            limit = 10000
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
            objects.extend(l)
            if not l or len(l) < limit:
                break
        return objects

    def _list_object_permissions(self, user, account, container, prefix, shared, public):
        allowed = []
        path = '/'.join((account, container, prefix)).rstrip('/')
        if user != account:
            allowed = self.permissions.access_list_paths(user, path)
            if not allowed:
                raise NotAllowedError
        else:
            allowed = set()
            if shared:
                allowed.update(self.permissions.access_list_shared(path))
            if public:
                allowed.update(
                    [x[0] for x in self.permissions.public_list(path)])
            allowed = sorted(allowed)
            if not allowed:
                return []
        return allowed

    @backend_method
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
        """Return a list of object (name, version_id) tuples existing under a container."""

        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)

    @backend_method
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
        """Return a list of object metadata dicts existing under a container."""

        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
        objects = []
        for p in props:
            if len(p) == 2:
                objects.append({'subdir': p[0]})
            else:
                objects.append({'name': p[0],
                                'bytes': p[self.SIZE + 1],
                                'type': p[self.TYPE + 1],
                                'hash': p[self.HASH + 1],
                                'version': p[self.SERIAL + 1],
                                'version_timestamp': p[self.MTIME + 1],
                                'modified': p[self.MTIME + 1] if until is None else None,
                                'modified_by': p[self.MUSER + 1],
                                'uuid': p[self.UUID + 1],
                                'checksum': p[self.CHECKSUM + 1]})
        return objects

    @backend_method
    def list_object_permissions(self, user, account, container, prefix=''):
        """Return a list of paths that enforce permissions under a container."""

        logger.debug("list_object_permissions: %s %s %s %s", user,
                     account, container, prefix)
        return self._list_object_permissions(user, account, container, prefix, True, False)

    @backend_method
    def list_object_public(self, user, account, container, prefix=''):
        """Return a dict mapping paths to public ids for objects that are public under a container."""

        logger.debug("list_object_public: %s %s %s %s", user,
                     account, container, prefix)
        public = {}
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
            public[path] = p + ULTIMATE_ANSWER
        return public

    @backend_method
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
        """Return a dictionary with the object metadata for the domain."""

        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
                     account, container, name, domain, version)
        self._can_read(user, account, container, name)
        path, node = self._lookup_object(account, container, name)
        props = self._get_version(node, version)
        if version is None:
            modified = props[self.MTIME]
        else:
            try:
                modified = self._get_version(
                    node)[self.MTIME]  # Overall last modification.
            except NameError:  # Object may be deleted.
                del_props = self.node.version_lookup(
                    node, inf, CLUSTER_DELETED)
                if del_props is None:
                    raise ItemNotExists('Object does not exist')
                modified = del_props[self.MTIME]

        meta = {}
        if include_user_defined:
            meta.update(
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
        meta.update({'name': name,
                     'bytes': props[self.SIZE],
                     'type': props[self.TYPE],
                     'hash': props[self.HASH],
                     'version': props[self.SERIAL],
                     'version_timestamp': props[self.MTIME],
                     'modified': modified,
                     'modified_by': props[self.MUSER],
                     'uuid': props[self.UUID],
                     'checksum': props[self.CHECKSUM]})
        return meta

    @backend_method
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
        """Update the metadata associated with the object for the domain and return the new version."""

        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
                     user, account, container, name, domain, meta, replace)
        self._can_write(user, account, container, name)
        path, node = self._lookup_object(account, container, name)
        src_version_id, dest_version_id = self._put_metadata(
            user, node, domain, meta, replace)
        self._apply_versioning(account, container, src_version_id)
        return dest_version_id

    @backend_method
    def get_object_permissions(self, user, account, container, name):
        """Return the action allowed on the object, the path
        from which the object gets its permissions from,
        along with a dictionary containing the permissions."""

        logger.debug("get_object_permissions: %s %s %s %s", user,
                     account, container, name)
        allowed = 'write'
        permissions_path = self._get_permissions_path(account, container, name)
        if user != account:
            if self.permissions.access_check(permissions_path, self.WRITE, user):
                allowed = 'write'
            elif self.permissions.access_check(permissions_path, self.READ, user):
                allowed = 'read'
            else:
                raise NotAllowedError
        self._lookup_object(account, container, name)
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))

    @backend_method
    def update_object_permissions(self, user, account, container, name, permissions):
        """Update the permissions associated with the object."""

        logger.debug("update_object_permissions: %s %s %s %s %s",
                     user, account, container, name, permissions)
        if user != account:
            raise NotAllowedError
        path = self._lookup_object(account, container, name)[0]
        self._check_permissions(path, permissions)
        self.permissions.access_set(path, permissions)
        self._report_sharing_change(user, account, path, {'members':
                                    self.permissions.access_members(path)})

    @backend_method
    def get_object_public(self, user, account, container, name):
        """Return the public id of the object if applicable."""

        logger.debug(
            "get_object_public: %s %s %s %s", user, account, container, name)
        self._can_read(user, account, container, name)
        path = self._lookup_object(account, container, name)[0]
        p = self.permissions.public_get(path)
        if p is not None:
            p += ULTIMATE_ANSWER
        return p

    @backend_method
    def update_object_public(self, user, account, container, name, public):
        """Update the public status of the object."""

        logger.debug("update_object_public: %s %s %s %s %s", user,
                     account, container, name, public)
        self._can_write(user, account, container, name)
        path = self._lookup_object(account, container, name)[0]
        if not public:
            self.permissions.public_unset(path)
        else:
            self.permissions.public_set(path)

    @backend_method
    def get_object_hashmap(self, user, account, container, name, version=None):
        """Return the object's size and a list with partial hashes."""

        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
                     account, container, name, version)
        self._can_read(user, account, container, name)
        path, node = self._lookup_object(account, container, name)
        props = self._get_version(node, version)
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]

    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
        if permissions is not None and user != account:
            raise NotAllowedError
        self._can_write(user, account, container, name)
        if permissions is not None:
            path = '/'.join((account, container, name))
            self._check_permissions(path, permissions)

        account_path, account_node = self._lookup_account(account, True)
        container_path, container_node = self._lookup_container(
            account, container)
        path, node = self._put_object_node(
            container_path, container_node, name)
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)

        # Handle meta.
        if src_version_id is None:
            src_version_id = pre_version_id
        self._put_metadata_duplicate(
            src_version_id, dest_version_id, domain, meta, replace_meta)

        del_size = self._apply_versioning(account, container, pre_version_id)
        size_delta = size - del_size
        if not self.using_external_quotaholder: # Check account quota.
            if size_delta > 0:
                account_quota = long(self._get_policy(account_node)['quota'])
                account_usage = self._get_statistics(account_node)[1] + size_delta
                if (account_quota > 0 and account_usage > account_quota):
                    raise QuotaError('account quota exceeded: limit: %s, usage: %s' % (
                        account_quota, account_usage
                    ))

        # Check container quota.
        container_quota = long(self._get_policy(container_node)['quota'])
        container_usage = self._get_statistics(container_node)[1] + size_delta
        if (container_quota > 0 and container_usage > container_quota):
            # This must be executed in a transaction, so the version is
            # never created if it fails.
            raise QuotaError('container quota exceeded: limit: %s, usage: %s' % (
                container_quota, container_usage
            ))

        self._report_size_change(user, account, size_delta,
                                 {'action': 'object update', 'path': path,
                                  'versions': ','.join([str(dest_version_id)])})
        if permissions is not None:
            self.permissions.access_set(path, permissions)
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})

        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
        return dest_version_id

    @backend_method
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
        """Create/update an object with the specified size and partial hashes."""

        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
                     account, container, name, size, type, hashmap, checksum)
        if size == 0:  # No such thing as an empty hashmap.
            hashmap = [self.put_block('')]
        map = HashMap(self.block_size, self.hash_algorithm)
        map.extend([binascii.unhexlify(x) for x in hashmap])
        missing = self.store.block_search(map)
        if missing:
            ie = IndexError()
            ie.data = [binascii.hexlify(x) for x in missing]
            raise ie

        hash = map.hash()
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
        self.store.map_put(hash, map)
        return dest_version_id

    @backend_method
    def update_object_checksum(self, user, account, container, name, version, checksum):
        """Update an object's checksum."""

        logger.debug("update_object_checksum: %s %s %s %s %s %s",
                     user, account, container, name, version, checksum)
        # Update objects with greater version and same hashmap and size (fix metadata updates).
        self._can_write(user, account, container, name)
        path, node = self._lookup_object(account, container, name)
        props = self._get_version(node, version)
        versions = self.node.node_get_versions(node)
        for x in versions:
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
                self.node.version_put_property(
                    x[self.SERIAL], 'checksum', checksum)

    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
        dest_version_ids = []
        self._can_read(user, src_account, src_container, src_name)
        path, node = self._lookup_object(src_account, src_container, src_name)
        # TODO: Will do another fetch of the properties in duplicate version...
        props = self._get_version(
            node, src_version)  # Check to see if source exists.
        src_version_id = props[self.SERIAL]
        hash = props[self.HASH]
        size = props[self.SIZE]
        is_copy = not is_move and (src_account, src_container, src_name) != (
            dest_account, dest_container, dest_name)  # New uuid.
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
            self._delete_object(user, src_account, src_container, src_name)

        if delimiter:
            prefix = src_name + \
                delimiter if not src_name.endswith(delimiter) else src_name
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
            src_names.sort(key=lambda x: x[2])  # order by nodes
            paths = [elem[0] for elem in src_names]
            nodes = [elem[2] for elem in src_names]
            # TODO: Will do another fetch of the properties in duplicate version...
            props = self._get_versions(nodes)  # Check to see if source exists.

            for prop, path, node in zip(props, paths, nodes):
                src_version_id = prop[self.SERIAL]
                hash = prop[self.HASH]
                vtype = prop[self.TYPE]
                size = prop[self.SIZE]
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
                    delimiter) else dest_name
                vdest_name = path.replace(prefix, dest_prefix, 1)
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
                    self._delete_object(user, src_account, src_container, path)
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids

    @backend_method
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
        """Copy an object's data and metadata."""

        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
        return dest_version_id

    @backend_method
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
        """Move an object's data and metadata."""

        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
        if user != src_account:
            raise NotAllowedError
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
        return dest_version_id

    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
        if user != account:
            raise NotAllowedError

        if until is not None:
            path = '/'.join((account, container, name))
            node = self.node.node_lookup(path)
            if node is None:
                return
            hashes = []
            size = 0
            serials = []
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
            hashes += h
            size += s
            serials += v
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
            hashes += h
            size += s
            serials += v
            for h in hashes:
                self.store.map_delete(h)
            self.node.node_purge(node, until, CLUSTER_DELETED)
            try:
                props = self._get_version(node)
            except NameError:
                self.permissions.access_clear(path)
            self._report_size_change(user, account, -size,
                                    {'action': 'object purge', 'path': path,
                                     'versions': ','.join(str(i) for i in serials)})
            return

        path, node = self._lookup_object(account, container, name)
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
        del_size = self._apply_versioning(account, container, src_version_id)
        self._report_size_change(user, account, -del_size,
                                 {'action': 'object delete', 'path': path,
                                  'versions': ','.join([str(dest_version_id)])})
        self._report_object_change(
            user, account, path, details={'action': 'object delete'})
        self.permissions.access_clear(path)

        if delimiter:
            prefix = name + delimiter if not name.endswith(delimiter) else name
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
            paths = []
            for t in src_names:
                path = '/'.join((account, container, t[0]))
                node = t[2]
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
                del_size = self._apply_versioning(
                    account, container, src_version_id)
                self._report_size_change(user, account, -del_size,
                                         {'action': 'object delete',
                                          'path': path,
                                          'versions': ','.join([str(dest_version_id)])})
                self._report_object_change(
                    user, account, path, details={'action': 'object delete'})
                paths.append(path)
            self.permissions.access_clear_bulk(paths)

    @backend_method
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
        """Delete/purge an object."""

        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
                     account, container, name, until, prefix, delimiter)
        self._delete_object(user, account, container, name, until, delimiter)

    @backend_method
    def list_versions(self, user, account, container, name):
        """Return a list of all (version, version_timestamp) tuples for an object."""

        logger.debug(
            "list_versions: %s %s %s %s", user, account, container, name)
        self._can_read(user, account, container, name)
        path, node = self._lookup_object(account, container, name)
        versions = self.node.node_get_versions(node)
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]

    @backend_method
    def get_uuid(self, user, uuid):
        """Return the (account, container, name) for the UUID given."""

        logger.debug("get_uuid: %s %s", user, uuid)
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
        if info is None:
            raise NameError
        path, serial = info
        account, container, name = path.split('/', 2)
        self._can_read(user, account, container, name)
        return (account, container, name)

    @backend_method
    def get_public(self, user, public):
        """Return the (account, container, name) for the public id given."""

        logger.debug("get_public: %s %s", user, public)
        if public is None or public < ULTIMATE_ANSWER:
            raise NameError
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
        if path is None:
            raise NameError
        account, container, name = path.split('/', 2)
        self._can_read(user, account, container, name)
        return (account, container, name)

    @backend_method(autocommit=0)
    def get_block(self, hash):
        """Return a block's data."""

        logger.debug("get_block: %s", hash)
        block = self.store.block_get(binascii.unhexlify(hash))
        if not block:
            raise ItemNotExists('Block does not exist')
        return block

    @backend_method(autocommit=0)
    def put_block(self, data):
        """Store a block and return the hash."""

        logger.debug("put_block: %s", len(data))
        return binascii.hexlify(self.store.block_put(data))

    @backend_method(autocommit=0)
    def update_block(self, hash, data, offset=0):
        """Update a known block and return the hash."""

        logger.debug("update_block: %s %s %s", hash, len(data), offset)
        if offset == 0 and len(data) == self.block_size:
            return self.put_block(data)
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
        return binascii.hexlify(h)

    # Path functions.

    def _generate_uuid(self):
        return str(uuidlib.uuid4())

    def _put_object_node(self, path, parent, name):
        path = '/'.join((path, name))
        node = self.node.node_lookup(path)
        if node is None:
            node = self.node.node_create(parent, path)
        return path, node

    def _put_path(self, user, parent, path):
        node = self.node.node_create(parent, path)
        self.node.version_create(node, None, 0, '', None, user,
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
        return node

    def _lookup_account(self, account, create=True):
        node = self.node.node_lookup(account)
        if node is None and create:
            node = self._put_path(
                account, self.ROOTNODE, account)  # User is account.
        return account, node

    def _lookup_container(self, account, container):
        path = '/'.join((account, container))
        node = self.node.node_lookup(path)
        if node is None:
            raise ItemNotExists('Container does not exist')
        return path, node

    def _lookup_object(self, account, container, name):
        path = '/'.join((account, container, name))
        node = self.node.node_lookup(path)
        if node is None:
            raise ItemNotExists('Object does not exist')
        return path, node

    def _lookup_objects(self, paths):
        nodes = self.node.node_lookup_bulk(paths)
        return paths, nodes

    def _get_properties(self, node, until=None):
        """Return properties until the timestamp given."""

        before = until if until is not None else inf
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
        if props is None and until is not None:
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
        if props is None:
            raise ItemNotExists('Path does not exist')
        return props

    def _get_statistics(self, node, until=None):
        """Return count, sum of size and latest timestamp of everything under node."""

        if until is None:
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
        else:
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
        if stats is None:
            stats = (0, 0, 0)
        return stats

    def _get_version(self, node, version=None):
        if version is None:
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
            if props is None:
                raise ItemNotExists('Object does not exist')
        else:
            try:
                version = int(version)
            except ValueError:
                raise VersionNotExists('Version does not exist')
            props = self.node.version_get_properties(version)
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
                raise VersionNotExists('Version does not exist')
        return props

    def _get_versions(self, nodes):
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)

    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
        """Create a new version of the node."""

        props = self.node.version_lookup(
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
        if props is not None:
            src_version_id = props[self.SERIAL]
            src_hash = props[self.HASH]
            src_size = props[self.SIZE]
            src_type = props[self.TYPE]
            src_checksum = props[self.CHECKSUM]
        else:
            src_version_id = None
            src_hash = None
            src_size = 0
            src_type = ''
            src_checksum = ''
        if size is None:  # Set metadata.
            hash = src_hash  # This way hash can be set to None (account or container).
            size = src_size
        if type is None:
            type = src_type
        if checksum is None:
            checksum = src_checksum
        uuid = self._generate_uuid(
        ) if (is_copy or src_version_id is None) else props[self.UUID]

        if src_node is None:
            pre_version_id = src_version_id
        else:
            pre_version_id = None
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
            if props is not None:
                pre_version_id = props[self.SERIAL]
        if pre_version_id is not None:
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)

        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
        return pre_version_id, dest_version_id

    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
        if src_version_id is not None:
            self.node.attribute_copy(src_version_id, dest_version_id)
        if not replace:
            self.node.attribute_del(dest_version_id, domain, (
                k for k, v in meta.iteritems() if v == ''))
            self.node.attribute_set(dest_version_id, domain, (
                (k, v) for k, v in meta.iteritems() if v != ''))
        else:
            self.node.attribute_del(dest_version_id, domain)
            self.node.attribute_set(dest_version_id, domain, ((
                k, v) for k, v in meta.iteritems()))

    def _put_metadata(self, user, node, domain, meta, replace=False):
        """Create a new version and store metadata."""

        src_version_id, dest_version_id = self._put_version_duplicate(
            user, node)
        self._put_metadata_duplicate(
            src_version_id, dest_version_id, domain, meta, replace)
        return src_version_id, dest_version_id

    def _list_limits(self, listing, marker, limit):
        start = 0
        if marker:
            try:
                start = listing.index(marker) + 1
            except ValueError:
                pass
        if not limit or limit > 10000:
            limit = 10000
        return start, limit

    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
        cont_prefix = path + '/'
        prefix = cont_prefix + prefix
        start = cont_prefix + marker if marker else None
        before = until if until is not None else inf
        filterq = keys if domain else []
        sizeq = size_range

        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
        objects.extend([(p, None) for p in prefixes] if virtual else [])
        objects.sort(key=lambda x: x[0])
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
        return objects

    # Reporting functions.

    def _report_size_change(self, user, account, size, details={}):
        if size == 0:
            return

        account_node = self._lookup_account(account, True)[1]
        total = self._get_statistics(account_node)[1]
        details.update({'user': user, 'total': total})
        logger.debug(
            "_report_size_change: %s %s %s %s", user, account, size, details)
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
                              account, QUEUE_INSTANCE_ID, 'diskspace',
                              float(size), details))

        if not self.using_external_quotaholder:
            return

        try:
            serial = self.quotaholder.issue_commission(
                    context     =   {},
                    target      =   account,
                    key         =   '1',
                    clientkey   =   'pithos',
                    ownerkey    =   '',
                    name        =   details['path'] if 'path' in details else '',
                    provisions  =   (('pithos+', 'pithos+.diskspace', size),)
            )
        except BaseException, e:
            raise QuotaError(e)
        else:
            self.serials.append(serial)

    def _report_object_change(self, user, account, path, details={}):
        details.update({'user': user})
        logger.debug("_report_object_change: %s %s %s %s", user,
                     account, path, details)
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
                              account, QUEUE_INSTANCE_ID, 'object', path, details))

    def _report_sharing_change(self, user, account, path, details={}):
        logger.debug("_report_permissions_change: %s %s %s %s",
                     user, account, path, details)
        details.update({'user': user})
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))

    # Policy functions.

    def _check_policy(self, policy):
        for k in policy.keys():
            if policy[k] == '':
                policy[k] = self.default_policy.get(k)
        for k, v in policy.iteritems():
            if k == 'quota':
                q = int(v)  # May raise ValueError.
                if q < 0:
                    raise ValueError
            elif k == 'versioning':
                if v not in ['auto', 'none']:
                    raise ValueError
            else:
                raise ValueError

    def _put_policy(self, node, policy, replace):
        if replace:
            for k, v in self.default_policy.iteritems():
                if k not in policy:
                    policy[k] = v
        self.node.policy_set(node, policy)

    def _get_policy(self, node):
        policy = self.default_policy.copy()
        policy.update(self.node.policy_get(node))
        return policy

    def _apply_versioning(self, account, container, version_id):
        """Delete the provided version if such is the policy.
           Return size of object removed.
        """

        if version_id is None:
            return 0
        path, node = self._lookup_container(account, container)
        versioning = self._get_policy(node)['versioning']
        if versioning != 'auto':
            hash, size = self.node.version_remove(version_id)
            self.store.map_delete(hash)
            return size
        elif self.free_versioning:
            return self.node.version_get_properties(
                version_id, keys=('size',))[0]
        return 0

    # Access control functions.

    def _check_groups(self, groups):
        # raise ValueError('Bad characters in groups')
        pass

    def _check_permissions(self, path, permissions):
        # raise ValueError('Bad characters in permissions')
        pass

    def _get_formatted_paths(self, paths):
        formatted = []
        for p in paths:
            node = self.node.node_lookup(p)
            props = None
            if node is not None:
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
            if props is not None:
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
                formatted.append((p, self.MATCH_EXACT))
        return formatted

    def _get_permissions_path(self, account, container, name):
        path = '/'.join((account, container, name))
        permission_paths = self.permissions.access_inherit(path)
        permission_paths.sort()
        permission_paths.reverse()
        for p in permission_paths:
            if p == path:
                return p
            else:
                if p.count('/') < 2:
                    continue
                node = self.node.node_lookup(p)
                props = None
                if node is not None:
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
                if props is not None:
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
                        return p
        return None

    def _can_read(self, user, account, container, name):
        if user == account:
            return True
        path = '/'.join((account, container, name))
        if self.permissions.public_get(path) is not None:
            return True
        path = self._get_permissions_path(account, container, name)
        if not path:
            raise NotAllowedError
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
            raise NotAllowedError

    def _can_write(self, user, account, container, name):
        if user == account:
            return True
        path = '/'.join((account, container, name))
        path = self._get_permissions_path(account, container, name)
        if not path:
            raise NotAllowedError
        if not self.permissions.access_check(path, self.WRITE, user):
            raise NotAllowedError

    def _allowed_accounts(self, user):
        allow = set()
        for path in self.permissions.access_list_paths(user):
            allow.add(path.split('/', 1)[0])
        return sorted(allow)

    def _allowed_containers(self, user, account):
        allow = set()
        for path in self.permissions.access_list_paths(user, account):
            allow.add(path.split('/', 2)[1])
        return sorted(allow)