__init__.py 53.1 KB
Newer Older
1
# Copyright 2011-2015 GRNET S.A. All rights reserved.
Giorgos Verigakis's avatar
Giorgos Verigakis committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
#      copyright notice, this list of conditions and the following
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
#      copyright notice, this list of conditions and the following
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

34
from threading import enumerate as activethreads
35

36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40
from logging import getLogger
Giorgos Verigakis's avatar
Giorgos Verigakis committed
41

42
from binascii import hexlify
Giorgos Verigakis's avatar
Giorgos Verigakis committed
43

44
from kamaki.clients import SilentEvent
45
from kamaki.clients.pithos.rest_api import PithosRestClient
46
from kamaki.clients.storage import ClientError
47
from kamaki.clients.utils import path4url, filter_in, readall
Giorgos Verigakis's avatar
Giorgos Verigakis committed
48

49 50
LOG = getLogger(__name__)

51

52 53 54 55 56 57
def _dump_buffer(buffer_, destination):
    """Append buffer to destination (file descriptor)"""
    destination.write(buffer_)
    destination.flush()


58
def _pithos_hash(block, blockhash):
59
    h = newhashlib(blockhash)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
60 61 62
    h.update(block.rstrip('\x00'))
    return h.hexdigest()

63

64 65 66 67 68 69 70 71 72 73 74 75 76 77
def _range_up(start, end, max_value, a_range):
    """
    :param start: (int) the window bottom

    :param end: (int) the window top

    :param max_value: (int) maximum accepted value

    :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
        where X: x|x-y|-x where x < y and x, y natural numbers

    :returns: (str) a range string cut-off for the start-end range
        an empty response means this window is out of range
    """
Stavros Sachtouris's avatar
Stavros Sachtouris committed
78 79 80 81 82
    assert start >= 0, '_range_up called w. start(%s) < 0' % start
    assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
        end, start)
    assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
        max_value, end)
83 84 85 86 87 88 89
    if not a_range:
        return '%s-%s' % (start, end)
    selected = []
    for some_range in a_range.split(','):
        v0, sep, v1 = some_range.partition('-')
        if v0:
            v0 = int(v0)
90
            if sep:
91 92 93 94 95 96 97 98
                v1 = int(v1)
                if v1 < start or v0 > end or v1 < v0:
                    continue
                v0 = v0 if v0 > start else start
                v1 = v1 if v1 < end else end
                selected.append('%s-%s' % (v0, v1))
            elif v0 < start:
                continue
99
            else:
100 101
                v1 = v0 if v0 <= end else end
                selected.append('%s-%s' % (start, v1))
102
        else:
103 104 105 106 107 108
            v1 = int(v1)
            if max_value - v1 > end:
                continue
            v0 = (max_value - v1) if max_value - v1 > start else start
            selected.append('%s-%s' % (v0, end))
    return ','.join(selected)
109

110

111
class PithosClient(PithosRestClient):
112
    """Synnefo Pithos+ API client"""
Giorgos Verigakis's avatar
Giorgos Verigakis committed
113

114 115 116
    def __init__(self, endpoint_url, token, account=None, container=None):
        super(PithosClient, self).__init__(
            endpoint_url, token, account, container)
117

118 119 120 121 122 123 124 125 126 127 128 129 130
    def use_alternative_account(self, func, *args, **kwargs):
        """Run method with an alternative account UUID, as long as kwargs
           contain a non-None "alternative_account" argument
        """
        alternative_account = kwargs.pop('alternative_account', None)
        bu_account = self.account
        try:
            if alternative_account and alternative_account != self.account:
                self.account = alternative_account
            return func(*args, **kwargs)
        finally:
            self.account = bu_account

131 132
    def create_container(
            self,
133
            container=None, sizelimit=None, versioning=None, metadata=None,
134
            project_id=None, **kwargs):
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
        """
        :param container: (str) if not given, self.container is used instead

        :param sizelimit: (int) container total size limit in bytes

        :param versioning: (str) can be auto or whatever supported by server

        :param metadata: (dict) Custom user-defined metadata of the form
            { 'name1': 'value1', 'name2': 'value2', ... }

        :returns: (dict) response headers
        """
        cnt_back_up = self.container
        try:
            self.container = container or cnt_back_up
            r = self.container_put(
151
                quota=sizelimit, versioning=versioning, metadata=metadata,
152
                project_id=project_id, **kwargs)
