modular.py 69.4 KB
Newer Older
Antony Chazapis's avatar
Antony Chazapis committed
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
4
5
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
6
#
7
8
9
#   1. Redistributions of source code must retain the above
#      copyright notice, this list of conditions and the following
#      disclaimer.
10
#
11
12
#   2. Redistributions in binary form must reproduce the above
#      copyright notice, this list of conditions and the following
13
14
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
15
#
16
17
18
19
20
21
22
23
24
25
26
27
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
30
31
32
33
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

34
import sys
35
import uuid as uuidlib
36
import logging
Antony Chazapis's avatar
Antony Chazapis committed
37
import hashlib
38
39
import binascii

40
from functools import wraps, partial
41
42
from traceback import format_exc

43
44
45
46
try:
    from astakosclient import AstakosClient
except ImportError:
    AstakosClient = None
47

48
49
50
51
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
52

53
54
55
56
57
58
59
60
61
62
63
64

class DisabledAstakosClient(object):
    def __init__(self, *args, **kwargs):
        self.args = args
        self.kwargs = kwargs

    def __getattr__(self, name):
        m = ("AstakosClient has been disabled, "
             "yet an attempt to access it was made")
        raise AssertionError(m)


Antony Chazapis's avatar
Antony Chazapis committed
65
# Stripped-down version of the HashMap class found in tools.
66

Antony Chazapis's avatar
Antony Chazapis committed
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class HashMap(list):

    def __init__(self, blocksize, blockhash):
        super(HashMap, self).__init__()
        self.blocksize = blocksize
        self.blockhash = blockhash

    def _hash_raw(self, v):
        h = hashlib.new(self.blockhash)
        h.update(v)
        return h.digest()

    def hash(self):
        if len(self) == 0:
            return self._hash_raw('')
        if len(self) == 1:
            return self.__getitem__(0)

        h = list(self)
        s = 2
        while s < len(h):
            s = s * 2
        h += [('\x00' * len(h[0]))] * (s - len(h))
        while len(h) > 1:
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
        return h[0]
93

Antony Chazapis's avatar
Antony Chazapis committed
94
95
96
97
98
# Default modules and settings.
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
DEFAULT_BLOCK_PATH = 'data/'
Antony Chazapis's avatar
Antony Chazapis committed
99
DEFAULT_BLOCK_UMASK = 0o022
100
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
DEFAULT_HASH_ALGORITHM = 'sha256'
Antony Chazapis's avatar
Antony Chazapis committed
102
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
103
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
104
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
106
107
108
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
                               'abcdefghijklmnopqrstuvwxyz'
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
109
DEFAULT_PUBLIC_URL_SECURITY = 16
Antony Chazapis's avatar
Antony Chazapis committed
110

111
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
112
QUEUE_CLIENT_ID = 'pithos'
113
QUEUE_INSTANCE_ID = '1'
Antony Chazapis's avatar
Antony Chazapis committed
114

115
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
Antony Chazapis's avatar
Antony Chazapis committed
116
117
118

inf = float('inf')

Antony Chazapis's avatar
Antony Chazapis committed
119
120
ULTIMATE_ANSWER = 42

121
DEFAULT_SOURCE = 'system'
122
123
124

logger = logging.getLogger(__name__)

125

126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def debug_method(func):
    @wraps(func)
    def wrapper(self, *args, **kw):
        try:
            result = func(self, *args, **kw)
            return result
        except:
            result = format_exc()
            raise
        finally:
            all_args = [str(i) for i in args]
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
            logger.debug(">>> %s(%s) <<< %s" % (
                func.__name__, ', '.join(all_args).rstrip(', '), result))
    return wrapper


143
144
class ModularBackend(BaseBackend):
    """A modular backend.
145

146
    Uses modules for SQL functions and storage.
147
    """
148

149
    def __init__(self, db_module=None, db_connection=None,
Antony Chazapis's avatar
Antony Chazapis committed
150
                 block_module=None, block_path=None, block_umask=None,
151
                 block_size=None, hash_algorithm=None,
152
                 queue_module=None, queue_hosts=None, queue_exchange=None,
153
154
                 astakos_url=None, service_token=None,
                 astakosclient_poolsize=None,
155
                 free_versioning=True, block_params=None,
156
                 public_url_security=None,
157
158
159
160
                 public_url_alphabet=None,
                 account_quota_policy=None,
                 container_quota_policy=None,
                 container_versioning_policy=None):
