modular.py 67.4 KB
Newer Older
Antony Chazapis's avatar
Antony Chazapis committed
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
4
5
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
6
#
7
8
9
#   1. Redistributions of source code must retain the above
#      copyright notice, this list of conditions and the following
#      disclaimer.
10
#
11
12
#   2. Redistributions in binary form must reproduce the above
#      copyright notice, this list of conditions and the following
13
14
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
15
#
16
17
18
19
20
21
22
23
24
25
26
27
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
30
31
32
33
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

34
import sys
35
import uuid as uuidlib
36
import logging
Antony Chazapis's avatar
Antony Chazapis committed
37
import hashlib
38
39
import binascii

40
from synnefo.lib.quotaholder import QuotaholderClient
41

42
43
44
45
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
46

Antony Chazapis's avatar
Antony Chazapis committed
47
# Stripped-down version of the HashMap class found in tools.
48
49


Antony Chazapis's avatar
Antony Chazapis committed
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class HashMap(list):

    def __init__(self, blocksize, blockhash):
        super(HashMap, self).__init__()
        self.blocksize = blocksize
        self.blockhash = blockhash

    def _hash_raw(self, v):
        h = hashlib.new(self.blockhash)
        h.update(v)
        return h.digest()

    def hash(self):
        if len(self) == 0:
            return self._hash_raw('')
        if len(self) == 1:
            return self.__getitem__(0)

        h = list(self)
        s = 2
        while s < len(h):
            s = s * 2
        h += [('\x00' * len(h[0]))] * (s - len(h))
        while len(h) > 1:
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
        return h[0]
76

Antony Chazapis's avatar
Antony Chazapis committed
77
78
79
80
81
# Default modules and settings.
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
DEFAULT_BLOCK_PATH = 'data/'
Antony Chazapis's avatar
Antony Chazapis committed
82
DEFAULT_BLOCK_UMASK = 0o022
Antony Chazapis's avatar
Antony Chazapis committed
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
85
86
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
87
88
89
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
                               'abcdefghijklmnopqrstuvwxyz'
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
90
DEFAULT_PUBLIC_URL_SECURITY = 16
Antony Chazapis's avatar
Antony Chazapis committed
91

92
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
93
QUEUE_CLIENT_ID = 'pithos'
94
QUEUE_INSTANCE_ID = '1'
Antony Chazapis's avatar
Antony Chazapis committed
95

96
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
Antony Chazapis's avatar
Antony Chazapis committed
97
98
99

inf = float('inf')

Antony Chazapis's avatar
Antony Chazapis committed
100
101
ULTIMATE_ANSWER = 42

102
103
104

logger = logging.getLogger(__name__)

105

106
107
108
109
110
111
112
113
def backend_method(func=None, autocommit=1):
    if func is None:
        def fn(func):
            return backend_method(func, autocommit)
        return fn

    if not autocommit:
        return func
114

115
    def fn(self, *args, **kw):
116
        self.wrapper.execute()
117
118
        serials = []
        self.serials = serials
119
        self.messages = []
120

121
122
        try:
            ret = func(self, *args, **kw)
123
124
            for m in self.messages:
                self.queue.send(*m)
125
            if serials:
126
                self.quotaholder_serials.insert_many(serials)
127
128
129
130
131
132

                # commit to ensure that the serials are registered
                # even if accept commission fails
                self.wrapper.commit()
                self.wrapper.execute()

133
134
135
136
                self.quotaholder.accept_commission(
                            context     =   {},
                            clientkey   =   'pithos',
                            serials     =   serials)
137
138
139

                self.quotaholder_serials.delete_many(serials)

140
            self.wrapper.commit()
141
142
            return ret
        except:
143
144
145
146
147
            if serials:
                self.quotaholder.reject_commission(
                            context     =   {},
                            clientkey   =   'pithos',
                            serials     =   serials)
148
            self.wrapper.rollback()
149
150
151
152
153
154
            raise
    return fn


class ModularBackend(BaseBackend):
    """A modular backend.
155

156
    Uses modules for SQL functions and storage.
157
    """
158

