__init__.py 36.2 KB
Newer Older
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
Giorgos Verigakis's avatar
Giorgos Verigakis committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
#      copyright notice, this list of conditions and the following
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
#      copyright notice, this list of conditions and the following
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

34
from threading import enumerate as activethreads
35

36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
Giorgos Verigakis's avatar
Giorgos Verigakis committed
39

40
from binascii import hexlify
Giorgos Verigakis's avatar
Giorgos Verigakis committed
41

42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos.rest_api import PithosRestClient
44
45
from kamaki.clients.storage import ClientError
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
Giorgos Verigakis's avatar
Giorgos Verigakis committed
47

48

49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
51
52
53
    h.update(block.rstrip('\x00'))
    return h.hexdigest()

54

55
56
57
58
59
def _range_up(start, end, a_range):
    if a_range:
        (rstart, rend) = a_range.split('-')
        (rstart, rend) = (int(rstart), int(rend))
        if rstart > end or rend < start:
60
            return (0, 0)
61
62
63
64
65
66
        if rstart > start:
            start = rstart
        if rend < end:
            end = rend
    return (start, end)

67

68
class PithosClient(PithosRestClient):
Giorgos Verigakis's avatar
Giorgos Verigakis committed
69
    """GRNet Pithos API client"""
Giorgos Verigakis's avatar
Giorgos Verigakis committed
70

71
72
    def __init__(self, base_url, token, account=None, container=None):
        super(PithosClient, self).__init__(base_url, token, account, container)
73

74
    def purge_container(self, container=None):
75
76
        """Delete an empty container and destroy associated blocks
        """
77
78
79
        cnt_back_up = self.container
        try:
            self.container = container or cnt_back_up
80
            self.container_delete(until=unicode(time()))
81
82
        finally:
            self.container = cnt_back_up
83

84
85
86
87
88
89
90
91
92
93
    def upload_object_unchunked(
            self, obj, f,
            withHashFile=False,
            size=None,
            etag=None,
            content_encoding=None,
            content_disposition=None,
            content_type=None,
            sharing=None,
            public=None):
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
        """
        :param obj: (str) remote object path

        :param f: open file descriptor

        :param withHashFile: (bool)

        :param size: (int) size of data to upload

        :param etag: (str)

        :param content_encoding: (str)

        :param content_disposition: (str)

        :param content_type: (str)

        :param sharing: {'read':[user and/or grp names],
            'write':[usr and/or grp names]}

        :param public: (bool)
        """
116
        self._assert_container()
117
118
119
120
121
122
123

        if withHashFile:
            data = f.read()
            try:
                import json
                data = json.dumps(json.loads(data))
            except ValueError:
124
                raise ClientError('"%s" is not json-formated' % f.name, 1)
125
            except SyntaxError:
126
127
                msg = '"%s" is not a valid hashmap file' % f.name
                raise ClientError(msg, 1)
128
            f = StringIO(data)
129
        else:
130
            data = f.read(size) if size else f.read()
131
        self.object_put(
132
            obj,
133
134
135
136
137
138
139
140
141
            data=data,
            etag=etag,
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            content_type=content_type,
            permissions=sharing,
            public=public,
            success=201)

142
143
144
145
146
147
148
149
    def create_object_by_manifestation(
            self, obj,
            etag=None,
            content_encoding=None,
            content_disposition=None,
            content_type=None,
            sharing=None,
            public=None):
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
        """
        :param obj: (str) remote object path

        :param etag: (str)

        :param content_encoding: (str)

        :param content_disposition: (str)

        :param content_type: (str)

        :param sharing: {'read':[user and/or grp names],
            'write':[usr and/or grp names]}

        :param public: (bool)
        """
166
        self._assert_container()
167
        self.object_put(
168
            obj,
169
170
171
172
173
174
175
176
177
            content_length=0,
            etag=etag,
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            content_type=content_type,
            permissions=sharing,
            public=public,
            manifest='%s/%s' % (self.container, obj))

178
    # upload_* auxiliary methods
179
    def _put_block_async(self, data, hash, upload_gen=None):
180
181
182
183
184
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
        event.start()
        return event

    def _put_block(self, data, hash):
185
186
        r = self.container_post(
            update=True,
187
188
189
190
191
192
            content_type='application/octet-stream',
            content_length=len(data),
            data=data,
            format='json')
        assert r.json[0] == hash, 'Local hash does not match server'