Antony Chazapis's avatar
Antony Chazapis committed
161
162
163
164
        db_module = db_module or DEFAULT_DB_MODULE
        db_connection = db_connection or DEFAULT_DB_CONNECTION
        block_module = block_module or DEFAULT_BLOCK_MODULE
        block_path = block_path or DEFAULT_BLOCK_PATH
Antony Chazapis's avatar
Antony Chazapis committed
165
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
166
        block_params = block_params or DEFAULT_BLOCK_PARAMS
167
168
        block_size = block_size or DEFAULT_BLOCK_SIZE
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
169
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
170
171
172
173
174
175
176
177
178
179
180
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
        container_quota_policy = container_quota_policy \
            or DEFAULT_CONTAINER_QUOTA
        container_versioning_policy = container_versioning_policy \
            or DEFAULT_CONTAINER_VERSIONING

        self.default_account_policy = {'quota': account_quota_policy}
        self.default_container_policy = {
            'quota': container_quota_policy,
            'versioning': container_versioning_policy
        }
181
182
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
183

184
        self.public_url_security = (public_url_security or
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
185
                                    DEFAULT_PUBLIC_URL_SECURITY)
186
        self.public_url_alphabet = (public_url_alphabet or
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
187
                                    DEFAULT_PUBLIC_URL_ALPHABET)
188

189
190
        self.hash_algorithm = hash_algorithm
        self.block_size = block_size
191
        self.free_versioning = free_versioning
192

193
194
195
        def load_module(m):
            __import__(m)
            return sys.modules[m]
196

197
198
        self.db_module = load_module(db_module)
        self.wrapper = self.db_module.DBWrapper(db_connection)
199
200
        params = {'wrapper': self.wrapper}
        self.permissions = self.db_module.Permissions(**params)
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
201
        self.config = self.db_module.Config(**params)
202
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
203
204
205
        for x in ['READ', 'WRITE']:
            setattr(self, x, getattr(self.db_module, x))
        self.node = self.db_module.Node(**params)
206
207
208
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
                  'MATCH_PREFIX', 'MATCH_EXACT']:
209
            setattr(self, x, getattr(self.db_module, x))
210

211
        self.block_module = load_module(block_module)
212
        self.block_params = block_params
213
214
        params = {'path': block_path,
                  'block_size': self.block_size,
Antony Chazapis's avatar
Antony Chazapis committed
215
216
                  'hash_algorithm': self.hash_algorithm,
                  'umask': block_umask}
217
        params.update(self.block_params)
218
        self.store = self.block_module.Store(**params)
219

220
        if queue_module and queue_hosts:
221
            self.queue_module = load_module(queue_module)
222
            params = {'hosts': queue_hosts,
223
                      'exchange': queue_exchange,
Antony Chazapis's avatar
Antony Chazapis committed
224
                      'client_id': QUEUE_CLIENT_ID}
225
226
227
            self.queue = self.queue_module.Queue(**params)
        else:
            class NoQueue:
228
                def send(self, *args):
229
                    pass
230

root's avatar
root committed
231
232
                def close(self):
                    pass
233

234
            self.queue = NoQueue()
235

236
237
        self.astakos_url = astakos_url
        self.service_token = service_token
238
239
240
241
242
243
244
245
246
247
248

        if not astakos_url or not AstakosClient:
            self.astakosclient = DisabledAstakosClient(
                astakos_url,
                use_pool=True,
                pool_size=astakosclient_poolsize)
        else:
            self.astakosclient = AstakosClient(
                astakos_url,
                use_pool=True,
                pool_size=astakosclient_poolsize)
249

250
        self.serials = []
251
        self.messages = []
252

253
254
        self._move_object = partial(self._copy_object, is_move=True)

255
256
    def pre_exec(self, lock_container_path=False):
        self.lock_container_path = lock_container_path
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
        self.wrapper.execute()

    def post_exec(self, success_status=True):
        if success_status:
            # send messages produced
            for m in self.messages:
                self.queue.send(*m)

            # register serials
            if self.serials:
                self.commission_serials.insert_many(
                    self.serials)

                # commit to ensure that the serials are registered
                # even if resolve commission fails
                self.wrapper.commit()

                # start new transaction
                self.wrapper.execute()

                r = self.astakosclient.resolve_commissions(
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
278
279
280
                    token=self.service_token,
                    accept_serials=self.serials,
                    reject_serials=[])