153 154 155 156
            return r.headers
        finally:
            self.container = cnt_back_up

157
    def purge_container(self, container=None):
Stavros Sachtouris's avatar
Stavros Sachtouris committed
158
        """Delete an empty container and destroy associated blocks"""
159 160 161
        cnt_back_up = self.container
        try:
            self.container = container or cnt_back_up
162
            r = self.container_delete(until=unicode(time()))
163 164
        finally:
            self.container = cnt_back_up
165
        return r.headers
166

167 168 169 170 171 172 173 174 175 176
    def upload_object_unchunked(
            self, obj, f,
            withHashFile=False,
            size=None,
            etag=None,
            content_encoding=None,
            content_disposition=None,
            content_type=None,
            sharing=None,
            public=None):
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
        """
        :param obj: (str) remote object path

        :param f: open file descriptor

        :param withHashFile: (bool)

        :param size: (int) size of data to upload

        :param etag: (str)

        :param content_encoding: (str)

        :param content_disposition: (str)

        :param content_type: (str)

        :param sharing: {'read':[user and/or grp names],
            'write':[usr and/or grp names]}

        :param public: (bool)
198 199

        :returns: (dict) created object metadata
200
        """
201
        self._assert_container()
202 203 204 205 206 207 208

        if withHashFile:
            data = f.read()
            try:
                import json
                data = json.dumps(json.loads(data))
            except ValueError:
209
                raise ClientError('"%s" is not json-formated' % f.name, 1)
210
            except SyntaxError:
211 212
                msg = '"%s" is not a valid hashmap file' % f.name
                raise ClientError(msg, 1)
213
            f = StringIO(data)
214
        else:
215
            data = readall(f, size) if size else f.read()
216
        r = self.object_put(
217
            obj,
218 219 220 221 222 223 224 225
            data=data,
            etag=etag,
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            content_type=content_type,
            permissions=sharing,
            public=public,
            success=201)
226
        return r.headers
227

228 229 230 231 232 233 234 235
    def create_object_by_manifestation(
            self, obj,
            etag=None,
            content_encoding=None,
            content_disposition=None,
            content_type=None,
            sharing=None,
            public=None):
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
        """
        :param obj: (str) remote object path

        :param etag: (str)

        :param content_encoding: (str)

        :param content_disposition: (str)

        :param content_type: (str)

        :param sharing: {'read':[user and/or grp names],
            'write':[usr and/or grp names]}

        :param public: (bool)
251 252

        :returns: (dict) created object metadata
253
        """
254
        self._assert_container()
255
        r = self.object_put(
256
            obj,
257 258 259 260 261 262 263 264
            content_length=0,
            etag=etag,
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            content_type=content_type,
            permissions=sharing,
            public=public,
            manifest='%s/%s' % (self.container, obj))
265
        return r.headers
266

267
    # upload_* auxiliary methods
268
    def _put_block_async(self, data, hash):
269 270 271 272 273
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
        event.start()
        return event

    def _put_block(self, data, hash):
274 275
        r = self.container_post(
            update=True,
276 277 278 279 280 281
            content_type='application/octet-stream',
            content_length=len(data),
            data=data,
            format='json')
        assert r.json[0] == hash, 'Local hash does not match server'

282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
    def _get_file_block_info(self, fileobj, size=None, cache=None):
        """
        :param fileobj: (file descriptor) source

        :param size: (int) size of data to upload from source

        :param cache: (dict) if provided, cache container info response to
        avoid redundant calls
        """
        if isinstance(cache, dict):
            try:
                meta = cache[self.container]
            except KeyError:
                meta = self.get_container_info()
                cache[self.container] = meta
        else:
            meta = self.get_container_info()
299 300
        blocksize = int(meta['x-container-block-size'])
        blockhash = meta['x-container-block-hash']
301
        size = size if size is not None else fstat(fileobj.fileno()).st_size
302
        nblocks = 1 + (size - 1) // blocksize
303 304
        return (blocksize, blockhash, size, nblocks)