193
    def _get_file_block_info(self, fileobj, size=None):
194
        meta = self.get_container_info()
195
196
        blocksize = int(meta['x-container-block-size'])
        blockhash = meta['x-container-block-hash']
197
        size = size if size is not None else fstat(fileobj.fileno()).st_size
198
        nblocks = 1 + (size - 1) // blocksize
199
200
        return (blocksize, blockhash, size, nblocks)

201
202
203
204
205
206
207
208
209
210
211
212
213
    def _get_missing_hashes(
            self, obj, json,
            size=None,
            format='json',
            hashmap=True,
            content_type=None,
            content_encoding=None,
            content_disposition=None,
            permissions=None,
            public=None,
            success=(201, 409)):
        r = self.object_put(
            obj,
214
215
216
217
218
219
220
221
            format='json',
            hashmap=True,
            content_type=content_type,
            json=json,
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            permissions=permissions,
            public=public,
222
            success=success)
223
        return None if r.status_code == 201 else r.json
224

225
    def _culculate_blocks_for_upload(
226
227
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
            hash_cb=None):
228
        offset = 0
229
230
231
232
233
        if hash_cb:
            hash_gen = hash_cb(nblocks)
            hash_gen.next()

        for i in range(nblocks):
234
            block = fileobj.read(min(blocksize, size - offset))
235
            bytes = len(block)
236
            hash = _pithos_hash(block, blockhash)
237
            hashes.append(hash)
238
            hmap[hash] = (offset, bytes)
239
240
241
            offset += bytes
            if hash_cb:
                hash_gen.next()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
242
        msg = 'Failed to calculate uploaded blocks:'
243
        ' Offset and object size do not match'
Stavros Sachtouris's avatar
Stavros Sachtouris committed
244
        assert offset == size, msg
245

246
247
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
        """upload missing blocks asynchronously"""
248

249
        self._init_thread_limit()
250

251
        flying = []
252
        failures = []
253
        for hash in missing:
254
            offset, bytes = hmap[hash]
255
256
            fileobj.seek(offset)
            data = fileobj.read(bytes)
257
            r = self._put_block_async(data, hash, upload_gen)
258
            flying.append(r)
259
260
            unfinished = self._watch_thread_limit(flying)
            for thread in set(flying).difference(unfinished):
261
262
                if thread.exception:
                    failures.append(thread)
263
                    if isinstance(
264
265
266
                            thread.exception,
                            ClientError) and thread.exception.status == 502:
                        self.POOLSIZE = self._thread_limit
267
                elif thread.isAlive():
268
                    flying.append(thread)
269
270
271
272
273
                elif upload_gen:
                    try:
                        upload_gen.next()
                    except:
                        pass
274
275
276
277
            flying = unfinished

        for thread in flying:
            thread.join()
278
279
            if thread.exception:
                failures.append(thread)
280
281
282
283
284
            elif upload_gen:
                try:
                    upload_gen.next()
                except:
                    pass
285

286
        return [failure.kwargs['hash'] for failure in failures]
287

288
289
290
291
292
293
    def upload_object(
            self, obj, f,
            size=None,
            hash_cb=None,
            upload_cb=None,
            etag=None,
294
            if_etag_match=None,
295
            if_not_exist=None,
296
297
298
299
300
            content_encoding=None,
            content_disposition=None,
            content_type=None,
            sharing=None,
            public=None):
301
302
303
304
305
306
307
308
309
310
311
312
        """Upload an object using multiple connections (threads)

        :param obj: (str) remote object path

        :param f: open file descriptor (rb)

        :param hash_cb: optional progress.bar object for calculating hashes

        :param upload_cb: optional progress.bar object for uploading

        :param etag: (str)

313
314
315
        :param if_etag_match: (str) Push that value to if-match header at file
            creation

316
317
318
319
320
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
            it does not exist remotely, otherwise the operation will fail.
            Involves the case of an object with the same path is created while
            the object is being uploaded.

321
322
323
324
325
326
327
328
329
330
331
        :param content_encoding: (str)

        :param content_disposition: (str)

        :param content_type: (str)

        :param sharing: {'read':[user and/or grp names],
            'write':[usr and/or grp names]}

        :param public: (bool)
        """
332
        self._assert_container()
333

334
        #init