281
282
283
284
285
286
287
288
289
290
291
292
                self.commission_serials.delete_many(
                    r['accepted'])

            self.wrapper.commit()
        else:
            if self.serials:
                self.astakosclient.resolve_commissions(
                    token=self.service_token,
                    accept_serials=[],
                    reject_serials=self.serials)
            self.wrapper.rollback()

Antony Chazapis's avatar
Antony Chazapis committed
293
294
    def close(self):
        self.wrapper.close()
root's avatar
root committed
295
        self.queue.close()
296

297
298
    @property
    def using_external_quotaholder(self):
299
        return not isinstance(self.astakosclient, DisabledAstakosClient)
300

301
    @debug_method
302
303
    def list_accounts(self, user, marker=None, limit=10000):
        """Return a list of accounts the user can access."""
304

305
306
307
        allowed = self._allowed_accounts(user)
        start, limit = self._list_limits(allowed, marker, limit)
        return allowed[start:start + limit]
308

309
    @debug_method
310
311
312
    def get_account_meta(
            self, user, account, domain, until=None, include_user_defined=True,
            external_quota=None):
313
        """Return a dictionary with the account metadata for the domain."""
314

Antony Chazapis's avatar
Antony Chazapis committed
315
        path, node = self._lookup_account(account, user == account)
316
        if user != account:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
317
318
            if until or (node is None) or (account not
                                           in self._allowed_accounts(user)):
319
320
                raise NotAllowedError
        try:
Antony Chazapis's avatar
Antony Chazapis committed
321
            props = self._get_properties(node, until)
322
            mtime = props[self.MTIME]
323
        except NameError:
Antony Chazapis's avatar
Antony Chazapis committed
324
            props = None
325
            mtime = until
326
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
Antony Chazapis's avatar
Antony Chazapis committed
327
        tstamp = max(tstamp, mtime)
328
329
330
        if until is None:
            modified = tstamp
        else:
331
            modified = self._get_statistics(
332
                node, compute=True)[2]  # Overall last modification.
Antony Chazapis's avatar
Antony Chazapis committed
333
            modified = max(modified, mtime)
334

335
336
337
        if user != account:
            meta = {'name': account}
        else:
Antony Chazapis's avatar
Antony Chazapis committed
338
            meta = {}
339
            if props is not None and include_user_defined:
340
341
                meta.update(
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
342
343
            if until is not None:
                meta.update({'until_timestamp': tstamp})
Antony Chazapis's avatar
Antony Chazapis committed
344
            meta.update({'name': account, 'count': count, 'bytes': bytes})
345
            if self.using_external_quotaholder:
346
                external_quota = external_quota or {}
347
                meta['bytes'] = external_quota.get('usage', 0)
348
349
        meta.update({'modified': modified})
        return meta
350

351
    @debug_method
352
353
    def update_account_meta(self, user, account, domain, meta, replace=False):
        """Update the metadata associated with the account for the domain."""
354

355
356
        if user != account:
            raise NotAllowedError
Antony Chazapis's avatar
Antony Chazapis committed
357
        path, node = self._lookup_account(account, True)
358
359
        self._put_metadata(user, node, domain, meta, replace,
                           update_statistics_ancestors_depth=-1)
360

361
    @debug_method
362
    def get_account_groups(self, user, account):
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
363
        """Return a dictionary with the user groups defined for the account."""
364

365
366
367
368
        if user != account:
            if account not in self._allowed_accounts(user):
                raise NotAllowedError
            return {}
Antony Chazapis's avatar
Antony Chazapis committed
369
        self._lookup_account(account, True)
370
        return self.permissions.group_dict(account)
371

372
    @debug_method
373
374
    def update_account_groups(self, user, account, groups, replace=False):
        """Update the groups associated with the account."""
375

376
377
        if user != account:
            raise NotAllowedError
Antony Chazapis's avatar
Antony Chazapis committed
378
        self._lookup_account(account, True)
379
        self._check_groups(groups)
380
381
382
        if replace:
            self.permissions.group_destroy(account)
        for k, v in groups.iteritems():
383
            if not replace:  # If not already deleted.
384
385
386
                self.permissions.group_delete(account, k)
            if v:
                self.permissions.group_addmany(account, k, v)
387

388
    @debug_method
389
    def get_account_policy(self, user, account, external_quota=None):
390
        """Return a dictionary with the account policy."""
391

392
        if user != account:
393
394
395
            if account not in self._allowed_accounts(user):
                raise NotAllowedError
            return {}
396
        path, node = self._lookup_account(account, True)
397
        policy = self._get_policy(node, is_account_policy=True)
398
        if self.using_external_quotaholder:
399
            external_quota = external_quota or {}
400
            policy['quota'] = external_quota.get('limit', 0)
401
        return policy
402

403
    @debug_method
404
405
    def update_account_policy(self, user, account, policy, replace=False):
        """Update the policy associated with the account."""
406

407
408
409
        if user != account:
            raise NotAllowedError
        path, node = self._lookup_account(account, True)
410
411
        self._check_policy(policy, is_account_policy=True)
        self._put_policy(node, policy, replace, is_account_policy=True)
412

413
    @debug_method
414
    def put_account(self, user, account, policy=None):
415
        """Create a new account with the given name."""
416

417
        policy = policy or {}
418
419
        if user != account:
            raise NotAllowedError
Antony Chazapis's avatar
Antony Chazapis committed
420
421
        node = self.node.node_lookup(account)
        if node is not None:
422
            raise AccountExists('Account already exists')
423
        if policy:
424
            self._check_policy(policy, is_account_policy=True)
425
426
        node = self._put_path(user, self.ROOTNODE, account,
                              update_statistics_ancestors_depth=-1)
427
        self._put_policy(node, policy, True, is_account_policy=True)
428

429
    @debug_method
430
431
    def delete_account(self, user, account):
        """Delete the account with the given name."""
432

433
434
        if user != account:
            raise NotAllowedError
Antony Chazapis's avatar
Antony Chazapis committed
435
436
437
        node = self.node.node_lookup(account)
        if node is None:
            return
438
439
        if not self.node.node_remove(node,
                                     update_statistics_ancestors_depth=-1):
440
            raise AccountNotEmpty('Account is not empty')
441
        self.permissions.group_destroy(account)
442

443
    @debug_method
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
444
445
    def list_containers(self, user, account, marker=None, limit=10000,
                        shared=False, until=None, public=False):
Antony Chazapis's avatar
Antony Chazapis committed
446
        """Return a list of containers existing under an account."""
447

Antony Chazapis's avatar
Antony Chazapis committed
448
449
450
451
452
453
        if user != account:
            if until or account not in self._allowed_accounts(user):
                raise NotAllowedError
            allowed = self._allowed_containers(user, account)
            start, limit = self._list_limits(allowed, marker, limit)
            return allowed[start:start + limit]
454
        if shared or public:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
455
            allowed = set()
456
            if shared:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
457
458
                allowed.update([x.split('/', 2)[1] for x in
                               self.permissions.access_list_shared(account)])
459
            if public:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
460
461
                allowed.update([x[0].split('/', 2)[1] for x in
                               self.permissions.public_list(account)])
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
462
            allowed = sorted(allowed)
Antony Chazapis's avatar
Antony Chazapis committed
463
464
465
            start, limit = self._list_limits(allowed, marker, limit)
            return allowed[start:start + limit]
        node = self.node.node_lookup(account)
466
467
468
469
        containers = [x[0] for x in self._list_object_properties(
            node, account, '', '/', marker, limit, False, None, [], until)]
        start, limit = self._list_limits(
            [x[0] for x in containers], marker, limit)
470
        return containers[start:start + limit]
471

472
    @debug_method
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
473
474
475
    def list_container_meta(self, user, account, container, domain,
                            until=None):
        """Return a list of the container's object meta keys for a domain."""
476

477
478
479
480
        allowed = []
        if user != account:
            if until:
                raise NotAllowedError
481
482
            allowed = self.permissions.access_list_paths(
                user, '/'.join((account, container)))
483
484
485
486
487
            if not allowed:
                raise NotAllowedError
        path, node = self._lookup_container(account, container)
        before = until if until is not None else inf
        allowed = self._get_formatted_paths(allowed)
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
488
489
        return self.node.latest_attribute_keys(node, domain, before,
                                               CLUSTER_DELETED, allowed)
490

491
    @debug_method
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
492
493
    def get_container_meta(self, user, account, container, domain, until=None,
                           include_user_defined=True):
494
        """Return a dictionary with the container metadata for the domain."""
495

496
        if user != account:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
497
498
            if until or container not in self._allowed_containers(user,
                                                                  account):
499
                raise NotAllowedError
Antony Chazapis's avatar
Antony Chazapis committed
500
501
        path, node = self._lookup_container(account, container)
        props = self._get_properties(node, until)
502
        mtime = props[self.MTIME]
Antony Chazapis's avatar
Antony Chazapis committed
503
504
        count, bytes, tstamp = self._get_statistics(node, until)
        tstamp = max(tstamp, mtime)
505
506
507
        if until is None:
            modified = tstamp
        else:
508
509
            modified = self._get_statistics(
                node)[2]  # Overall last modification.
Antony Chazapis's avatar
Antony Chazapis committed
510
            modified = max(modified, mtime)
511

512
        if user != account:
Antony Chazapis's avatar
Antony Chazapis committed
513
            meta = {'name': container}
514
        else:
515
516
            meta = {}
            if include_user_defined:
517
518
                meta.update(
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
519
520
            if until is not None:
                meta.update({'until_timestamp': tstamp})
Antony Chazapis's avatar
Antony Chazapis committed
521
522
            meta.update({'name': container, 'count': count, 'bytes': bytes})
        meta.update({'modified': modified})
523
        return meta
524

525
    @debug_method
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
526
527
    def update_container_meta(self, user, account, container, domain, meta,
                              replace=False):
528
        """Update the metadata associated with the container for the domain."""
529

530
531
        if user != account:
            raise NotAllowedError
Antony Chazapis's avatar
Antony Chazapis committed
532
        path, node = self._lookup_container(account, container)
533
        src_version_id, dest_version_id = self._put_metadata(
534
535
            user, node, domain, meta, replace,
            update_statistics_ancestors_depth=0)
536
        if src_version_id is not None:
537
538
            versioning = self._get_policy(
                node, is_account_policy=False)['versioning']
539
            if versioning != 'auto':
540
541
                self.node.version_remove(src_version_id,
                                         update_statistics_ancestors_depth=0)
542

543
    @debug_method
544
545
    def get_container_policy(self, user, account, container):
        """Return a dictionary with the container policy."""
546

547
548
549
550
        if user != account:
            if container not in self._allowed_containers(user, account):
                raise NotAllowedError
            return {}
551
        path, node = self._lookup_container(account, container)
552
        return self._get_policy(node, is_account_policy=False)
553

554
    @debug_method
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
555
556
    def update_container_policy(self, user, account, container, policy,
                                replace=False):
557
        """Update the policy associated with the container."""
558

559
560
        if user != account:
            raise NotAllowedError
561
        path, node = self._lookup_container(account, container)
562
563
        self._check_policy(policy, is_account_policy=False)
        self._put_policy(node, policy, replace, is_account_policy=False)
564

565
    @debug_method
566
    def put_container(self, user, account, container, policy=None):
567
        """Create a new container with the given name."""
568

569
        policy = policy or {}
570
571
572
        if user != account:
            raise NotAllowedError
        try:
Antony Chazapis's avatar
Antony Chazapis committed
573
            path, node = self._lookup_container(account, container)
574
575
576
        except NameError:
            pass
        else:
577
            raise ContainerExists('Container already exists')
578
        if policy:
579
            self._check_policy(policy, is_account_policy=False)
580
        path = '/'.join((account, container))
581
        node = self._put_path(
582
583
            user, self._lookup_account(account, True)[1], path,
            update_statistics_ancestors_depth=-1)
584
        self._put_policy(node, policy, True, is_account_policy=False)
585

586
    @debug_method
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
587
588
    def delete_container(self, user, account, container, until=None, prefix='',
                         delimiter=None):
589
        """Delete/purge the container with the given name."""
590

591
592
        if user != account:
            raise NotAllowedError
Antony Chazapis's avatar
Antony Chazapis committed
593
        path, node = self._lookup_container(account, container)
594

595
        if until is not None:
596
            hashes, size, serials = self.node.node_purge_children(
597
598
                node, until, CLUSTER_HISTORY,
                update_statistics_ancestors_depth=0)
599
600
            for h in hashes:
                self.store.map_delete(h)
601
602
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
                                          update_statistics_ancestors_depth=0)
603
604
605
            if not self.free_versioning:
                self._report_size_change(
                    user, account, -size, {
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
606
                        'action': 'container purge',
607
608
609
610
                        'path': path,
                        'versions': ','.join(str(i) for i in serials)
                    }
                )
611
            return
612

613
614
615
        if not delimiter:
            if self._get_statistics(node)[0] > 0:
                raise ContainerNotEmpty('Container is not empty')
616
            hashes, size, serials = self.node.node_purge_children(
617
618
                node, inf, CLUSTER_HISTORY,
                update_statistics_ancestors_depth=0)
619
620
            for h in hashes:
                self.store.map_delete(h)
621
622
623
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
                                          update_statistics_ancestors_depth=0)
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
624
625
626
            if not self.free_versioning:
                self._report_size_change(
                    user, account, -size, {
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
627
                        'action': 'container purge',
628
629
630
631
                        'path': path,
                        'versions': ','.join(str(i) for i in serials)
                    }
                )