159
    def __init__(self, db_module=None, db_connection=None,
Antony Chazapis's avatar
Antony Chazapis committed
160
                 block_module=None, block_path=None, block_umask=None,
161
                 queue_module=None, queue_hosts=None, queue_exchange=None,
162
                 quotaholder_enabled=False,
163
                 quotaholder_url=None, quotaholder_token=None,
164
                 quotaholder_client_poolsize=None,
165
                 free_versioning=True, block_params=None,
166
                 public_url_security=None,
167
168
169
170
                 public_url_alphabet=None,
                 account_quota_policy=None,
                 container_quota_policy=None,
                 container_versioning_policy=None):
Antony Chazapis's avatar
Antony Chazapis committed
171
172
173
174
        db_module = db_module or DEFAULT_DB_MODULE
        db_connection = db_connection or DEFAULT_DB_CONNECTION
        block_module = block_module or DEFAULT_BLOCK_MODULE
        block_path = block_path or DEFAULT_BLOCK_PATH
Antony Chazapis's avatar
Antony Chazapis committed
175
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
176
        block_params = block_params or DEFAULT_BLOCK_PARAMS
177
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
178
179
180
181
182
183
184
185
186
187
188
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
        container_quota_policy = container_quota_policy \
            or DEFAULT_CONTAINER_QUOTA
        container_versioning_policy = container_versioning_policy \
            or DEFAULT_CONTAINER_VERSIONING

        self.default_account_policy = {'quota': account_quota_policy}
        self.default_container_policy = {
            'quota': container_quota_policy,
            'versioning': container_versioning_policy
        }
189
190
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
191

192
193
        self.public_url_security = public_url_security or DEFAULT_PUBLIC_URL_SECURITY
        self.public_url_alphabet = public_url_alphabet or DEFAULT_PUBLIC_URL_ALPHABET
194

195
        self.hash_algorithm = 'sha256'
196
        self.block_size = 4 * 1024 * 1024  # 4MB
197
        self.free_versioning = free_versioning
198

199
200
201
        def load_module(m):
            __import__(m)
            return sys.modules[m]
202

203
204
        self.db_module = load_module(db_module)
        self.wrapper = self.db_module.DBWrapper(db_connection)
205
206
        params = {'wrapper': self.wrapper}
        self.permissions = self.db_module.Permissions(**params)
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
207
        self.config = self.db_module.Config(**params)
208
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
209
210
211
        for x in ['READ', 'WRITE']:
            setattr(self, x, getattr(self.db_module, x))
        self.node = self.db_module.Node(**params)
212
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
213
            setattr(self, x, getattr(self.db_module, x))
214

215
        self.block_module = load_module(block_module)
216
        self.block_params = block_params
217
218
        params = {'path': block_path,
                  'block_size': self.block_size,
Antony Chazapis's avatar
Antony Chazapis committed
219
220
                  'hash_algorithm': self.hash_algorithm,
                  'umask': block_umask}
221
        params.update(self.block_params)
222
        self.store = self.block_module.Store(**params)
223

224
        if queue_module and queue_hosts:
225
            self.queue_module = load_module(queue_module)
226
            params = {'hosts': queue_hosts,
227
                      'exchange': queue_exchange,
Antony Chazapis's avatar
Antony Chazapis committed
228
                      'client_id': QUEUE_CLIENT_ID}
229
230
231
            self.queue = self.queue_module.Queue(**params)
        else:
            class NoQueue:
232
                def send(self, *args):
233
                    pass
234

root's avatar
root committed
235
236
                def close(self):
                    pass
237

238
            self.queue = NoQueue()
239

240
        self.quotaholder_enabled = quotaholder_enabled
241
242
243
244
245
246
247
248
        if quotaholder_enabled:
            self.quotaholder_url = quotaholder_url
            self.quotaholder_token = quotaholder_token
            self.quotaholder = QuotaholderClient(
                                    quotaholder_url,
                                    token=quotaholder_token,
                                    poolsize=quotaholder_client_poolsize)

249
        self.serials = []
250
        self.messages = []
251

Antony Chazapis's avatar
Antony Chazapis committed
252
253
    def close(self):
        self.wrapper.close()
