__init__.py 35.9 KB
Newer Older
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
Giorgos Verigakis's avatar
Giorgos Verigakis committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
#      copyright notice, this list of conditions and the following
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
#      copyright notice, this list of conditions and the following
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

34
from threading import enumerate as activethreads
35

36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
Giorgos Verigakis's avatar
Giorgos Verigakis committed
39

40
from binascii import hexlify
Giorgos Verigakis's avatar
Giorgos Verigakis committed
41

42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos.rest_api import PithosRestClient
44
45
from kamaki.clients.storage import ClientError
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
Giorgos Verigakis's avatar
Giorgos Verigakis committed
47

48

49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
51
52
53
    h.update(block.rstrip('\x00'))
    return h.hexdigest()

54

55
56
57
58
59
def _range_up(start, end, a_range):
    if a_range:
        (rstart, rend) = a_range.split('-')
        (rstart, rend) = (int(rstart), int(rend))
        if rstart > end or rend < start:
60
            return (0, 0)
61
62
63
64
65
66
        if rstart > start:
            start = rstart
        if rend < end:
            end = rend
    return (start, end)

67

68
class PithosClient(PithosRestClient):
Giorgos Verigakis's avatar
Giorgos Verigakis committed
69
    """GRNet Pithos API client"""
Giorgos Verigakis's avatar
Giorgos Verigakis committed
70

71
72
    def __init__(self, base_url, token, account=None, container=None):
        super(PithosClient, self).__init__(base_url, token, account, container)
73

74
    def purge_container(self, container=None):
75
76
        """Delete an empty container and destroy associated blocks
        """
77
78
79
80
81
82
        cnt_back_up = self.container
        try:
            self.container = container or cnt_back_up
            r = self.container_delete(until=unicode(time()))
        finally:
            self.container = cnt_back_up
83
        r.release()
84

85
86
87
88
89
90
91
92
93
94
    def upload_object_unchunked(
            self, obj, f,
            withHashFile=False,
            size=None,
            etag=None,
            content_encoding=None,
            content_disposition=None,
            content_type=None,
            sharing=None,
            public=None):
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
        """
        :param obj: (str) remote object path

        :param f: open file descriptor

        :param withHashFile: (bool)

        :param size: (int) size of data to upload

        :param etag: (str)

        :param content_encoding: (str)

        :param content_disposition: (str)

        :param content_type: (str)

        :param sharing: {'read':[user and/or grp names],
            'write':[usr and/or grp names]}

        :param public: (bool)
        """
117
        self._assert_container()
118
119
120
121
122
123
124

        if withHashFile:
            data = f.read()
            try:
                import json
                data = json.dumps(json.loads(data))
            except ValueError:
125
                raise ClientError('"%s" is not json-formated' % f.name, 1)
126
            except SyntaxError:
127
128
                msg = '"%s" is not a valid hashmap file' % f.name
                raise ClientError(msg, 1)
129
            f = StringIO(data)
130
        else:
131
            data = f.read(size) if size else f.read()
132
133
        r = self.object_put(
            obj,
134
135
136
137
138
139
140
141
            data=data,
            etag=etag,
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            content_type=content_type,
            permissions=sharing,
            public=public,
            success=201)
142
        r.release()
143

144
145
146
147
148
149
150
151
    def create_object_by_manifestation(
            self, obj,
            etag=None,
            content_encoding=None,
            content_disposition=None,
            content_type=None,
            sharing=None,
            public=None):
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
        """
        :param obj: (str) remote object path

        :param etag: (str)

        :param content_encoding: (str)

        :param content_disposition: (str)

        :param content_type: (str)

        :param sharing: {'read':[user and/or grp names],
            'write':[usr and/or grp names]}

        :param public: (bool)
        """
168
        self._assert_container()
169
170
        r = self.object_put(
            obj,
171
172
173
174
175
176
177
178
            content_length=0,
            etag=etag,
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            content_type=content_type,
            permissions=sharing,
            public=public,
            manifest='%s/%s' % (self.container, obj))
179
        r.release()
180

181
    # upload_* auxiliary methods
182
    def _put_block_async(self, data, hash, upload_gen=None):
183
184
185
186
187
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
        event.start()
        return event

    def _put_block(self, data, hash):