632
        else:
633
            # remove only contents
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
634
635
636
637
            src_names = self._list_objects_no_limit(
                user, account, container, prefix='', delimiter=None,
                virtual=False, domain=None, keys=[], shared=False, until=None,
                size_range=None, all_props=True, public=False)
638
639
640
641
            paths = []
            for t in src_names:
                path = '/'.join((account, container, t[0]))
                node = t[2]
642
643
644
645
                src_version_id, dest_version_id = self._put_version_duplicate(
                    user, node, size=0, type='', hash=None, checksum='',
                    cluster=CLUSTER_DELETED,
                    update_statistics_ancestors_depth=1)
646
                del_size = self._apply_versioning(
647
648
                    account, container, src_version_id,
                    update_statistics_ancestors_depth=1)
649
                self._report_size_change(
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
650
651
652
653
                    user, account, -del_size, {
                        'action': 'object delete',
                        'path': path,
                        'versions': ','.join([str(dest_version_id)])})
654
655
                self._report_object_change(
                    user, account, path, details={'action': 'object delete'})
656
657
                paths.append(path)
            self.permissions.access_clear_bulk(paths)
658

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
659
660
661
    def _list_objects(self, user, account, container, prefix, delimiter,
                      marker, limit, virtual, domain, keys, shared, until,
                      size_range, all_props, public):