root's avatar
root committed
254
        self.queue.close()
255

256
257
    @property
    def using_external_quotaholder(self):
258
        return self.quotaholder_enabled
259

260
261
262
    @backend_method
    def list_accounts(self, user, marker=None, limit=10000):
        """Return a list of accounts the user can access."""
263

264
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
265
266
267
        allowed = self._allowed_accounts(user)
        start, limit = self._list_limits(allowed, marker, limit)
        return allowed[start:start + limit]
268

269
    @backend_method
270
271
272
    def get_account_meta(
            self, user, account, domain, until=None, include_user_defined=True,
            external_quota=None):
273
        """Return a dictionary with the account metadata for the domain."""
274
275
276

        logger.debug(
            "get_account_meta: %s %s %s %s", user, account, domain, until)
Antony Chazapis's avatar
Antony Chazapis committed
277
        path, node = self._lookup_account(account, user == account)
278
        if user != account:
Antony Chazapis's avatar
Antony Chazapis committed
279
            if until or node is None or account not in self._allowed_accounts(user):
280
281
                raise NotAllowedError
        try:
Antony Chazapis's avatar
Antony Chazapis committed
282
            props = self._get_properties(node, until)
283
            mtime = props[self.MTIME]
284
        except NameError:
Antony Chazapis's avatar
Antony Chazapis committed
285
            props = None
286
            mtime = until
Antony Chazapis's avatar
Antony Chazapis committed
287
288
        count, bytes, tstamp = self._get_statistics(node, until)
        tstamp = max(tstamp, mtime)
289
290
291
        if until is None:
            modified = tstamp
        else:
292
293
            modified = self._get_statistics(
                node)[2]  # Overall last modification.
Antony Chazapis's avatar
Antony Chazapis committed
294
            modified = max(modified, mtime)
295

296
297
298
        if user != account:
            meta = {'name': account}
        else:
Antony Chazapis's avatar
Antony Chazapis committed
299
            meta = {}
300
            if props is not None and include_user_defined:
301
302
                meta.update(
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
303
304
            if until is not None:
                meta.update({'until_timestamp': tstamp})
Antony Chazapis's avatar
Antony Chazapis committed
305
            meta.update({'name': account, 'count': count, 'bytes': bytes})
306
            if self.using_external_quotaholder:
307
                external_quota = external_quota or {}
308
                meta['bytes'] = external_quota.get('currValue', 0)
309
310
        meta.update({'modified': modified})
        return meta
311

312
    @backend_method
313
314
    def update_account_meta(self, user, account, domain, meta, replace=False):
        """Update the metadata associated with the account for the domain."""
315
316
317

        logger.debug("update_account_meta: %s %s %s %s %s", user,
                     account, domain, meta, replace)
318
319
        if user != account:
            raise NotAllowedError
Antony Chazapis's avatar
Antony Chazapis committed
320
        path, node = self._lookup_account(account, True)
321
        self._put_metadata(user, node, domain, meta, replace)
322

323
324
325
    @backend_method
    def get_account_groups(self, user, account):
        """Return a dictionary with the user groups defined for this account."""
326

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
327
        logger.debug("get_account_groups: %s %s", user, account)
328
329
330
331
        if user != account:
            if account not in self._allowed_accounts(user):
                raise NotAllowedError
            return {}
Antony Chazapis's avatar
Antony Chazapis committed
332
        self._lookup_account(account, True)
333
        return self.permissions.group_dict(account)
334

335
336
337
    @backend_method
    def update_account_groups(self, user, account, groups, replace=False):
        """Update the groups associated with the account."""
338
339
340

        logger.debug("update_account_groups: %s %s %s %s", user,
                     account, groups, replace)
341
342
        if user != account:
            raise NotAllowedError
Antony Chazapis's avatar
Antony Chazapis committed
343
        self._lookup_account(account, True)
344
        self._check_groups(groups)
345
346
347
        if replace:
            self.permissions.group_destroy(account)
        for k, v in groups.iteritems():
348
            if not replace:  # If not already deleted.
349
350
351
                self.permissions.group_delete(account, k)
            if v:
                self.permissions.group_addmany(account, k, v)
352

353
    @backend_method
354
    def get_account_policy(self, user, account, external_quota=None):
355
        """Return a dictionary with the account policy."""
356

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
357
        logger.debug("get_account_policy: %s %s", user, account)
358
        if user != account:
359
360
361
            if account not in self._allowed_accounts(user):
                raise NotAllowedError
            return {}
362
        path, node = self._lookup_account(account, True)
363
        policy = self._get_policy(node, is_account_policy=True)
364
        if self.using_external_quotaholder:
365
            external_quota = external_quota or {}
366
            policy['quota'] = external_quota.get('maxValue', 0)
367
        return policy
368

369
370
371
    @backend_method
    def update_account_policy(self, user, account, policy, replace=False):
        """Update the policy associated with the account."""
372
373
374

        logger.debug("update_account_policy: %s %s %s %s", user,
                     account, policy, replace)
375
376
377
        if user != account:
            raise NotAllowedError
        path, node = self._lookup_account(account, True)
378
379
        self._check_policy(policy, is_account_policy=True)
        self._put_policy(node, policy, replace, is_account_policy=True)
380

381
    @backend_method
382
    def put_account(self, user, account, policy=None):
383
        """Create a new account with the given name."""
384

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
385
        logger.debug("put_account: %s %s %s", user, account, policy)
386
        policy = policy or {}
387
388
        if user != account:
            raise NotAllowedError
Antony Chazapis's avatar
Antony Chazapis committed
389
390
        node = self.node.node_lookup(account)
        if node is not None:
391
            raise AccountExists('Account already exists')
392
        if policy:
393
            self._check_policy(policy, is_account_policy=True)
394
        node = self._put_path(user, self.ROOTNODE, account)
395
        self._put_policy(node, policy, True, is_account_policy=True)
396

397
398
399
    @backend_method
    def delete_account(self, user, account):
        """Delete the account with the given name."""
400

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
401
        logger.debug("delete_account: %s %s", user, account)
402
403
        if user != account:
            raise NotAllowedError
Antony Chazapis's avatar
Antony Chazapis committed
404
405
406
407
        node = self.node.node_lookup(account)
        if node is None:
            return
        if not self.node.node_remove(node):
408
            raise AccountNotEmpty('Account is not empty')
409
        self.permissions.group_destroy(account)
410

Antony Chazapis's avatar
Antony Chazapis committed
411
    @backend_method
412
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
Antony Chazapis's avatar
Antony Chazapis committed
413
        """Return a list of containers existing under an account."""
414
415
416

        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
                     account, marker, limit, shared, until, public)
Antony Chazapis's avatar
Antony Chazapis committed
417
418
419
420
421
422
        if user != account:
            if until or account not in self._allowed_accounts(user):
                raise NotAllowedError
            allowed = self._allowed_containers(user, account)
            start, limit = self._list_limits(allowed, marker, limit)
            return allowed[start:start + limit]
423
        if shared or public:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
424
            allowed = set()
425
            if shared:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
426
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
427
            if public:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
428
429
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
            allowed = sorted(allowed)
Antony Chazapis's avatar
Antony Chazapis committed
430
431
432
            start, limit = self._list_limits(allowed, marker, limit)
            return allowed[start:start + limit]
        node = self.node.node_lookup(account)
433
434
435
436
        containers = [x[0] for x in self._list_object_properties(
            node, account, '', '/', marker, limit, False, None, [], until)]
        start, limit = self._list_limits(
            [x[0] for x in containers], marker, limit)
437
        return containers[start:start + limit]
438

439
440
441
    @backend_method
    def list_container_meta(self, user, account, container, domain, until=None):
        """Return a list with all the container's object meta keys for the domain."""
442
443
444

        logger.debug("list_container_meta: %s %s %s %s %s", user,
                     account, container, domain, until)
445
446
447
448
        allowed = []
        if user != account:
            if until:
                raise NotAllowedError
449
450
            allowed = self.permissions.access_list_paths(
                user, '/'.join((account, container)))
451
452
453
454
455
456
            if not allowed:
                raise NotAllowedError
        path, node = self._lookup_container(account, container)
        before = until if until is not None else inf
        allowed = self._get_formatted_paths(allowed)
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
457