305
    def _create_object_or_get_missing_hashes(
306 307 308 309 310
            self, obj, json,
            size=None,
            format='json',
            hashmap=True,
            content_type=None,
311 312
            if_etag_match=None,
            if_etag_not_match=None,
313 314 315 316 317 318 319
            content_encoding=None,
            content_disposition=None,
            permissions=None,
            public=None,
            success=(201, 409)):
        r = self.object_put(
            obj,
320 321 322 323
            format='json',
            hashmap=True,
            content_type=content_type,
            json=json,
324 325
            if_etag_match=if_etag_match,
            if_etag_not_match=if_etag_not_match,
326 327 328 329
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            permissions=permissions,
            public=public,
330
            success=success)
331
        return (None if r.status_code == 201 else r.json), r.headers
332

333
    def _calculate_blocks_for_upload(
334 335
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
            hash_cb=None):
336
        offset = 0
337 338 339 340
        if hash_cb:
            hash_gen = hash_cb(nblocks)
            hash_gen.next()

341
        for i in xrange(nblocks):
342
            block = readall(fileobj, min(blocksize, size - offset))
343
            bytes = len(block)
344 345
            if bytes <= 0:
                break
346
            hash = _pithos_hash(block, blockhash)
347
            hashes.append(hash)
348
            hmap[hash] = (offset, bytes)
349 350 351
            offset += bytes
            if hash_cb:
                hash_gen.next()
352 353
        msg = ('Failed to calculate uploading blocks: '
               'read bytes(%s) != requested size (%s)' % (offset, size))
Stavros Sachtouris's avatar
Stavros Sachtouris committed
354
        assert offset == size, msg
355

356 357
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
        """upload missing blocks asynchronously"""
358

359
        self._init_thread_limit()
360

361
        flying = []
362
        failures = []
363
        for hash in missing:
364
            offset, bytes = hmap[hash]
365
            fileobj.seek(offset)
366
            data = readall(fileobj, bytes)
367
            r = self._put_block_async(data, hash)
368
            flying.append(r)
369 370
            unfinished = self._watch_thread_limit(flying)
            for thread in set(flying).difference(unfinished):
371 372
                if thread.exception:
                    failures.append(thread)
373
                    if isinstance(
374 375 376
                            thread.exception,
                            ClientError) and thread.exception.status == 502:
                        self.POOLSIZE = self._thread_limit
377
                elif thread.isAlive():
378
                    flying.append(thread)
379 380 381 382 383
                elif upload_gen:
                    try:
                        upload_gen.next()
                    except:
                        pass
384 385 386 387
            flying = unfinished

        for thread in flying:
            thread.join()
388 389
            if thread.exception:
                failures.append(thread)
390 391 392 393 394
            elif upload_gen:
                try:
                    upload_gen.next()
                except:
                    pass
395

396
        return [failure.kwargs['hash'] for failure in failures]
397

398 399 400 401 402 403
    def upload_object(
            self, obj, f,
            size=None,
            hash_cb=None,
            upload_cb=None,
            etag=None,
404
            if_etag_match=None,
405
            if_not_exist=None,
406 407 408 409
            content_encoding=None,
            content_disposition=None,
            content_type=None,
            sharing=None,
410
            public=None,
411 412
            container_info_cache=None,
            target_account=None):
413 414 415 416 417 418 419 420 421 422 423 424
        """Upload an object using multiple connections (threads)

        :param obj: (str) remote object path

        :param f: open file descriptor (rb)

        :param hash_cb: optional progress.bar object for calculating hashes

        :param upload_cb: optional progress.bar object for uploading

        :param etag: (str)

425 426 427
        :param if_etag_match: (str) Push that value to if-match header at file
            creation

428 429 430 431 432
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
            it does not exist remotely, otherwise the operation will fail.
            Involves the case of an object with the same path is created while
            the object is being uploaded.

433 434 435 436 437 438 439 440 441 442
        :param content_encoding: (str)

        :param content_disposition: (str)

        :param content_type: (str)

        :param sharing: {'read':[user and/or grp names],
            'write':[usr and/or grp names]}

        :param public: (bool)
443 444

        :param container_info_cache: (dict) if given, avoid redundant calls to
445
            server for container info (block size and hash information)
446 447 448 449

        :param target_account: (str) the UUID of the account the object will be
            allocated at, if different to the client account (e.g., when
            user A uploads something to a location owned by user B)
450
        """