335
336
        block_info = (blocksize, blockhash, size, nblocks) =\
            self._get_file_block_info(f, size)
337
        (hashes, hmap, offset) = ([], {}, 0)
338
        if not content_type:
339
            content_type = 'application/octet-stream'
340

341
        self._culculate_blocks_for_upload(
342
            *block_info,
343
344
345
            hashes=hashes,
            hmap=hmap,
            fileobj=f,
346
347
348
            hash_cb=hash_cb)

        hashmap = dict(bytes=size, hashes=hashes)
349
350
        missing = self._get_missing_hashes(
            obj, hashmap,
351
352
353
354
355
356
            content_type=content_type,
            size=size,
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            permissions=sharing,
            public=public)
357
358
359

        if missing is None:
            return
360

361
362
        if upload_cb:
            upload_gen = upload_cb(len(missing))
363
364
365
366
367
            for i in range(len(missing), len(hashmap['hashes']) + 1):
                try:
                    upload_gen.next()
                except:
                    upload_gen = None
368
369
370
        else:
            upload_gen = None

371
        retries = 7
372
        try:
373
            while retries:
374
                sendlog.info('%s blocks missing' % len(missing))
375
376
377
378
379
                num_of_blocks = len(missing)
                missing = self._upload_missing_blocks(
                    missing,
                    hmap,
                    f,
380
                    upload_gen)
381
382
383
384
385
                if missing:
                    if num_of_blocks == len(missing):
                        retries -= 1
                    else:
                        num_of_blocks = len(missing)
386
387
388
389
390
391
                else:
                    break
            if missing:
                raise ClientError(
                    '%s blocks failed to upload' % len(missing),
                    status=800)
392
        except KeyboardInterrupt:
393
            sendlog.info('- - - wait for threads to finish')
394
395
396
397
            for thread in activethreads():
                thread.join()
            raise

398
        self.object_put(
399
            obj,
400
401
402
            format='json',
            hashmap=True,
            content_type=content_type,
403
            if_etag_match=if_etag_match,
404
405
            if_etag_not_match='*' if if_not_exist else None,
            etag=etag,
406
            json=hashmap,
407
408
            permissions=sharing,
            public=public,
409
410
            success=201)

411
    # download_* auxiliary methods
Stavros Sachtouris's avatar
Stavros Sachtouris committed
412
    def _get_remote_blocks_info(self, obj, **restargs):
413
        #retrieve object hashmap
414
        myrange = restargs.pop('data_range', None)
415
        hashmap = self.get_object_hashmap(obj, **restargs)
416
        restargs['data_range'] = myrange
417
418
419
        blocksize = int(hashmap['block_size'])
        blockhash = hashmap['block_hash']
        total_size = hashmap['bytes']
Stavros Sachtouris's avatar
Stavros Sachtouris committed
420
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
421
        map_dict = {}
Stavros Sachtouris's avatar
Stavros Sachtouris committed
422
        for i, h in enumerate(hashmap['hashes']):
423
424
425
426
427
            #  map_dict[h] = i   CHAGE
            if h in map_dict:
                map_dict[h].append(i)
            else:
                map_dict[h] = [i]
Stavros Sachtouris's avatar
Stavros Sachtouris committed
428
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
429

430
431
432
    def _dump_blocks_sync(
            self, obj, remote_hashes, blocksize, total_size, dst, range,
            **args):
Stavros Sachtouris's avatar
Stavros Sachtouris committed
433
        for blockid, blockhash in enumerate(remote_hashes):
434
435
436
437
438
439
440
441
442
443
            if blockhash:
                start = blocksize * blockid
                is_last = start + blocksize > total_size
                end = (total_size - 1) if is_last else (start + blocksize - 1)
                (start, end) = _range_up(start, end, range)
                args['data_range'] = 'bytes=%s-%s' % (start, end)
                r = self.object_get(obj, success=(200, 206), **args)
                self._cb_next()
                dst.write(r.content)
                dst.flush()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
444

445
446
    def _get_block_async(self, obj, **args):
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
447
448
        event.start()
        return event
449

450
451
452
453
454
455
456
    def _hash_from_file(self, fp, start, size, blockhash):
        fp.seek(start)
        block = fp.read(size)
        h = newhashlib(blockhash)
        h.update(block.strip('\x00'))
        return hexlify(h.digest())