458
    @backend_method
459
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
460
        """Return a dictionary with the container metadata for the domain."""
461
462
463

        logger.debug("get_container_meta: %s %s %s %s %s", user,
                     account, container, domain, until)
464
465
466
        if user != account:
            if until or container not in self._allowed_containers(user, account):
                raise NotAllowedError
Antony Chazapis's avatar
Antony Chazapis committed
467
468
        path, node = self._lookup_container(account, container)
        props = self._get_properties(node, until)
469
        mtime = props[self.MTIME]
Antony Chazapis's avatar
Antony Chazapis committed
470
471
        count, bytes, tstamp = self._get_statistics(node, until)
        tstamp = max(tstamp, mtime)
472
473
474
        if until is None:
            modified = tstamp
        else:
475
476
            modified = self._get_statistics(
                node)[2]  # Overall last modification.
Antony Chazapis's avatar
Antony Chazapis committed
477
            modified = max(modified, mtime)
478

479
        if user != account:
Antony Chazapis's avatar
Antony Chazapis committed
480
            meta = {'name': container}
481
        else:
482
483
            meta = {}
            if include_user_defined:
484
485
                meta.update(
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
486
487
            if until is not None:
                meta.update({'until_timestamp': tstamp})
Antony Chazapis's avatar
Antony Chazapis committed
488
489
            meta.update({'name': container, 'count': count, 'bytes': bytes})
        meta.update({'modified': modified})
490
        return meta
491

492
    @backend_method
493
494
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
        """Update the metadata associated with the container for the domain."""
495
496
497

        logger.debug("update_container_meta: %s %s %s %s %s %s",
                     user, account, container, domain, meta, replace)
498
499
        if user != account:
            raise NotAllowedError
Antony Chazapis's avatar
Antony Chazapis committed
500
        path, node = self._lookup_container(account, container)
501
502
        src_version_id, dest_version_id = self._put_metadata(
            user, node, domain, meta, replace)
503
        if src_version_id is not None:
504
505
            versioning = self._get_policy(
                node, is_account_policy=False)['versioning']
506
507
            if versioning != 'auto':
                self.node.version_remove(src_version_id)
508

509
510
511
    @backend_method
    def get_container_policy(self, user, account, container):
        """Return a dictionary with the container policy."""
512
513
514

        logger.debug(
            "get_container_policy: %s %s %s", user, account, container)
515
516
517
518
        if user != account:
            if container not in self._allowed_containers(user, account):
                raise NotAllowedError
            return {}
519
        path, node = self._lookup_container(account, container)
520
        return self._get_policy(node, is_account_policy=False)
521

522
523
    @backend_method
    def update_container_policy(self, user, account, container, policy, replace=False):
524
        """Update the policy associated with the container."""
525
526
527

        logger.debug("update_container_policy: %s %s %s %s %s",
                     user, account, container, policy, replace)
528
529
        if user != account:
            raise NotAllowedError
530
        path, node = self._lookup_container(account, container)
531
532
        self._check_policy(policy, is_account_policy=False)
        self._put_policy(node, policy, replace, is_account_policy=False)
533

534
    @backend_method
535
    def put_container(self, user, account, container, policy=None):
536
        """Create a new container with the given name."""
537
538
539

        logger.debug(
            "put_container: %s %s %s %s", user, account, container, policy)
540
        policy = policy or {}
541
542
543
        if user != account:
            raise NotAllowedError
        try:
Antony Chazapis's avatar
Antony Chazapis committed
544
            path, node = self._lookup_container(account, container)
545
546
547
        except NameError:
            pass
        else:
548
            raise ContainerExists('Container already exists')
549
        if policy:
550
            self._check_policy(policy, is_account_policy=False)
551
        path = '/'.join((account, container))
552
553
        node = self._put_path(
            user, self._lookup_account(account, True)[1], path)
554
        self._put_policy(node, policy, True, is_account_policy=False)
555

556
    @backend_method
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
557
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
558
        """Delete/purge the container with the given name."""
559
560
561

        logger.debug("delete_container: %s %s %s %s %s %s", user,
                     account, container, until, prefix, delimiter)
562
563
        if user != account:
            raise NotAllowedError
Antony Chazapis's avatar
Antony Chazapis committed
564
        path, node = self._lookup_container(account, container)
565

566
        if until is not None:
567
            hashes, size, serials = self.node.node_purge_children(
568
                node, until, CLUSTER_HISTORY)
569
570
            for h in hashes:
                self.store.map_delete(h)
Antony Chazapis's avatar
Antony Chazapis committed
571
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
572
573
574
575
576
577
578
579
            if not self.free_versioning:
                self._report_size_change(
                    user, account, -size, {
                        'action':'container purge',
                        'path': path,
                        'versions': ','.join(str(i) for i in serials)
                    }
                )
580
            return
581

582
583
584
        if not delimiter:
            if self._get_statistics(node)[0] > 0:
                raise ContainerNotEmpty('Container is not empty')
585
            hashes, size, serials = self.node.node_purge_children(
586
                node, inf, CLUSTER_HISTORY)
587
588
589
590
            for h in hashes:
                self.store.map_delete(h)
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
            self.node.node_remove(node)
591
592
593
594
595
596
597
598
            if not self.free_versioning:
                self._report_size_change(
                    user, account, -size, {
                        'action':'container purge',
                        'path': path,
                        'versions': ','.join(str(i) for i in serials)
                    }
                )
599
        else:
600
            # remove only contents
601
602
603
604
605
606
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
            paths = []
            for t in src_names:
                path = '/'.join((account, container, t[0]))
                node = t[2]
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
607
608
                del_size = self._apply_versioning(
                    account, container, src_version_id)
609
                self._report_size_change(
610
611
612
613
                        user, account, -del_size, {
                                'action': 'object delete',
                                'path': path,
                        'versions': ','.join([str(dest_version_id)])
614
615
                     }
                )
616
617
                self._report_object_change(
                    user, account, path, details={'action': 'object delete'})
618
619
                paths.append(path)
            self.permissions.access_clear_bulk(paths)
620

621
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
622
623
        if user != account and until:
            raise NotAllowedError
624
625
        if shared and public:
            # get shared first
626
            shared_paths = self._list_object_permissions(
627
                user, account, container, prefix, shared=True, public=False)
628
            objects = set()
629
            if shared_paths:
630
                path, node = self._lookup_container(account, container)
631
632
                shared_paths = self._get_formatted_paths(shared_paths)
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared_paths, all_props))
633