451
        self._assert_container()
452

453
        block_info = (
454 455 456 457
            blocksize, blockhash, size, nblocks
            ) = self.use_alternative_account(
                self._get_file_block_info, f, size, container_info_cache,
                alternative_account=target_account)
458
        hashes, hmap = [], {}
459
        content_type = content_type or 'application/octet-stream'
460

461
        self._calculate_blocks_for_upload(
462
            *block_info,
463 464 465
            hashes=hashes,
            hmap=hmap,
            fileobj=f,
466 467 468
            hash_cb=hash_cb)

        hashmap = dict(bytes=size, hashes=hashes)
469 470
        missing, obj_headers = self.use_alternative_account(
            self._create_object_or_get_missing_hashes, obj, hashmap,
471 472
            content_type=content_type,
            size=size,
473 474
            if_etag_match=if_etag_match,
            if_etag_not_match='*' if if_not_exist else None,
475 476 477
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            permissions=sharing,
478 479
            public=public,
            alternative_account=target_account)
480 481

        if missing is None:
482
            return obj_headers
483

484
        if upload_cb:
485 486
            upload_gen = upload_cb(len(hashmap['hashes']))
            for i in range(len(hashmap['hashes']) + 1 - len(missing)):
487 488 489
                try:
                    upload_gen.next()
                except:
490
                    LOG.debug('Progress bar failure')
491
                    break
492 493 494
        else:
            upload_gen = None

495
        retries = 7
496
        while retries:
497
            LOG.debug('%s blocks missing' % len(missing))
498 499 500
            num_of_blocks = len(missing)
            missing = self._upload_missing_blocks(
                missing, hmap, f, upload_gen)
501
            if missing:
502 503 504 505 506 507 508 509 510 511 512 513 514 515
                if num_of_blocks == len(missing):
                    retries -= 1
                else:
                    num_of_blocks = len(missing)
            else:
                break
        if missing:
            try:
                details = ['%s' % thread.exception for thread in missing]
            except Exception:
                details = ['Also, failed to read thread exceptions']
            raise ClientError(
                '%s blocks failed to upload' % len(missing),
                details=details)
516

517 518
        r = self.use_alternative_account(
            self.object_put,
519 520 521 522
            obj,
            format='json',
            hashmap=True,
            content_type=content_type,
523
            content_encoding=content_encoding,
524 525 526 527 528 529
            if_etag_match=if_etag_match,
            if_etag_not_match='*' if if_not_exist else None,
            etag=etag,
            json=hashmap,
            permissions=sharing,
            public=public,
530 531
            success=201,
            alternative_account=target_account)
532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
        return r.headers

    def upload_from_string(
            self, obj, input_str,
            hash_cb=None,
            upload_cb=None,
            etag=None,
            if_etag_match=None,
            if_not_exist=None,
            content_encoding=None,
            content_disposition=None,
            content_type=None,
            sharing=None,
            public=None,
            container_info_cache=None):
        """Upload an object using multiple connections (threads)

        :param obj: (str) remote object path

        :param input_str: (str) upload content

        :param hash_cb: optional progress.bar object for calculating hashes

        :param upload_cb: optional progress.bar object for uploading

        :param etag: (str)

        :param if_etag_match: (str) Push that value to if-match header at file
            creation

        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
            it does not exist remotely, otherwise the operation will fail.
            Involves the case of an object with the same path is created while
            the object is being uploaded.

        :param content_encoding: (str)

        :param content_disposition: (str)

        :param content_type: (str)

        :param sharing: {'read':[user and/or grp names],
            'write':[usr and/or grp names]}

        :param public: (bool)

        :param container_info_cache: (dict) if given, avoid redundant calls to
579
            server for container info (block size and hash information)
580 581 582 583
        """
        self._assert_container()

        blocksize, blockhash, size, nblocks = self._get_file_block_info(
584
            fileobj=None, size=len(input_str), cache=container_info_cache)
585 586 587 588
        (hashes, hmap, offset) = ([], {}, 0)
        if not content_type:
            content_type = 'application/octet-stream'

589
        hashes = []
590
        hmap = {}