457
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
458
        """write the results of a greenleted rest call to a file
459
460
461
462
463

        :param offset: the offset of the file up to blocksize
        - e.g. if the range is 10-100, all blocks will be written to
        normal_position - 10
        """
464
465
466
467
468
469
470
471
        for i, (key, g) in enumerate(flying.items()):
            if g.isAlive():
                continue
            if g.exception:
                raise g.exception
            block = g.value.content
            for block_start in blockids[key]:
                local_file.seek(block_start + offset)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
472
473
                local_file.write(block)
                self._cb_next()
474
475
            flying.pop(key)
            blockids.pop(key)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
476
477
        local_file.flush()

478
479
480
    def _dump_blocks_async(
            self, obj, remote_hashes, blocksize, total_size, local_file,
            blockhash=None, resume=False, filerange=None, **restargs):
481
        file_size = fstat(local_file.fileno()).st_size if resume else 0
482
483
        flying = dict()
        blockid_dict = dict()
484
485
486
        offset = 0
        if filerange is not None:
            rstart = int(filerange.split('-')[0])
487
            offset = rstart if blocksize > rstart else rstart % blocksize
488

489
        self._init_thread_limit()
490
        for block_hash, blockids in remote_hashes.items():
491
492
493
494
495
496
497
            blockids = [blk * blocksize for blk in blockids]
            unsaved = [blk for blk in blockids if not (
                blk < file_size and block_hash == self._hash_from_file(
                        local_file, blk, blocksize, blockhash))]
            self._cb_next(len(blockids) - len(unsaved))
            if unsaved:
                key = unsaved[0]
498
                self._watch_thread_limit(flying.values())
499
500
                self._thread2file(
                    flying, blockid_dict, local_file, offset,
501
                    **restargs)
502
503
504
                end = total_size - 1 if key + blocksize > total_size\
                    else key + blocksize - 1
                start, end = _range_up(key, end, filerange)
505
506
507
508
509
                if start == end:
                    self._cb_next()
                    continue
                restargs['async_headers'] = {
                    'Range': 'bytes=%s-%s' % (start, end)}
510
511
                flying[key] = self._get_block_async(obj, **restargs)
                blockid_dict[key] = unsaved
Stavros Sachtouris's avatar
Stavros Sachtouris committed
512

513
514
        for thread in flying.values():
            thread.join()
515
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
516

517
518
519
520
521
522
523
524
525
526
    def download_object(
            self, obj, dst,
            download_cb=None,
            version=None,
            resume=False,
            range_str=None,
            if_match=None,
            if_none_match=None,
            if_modified_since=None,
            if_unmodified_since=None):
527
        """Download an object (multiple connections, random blocks)
528
529
530
531
532
533
534
535
536
537
538

        :param obj: (str) remote object path

        :param dst: open file descriptor (wb+)

        :param download_cb: optional progress.bar object for downloading

        :param version: (str) file version

        :param resume: (bool) if set, preserve already downloaded file parts

539
        :param range_str: (str) from, to are file positions (int) in bytes
540
541
542
543
544
545
546

        :param if_match: (str)

        :param if_none_match: (str)

        :param if_modified_since: (str) formated date

547
        :param if_unmodified_since: (str) formated date"""
548
549
550
        restargs = dict(
            version=version,
            data_range=None if range_str is None else 'bytes=%s' % range_str,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
551
552
553
554
555
            if_match=if_match,
            if_none_match=if_none_match,
            if_modified_since=if_modified_since,
            if_unmodified_since=if_unmodified_since)