634
            # get public
635
636
            objects |= set(self._list_public_object_properties(
                user, account, container, prefix, all_props))
637
            objects = list(objects)
638

639
            objects.sort(key=lambda x: x[0])
640
641
            start, limit = self._list_limits(
                [x[0] for x in objects], marker, limit)
642
643
            return objects[start:start + limit]
        elif public:
644
645
646
647
            objects = self._list_public_object_properties(
                user, account, container, prefix, all_props)
            start, limit = self._list_limits(
                [x[0] for x in objects], marker, limit)
648
            return objects[start:start + limit]
649
650
651

        allowed = self._list_object_permissions(
            user, account, container, prefix, shared, public)
652
        if shared and not allowed:
Antony Chazapis's avatar
Antony Chazapis committed
653
            return []
654
655
        path, node = self._lookup_container(account, container)
        allowed = self._get_formatted_paths(allowed)
656
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
657
658
        start, limit = self._list_limits(
            [x[0] for x in objects], marker, limit)
659
        return objects[start:start + limit]
660

661
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
662
663
        public = self._list_object_permissions(
            user, account, container, prefix, shared=False, public=True)
664
665
666
667
668
669
670
        paths, nodes = self._lookup_objects(public)
        path = '/'.join((account, container))
        cont_prefix = path + '/'
        paths = [x[len(cont_prefix):] for x in paths]
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
        objects = [(path,) + props for path, props in zip(paths, props)]
        return objects
671

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
672
673
674
675
676
677
678
679
680
681
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
        objects = []
        while True:
            marker = objects[-1] if objects else None
            limit = 10000
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
            objects.extend(l)
            if not l or len(l) < limit:
                break
        return objects