662
663
        if user != account and until:
            raise NotAllowedError
664
665
        if shared and public:
            # get shared first
666
            shared_paths = self._list_object_permissions(
667
                user, account, container, prefix, shared=True, public=False)
668
            objects = set()
669
            if shared_paths:
670
                path, node = self._lookup_container(account, container)
671
                shared_paths = self._get_formatted_paths(shared_paths)
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
672
673
674
                objects |= set(self._list_object_properties(
                    node, path, prefix, delimiter, marker, limit, virtual,
                    domain, keys, until, size_range, shared_paths, all_props))
675

676
            # get public
677
678
            objects |= set(self._list_public_object_properties(
                user, account, container, prefix, all_props))
679
            objects = list(objects)
680

681
            objects.sort(key=lambda x: x[0])
682
683
            start, limit = self._list_limits(
                [x[0] for x in objects], marker, limit)
684
685
            return objects[start:start + limit]
        elif public:
686
687
688
689
            objects = self._list_public_object_properties(
                user, account, container, prefix, all_props)
            start, limit = self._list_limits(
                [x[0] for x in objects], marker, limit)
690
            return objects[start:start + limit]
691
692
693

        allowed = self._list_object_permissions(
            user, account, container, prefix, shared, public)
694
        if shared and not allowed:
Antony Chazapis's avatar
Antony Chazapis committed
695
            return []
696
697
        path, node = self._lookup_container(account, container)
        allowed = self._get_formatted_paths(allowed)
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
698
699
700
        objects = self._list_object_properties(
            node, path, prefix, delimiter, marker, limit, virtual, domain,
            keys, until, size_range, allowed, all_props)
701
702
        start, limit = self._list_limits(
            [x[0] for x in objects], marker, limit)
703
        return objects[start:start + limit]
704

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
705
706
    def _list_public_object_properties(self, user, account, container, prefix,
                                       all_props):
707
708
        public = self._list_object_permissions(
            user, account, container, prefix, shared=False, public=True)