188
189
        r = self.container_post(
            update=True,
190
191
192
193
194
195
            content_type='application/octet-stream',
            content_length=len(data),
            data=data,
            format='json')
        assert r.json[0] == hash, 'Local hash does not match server'

196
    def _get_file_block_info(self, fileobj, size=None):
197
        meta = self.get_container_info()
198
199
        blocksize = int(meta['x-container-block-size'])
        blockhash = meta['x-container-block-hash']
200
        size = size if size is not None else fstat(fileobj.fileno()).st_size
201
        nblocks = 1 + (size - 1) // blocksize
202
203
        return (blocksize, blockhash, size, nblocks)

204
205
206
207
208
209
210
211
212
213
214
215
216
217
    def _get_missing_hashes(
            self, obj, json,
            size=None,
            format='json',
            hashmap=True,
            content_type=None,
            etag=None,
            content_encoding=None,
            content_disposition=None,
            permissions=None,
            public=None,
            success=(201, 409)):
        r = self.object_put(
            obj,
218
219
220
221
222
223
224
225
226
            format='json',
            hashmap=True,
            content_type=content_type,
            json=json,
            etag=etag,
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            permissions=permissions,
            public=public,
227
228
            success=success)
        if r.status_code == 201:
229
            r.release()
230
231
            return None
        return r.json
232

233
    def _culculate_blocks_for_upload(
234
235
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
            hash_cb=None):
236
        offset = 0
237
238
239
240
241
        if hash_cb:
            hash_gen = hash_cb(nblocks)
            hash_gen.next()

        for i in range(nblocks):
242
            block = fileobj.read(min(blocksize, size - offset))
243
            bytes = len(block)
244
            hash = _pithos_hash(block, blockhash)
245
            hashes.append(hash)
246
            hmap[hash] = (offset, bytes)
247
248
249
            offset += bytes
            if hash_cb:
                hash_gen.next()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
250
        msg = 'Failed to calculate uploaded blocks:'
251
        ' Offset and object size do not match'
Stavros Sachtouris's avatar
Stavros Sachtouris committed
252
        assert offset == size, msg
253

254
255
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
        """upload missing blocks asynchronously"""
256

257
        self._init_thread_limit()
258

259
        flying = []
260
        failures = []
261
        for hash in missing:
262
            offset, bytes = hmap[hash]
263
264
            fileobj.seek(offset)
            data = fileobj.read(bytes)
265
            r = self._put_block_async(data, hash, upload_gen)
266
            flying.append(r)
267
268
            unfinished = self._watch_thread_limit(flying)
            for thread in set(flying).difference(unfinished):
269
270
                if thread.exception:
                    failures.append(thread)
271
                    if isinstance(
272
273
274
                            thread.exception,
                            ClientError) and thread.exception.status == 502:
                        self.POOLSIZE = self._thread_limit
275
                elif thread.isAlive():
276
                    flying.append(thread)
277
278
279
280
281
                elif upload_gen:
                    try:
                        upload_gen.next()
                    except:
                        pass
282
283
284
285
            flying = unfinished

        for thread in flying:
            thread.join()
286
287
            if thread.exception:
                failures.append(thread)
288
289
290
291
292
            elif upload_gen:
                try:
                    upload_gen.next()
                except:
                    pass
293

294
        return [failure.kwargs['hash'] for failure in failures]
295

296
297
298
299
300
301
302
303
304
305
306
    def upload_object(
            self, obj, f,
            size=None,
            hash_cb=None,
            upload_cb=None,
            etag=None,
            content_encoding=None,
            content_disposition=None,
            content_type=None,
            sharing=None,
            public=None):
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
        """Upload an object using multiple connections (threads)

        :param obj: (str) remote object path

        :param f: open file descriptor (rb)

        :param hash_cb: optional progress.bar object for calculating hashes

        :param upload_cb: optional progress.bar object for uploading

        :param etag: (str)

        :param content_encoding: (str)

        :param content_disposition: (str)

        :param content_type: (str)

        :param sharing: {'read':[user and/or grp names],
            'write':[usr and/or grp names]}

        :param public: (bool)
        """
330
        self._assert_container()
331

332
        #init
333
334
        block_info = (blocksize, blockhash, size, nblocks) =\
            self._get_file_block_info(f, size)
335
        (hashes, hmap, offset) = ([], {}, 0)
336
        if not content_type:
337
            content_type = 'application/octet-stream'