682

683
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
Antony Chazapis's avatar
Antony Chazapis committed
684
        allowed = []
Antony Chazapis's avatar
Antony Chazapis committed
685
        path = '/'.join((account, container, prefix)).rstrip('/')
Antony Chazapis's avatar
Antony Chazapis committed
686
        if user != account:
Antony Chazapis's avatar
Antony Chazapis committed
687
            allowed = self.permissions.access_list_paths(user, path)
Antony Chazapis's avatar
Antony Chazapis committed
688
689
690
            if not allowed:
                raise NotAllowedError
        else:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
691
            allowed = set()
Antony Chazapis's avatar
Antony Chazapis committed
692
            if shared:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
693
                allowed.update(self.permissions.access_list_shared(path))
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
694
            if public:
695
696
                allowed.update(
                    [x[0] for x in self.permissions.public_list(path)])
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
697
            allowed = sorted(allowed)
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
698
699
            if not allowed:
                return []
700
        return allowed
701

Antony Chazapis's avatar
Antony Chazapis committed
702
    @backend_method
703
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
704
        """Return a list of object (name, version_id) tuples existing under a container."""
705

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
706
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
707
        keys = keys or []
708
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
709

710
    @backend_method
711
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
712
        """Return a list of object metadata dicts existing under a container."""
713

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
714
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
715
        keys = keys or []
716
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
        objects = []
        for p in props:
            if len(p) == 2:
                objects.append({'subdir': p[0]})
            else:
                objects.append({'name': p[0],
                                'bytes': p[self.SIZE + 1],
                                'type': p[self.TYPE + 1],
                                'hash': p[self.HASH + 1],
                                'version': p[self.SERIAL + 1],
                                'version_timestamp': p[self.MTIME + 1],
                                'modified': p[self.MTIME + 1] if until is None else None,
                                'modified_by': p[self.MUSER + 1],
                                'uuid': p[self.UUID + 1],
                                'checksum': p[self.CHECKSUM + 1]})
        return objects
733

734
735
736
    @backend_method
    def list_object_permissions(self, user, account, container, prefix=''):
        """Return a list of paths that enforce permissions under a container."""
737
738
739

        logger.debug("list_object_permissions: %s %s %s %s", user,
                     account, container, prefix)
740
        return self._list_object_permissions(user, account, container, prefix, True, False)
741

742
743
744
    @backend_method
    def list_object_public(self, user, account, container, prefix=''):
        """Return a dict mapping paths to public ids for objects that are public under a container."""
745
746
747

        logger.debug("list_object_public: %s %s %s %s", user,
                     account, container, prefix)
748
749
        public = {}
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
750
            public[path] = p
751
        return public
752

753
    @backend_method
754
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
755
        """Return a dictionary with the object metadata for the domain."""
756
757
758

        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
                     account, container, name, domain, version)
759
        self._can_read(user, account, container, name)
Antony Chazapis's avatar
Antony Chazapis committed
760
761
        path, node = self._lookup_object(account, container, name)
        props = self._get_version(node, version)
762
        if version is None:
763
            modified = props[self.MTIME]
764
        else:
765
            try:
766
767
768
769
770
                modified = self._get_version(
                    node)[self.MTIME]  # Overall last modification.
            except NameError:  # Object may be deleted.
                del_props = self.node.version_lookup(
                    node, inf, CLUSTER_DELETED)
771
                if del_props is None:
772
                    raise ItemNotExists('Object does not exist')
773
                modified = del_props[self.MTIME]
774

775
776
        meta = {}
        if include_user_defined:
777
778
            meta.update(
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
779
780
781
        meta.update({'name': name,
                     'bytes': props[self.SIZE],
                     'type': props[self.TYPE],
782
                     'hash': props[self.HASH],
783
784
785
786
787
788
                     'version': props[self.SERIAL],
                     'version_timestamp': props[self.MTIME],
                     'modified': modified,
                     'modified_by': props[self.MUSER],
                     'uuid': props[self.UUID],
                     'checksum': props[self.CHECKSUM]})
789
        return meta
790

791
    @backend_method