__init__.py 45.8 KB
Newer Older
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
Giorgos Verigakis's avatar
Giorgos Verigakis committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
#      copyright notice, this list of conditions and the following
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
#      copyright notice, this list of conditions and the following
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

34
from threading import enumerate as activethreads
35

36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
Giorgos Verigakis's avatar
Giorgos Verigakis committed
40

41
from binascii import hexlify
Giorgos Verigakis's avatar
Giorgos Verigakis committed
42

43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
46
from kamaki.clients.storage import ClientError
from kamaki.clients.utils import path4url, filter_in
Giorgos Verigakis's avatar
Giorgos Verigakis committed
47

48

49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
51
52
53
    h.update(block.rstrip('\x00'))
    return h.hexdigest()

54

55
56
57
58
59
def _range_up(start, end, a_range):
    if a_range:
        (rstart, rend) = a_range.split('-')
        (rstart, rend) = (int(rstart), int(rend))
        if rstart > end or rend < start:
60
            return (0, 0)
61
62
63
64
65
66
        if rstart > start:
            start = rstart
        if rend < end:
            end = rend
    return (start, end)

67

68
class PithosClient(PithosRestClient):
69
    """Synnefo Pithos+ API client"""
Giorgos Verigakis's avatar
Giorgos Verigakis committed
70

71
72
    def __init__(self, base_url, token, account=None, container=None):
        super(PithosClient, self).__init__(base_url, token, account, container)
73

74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
    def create_container(
            self,
            container=None, sizelimit=None, versioning=None, metadata=None):
        """
        :param container: (str) if not given, self.container is used instead

        :param sizelimit: (int) container total size limit in bytes

        :param versioning: (str) can be auto or whatever supported by server

        :param metadata: (dict) Custom user-defined metadata of the form
            { 'name1': 'value1', 'name2': 'value2', ... }

        :returns: (dict) response headers
        """
        cnt_back_up = self.container
        try:
            self.container = container or cnt_back_up
            r = self.container_put(
                quota=sizelimit, versioning=versioning, metadata=metadata)
            return r.headers
        finally:
            self.container = cnt_back_up

98
    def purge_container(self, container=None):
99
100
        """Delete an empty container and destroy associated blocks
        """
101
102
103
        cnt_back_up = self.container
        try:
            self.container = container or cnt_back_up
104
            self.container_delete(until=unicode(time()))
105
106
        finally:
            self.container = cnt_back_up
107

108
109
110
111
112
113
114
115
116
117
    def upload_object_unchunked(
            self, obj, f,
            withHashFile=False,
            size=None,
            etag=None,
            content_encoding=None,
            content_disposition=None,
            content_type=None,
            sharing=None,
            public=None):
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
        """
        :param obj: (str) remote object path

        :param f: open file descriptor

        :param withHashFile: (bool)

        :param size: (int) size of data to upload

        :param etag: (str)

        :param content_encoding: (str)

        :param content_disposition: (str)

        :param content_type: (str)

        :param sharing: {'read':[user and/or grp names],
            'write':[usr and/or grp names]}

        :param public: (bool)
139
140

        :returns: (dict) created object metadata
141
        """
142
        self._assert_container()
143
144
145
146
147
148
149

        if withHashFile:
            data = f.read()
            try:
                import json
                data = json.dumps(json.loads(data))
            except ValueError:
150
                raise ClientError('"%s" is not json-formated' % f.name, 1)
151
            except SyntaxError:
152
153
                msg = '"%s" is not a valid hashmap file' % f.name
                raise ClientError(msg, 1)
154
            f = StringIO(data)
155
        else:
156
            data = f.read(size) if size else f.read()
157
        r = self.object_put(
158
            obj,
159
160
161
162
163
164
165
166
            data=data,
            etag=etag,
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            content_type=content_type,
            permissions=sharing,
            public=public,
            success=201)
167
        return r.headers
168

169
170
171
172
173
174
175
176
    def create_object_by_manifestation(
            self, obj,
            etag=None,
            content_encoding=None,
            content_disposition=None,
            content_type=None,
            sharing=None,
            public=None):
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
        """
        :param obj: (str) remote object path

        :param etag: (str)

        :param content_encoding: (str)

        :param content_disposition: (str)

        :param content_type: (str)

        :param sharing: {'read':[user and/or grp names],
            'write':[usr and/or grp names]}

        :param public: (bool)
192
193

        :returns: (dict) created object metadata
194
        """