709
710
711
712
        paths, nodes = self._lookup_objects(public)
        path = '/'.join((account, container))
        cont_prefix = path + '/'
        paths = [x[len(cont_prefix):] for x in paths]
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
713
714
715
        objects = [(p,) + props for p, props in
                   zip(paths, self.node.version_lookup_bulk(
                       nodes, all_props=all_props))]
716
        return objects
717

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
718
719
720
    def _list_objects_no_limit(self, user, account, container, prefix,
                               delimiter, virtual, domain, keys, shared, until,
                               size_range, all_props, public):
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
721
722
723
724
        objects = []
        while True:
            marker = objects[-1] if objects else None
            limit = 10000
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
725
726
727
728
            l = self._list_objects(
                user, account, container, prefix, delimiter, marker, limit,
                virtual, domain, keys, shared, until, size_range, all_props,
                public)
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
729
730
731
732
            objects.extend(l)
            if not l or len(l) < limit:
                break
        return objects
733

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
734
735
    def _list_object_permissions(self, user, account, container, prefix,
                                 shared, public):
Antony Chazapis's avatar
Antony Chazapis committed
736
        allowed = []
Antony Chazapis's avatar
Antony Chazapis committed
737
        path = '/'.join((account, container, prefix)).rstrip('/')
Antony Chazapis's avatar
Antony Chazapis committed
738
        if user != account:
Antony Chazapis's avatar
Antony Chazapis committed
739
            allowed = self.permissions.access_list_paths(user, path)
Antony Chazapis's avatar
Antony Chazapis committed
740
741
742
            if not allowed:
                raise NotAllowedError
        else:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
743
            allowed = set()
Antony Chazapis's avatar
Antony Chazapis committed
744
            if shared:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
745
                allowed.update(self.permissions.access_list_shared(path))
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
746
            if public:
747
748
                allowed.update(
                    [x[0] for x in self.permissions.public_list(path)])
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
749
            allowed = sorted(allowed)
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
750
751
            if not allowed:
                return []
752
        return allowed
753

754
    @debug_method
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
755
756
757
758
759
    def list_objects(self, user, account, container, prefix='', delimiter=None,
                     marker=None, limit=10000, virtual=True, domain=None,
                     keys=None, shared=False, until=None, size_range=None,
                     public=False):
        """List (object name, object version_id) under a container."""
760

761
        keys = keys or []
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
762
763
764
        return self._list_objects(
            user, account, container, prefix, delimiter, marker, limit,
            virtual, domain, keys, shared, until, size_range, False, public)
765

766
    @debug_method
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
767
768
769
770
771
    def list_object_meta(self, user, account, container, prefix='',
                         delimiter=None, marker=None, limit=10000,
                         virtual=True, domain=None, keys=None, shared=False,
                         until=None, size_range=None, public=False):
        """Return a list of metadata dicts of objects under a container."""
772

773
        keys = keys or []
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
774
775
776
        props = self._list_objects(
            user, account, container, prefix, delimiter, marker, limit,
            virtual, domain, keys, shared, until, size_range, True, public)
777
778
779
780
781
        objects = []
        for p in props:
            if len(p) == 2:
                objects.append({'subdir': p[0]})
            else:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
782
783
784
785
786
787
788
789
790
791
792
                objects.append({
                    'name': p[0],
                    'bytes': p[self.SIZE + 1],
                    'type': p[self.TYPE + 1],
                    'hash': p[self.HASH + 1],
                    'version': p[self.SERIAL + 1],
                    'version_timestamp': p[self.MTIME + 1],
                    'modified': p[self.MTIME + 1] if until is None else None,
                    'modified_by': p[self.MUSER + 1],
                    'uuid': p[self.UUID + 1],
                    'checksum': p[self.CHECKSUM + 1]})
793
        return objects
794

795
    @debug_method
796
    def list_object_permissions(self, user, account, container, prefix=''):
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
797
        """Return a list of paths enforce permissions under a container."""
798

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
799
800
        return self._list_object_permissions(user, account, container, prefix,
                                             True, False)
801

802
    @debug_method
803
    def list_object_public(self, user, account, container, prefix=''):
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
804
        """Return a mapping of object paths to public ids under a container."""
805

806
        public = {}
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
807
808
809
        for path, p in self.permissions.public_list('/'.join((account,
                                                              container,
                                                              prefix))):
810
            public[path] = p
811
        return public
812