338

339
        self._culculate_blocks_for_upload(
340
            *block_info,
341
342
343
            hashes=hashes,
            hmap=hmap,
            fileobj=f,
344
345
346
            hash_cb=hash_cb)

        hashmap = dict(bytes=size, hashes=hashes)
347
348
        missing = self._get_missing_hashes(
            obj, hashmap,
349
350
351
352
353
354
355
            content_type=content_type,
            size=size,
            etag=etag,
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            permissions=sharing,
            public=public)
356
357
358

        if missing is None:
            return
359

360
361
        if upload_cb:
            upload_gen = upload_cb(len(missing))
362
363
364
365
366
            for i in range(len(missing), len(hashmap['hashes']) + 1):
                try:
                    upload_gen.next()
                except:
                    upload_gen = None
367
368
369
        else:
            upload_gen = None

370
        retries = 7
371
        try:
372
            while retries:
373
                sendlog.info('%s blocks missing' % len(missing))
374
375
376
377
378
                num_of_blocks = len(missing)
                missing = self._upload_missing_blocks(
                    missing,
                    hmap,
                    f,
379
                    upload_gen)
380
381
382
383
384
                if missing:
                    if num_of_blocks == len(missing):
                        retries -= 1
                    else:
                        num_of_blocks = len(missing)
385
386
387
388
389
390
                else:
                    break
            if missing:
                raise ClientError(
                    '%s blocks failed to upload' % len(missing),
                    status=800)
391
        except KeyboardInterrupt:
392
            sendlog.info('- - - wait for threads to finish')
393
394
395
396
397
398
            for thread in activethreads():
                thread.join()
            raise

        r = self.object_put(
            obj,
399
400
401
402
403
            format='json',
            hashmap=True,
            content_type=content_type,
            json=hashmap,
            success=201)
404
        r.release()
405

406
    # download_* auxiliary methods
Stavros Sachtouris's avatar
Stavros Sachtouris committed
407
    def _get_remote_blocks_info(self, obj, **restargs):
408
        #retrieve object hashmap
409
        myrange = restargs.pop('data_range', None)
410
        hashmap = self.get_object_hashmap(obj, **restargs)
411
        restargs['data_range'] = myrange
412
413
414
        blocksize = int(hashmap['block_size'])
        blockhash = hashmap['block_hash']
        total_size = hashmap['bytes']
Stavros Sachtouris's avatar
Stavros Sachtouris committed
415
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
416
        map_dict = {}
Stavros Sachtouris's avatar
Stavros Sachtouris committed
417
418
419
        for i, h in enumerate(hashmap['hashes']):
            map_dict[h] = i
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
420

421
422
423
    def _dump_blocks_sync(
            self, obj, remote_hashes, blocksize, total_size, dst, range,
            **args):
Stavros Sachtouris's avatar
Stavros Sachtouris committed
424
        for blockid, blockhash in enumerate(remote_hashes):
425
426
427
428
429
430
431
432
433
434
            if blockhash:
                start = blocksize * blockid
                is_last = start + blocksize > total_size
                end = (total_size - 1) if is_last else (start + blocksize - 1)
                (start, end) = _range_up(start, end, range)
                args['data_range'] = 'bytes=%s-%s' % (start, end)
                r = self.object_get(obj, success=(200, 206), **args)
                self._cb_next()
                dst.write(r.content)
                dst.flush()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
435

436
437
    def _get_block_async(self, obj, **args):
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
438
439
        event.start()
        return event
440

441
442
443
444
445
446
447
    def _hash_from_file(self, fp, start, size, blockhash):
        fp.seek(start)
        block = fp.read(size)
        h = newhashlib(blockhash)
        h.update(block.strip('\x00'))
        return hexlify(h.digest())

448
    def _thread2file(self, flying, local_file, offset=0, **restargs):
449
        """write the results of a greenleted rest call to a file
450
451
452
453
454

        :param offset: the offset of the file up to blocksize
        - e.g. if the range is 10-100, all blocks will be written to
        normal_position - 10
        """
Stavros Sachtouris's avatar
Stavros Sachtouris committed
455
        finished = []
456
        for i, (start, g) in enumerate(flying.items()):
457
            if not g.isAlive():
Stavros Sachtouris's avatar
Stavros Sachtouris committed
458
459
                if g.exception:
                    raise g.exception