195
        self._assert_container()
196
        r = self.object_put(
197
            obj,
198
199
200
201
202
203
204
205
            content_length=0,
            etag=etag,
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            content_type=content_type,
            permissions=sharing,
            public=public,
            manifest='%s/%s' % (self.container, obj))
206
        return r.headers
207

208
    # upload_* auxiliary methods
209
    def _put_block_async(self, data, hash):
210
211
212
213
214
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
        event.start()
        return event

    def _put_block(self, data, hash):
215
216
        r = self.container_post(
            update=True,
217
218
219
220
221
222
            content_type='application/octet-stream',
            content_length=len(data),
            data=data,
            format='json')
        assert r.json[0] == hash, 'Local hash does not match server'

223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
    def _get_file_block_info(self, fileobj, size=None, cache=None):
        """
        :param fileobj: (file descriptor) source

        :param size: (int) size of data to upload from source

        :param cache: (dict) if provided, cache container info response to
        avoid redundant calls
        """
        if isinstance(cache, dict):
            try:
                meta = cache[self.container]
            except KeyError:
                meta = self.get_container_info()
                cache[self.container] = meta
        else:
            meta = self.get_container_info()
240
241
        blocksize = int(meta['x-container-block-size'])
        blockhash = meta['x-container-block-hash']
242
        size = size if size is not None else fstat(fileobj.fileno()).st_size
243
        nblocks = 1 + (size - 1) // blocksize
244
245
        return (blocksize, blockhash, size, nblocks)

246
    def _create_object_or_get_missing_hashes(
247
248
249
250
251
            self, obj, json,
            size=None,
            format='json',
            hashmap=True,
            content_type=None,
252
253
            if_etag_match=None,
            if_etag_not_match=None,
254
255
256
257
258
259
260
            content_encoding=None,
            content_disposition=None,
            permissions=None,
            public=None,
            success=(201, 409)):
        r = self.object_put(
            obj,
261
262
263
264
            format='json',
            hashmap=True,
            content_type=content_type,
            json=json,
265
266
            if_etag_match=if_etag_match,
            if_etag_not_match=if_etag_not_match,
267
268
269
270
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            permissions=permissions,
            public=public,
271
            success=success)
272
        return (None if r.status_code == 201 else r.json), r.headers
273

274
    def _calculate_blocks_for_upload(
275
276
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
            hash_cb=None):
277
        offset = 0
278
279
280
281
282
        if hash_cb:
            hash_gen = hash_cb(nblocks)
            hash_gen.next()

        for i in range(nblocks):
283
            block = fileobj.read(min(blocksize, size - offset))
284
            bytes = len(block)
285
            hash = _pithos_hash(block, blockhash)
286
            hashes.append(hash)
287
            hmap[hash] = (offset, bytes)
288
289
290
            offset += bytes
            if hash_cb:
                hash_gen.next()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
291
        msg = 'Failed to calculate uploaded blocks:'
292
        ' Offset and object size do not match'
Stavros Sachtouris's avatar
Stavros Sachtouris committed
293
        assert offset == size, msg
294

295
296
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
        """upload missing blocks asynchronously"""
297

298
        self._init_thread_limit()
299

300
        flying = []
301
        failures = []
302
        for hash in missing:
303
            offset, bytes = hmap[hash]
304
305
            fileobj.seek(offset)
            data = fileobj.read(bytes)
306
            r = self._put_block_async(data, hash)
307
            flying.append(r)
308
309
            unfinished = self._watch_thread_limit(flying)
            for thread in set(flying).difference(unfinished):
310
311
                if thread.exception:
                    failures.append(thread)
312
                    if isinstance(
313
314
315
                            thread.exception,
                            ClientError) and thread.exception.status == 502:
                        self.POOLSIZE = self._thread_limit
316
                elif thread.isAlive():
317
                    flying.append(thread)