591
        for blockid in range(nblocks):
592 593
            start = blockid * blocksize
            block = input_str[start: (start + blocksize)]
594
            hashes.append(_pithos_hash(block, blockhash))
595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
            hmap[hashes[blockid]] = (start, block)

        hashmap = dict(bytes=size, hashes=hashes)
        missing, obj_headers = self._create_object_or_get_missing_hashes(
            obj, hashmap,
            content_type=content_type,
            size=size,
            if_etag_match=if_etag_match,
            if_etag_not_match='*' if if_not_exist else None,
            content_encoding=content_encoding,
            content_disposition=content_disposition,
            permissions=sharing,
            public=public)
        if missing is None:
            return obj_headers
        num_of_missing = len(missing)

        if upload_cb:
613 614
            self.progress_bar_gen = upload_cb(nblocks)
            for i in range(nblocks + 1 - num_of_missing):
615 616
                self._cb_next()

617 618
        tries = 7
        old_failures = 0
619
        try:
620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
            while tries and missing:
                flying = []
                failures = []
                for hash in missing:
                    offset, block = hmap[hash]
                    bird = self._put_block_async(block, hash)
                    flying.append(bird)
                    unfinished = self._watch_thread_limit(flying)
                    for thread in set(flying).difference(unfinished):
                        if thread.exception:
                            failures.append(thread.kwargs['hash'])
                        if thread.isAlive():
                            flying.append(thread)
                        else:
                            self._cb_next()
                    flying = unfinished
                for thread in flying:
                    thread.join()
638
                    if thread.exception:
639 640 641 642 643 644 645
                        failures.append(thread.kwargs['hash'])
                    self._cb_next()
                missing = failures
                if missing and len(missing) == old_failures:
                    tries -= 1
                old_failures = len(missing)
            if missing:
646
                raise ClientError('%s blocks failed to upload' % len(missing))
647
        except KeyboardInterrupt:
648
            LOG.debug('- - - wait for threads to finish')
649 650 651
            for thread in activethreads():
                thread.join()
            raise
652
        self._cb_next()
653

654
        r = self.object_put(
655
            obj,
656 657 658
            format='json',
            hashmap=True,
            content_type=content_type,
659
            content_encoding=content_encoding,
660
            if_etag_match=if_etag_match,
661 662
            if_etag_not_match='*' if if_not_exist else None,
            etag=etag,
663
            json=hashmap,
664 665
            permissions=sharing,
            public=public,
666
            success=201)
667
        return r.headers
668

669
    # download_* auxiliary methods
Stavros Sachtouris's avatar
Stavros Sachtouris committed
670
    def _get_remote_blocks_info(self, obj, **restargs):
671
        # retrieve object hashmap
672
        myrange = restargs.pop('data_range', None)
673 674
        hashmap = restargs.pop('hashmap', None) or (
            self.get_object_hashmap(obj, **restargs))
675
        restargs['data_range'] = myrange
676 677 678
        blocksize = int(hashmap['block_size'])
        blockhash = hashmap['block_hash']
        total_size = hashmap['bytes']
679
        # assert total_size/blocksize + 1 == len(hashmap['hashes'])
680
        map_dict = {}
Stavros Sachtouris's avatar
Stavros Sachtouris committed
681
        for i, h in enumerate(hashmap['hashes']):
682 683 684 685 686
            #  map_dict[h] = i   CHAGE
            if h in map_dict:
                map_dict[h].append(i)
            else:
                map_dict[h] = [i]
Stavros Sachtouris's avatar
Stavros Sachtouris committed
687
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
688

689
    def _dump_blocks_sync(
690
            self, obj, remote_hashes, blocksize, total_size, dst, crange,
691
            **args):
692 693
        if not total_size:
            return
Stavros Sachtouris's avatar
Stavros Sachtouris committed
694
        for blockid, blockhash in enumerate(remote_hashes):
695 696 697 698
            if blockhash:
                start = blocksize * blockid
                is_last = start + blocksize > total_size
                end = (total_size - 1) if is_last else (start + blocksize - 1)
699 700 701 702 703
                data_range = _range_up(start, end, total_size, crange)
                if not data_range:
                    self._cb_next()
                    continue
                args['data_range'] = 'bytes=%s' % data_range