460
                block = g.value.content
461
                local_file.seek(start - offset)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
462
463
                local_file.write(block)
                self._cb_next()
464
                finished.append(flying.pop(start))
Stavros Sachtouris's avatar
Stavros Sachtouris committed
465
466
467
        local_file.flush()
        return finished

468
469
470
    def _dump_blocks_async(
            self, obj, remote_hashes, blocksize, total_size, local_file,
            blockhash=None, resume=False, filerange=None, **restargs):
471
        file_size = fstat(local_file.fileno()).st_size if resume else 0
472
473
        flying = {}
        finished = []
474
475
476
        offset = 0
        if filerange is not None:
            rstart = int(filerange.split('-')[0])
477
            offset = rstart if blocksize > rstart else rstart % blocksize
478

479
        self._init_thread_limit()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
480
        for block_hash, blockid in remote_hashes.items():
481
            start = blocksize * blockid
482
483
            if start < file_size and block_hash == self._hash_from_file(
                    local_file, start, blocksize, blockhash):
484
485
                self._cb_next()
                continue
486
            self._watch_thread_limit(flying.values())
487
488
489
490
491
            finished += self._thread2file(
                flying,
                local_file,
                offset,
                **restargs)
492
493
            end = total_size - 1 if start + blocksize > total_size\
                else start + blocksize - 1
494
495
496
497
            (start, end) = _range_up(start, end, filerange)
            if start == end:
                self._cb_next()
                continue
498
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
499
            flying[start] = self._get_block_async(obj, **restargs)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
500

501
502
503
        for thread in flying.values():
            thread.join()
        finished += self._thread2file(flying, local_file, offset, **restargs)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
504

505
506
507
508
509
510
511
512
513
514
    def download_object(
            self, obj, dst,
            download_cb=None,
            version=None,
            resume=False,
            range_str=None,
            if_match=None,
            if_none_match=None,
            if_modified_since=None,
            if_unmodified_since=None):
515
        """Download an object (multiple connections, random blocks)
516
517
518
519
520
521
522
523
524
525
526

        :param obj: (str) remote object path

        :param dst: open file descriptor (wb+)

        :param download_cb: optional progress.bar object for downloading

        :param version: (str) file version

        :param resume: (bool) if set, preserve already downloaded file parts

527
        :param range_str: (str) from, to are file positions (int) in bytes
528
529
530
531
532
533
534

        :param if_match: (str)

        :param if_none_match: (str)

        :param if_modified_since: (str) formated date

535
        :param if_unmodified_since: (str) formated date"""
536
537
538
        restargs = dict(
            version=version,
            data_range=None if range_str is None else 'bytes=%s' % range_str,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
539
540
541
542
543
            if_match=if_match,
            if_none_match=if_none_match,
            if_modified_since=if_modified_since,
            if_unmodified_since=if_unmodified_since)