318
319
320
321
322
                elif upload_gen:
                    try:
                        upload_gen.next()
                    except:
                        pass
323
324
325
326
            flying = unfinished

        for thread in flying:
            thread.join()
327
328
            if thread.exception:
                failures.append(thread)
329
330
331
332
333
            elif upload_gen:
                try:
                    upload_gen.next()
                except:
                    pass
334

335
        return [failure.kwargs['hash'] for failure in failures]
336

337
338
339
340
341
342
    def upload_object(
            self, obj, f,
            size=None,
            hash_cb=None,
            upload_cb=None,
            etag=None,
343
            if_etag_match=None,
344
            if_not_exist=None,
345
346
347
348
            content_encoding=None,
            content_disposition=None,
            content_type=None,
            sharing=None,
349
350
            public=None,
            container_info_cache=None):
351
352
353
354
355
356
357
358
359
360
361
362
        """Upload an object using multiple connections (threads)

        :param obj: (str) remote object path

        :param f: open file descriptor (rb)

        :param hash_cb: optional progress.bar object for calculating hashes

        :param upload_cb: optional progress.bar object for uploading

        :param etag: (str)

363
364
365
        :param if_etag_match: (str) Push that value to if-match header at file
            creation

366
367
368
369
370
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
            it does not exist remotely, otherwise the operation will fail.
            Involves the case of an object with the same path is created while
            the object is being uploaded.

371
372
373
374
375
376
377
378
379
380
        :param content_encoding: (str)

        :param content_disposition: (str)

        :param content_type: (str)

        :param sharing: {'read':[user and/or grp names],
            'write':[usr and/or grp names]}

        :param public: (bool)
381
382
383

        :param container_info_cache: (dict) if given, avoid redundant calls to
        server for container info (block size and hash information)
384
        """
385
        self._assert_container()
386

387
388
389
        block_info = (
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
                f, size, container_info_cache)
390
        (hashes, hmap, offset) = ([], {}, 0)
391
        if not content_type:
392
            content_type = 'application/octet-stream'
393

394
        self._calculate_blocks_for_upload(
395
            *block_info,
396
397
398
            hashes=hashes,
            hmap=hmap,
            fileobj=f,
399
400
401
            hash_cb=hash_cb)

        hashmap = dict(bytes=size, hashes=hashes)
402
        missing, obj_headers = self._create_object_or_get_missing_hashes(
403
            obj, hashmap,
404
405
            content_type=content_type,
            size=size,
406
407
            if_etag_match=if_etag_match,
            if_etag_not_match='*' if if_not_exist else None,
408
409
410
411
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            permissions=sharing,
            public=public)
412
413

        if missing is None:
414
            return obj_headers
415

416
417
        if upload_cb:
            upload_gen = upload_cb(len(missing))
418
419
420
421
422
            for i in range(len(missing), len(hashmap['hashes']) + 1):
                try:
                    upload_gen.next()
                except:
                    upload_gen = None
423
424
425
        else:
            upload_gen = None

426
        retries = 7
427
        try:
428
            while retries:
429
                sendlog.info('%s blocks missing' % len(missing))
430
431
432
433
434
                num_of_blocks = len(missing)
                missing = self._upload_missing_blocks(
                    missing,
                    hmap,
                    f,
435
                    upload_gen)
436
437
438
439
440
                if missing:
                    if num_of_blocks == len(missing):
                        retries -= 1
                    else:
                        num_of_blocks = len(missing)
441
442
443
                else:
                    break
            if missing:
444
445
446
                raise ClientError(
                    '%s blocks failed to upload' % len(missing),
                    details=['%s' % thread.exception for thread in missing])
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
        except KeyboardInterrupt:
            sendlog.info('- - - wait for threads to finish')
            for thread in activethreads():
                thread.join()
            raise

        r = self.object_put(
            obj,
            format='json',
            hashmap=True,
            content_type=content_type,
            if_etag_match=if_etag_match,
            if_etag_not_match='*' if if_not_exist else None,
            etag=etag,
            json=hashmap,
            permissions=sharing,
            public=public,
            success=201)
        return r.headers

    def upload_from_string(
            self, obj, input_str,
            hash_cb=None,
            upload_cb=None,
            etag=None,
            if_etag_match=None,
            if_not_exist=None,
            content_encoding=None,
            content_disposition=None,
            content_type=None,
            sharing=None,
            public=None,
            container_info_cache=None):
        """Upload an object using multiple connections (threads)

        :param obj: (str) remote object path

        :param input_str: (str) upload content

        :param hash_cb: optional progress.bar object for calculating hashes

        :param upload_cb: optional progress.bar object for uploading

        :param etag: (str)

        :param if_etag_match: (str) Push that value to if-match header at file
            creation

        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
            it does not exist remotely, otherwise the operation will fail.
            Involves the case of an object with the same path is created while
            the object is being uploaded.

        :param content_encoding: (str)

        :param content_disposition: (str)

        :param content_type: (str)

        :param sharing: {'read':[user and/or grp names],
            'write':[usr and/or grp names]}

        :param public: (bool)

        :param container_info_cache: (dict) if given, avoid redundant calls to
        server for container info (block size and hash information)
        """
        self._assert_container()

        blocksize, blockhash, size, nblocks = self._get_file_block_info(
                fileobj=None, size=len(input_str), cache=container_info_cache)
        (hashes, hmap, offset) = ([], {}, 0)
        if not content_type:
            content_type = 'application/octet-stream'

        num_of_blocks, blockmod = size / blocksize, size % blocksize
        num_of_blocks += (1 if blockmod else 0) if num_of_blocks else blockmod

525
        hashes = []
526
527
528
529
        hmap = {}
        for blockid in range(num_of_blocks):
            start = blockid * blocksize
            block = input_str[start: (start + blocksize)]
530
            hashes.append(_pithos_hash(block, blockhash))
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
            hmap[hashes[blockid]] = (start, block)

        hashmap = dict(bytes=size, hashes=hashes)
        missing, obj_headers = self._create_object_or_get_missing_hashes(
            obj, hashmap,
            content_type=content_type,
            size=size,
            if_etag_match=if_etag_match,
            if_etag_not_match='*' if if_not_exist else None,
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            permissions=sharing,
            public=public)
        if missing is None:
            return obj_headers
        num_of_missing = len(missing)

        if upload_cb:
            self.progress_bar_gen = upload_cb(num_of_blocks)
            for i in range(num_of_blocks + 1 - num_of_missing):
                self._cb_next()

553
554
        tries = 7
        old_failures = 0
555
        try:
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
            while tries and missing:
                flying = []
                failures = []
                for hash in missing:
                    offset, block = hmap[hash]
                    bird = self._put_block_async(block, hash)
                    flying.append(bird)
                    unfinished = self._watch_thread_limit(flying)
                    for thread in set(flying).difference(unfinished):
                        if thread.exception:
                            failures.append(thread.kwargs['hash'])
                        if thread.isAlive():
                            flying.append(thread)
                        else:
                            self._cb_next()
                    flying = unfinished
                for thread in flying:
                    thread.join()
574
                    if thread.exception:
575
576
577
578
579
580
581
582
583
584
                        failures.append(thread.kwargs['hash'])
                    self._cb_next()
                missing = failures
                if missing and len(missing) == old_failures:
                    tries -= 1
                old_failures = len(missing)
            if missing:
                raise ClientError(
                    '%s blocks failed to upload' % len(missing),
                    details=['%s' % thread.exception for thread in missing])
585
        except KeyboardInterrupt:
586
            sendlog.info('- - - wait for threads to finish')
587
588
589
590
            for thread in activethreads():
                thread.join()
            raise

591
        r = self.object_put(
592
            obj,
593
594
595
            format='json',
            hashmap=True,
            content_type=content_type,
596
            if_etag_match=if_etag_match,
597
598
            if_etag_not_match='*' if if_not_exist else None,
            etag=etag,
599
            json=hashmap,
600
601
            permissions=sharing,
            public=public,
602
            success=201)
603
        return r.headers
604

605
    # download_* auxiliary methods
Stavros Sachtouris's avatar
Stavros Sachtouris committed
606
    def _get_remote_blocks_info(self, obj, **restargs):
607
        #retrieve object hashmap
608
        myrange = restargs.pop('data_range', None)