704 705 706 707
                r = self.object_get(obj, success=(200, 206), **args)
                self._cb_next()
                dst.write(r.content)
                dst.flush()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
708

709 710
    def _get_block_async(self, obj, **args):
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
711 712
        event.start()
        return event
713

714 715
    def _hash_from_file(self, fp, start, size, blockhash):
        fp.seek(start)
716
        block = readall(fp, size)
717 718 719 720
        h = newhashlib(blockhash)
        h.update(block.strip('\x00'))
        return hexlify(h.digest())

721
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
722
        """write the results of a greenleted rest call to a file
723 724 725 726 727

        :param offset: the offset of the file up to blocksize
        - e.g. if the range is 10-100, all blocks will be written to
        normal_position - 10
        """
728
        for key, g in flying.items():
729 730 731 732 733 734
            if g.isAlive():
                continue
            if g.exception:
                raise g.exception
            block = g.value.content
            for block_start in blockids[key]:
735
                # This should not be used in all cases
736
                local_file.seek(block_start + offset)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
737 738
                local_file.write(block)
                self._cb_next()
739 740
            flying.pop(key)
            blockids.pop(key)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
741 742
        local_file.flush()

743 744 745
    def _dump_blocks_async(
            self, obj, remote_hashes, blocksize, total_size, local_file,
            blockhash=None, resume=False, filerange=None, **restargs):
746
        file_size = fstat(local_file.fileno()).st_size if resume else 0
747 748
        flying = dict()
        blockid_dict = dict()
749
        offset = 0
750

751
        self._init_thread_limit()
752
        for block_hash, blockids in remote_hashes.items():
753 754 755
            blockids = [blk * blocksize for blk in blockids]
            unsaved = [blk for blk in blockids if not (
                blk < file_size and block_hash == self._hash_from_file(
756
                    local_file, blk, blocksize, blockhash))]
757 758 759
            self._cb_next(len(blockids) - len(unsaved))
            if unsaved:
                key = unsaved[0]
Stavros Sachtouris's avatar
Stavros Sachtouris committed
760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
                self._watch_thread_limit(flying.values())
                self._thread2file(
                    flying, blockid_dict, local_file, offset,
                    **restargs)
                end = total_size - 1 if (
                    key + blocksize > total_size) else key + blocksize - 1
                if end < key:
                    self._cb_next()
                    continue
                data_range = _range_up(key, end, total_size, filerange)
                if not data_range:
                    self._cb_next()
                    continue
                restargs[
                    'async_headers'] = {'Range': 'bytes=%s' % data_range}
                flying[key] = self._get_block_async(obj, **restargs)
                blockid_dict[key] = unsaved
Stavros Sachtouris's avatar
Stavros Sachtouris committed
777

778 779
        for thread in flying.values():
            thread.join()
780
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
781

782 783 784 785 786 787 788 789 790
    def download_object(
            self, obj, dst,
            download_cb=None,
            version=None,
            resume=False,
            range_str=None,
            if_match=None,
            if_none_match=None,
            if_modified_since=None,
791 792
            if_unmodified_since=None,
            headers=dict()):
793
        """Download an object (multiple connections, random blocks)
794 795 796 797 798 799 800 801 802 803 804

        :param obj: (str) remote object path

        :param dst: open file descriptor (wb+)

        :param download_cb: optional progress.bar object for downloading

        :param version: (str) file version

        :param resume: (bool) if set, preserve already downloaded file parts

805
        :param range_str: (str) from, to are file positions (int) in bytes
806 807 808 809 810 811 812

        :param if_match: (str)

        :param if_none_match: (str)

        :param if_modified_since: (str) formated date

813 814 815 816
        :param if_unmodified_since: (str) formated date

        :param headers: (dict) placeholder to gather object headers
        """
817 818 819
        restargs = dict(
            version=version,
            data_range=None if range_str is None else 'bytes=%s' % range_str,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
820 821 822
            if_match=if_match,
            if_none_match=if_none_match,
            if_modified_since=if_modified_since,
823 824
            if_unmodified_since=if_unmodified_since,
            headers=dict())
Stavros Sachtouris's avatar
Stavros Sachtouris committed
825