544
545
        (
            blocksize,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
546
547
            blockhash,
            total_size,
548
            hash_list,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
549
550
551
552
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
        assert total_size >= 0

        if download_cb:
553
            self.progress_bar_gen = download_cb(len(remote_hashes))
Stavros Sachtouris's avatar
Stavros Sachtouris committed
554
555
556
            self._cb_next()

        if dst.isatty():
557
558
            self._dump_blocks_sync(
                obj,
559
560
561
562
                hash_list,
                blocksize,
                total_size,
                dst,
563
                range_str,
564
                **restargs)
565
        else:
566
567
            self._dump_blocks_async(
                obj,
568
569
570
571
572
573
                remote_hashes,
                blocksize,
                total_size,
                dst,
                blockhash,
                resume,
574
                range_str,
575
                **restargs)
576
            if not range_str:
577
                dst.truncate(total_size)
578

Stavros Sachtouris's avatar
Stavros Sachtouris committed
579
580
581
582
583
584
585
586
587
        self._complete_cb()

    #Command Progress Bar method
    def _cb_next(self):
        if hasattr(self, 'progress_bar_gen'):
            try:
                self.progress_bar_gen.next()
            except:
                pass
588

Stavros Sachtouris's avatar
Stavros Sachtouris committed
589
590
591
592
593
594
    def _complete_cb(self):
        while True:
            try:
                self.progress_bar_gen.next()
            except:
                break
Stavros Sachtouris's avatar
Stavros Sachtouris committed
595

596
597
598
599
600
601
602
603
    def get_object_hashmap(
            self, obj,
            version=None,
            if_match=None,
            if_none_match=None,
            if_modified_since=None,
            if_unmodified_since=None,
            data_range=None):
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
        """
        :param obj: (str) remote object path

        :param if_match: (str)

        :param if_none_match: (str)

        :param if_modified_since: (str) formated date

        :param if_unmodified_since: (str) formated date

        :param data_range: (str) from-to where from and to are integers
            denoting file positions in bytes

        :returns: (list)
        """
620
        try:
621
622
            r = self.object_get(
                obj,
623
624
625
626
627
628
629
                hashmap=True,
                version=version,
                if_etag_match=if_match,
                if_etag_not_match=if_none_match,
                if_modified_since=if_modified_since,
                if_unmodified_since=if_unmodified_since,
                data_range=data_range)
630
631
632
633
        except ClientError as err:
            if err.status == 304 or err.status == 412:
                return {}
            raise
634
        return r.json
635

636
    def set_account_group(self, group, usernames):
637
638
639
640
641
        """
        :param group: (str)

        :param usernames: (list)
        """
642
        r = self.account_post(update=True, groups={group: usernames})
643
        r.release()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
644

645
    def del_account_group(self, group):
646
647
648
        """
        :param group: (str)
        """
649
        r = self.account_post(update=True, groups={group: []})
650
        r.release()
651

Stavros Sachtouris's avatar
Stavros Sachtouris committed
652
    def get_account_info(self, until=None):
653
654
655
656
657
        """
        :param until: (str) formated date

        :returns: (dict)
        """
Stavros Sachtouris's avatar
Stavros Sachtouris committed
658
        r = self.account_head(until=until)
659
        if r.status_code == 401:
660
            raise ClientError("No authorization", status=401)
661
        return r.headers
662

Stavros Sachtouris's avatar
Stavros Sachtouris committed
663
    def get_account_quota(self):
664
665
666
        """
        :returns: (dict)
        """
667
668
        return filter_in(
            self.get_account_info(),
669
670
            'X-Account-Policy-Quota',
            exactMatch=True)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
671
672

    def get_account_versioning(self):
673
674
675
        """
        :returns: (dict)
        """
676
677
        return filter_in(
            self.get_account_info(),
678
679
            'X-Account-Policy-Versioning',
            exactMatch=True)
680

Stavros Sachtouris's avatar
Stavros Sachtouris committed
681
    def get_account_meta(self, until=None):
682
683
684
685
686
        """
        :meta until: (str) formated date

        :returns: (dict)
        """
687
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
688

689
    def get_account_group(self):
690
691
692
        """
        :returns: (dict)
        """
693
694
        return filter_in(self.get_account_info(), 'X-Account-Group-')

695
    def set_account_meta(self, metapairs):
696
697
698
        """
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
        """
699
        assert(type(metapairs) is dict)
700
701
        r = self.account_post(update=True, metadata=metapairs)
        r.release()
702

703
    def del_account_meta(self, metakey):
704
705
706
        """
        :param metakey: (str) metadatum key
        """
707
        r = self.account_post(update=True, metadata={metakey: ''})
708
        r.release()
709

Stavros Sachtouris's avatar
Stavros Sachtouris committed
710
    def set_account_quota(self, quota):
711
712
713
        """
        :param quota: (int)
        """
714
715
        r = self.account_post(update=True, quota=quota)
        r.release()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
716
717

    def set_account_versioning(self, versioning):
718
719
720
        """
        "param versioning: (str)
        """
721
        r = self.account_post(update=True, versioning=versioning)
722
        r.release()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
723

724
    def list_containers(self):
725
726
727
        """
        :returns: (dict)
        """
728
        r = self.account_get()
729
        return r.json
730

731
    def del_container(self, until=None, delimiter=None):
732
733
734
        """
        :param until: (str) formated date

735
        :param delimiter: (str) with / empty container
736
737
738
739
740

        :raises ClientError: 404 Container does not exist

        :raises ClientError: 409 Container is not empty
        """
741
        self._assert_container()
742
743
        r = self.container_delete(
            until=until,
744
745
            delimiter=delimiter,
            success=(204, 404, 409))
746
        r.release()
747
        if r.status_code == 404:
748
749
            raise ClientError(
                'Container "%s" does not exist' % self.container,
750
                r.status_code)
751
        elif r.status_code == 409:
752
753
            raise ClientError(
                'Container "%s" is not empty' % self.container,
754
                r.status_code)
755

756
    def get_container_versioning(self, container=None):
757
758
759
760
761
        """
        :param container: (str)

        :returns: (dict)
        """
762
763
764
765
766
767
768
769
        cnt_back_up = self.container
        try:
            self.container = container or cnt_back_up
            return filter_in(
                self.get_container_info(),
                'X-Container-Policy-Versioning')
        finally:
            self.container = cnt_back_up
Stavros Sachtouris's avatar
Stavros Sachtouris committed
770

771
    def get_container_quota(self, container=None):
772
773
774
775
776
        """
        :param container: (str)

        :returns: (dict)
        """
777
778
779
780
781
782
783
784
        cnt_back_up = self.container
        try:
            self.container = container or cnt_back_up
            return filter_in(
                self.get_container_info(),
                'X-Container-Policy-Quota')
        finally:
            self.container = cnt_back_up
785

786
    def get_container_info(self, until=None):
787
788
789
790
        """
        :param until: (str) formated date

        :returns: (dict)
791
792

        :raises ClientError: 404 Container not found
793
        """
794
795
796
797
798
        try:
            r = self.container_head(until=until)
        except ClientError as err:
            err.details.append('for container %s' % self.container)
            raise err
Stavros Sachtouris's avatar
Stavros Sachtouris committed
799
800
        return r.headers

801
    def get_container_meta(self, until=None):
802
803
804
805
806
        """
        :param until: (str) formated date

        :returns: (dict)
        """
807
808
        return filter_in(
            self.get_container_info(until=until),
809
            'X-Container-Meta')
810

811
    def get_container_object_meta(self, until=None):
812
813
814
815
816
        """
        :param until: (str) formated date

        :returns: (dict)
        """
817
818
        return filter_in(
            self.get_container_info(until=until),
819
            'X-Container-Object-Meta')
820

821
    def set_container_meta(self, metapairs):
822
823
824
        """
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
        """
825
        assert(type(metapairs) is dict)
826
827
        r = self.container_post(update=True, metadata=metapairs)
        r.release()
828

Stavros Sachtouris's avatar
Stavros Sachtouris committed
829
    def del_container_meta(self, metakey):
830
831
832
        """
        :param metakey: (str) metadatum key
        """
833
        r = self.container_post(update=True, metadata={metakey: ''})
834
        r.release()
835

Stavros Sachtouris's avatar
Stavros Sachtouris committed
836
    def set_container_quota(self, quota):
837
838
839
        """
        :param quota: (int)
        """
840
841
        r = self.container_post(update=True, quota=quota)
        r.release()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
842
843

    def set_container_versioning(self, versioning):
844
845
846
        """
        :param versioning: (str)
        """
847
848
        r = self.container_post(update=True, versioning=versioning)
        r.release()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
849

850
    def del_object(self, obj, until=None, delimiter=None):
851
852
853
854
855
856
857
        """
        :param obj: (str) remote object path

        :param until: (str) formated date

        :param delimiter: (str)
        """
858
        self._assert_container()
859
860
        r = self.object_delete(obj, until=until, delimiter=delimiter)
        r.release()
861

862
863
864
865
866
867
    def set_object_meta(self, obj, metapairs):
        """
        :param obj: (str) remote object path

        :param metapairs: (dict) {key1:val1, key2:val2, ...}
        """
868
        assert(type(metapairs) is dict)
869
        r = self.object_post(obj, update=True, metadata=metapairs)
870
        r.release()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
871

872
    def del_object_meta(self, obj, metakey):
873
874
        """
        :param obj: (str) remote object path
875
876

        :param metakey: (str) metadatum key
877
        """
878
        r = self.object_post(obj, update=True, metadata={metakey: ''})
879
        r.release()
880

881
882
883
    def publish_object(self, obj):
        """
        :param obj: (str) remote object path
884
885

        :returns: (str) access url
886
887
        """
        r = self.object_post(obj, update=True, public=True)
888
        r.release()
889
        info = self.get_object_info(obj)
890
891
        pref, sep, rest = self.base_url.partition('//')
        base = rest.split('/')[0]
892
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
Stavros Sachtouris's avatar
Stavros Sachtouris committed
893

894
895
896
897
898
    def unpublish_object(self, obj):
        """
        :param obj: (str) remote object path
        """
        r = self.object_post(obj, update=True, public=False)
899
        r.release()
900

Stavros Sachtouris's avatar
Stavros Sachtouris committed
901
    def get_object_info(self, obj, version=None):
902
903
904
905
906
907
908
        """
        :param obj: (str) remote object path

        :param version: (str)

        :returns: (dict)
        """
909
910
911
912
913
        try:
            r = self.object_head(obj, version=version)
            return r.headers
        except ClientError as ce:
            if ce.status == 404:
914
                raise ClientError('Object %s not found' % obj, status=404)
915
            raise
Stavros Sachtouris's avatar
Stavros Sachtouris committed
916
917

    def get_object_meta(self, obj, version=None):
918
919
920
921
922
923
924
        """
        :param obj: (str) remote object path

        :param version: (str)

        :returns: (dict)
        """
925
926
        return filter_in(
            self.get_object_info(obj, version=version),
927
            'X-Object-Meta')
Stavros Sachtouris's avatar
Stavros Sachtouris committed
928

929
930
931
932
933
934
    def get_object_sharing(self, obj):
        """
        :param obj: (str) remote object path

        :returns: (dict)
        """
935
936
        r = filter_in(
            self.get_object_info(obj),
937
938
            'X-Object-Sharing',
            exactMatch=True)
939
940
941
942
943
944
945
946
947
948
949
        reply = {}
        if len(r) > 0:
            perms = r['x-object-sharing'].split(';')
            for perm in perms:
                try:
                    perm.index('=')
                except ValueError:
                    raise ClientError('Incorrect reply format')
                (key, val) = perm.strip().split('=')
                reply[key] = val
        return reply
950

951
    def set_object_sharing(
952
953
            self, obj,
            read_permition=False, write_permition=False):
954
        """Give read/write permisions to an object.
955
956
957
958
959
960
961
962
963

        :param obj: (str) remote object path

        :param read_permition: (list - bool) users and user groups that get
            read permition for this object - False means all previous read
            permissions will be removed

        :param write_perimition: (list - bool) of users and user groups to get
           write permition for this object - False means all previous write
964
           permissions will be removed
965
        """
966

967
        perms = dict(read=read_permition or '', write=write_permition or '')
968
        r = self.object_post(obj, update=True, permissions=perms)
969
        r.release()
970

971
972
973
974
975
976
977
978
979
980
981
    def del_object_sharing(self, obj):
        """
        :param obj: (str) remote object path
        """
        self.set_object_sharing(obj)

    def append_object(self, obj, source_file, upload_cb=None):
        """
        :param obj: (str) remote object path

        :param source_file: open file descriptor
982

983
        :param upload_db: progress.bar for uploading
Stavros Sachtouris's avatar
Stavros Sachtouris committed
984
        """
985

986
        self._assert_container()
987
        meta = self.get_container_info()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
988
        blocksize = int(meta['x-container-block-size'])
989
        filesize = fstat(source_file.fileno()).st_size
990
        nblocks = 1 + (filesize - 1) // blocksize
Stavros Sachtouris's avatar
Stavros Sachtouris committed
991
        offset = 0
992
        if upload_cb:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
993
            upload_gen = upload_cb(nblocks)
994
            upload_gen.next()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
995
996
997
        for i in range(nblocks):
            block = source_file.read(min(blocksize, filesize - offset))
            offset += len(block)
998
999
            r = self.object_post(
                obj,
1000
1001
1002
1003
1004
                update=True,
                content_range='bytes */*',
                content_type='application/octet-stream',
                content_length=len(block),
                data=block)
1005
            r.release()
1006

1007
            if upload_cb:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1008
                upload_gen.next()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1009

1010
1011
1012
1013
1014
1015
    def truncate_object(self, obj, upto_bytes):
        """
        :param obj: (str) remote object path

        :param upto_bytes: max number of bytes to leave on file
        """
1016
1017
        r = self.object_post(
            obj,
1018
1019
1020
1021
            update=True,
            content_range='bytes 0-%s/*' % upto_bytes,
            content_type='application/octet-stream',
            object_bytes=upto_bytes,
1022