609
        hashmap = self.get_object_hashmap(obj, **restargs)
610
        restargs['data_range'] = myrange
611
612
613
        blocksize = int(hashmap['block_size'])
        blockhash = hashmap['block_hash']
        total_size = hashmap['bytes']
Stavros Sachtouris's avatar
Stavros Sachtouris committed
614
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
615
        map_dict = {}
Stavros Sachtouris's avatar
Stavros Sachtouris committed
616
        for i, h in enumerate(hashmap['hashes']):
617
618
619
620
621
            #  map_dict[h] = i   CHAGE
            if h in map_dict:
                map_dict[h].append(i)
            else:
                map_dict[h] = [i]
Stavros Sachtouris's avatar
Stavros Sachtouris committed
622
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
623

624
625
626
    def _dump_blocks_sync(
            self, obj, remote_hashes, blocksize, total_size, dst, range,
            **args):
Stavros Sachtouris's avatar
Stavros Sachtouris committed
627
        for blockid, blockhash in enumerate(remote_hashes):
628
629
630
631
632
633
634
635
636
637
            if blockhash:
                start = blocksize * blockid
                is_last = start + blocksize > total_size
                end = (total_size - 1) if is_last else (start + blocksize - 1)
                (start, end) = _range_up(start, end, range)
                args['data_range'] = 'bytes=%s-%s' % (start, end)
                r = self.object_get(obj, success=(200, 206), **args)
                self._cb_next()
                dst.write(r.content)
                dst.flush()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
638

639
640
    def _get_block_async(self, obj, **args):
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
641
642
        event.start()
        return event
643

644
645
646
647
648
649
650
    def _hash_from_file(self, fp, start, size, blockhash):
        fp.seek(start)
        block = fp.read(size)
        h = newhashlib(blockhash)
        h.update(block.strip('\x00'))
        return hexlify(h.digest())

651
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
652
        """write the results of a greenleted rest call to a file
653
654
655
656
657

        :param offset: the offset of the file up to blocksize
        - e.g. if the range is 10-100, all blocks will be written to
        normal_position - 10
        """
658
        for key, g in flying.items():
659
660
661
662
663
664
665
            if g.isAlive():
                continue
            if g.exception:
                raise g.exception
            block = g.value.content
            for block_start in blockids[key]:
                local_file.seek(block_start + offset)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
666
667
                local_file.write(block)
                self._cb_next()
668
669
            flying.pop(key)
            blockids.pop(key)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
670
671
        local_file.flush()

672
673
674
    def _dump_blocks_async(
            self, obj, remote_hashes, blocksize, total_size, local_file,
            blockhash=None, resume=False, filerange=None, **restargs):
675
        file_size = fstat(local_file.fileno()).st_size if resume else 0
676
677
        flying = dict()
        blockid_dict = dict()
678
679
680
        offset = 0
        if filerange is not None:
            rstart = int(filerange.split('-')[0])
681
            offset = rstart if blocksize > rstart else rstart % blocksize
682

683
        self._init_thread_limit()
684
        for block_hash, blockids in remote_hashes.items():
685
686
687
688
689
690
691
            blockids = [blk * blocksize for blk in blockids]
            unsaved = [blk for blk in blockids if not (
                blk < file_size and block_hash == self._hash_from_file(
                        local_file, blk, blocksize, blockhash))]
            self._cb_next(len(blockids) - len(unsaved))
            if unsaved:
                key = unsaved[0]
692
                self._watch_thread_limit(flying.values())
693
694
                self._thread2file(
                    flying, blockid_dict, local_file, offset,
695
                    **restargs)
696
697
                end = total_size - 1 if (
                    key + blocksize > total_size) else key + blocksize - 1
698
                start, end = _range_up(key, end, filerange)
699
700
701
702
703
                if start == end:
                    self._cb_next()
                    continue
                restargs['async_headers'] = {
                    'Range': 'bytes=%s-%s' % (start, end)}
704
705
                flying[key] = self._get_block_async(obj, **restargs)
                blockid_dict[key] = unsaved
Stavros Sachtouris's avatar
Stavros Sachtouris committed
706

707
708
        for thread in flying.values():
            thread.join()