556
557
        (
            blocksize,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
558
559
            blockhash,
            total_size,
560
            hash_list,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
561
562
563
564
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
        assert total_size >= 0

        if download_cb:
565
            self.progress_bar_gen = download_cb(len(hash_list))
Stavros Sachtouris's avatar
Stavros Sachtouris committed
566
567
568
            self._cb_next()

        if dst.isatty():
569
570
            self._dump_blocks_sync(
                obj,
571
572
573
574
                hash_list,
                blocksize,
                total_size,
                dst,
575
                range_str,
576
                **restargs)
577
        else:
578
579
            self._dump_blocks_async(
                obj,
580
581
582
583
584
585
                remote_hashes,
                blocksize,
                total_size,
                dst,
                blockhash,
                resume,
586
                range_str,
587
                **restargs)
588
            if not range_str:
589
                dst.truncate(total_size)
590

Stavros Sachtouris's avatar
Stavros Sachtouris committed
591
592
593
        self._complete_cb()

    #Command Progress Bar method
594
    def _cb_next(self, step=1):
Stavros Sachtouris's avatar
Stavros Sachtouris committed
595
596
        if hasattr(self, 'progress_bar_gen'):
            try:
597
598
                for i in xrange(step):
                    self.progress_bar_gen.next()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
599
600
            except:
                pass
601

Stavros Sachtouris's avatar
Stavros Sachtouris committed
602
603
604
    def _complete_cb(self):
        while True:
            try:
605
                self.progress_bar_gen.next()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
606
607
            except:
                break
Stavros Sachtouris's avatar
Stavros Sachtouris committed
608

609
610
611
612
613
614
615
616
    def get_object_hashmap(
            self, obj,
            version=None,
            if_match=None,
            if_none_match=None,
            if_modified_since=None,
            if_unmodified_since=None,
            data_range=None):
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
        """
        :param obj: (str) remote object path

        :param if_match: (str)

        :param if_none_match: (str)

        :param if_modified_since: (str) formated date

        :param if_unmodified_since: (str) formated date

        :param data_range: (str) from-to where from and to are integers
            denoting file positions in bytes

        :returns: (list)
        """
633
        try:
634
635
            r = self.object_get(
                obj,
636
637
638
639
640
641
642
                hashmap=True,
                version=version,
                if_etag_match=if_match,
                if_etag_not_match=if_none_match,
                if_modified_since=if_modified_since,
                if_unmodified_since=if_unmodified_since,
                data_range=data_range)
643
644
645
646
        except ClientError as err:
            if err.status == 304 or err.status == 412:
                return {}
            raise
647
        return r.json
648

649
    def set_account_group(self, group, usernames):
650
651
652
653
654
        """
        :param group: (str)

        :param usernames: (list)
        """
655
        self.account_post(update=True, groups={group: usernames})
Stavros Sachtouris's avatar
Stavros Sachtouris committed
656

657
    def del_account_group(self, group):
658
659
660
        """
        :param group: (str)
        """
661
        self.account_post(update=True, groups={group: []})
662

Stavros Sachtouris's avatar
Stavros Sachtouris committed
663
    def get_account_info(self, until=None):
664
665
666
667
668
        """
        :param until: (str) formated date

        :returns: (dict)
        """
Stavros Sachtouris's avatar
Stavros Sachtouris committed
669
        r = self.account_head(until=until)
670
        if r.status_code == 401:
671
            raise ClientError("No authorization", status=401)
672
        return r.headers
673

Stavros Sachtouris's avatar
Stavros Sachtouris committed
674
    def get_account_quota(self):
675
676
677
        """
        :returns: (dict)
        """
678
679
        return filter_in(
            self.get_account_info(),
680
681
            'X-Account-Policy-Quota',
            exactMatch=True)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
682
683

    def get_account_versioning(self):
684
685
686
        """
        :returns: (dict)
        """
687
688
        return filter_in(
            self.get_account_info(),
689
690
            'X-Account-Policy-Versioning',
            exactMatch=True)
691

Stavros Sachtouris's avatar
Stavros Sachtouris committed
692
    def get_account_meta(self, until=None):
693
694
695
696
697
        """
        :meta until: (str) formated date

        :returns: (dict)
        """
698
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
699

700
    def get_account_group(self):
701
702
703
        """
        :returns: (dict)
        """
704
705
        return filter_in(self.get_account_info(), 'X-Account-Group-')

706
    def set_account_meta(self, metapairs):
707
708
709
        """
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
        """
710
        assert(type(metapairs) is dict)
711
        self.account_post(update=True, metadata=metapairs)
712

713
    def del_account_meta(self, metakey):
714
715
716
        """
        :param metakey: (str) metadatum key
        """
717
        self.account_post(update=True, metadata={metakey: ''})
718

Stavros Sachtouris's avatar
Stavros Sachtouris committed
719
    def set_account_quota(self, quota):
720
721
722
        """
        :param quota: (int)
        """
723
        self.account_post(update=True, quota=quota)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
724
725

    def set_account_versioning(self, versioning):
726
727
728
        """
        "param versioning: (str)
        """
729
        self.account_post(update=True, versioning=versioning)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
730

731
    def list_containers(self):
732
733
734
        """
        :returns: (dict)
        """
735
        r = self.account_get()
736
        return r.json
737

738
    def del_container(self, until=None, delimiter=None):
739
740
741
        """
        :param until: (str) formated date

742
        :param delimiter: (str) with / empty container
743
744
745
746
747

        :raises ClientError: 404 Container does not exist

        :raises ClientError: 409 Container is not empty
        """
748
        self._assert_container()
749
750
        r = self.container_delete(
            until=until,
751
752
            delimiter=delimiter,
            success=(204, 404, 409))
753
        if r.status_code == 404:
754
755
            raise ClientError(
                'Container "%s" does not exist' % self.container,
756
                r.status_code)
757
        elif r.status_code == 409:
758
759
            raise ClientError(
                'Container "%s" is not empty' % self.container,
760
                r.status_code)
761

762
    def get_container_versioning(self, container=None):
763
764
765
766
767
        """
        :param container: (str)

        :returns: (dict)
        """
768
769
770
771
772
773
774
775
        cnt_back_up = self.container
        try:
            self.container = container or cnt_back_up
            return filter_in(
                self.get_container_info(),
                'X-Container-Policy-Versioning')
        finally:
            self.container = cnt_back_up
Stavros Sachtouris's avatar
Stavros Sachtouris committed
776

777
    def get_container_quota(self, container=None):
778
779
780
781
782
        """
        :param container: (str)

        :returns: (dict)
        """
783
784
785
786
787
788
789
790
        cnt_back_up = self.container
        try:
            self.container = container or cnt_back_up
            return filter_in(
                self.get_container_info(),
                'X-Container-Policy-Quota')
        finally:
            self.container = cnt_back_up
791

792
    def get_container_info(self, until=None):
793
794
795
796
        """
        :param until: (str) formated date

        :returns: (dict)
797
798

        :raises ClientError: 404 Container not found
799
        """
800
801
802
803
804
        try:
            r = self.container_head(until=until)
        except ClientError as err:
            err.details.append('for container %s' % self.container)
            raise err
Stavros Sachtouris's avatar
Stavros Sachtouris committed
805
806
        return r.headers

807
    def get_container_meta(self, until=None):
808
809
810
811
812
        """
        :param until: (str) formated date

        :returns: (dict)
        """
813
814
        return filter_in(
            self.get_container_info(until=until),
815
            'X-Container-Meta')
816

817
    def get_container_object_meta(self, until=None):
818
819
820
821
822
        """
        :param until: (str) formated date

        :returns: (dict)
        """
823
824
        return filter_in(
            self.get_container_info(until=until),
825
            'X-Container-Object-Meta')
826

827
    def set_container_meta(self, metapairs):
828
829
830
        """
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
        """
831
        assert(type(metapairs) is dict)
832
        self.container_post(update=True, metadata=metapairs)
833

Stavros Sachtouris's avatar
Stavros Sachtouris committed
834
    def del_container_meta(self, metakey):
835
836
837
        """
        :param metakey: (str) metadatum key
        """
838
        self.container_post(update=True, metadata={metakey: ''})
839

840
    def set_container_limit(self, limit):
841
        """
842
        :param limit: (int)
843
        """
844
        self.container_post(update=True, quota=limit)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
845
846

    def set_container_versioning(self, versioning):
847
848
849
        """
        :param versioning: (str)
        """
850
        self.container_post(update=True, versioning=versioning)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
851

852
    def del_object(self, obj, until=None, delimiter=None):
853
854
855
856
857
858
859
        """
        :param obj: (str) remote object path

        :param until: (str) formated date

        :param delimiter: (str)
        """
860
        self._assert_container()
861
        self.object_delete(obj, until=until, delimiter=delimiter)
862

863
864
865
866
867
868
    def set_object_meta(self, obj, metapairs):
        """
        :param obj: (str) remote object path

        :param metapairs: (dict) {key1:val1, key2:val2, ...}
        """
869
        assert(type(metapairs) is dict)
870
        self.object_post(obj, update=True, metadata=metapairs)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
871

872
    def del_object_meta(self, obj, metakey):
873
874
        """
        :param obj: (str) remote object path
875
876

        :param metakey: (str) metadatum key
877
        """
878
        self.object_post(obj, update=True, metadata={metakey: ''})
879

880
881
882
    def publish_object(self, obj):
        """
        :param obj: (str) remote object path
883
884

        :returns: (str) access url
885
        """
886
        self.object_post(obj, update=True, public=True)
887
        info = self.get_object_info(obj)
888
889
        pref, sep, rest = self.base_url.partition('//')
        base = rest.split('/')[0]
890
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
Stavros Sachtouris's avatar
Stavros Sachtouris committed
891

892
893
894
895
    def unpublish_object(self, obj):
        """
        :param obj: (str) remote object path
        """
896
        self.object_post(obj, update=True, public=False)
897

Stavros Sachtouris's avatar
Stavros Sachtouris committed
898
    def get_object_info(self, obj, version=None):
899
900
901
902
903
904
905
        """
        :param obj: (str) remote object path

        :param version: (str)

        :returns: (dict)
        """
906
907
908
909
910
        try:
            r = self.object_head(obj, version=version)
            return r.headers
        except ClientError as ce:
            if ce.status == 404:
911
                raise ClientError('Object %s not found' % obj, status=404)
912
            raise
Stavros Sachtouris's avatar
Stavros Sachtouris committed
913
914

    def get_object_meta(self, obj, version=None):
915
916
917
918
919
920
921
        """
        :param obj: (str) remote object path

        :param version: (str)

        :returns: (dict)
        """
922
923
        return filter_in(
            self.get_object_info(obj, version=version),
924
            'X-Object-Meta')
Stavros Sachtouris's avatar
Stavros Sachtouris committed
925

926
927
928
929
930
931
    def get_object_sharing(self, obj):
        """
        :param obj: (str) remote object path

        :returns: (dict)
        """
932
933
        r = filter_in(
            self.get_object_info(obj),
934
935
            'X-Object-Sharing',
            exactMatch=True)
936
937
938
939
940
941
942
943
944
945
946
        reply = {}
        if len(r) > 0:
            perms = r['x-object-sharing'].split(';')
            for perm in perms:
                try:
                    perm.index('=')
                except ValueError:
                    raise ClientError('Incorrect reply format')
                (key, val) = perm.strip().split('=')
                reply[key] = val
        return reply
947

948
    def set_object_sharing(
949
950
            self, obj,
            read_permition=False, write_permition=False):
951
        """Give read/write permisions to an object.
952
953
954
955
956
957
958
959
960

        :param obj: (str) remote object path

        :param read_permition: (list - bool) users and user groups that get
            read permition for this object - False means all previous read
            permissions will be removed

        :param write_perimition: (list - bool) of users and user groups to get
           write permition for this object - False means all previous write
961
           permissions will be removed
962
        """
963

964
        perms = dict(read=read_permition or '', write=write_permition or '')
965
        self.object_post(obj, update=True, permissions=perms)
966

967
968
969
970
971
972
973
974
975
976
977
    def del_object_sharing(self, obj):
        """
        :param obj: (str) remote object path
        """
        self.set_object_sharing(obj)

    def append_object(self, obj, source_file, upload_cb=None):
        """
        :param obj: (str) remote object path

        :param source_file: open file descriptor
978

979
        :param upload_db: progress.bar for uploading
Stavros Sachtouris's avatar
Stavros Sachtouris committed
980
        """
981

982
        self._assert_container()
983
        meta = self.get_container_info()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
984
        blocksize = int(meta['x-container-block-size'])
985
        filesize = fstat(source_file.fileno()).st_size
986
        nblocks = 1 + (filesize - 1) // blocksize
Stavros Sachtouris's avatar
Stavros Sachtouris committed
987
        offset = 0
988
        if upload_cb:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
989
            upload_gen = upload_cb(nblocks)
990
            upload_gen.next()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
991
992
993
        for i in range(nblocks):
            block = source_file.read(min(blocksize, filesize - offset))
            offset += len(block)
994
            self.object_post(
995
                obj,
996
997
998
999
1000
1001
                update=True,
                content_range='bytes */*',
                content_type='application/octet-stream',
                content_length=len(block),
                data=block)

1002
            if upload_cb:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1003
                upload_gen.next()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1004

1005
1006
1007
1008
1009
1010
    def truncate_object(self, obj, upto_bytes):
        """
        :param obj: (str) remote object path

        :param upto_bytes: max number of bytes to leave on file
        """
1011
        self.object_post(
1012
            obj,