826 827
        (
            blocksize,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
828 829
            blockhash,
            total_size,
830
            hash_list,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
831
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
832
        headers.update(restargs.pop('headers'))
Stavros Sachtouris's avatar
Stavros Sachtouris committed
833 834 835
        assert total_size >= 0

        if download_cb:
836
            self.progress_bar_gen = download_cb(len(hash_list))
Stavros Sachtouris's avatar
Stavros Sachtouris committed
837 838 839
            self._cb_next()

        if dst.isatty():
840 841
            self._dump_blocks_sync(
                obj,
842 843 844 845
                hash_list,
                blocksize,
                total_size,
                dst,
846
                range_str,
847
                **restargs)
848
        else:
849 850
            self._dump_blocks_async(
                obj,
851 852 853 854 855 856
                remote_hashes,
                blocksize,
                total_size,
                dst,
                blockhash,
                resume,
857
                range_str,
858
                **restargs)
859
            if not range_str:
860
                # this should not be used in all cases
861
                dst.truncate(total_size)
862

Stavros Sachtouris's avatar
Stavros Sachtouris committed
863 864
        self._complete_cb()

865 866 867 868 869 870 871 872
    def download_to_string(
            self, obj,
            download_cb=None,
            version=None,
            range_str=None,
            if_match=None,
            if_none_match=None,
            if_modified_since=None,
873
            if_unmodified_since=None,
874 875
            remote_block_info=None,
            hashmap=None,
876
            headers=dict()):
877 878
        """Download an object to a string (multiple connections). This method
        uses threads for http requests, but stores all content in memory.
879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895

        :param obj: (str) remote object path

        :param download_cb: optional progress.bar object for downloading

        :param version: (str) file version

        :param range_str: (str) from, to are file positions (int) in bytes

        :param if_match: (str)

        :param if_none_match: (str)

        :param if_modified_since: (str) formated date

        :param if_unmodified_since: (str) formated date

896 897 898 899 900 901 902
        :param remote_block_info: (tuple) blocksize, blockhas, total_size and
            hash_list

        :param hashmap: (dict) the remote object hashmap, if it is available
            e.g., from another call. Used for minimizing HEAD container
            requests

903 904
        :param headers: (dict) a placeholder dict to gather object headers

905 906 907 908 909 910 911 912
        :returns: (str) the whole object contents
        """
        restargs = dict(
            version=version,
            data_range=None if range_str is None else 'bytes=%s' % range_str,
            if_match=if_match,
            if_none_match=if_none_match,
            if_modified_since=if_modified_since,
913 914
            if_unmodified_since=if_unmodified_since,
            headers=dict())
915 916 917 918 919 920

        (
            blocksize,
            blockhash,
            total_size,
            hash_list,
921 922
            remote_hashes) = self._get_remote_blocks_info(
                obj, hashmap=hashmap, **restargs)
923
        headers.update(restargs.pop('headers'))
924 925 926 927 928 929
        assert total_size >= 0

        if download_cb:
            self.progress_bar_gen = download_cb(len(hash_list))
            self._cb_next()

930 931 932 933
        num_of_blocks = len(remote_hashes)
        ret = [''] * num_of_blocks
        self._init_thread_limit()
        flying = dict()
934 935 936 937 938
        try:
            for blockid, blockhash in enumerate(remote_hashes):
                start = blocksize * blockid
                is_last = start + blocksize > total_size
                end = (total_size - 1) if is_last else (start + blocksize - 1)
939 940
                data_range_str = _range_up(start, end, end, range_str)
                if data_range_str:
941
                    self._watch_thread_limit(flying.values())
942
                    restargs['data_range'] = 'bytes=%s' % data_range_str
943 944 945 946 947 948 949 950 951 952 953 954 955
                    flying[blockid] = self._get_block_async(obj, **restargs)
                for runid, thread in flying.items():
                    if (blockid + 1) == num_of_blocks:
                        thread.join()
                    elif thread.isAlive():
                        continue
                    if thread.exception:
                        raise thread.exception
                    ret[runid] = thread.value.content
                    self._cb_next()
                    flying.pop(runid)
            return ''.join(ret)
        except KeyboardInterrupt:
956
            LOG.debug('- - - wait for threads to finish')
957 958
            for thread in activethreads():
                thread.join()
959

960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
    def stream_down(self, obj, dst, buffer_blocks=4, **kwargs):
        """
        Download obj to dst as a stream. Buffer-sized chunks are downloaded
            sequentially, but the blocks of each chunk are downloaded
            asynchronously, using the download_to_string method
        :param obj: (str) the remote object
        :param dst: a file descriptor allowing sequential writing
        :param buffer_blocks: (int) the size of the buffer in blocks. If it is
            1, all blocks will be downloaded sequentially
        :param kwargs: (dict) keyword arguments for download_to_string method
        """
        buffer_blocks = 1 if buffer_blocks < 2 else buffer_blocks
        hashmap = kwargs.get('hashmap', None)
        range_str = kwargs.pop('range_str', None)
        if hashmap is None:
            # Clean kwargs if it contains hashmap=None
            kwargs.pop('hashmap', None)
            # Get new hashmap
            hashmap = kwargs['hashmap'] = self.get_object_hashmap(
                obj,
                kwargs.get('version', None),
                kwargs.get('if_match', None),
                kwargs.get('if_none_match', None),
                kwargs.get('if_modified_since', None),
                kwargs.get('if_unmodified_since', None))
        block_size, obj_size = int(hashmap['block_size']), hashmap['bytes']
        buffer_size = buffer_blocks * block_size
        event = None

        def finish_event(e):
            """Blocking: stop until e is finished or raise error"""
            if e is not None:
                if e.isAlive():
                    e.join()
                if e.exception:
                    raise e.exception

        for chunk_number in range(1 + obj_size // buffer_size):
            start = chunk_number * buffer_size
            end = start + buffer_size
            end = (obj_size if (end > obj_size) else end) - 1
            kwargs['range_str'] = _range_up(start, end, obj_size, range_str)
            buffer_ = self.download_to_string(obj, **kwargs)
            finish_event(event)
            event = SilentEvent(_dump_buffer, buffer_, dst)
            event.start()
        finish_event(event)

1008
    # Command Progress Bar method
1009
    def _cb_next(self, step=1):
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1010 1011
        if hasattr(self, 'progress_bar_gen'):
            try:
1012 1013
                for i in xrange(step):
                    self.progress_bar_gen.next()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1014 1015
            except:
                pass
1016

Stavros Sachtouris's avatar
Stavros Sachtouris committed
1017 1018 1019
    def _complete_cb(self):
        while True:
            try:
1020
                self.progress_bar_gen.next()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1021 1022
            except:
                break
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1023

1024 1025 1026 1027 1028 1029
    def get_object_hashmap(
            self, obj,
            version=None,
            if_match=None,
            if_none_match=None,
            if_modified_since=None,
1030 1031
            if_unmodified_since=None,
            headers=dict()):
1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044
        """
        :param obj: (str) remote object path

        :param if_match: (str)

        :param if_none_match: (str)

        :param if_modified_since: (str) formated date

        :param if_unmodified_since: (str) formated date

        :returns: (list)
        """
1045
        try:
1046 1047
            r = self.object_get(
                obj,
1048 1049 1050 1051 1052
                hashmap=True,
                version=version,
                if_etag_match=if_match,
                if_etag_not_match=if_none_match,
                if_modified_since=if_modified_since,
1053
                if_unmodified_since=if_unmodified_since)
1054 1055 1056 1057
        except ClientError as err:
            if err.status == 304 or err.status == 412:
                return {}
            raise
1058
        headers.update(r.headers)
1059
        return r.json
1060

1061
    def set_account_group(self, group, usernames):
1062 1063 1064 1065 1066
        """
        :param group: (str)

        :param usernames: (list)
        """
1067 1068
        r = self.account_post(update=True, groups={group: usernames})
        return r
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1069

1070
    def del_account_group(self, group):
1071 1072 1073
        """
        :param group: (str)
        """
1074
        self.account_post(update=True, groups={group: []})
1075

Stavros Sachtouris's avatar
Stavros Sachtouris committed
1076
    def get_account_info(self, until=None):
1077 1078 1079 1080 1081
        """
        :param until: (str) formated date

        :returns: (dict)
        """
Stavros Sachtouris's avatar
Stavros Sachtouris committed
1082
        r = self.account_head(until=until)
1083