709
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
710

711
712
713
714
715
716
717
718
719
720
    def download_object(
            self, obj, dst,
            download_cb=None,
            version=None,
            resume=False,
            range_str=None,
            if_match=None,
            if_none_match=None,
            if_modified_since=None,
            if_unmodified_since=None):
721
        """Download an object (multiple connections, random blocks)
722
723
724
725
726
727
728
729
730
731
732

        :param obj: (str) remote object path

        :param dst: open file descriptor (wb+)

        :param download_cb: optional progress.bar object for downloading

        :param version: (str) file version

        :param resume: (bool) if set, preserve already downloaded file parts

733
        :param range_str: (str) from, to are file positions (int) in bytes
734
735
736
737
738
739
740

        :param if_match: (str)

        :param if_none_match: (str)

        :param if_modified_since: (str) formated date

741
        :param if_unmodified_since: (str) formated date"""
742
743
744
        restargs = dict(
            version=version,
            data_range=None if range_str is None else 'bytes=%s' % range_str,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
745
746
747
748
749
            if_match=if_match,
            if_none_match=if_none_match,
            if_modified_since=if_modified_since,
            if_unmodified_since=if_unmodified_since)

750
751
        (
            blocksize,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
752
753
            blockhash,
            total_size,
754
            hash_list,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
755
756
757
758
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
        assert total_size >= 0

        if download_cb:
759
            self.progress_bar_gen = download_cb(len(hash_list))
Stavros Sachtouris's avatar
Stavros Sachtouris committed
760
761
762
            self._cb_next()

        if dst.isatty():
763
764
            self._dump_blocks_sync(
                obj,
765
766
767
768
                hash_list,
                blocksize,
                total_size,
                dst,
769
                range_str,
770
                **restargs)
771
        else:
772
773
            self._dump_blocks_async(
                obj,
774
775
776
777
778
779
                remote_hashes,
                blocksize,
                total_size,
                dst,
                blockhash,
                resume,
780
                range_str,
781
                **restargs)
782
            if not range_str:
783
                dst.truncate(total_size)
784

Stavros Sachtouris's avatar
Stavros Sachtouris committed
785
786
        self._complete_cb()

787
788
789
790
791
792
793
794
795
    def download_to_string(
            self, obj,
            download_cb=None,
            version=None,
            range_str=None,
            if_match=None,
            if_none_match=None,
            if_modified_since=None,
            if_unmodified_since=None):
796
797
        """Download an object to a string (multiple connections). This method
        uses threads for http requests, but stores all content in memory.
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836

        :param obj: (str) remote object path

        :param download_cb: optional progress.bar object for downloading

        :param version: (str) file version

        :param range_str: (str) from, to are file positions (int) in bytes

        :param if_match: (str)

        :param if_none_match: (str)

        :param if_modified_since: (str) formated date

        :param if_unmodified_since: (str) formated date

        :returns: (str) the whole object contents
        """
        restargs = dict(
            version=version,
            data_range=None if range_str is None else 'bytes=%s' % range_str,
            if_match=if_match,
            if_none_match=if_none_match,
            if_modified_since=if_modified_since,
            if_unmodified_since=if_unmodified_since)

        (
            blocksize,
            blockhash,
            total_size,
            hash_list,
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
        assert total_size >= 0

        if download_cb:
            self.progress_bar_gen = download_cb(len(hash_list))
            self._cb_next()

837
838
839
840
        num_of_blocks = len(remote_hashes)
        ret = [''] * num_of_blocks
        self._init_thread_limit()
        flying = dict()
841
842
843
844
845
        for blockid, blockhash in enumerate(remote_hashes):
            start = blocksize * blockid
            is_last = start + blocksize > total_size
            end = (total_size - 1) if is_last else (start + blocksize - 1)
            (start, end) = _range_up(start, end, range_str)
846
847
848
849
850
851
852
853
854
855
856
857
858
859
            if start < end:
                self._watch_thread_limit(flying.values())
                flying[blockid] = self._get_block_async(obj, **restargs)
            for runid, thread in flying.items():
                if (blockid + 1) == num_of_blocks:
                    thread.join()
                elif thread.isAlive():
                    continue
                if thread.exception:
                    raise thread.exception
                ret[runid] = thread.value.content
                self._cb_next()
                flying.pop(runid)
        return ''.join(ret)
860

Stavros Sachtouris's avatar
Stavros Sachtouris committed
861
    #Command Progress Bar method
862
    def _cb_next(self, step=1):
Stavros Sachtouris's avatar
Stavros Sachtouris committed
863
864
        if hasattr(self, 'progress_bar_gen'):
            try:
865
866
                for i in xrange(step):
                    self.progress_bar_gen.next()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
867
868
            except:
                pass
869

Stavros Sachtouris's avatar
Stavros Sachtouris committed
870
871
872
    def _complete_cb(self):
        while True:
            try:
873
                self.progress_bar_gen.next()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
874
875
            except:
                break
Stavros Sachtouris's avatar
Stavros Sachtouris committed
876

877
878
879
880
881
882
883
884
    def get_object_hashmap(
            self, obj,
            version=None,
            if_match=None,
            if_none_match=None,
            if_modified_since=None,
            if_unmodified_since=None,
            data_range=None):
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
        """
        :param obj: (str) remote object path

        :param if_match: (str)

        :param if_none_match: (str)

        :param if_modified_since: (str) formated date

        :param if_unmodified_since: (str) formated date

        :param data_range: (str) from-to where from and to are integers
            denoting file positions in bytes

        :returns: (list)
        """
901
        try:
902
903
            r = self.object_get(
                obj,
904
905
906
907
908
909
910
                hashmap=True,
                version=version,
                if_etag_match=if_match,
                if_etag_not_match=if_none_match,
                if_modified_since=if_modified_since,
                if_unmodified_since=if_unmodified_since,
                data_range=data_range)
911
912
913
914
        except ClientError as err:
            if err.status == 304 or err.status == 412:
                return {}
            raise
915
        return r.json
916

917
    def set_account_group(self, group, usernames):
918
919
920
921
922
        """
        :param group: (str)

        :param usernames: (list)
        """
923
        self.account_post(update=True, groups={group: usernames})
Stavros Sachtouris's avatar
Stavros Sachtouris committed
924

925
    def del_account_group(self, group):
926
927
928
        """
        :param group: (str)
        """
929
        self.account_post(update=True, groups={group: []})
930

Stavros Sachtouris's avatar
Stavros Sachtouris committed
931
    def get_account_info(self, until=None):
932
933
934
935
936
        """
        :param until: (str) formated date

        :returns: (dict)
        """
Stavros Sachtouris's avatar
Stavros Sachtouris committed
937
        r = self.account_head(until=until)
938
        if r.status_code == 401:
939
            raise ClientError("No authorization", status=401)
940
        return r.headers
941

Stavros Sachtouris's avatar
Stavros Sachtouris committed
942
    def get_account_quota(self):
943
944
945
        """
        :returns: (dict)
        """
946
947
        return filter_in(
            self.get_account_info(),
948
949
            'X-Account-Policy-Quota',
            exactMatch=True)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
950
951

    def get_account_versioning(self):
952
953
954
        """
        :returns: (dict)
        """
955
956
        return filter_in(
            self.get_account_info(),
957
958
            'X-Account-Policy-Versioning',
            exactMatch=True)
959

Stavros Sachtouris's avatar
Stavros Sachtouris committed
960
    def get_account_meta(self, until=None):
961
962
963
964
965
        """
        :meta until: (str) formated date

        :returns: (dict)
        """
966
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
967

968
    def get_account_group(self):
969
970
971
        """
        :returns: (dict)
        """
972
973
        return filter_in(self.get_account_info(), 'X-Account-Group-')

974
    def set_account_meta(self, metapairs):
975
976
977
        """
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
        """
978
        assert(type(metapairs) is dict)
979
        self.account_post(update=True, metadata=metapairs)
980

981
    def del_account_meta(self, metakey):
982
983
984
        """
        :param metakey: (str) metadatum key
        """
985
        self.account_post(update=True, metadata={metakey: ''})
986

987
    """
Stavros Sachtouris's avatar
Stavros Sachtouris committed
988
    def set_account_quota(self, quota):
989
        ""
990
        :param quota: (int)
991
        ""
992
        self.account_post(update=True, quota=quota)
993
    """
Stavros Sachtouris's avatar
Stavros Sachtouris committed
994
995

    def set_account_versioning(self, versioning):
996
997
998
        """
        "param versioning: (str)
        """
999
        self.account_post(update=True, versioning=versioning)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1000

1001
    def list_containers(self):
1002
1003
1004
        """
        :returns: (dict)
        """
1005
        r = self.account_get()
1006
        return r.json
1007

1008
    def del_container(self, until=None, delimiter=None):
1009
1010
1011
        """
        :param until: (str) formated date

1012
        :param delimiter: (str) with / empty container
1013
1014
1015
1016
1017

        :raises ClientError: 404 Container does not exist

        :raises ClientError: 409 Container is not empty
        """
1018
        self._assert_container()
1019
1020
        r = self.container_delete(
            until=until,
1021
1022
            delimiter=delimiter,
            success=(204, 404, 409))
1023
        if r.status_code == 404:
1024
1025
            raise ClientError(
                'Container "%s" does not exist' % self.container,
1026
                r.status_code)
1027
        elif r.status_code == 409:
1028
1029
            raise ClientError(
                'Container "%s" is not empty' % self.container,
1030
                r.status_code)
1031

1032
    def get_container_versioning(self, container=None):
1033
1034
1035
1036
1037
        """
        :param container: (str)

        :returns: (dict)
        """
1038
1039
1040
1041
1042
1043
1044
1045
        cnt_back_up = self.container
        try:
            self.container = container or cnt_back_up
            return filter_in(
                self.get_container_info(),
                'X-Container-Policy-Versioning')
        finally:
            self.container = cnt_back_up
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1046

1047
    def get_container_limit(self, container=None):
1048
1049
1050
1051
1052
        """
        :param container: (str)

        :returns: (dict)
        """
1053
1054
1055
1056
1057
1058
1059
1060
        cnt_back_up = self.container
        try:
            self.container = container or cnt_back_up
            return filter_in(
                self.get_container_info(),
                'X-Container-Policy-Quota')
        finally:
            self.container = cnt_back_up
1061

1062
    def get_container_info(self, until=None):
1063
1064
1065
1066
        """
        :param until: (str) formated date

        :returns: (dict)
1067
1068

        :raises ClientError: 404 Container not found
1069
        """
1070
1071
1072
1073
1074
        try:
            r = self.container_head(until=until)
        except ClientError as err:
            err.details.append('for container %s' % self.container)
            raise err
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1075
1076
        return r.headers

1077
    def get_container_meta(self, until=None):
1078
1079
1080
1081
1082
        """
        :param until: (str) formated date

        :returns: (dict)
        """
1083
1084
        return filter_in(
            self.get_container_info(until=until),
1085
            'X-Container-Meta')
1086

1087
    def get_container_object_meta(self, until=None):
1088
1089
1090
1091
1092
        """
        :param until: (str) formated date

        :returns: (dict)
        """
1093
1094
        return filter_in(
            self.get_container_info(until=until),
1095
            'X-Container-Object-Meta')
1096

1097
    def set_container_meta(self, metapairs):
1098
1099
1100
        """
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
        """
1101
        assert(type(metapairs) is dict)
1102
        self.container_post(update=True, metadata=metapairs)
1103

Stavros Sachtouris's avatar
Stavros Sachtouris committed
1104
    def del_container_meta(self, metakey):
1105
1106
1107
        """
        :param metakey: (str) metadatum key
        """
1108
        self.container_post(update=True, metadata={metakey: ''})
1109

1110
    def set_container_limit(self, limit):
1111
        """
1112
        :param limit: (int)
1113
        """
1114
        self.container_post(update=True, quota=limit)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1115
1116

    def set_container_versioning(self, versioning):
1117
1118
1119
        """
        :param versioning: (str)
        """
1120
        self.container_post(update=True, versioning=versioning)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1121

1122
    def del_object(self, obj, until=None, delimiter=None):
1123
1124
1125
1126
1127
1128
1129
        """
        :param obj: (str) remote object path

        :param until: (str) formated date