__init__.py 23.5 KB
Newer Older
1
# Copyright 2011-2014 GRNET S.A. All rights reserved. #
Giorgos Verigakis's avatar
Giorgos Verigakis committed
2
3
4
5
6
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
7
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
8
9
10
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
11
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

33
from urllib2 import quote, unquote
34
from urlparse import urlparse
35
from threading import Thread
36
from json import dumps, loads
37
from time import time
38
from httplib import ResponseNotReady, HTTPException
39
40
from time import sleep
from random import random
41
from logging import getLogger
42
43

from objpool.http import PooledHTTPConnection
44

Giorgos Verigakis's avatar
Giorgos Verigakis committed
45

46
47
TIMEOUT = 60.0   # seconds
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
48

49
50
51
log = getLogger(__name__)
sendlog = getLogger('%s.send' % __name__)
recvlog = getLogger('%s.recv' % __name__)
52

53
54
55
56
57
58

def _encode(v):
    if v and isinstance(v, unicode):
        return quote(v.encode('utf-8'))
    return v

59

Giorgos Verigakis's avatar
Giorgos Verigakis committed
60
class ClientError(Exception):
61
    def __init__(self, message, status=0, details=None):
62
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
63
            message, status, details))
64
        try:
65
            message += '' if message and message[-1] == '\n' else '\n'
66
            serv_stat, sep, new_msg = message.partition('{')
67
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
68
            json_msg = loads(new_msg)
69
            key = json_msg.keys()[0]
70
            serv_stat = serv_stat.strip()
71

72
            json_msg = json_msg[key]
73
74
75
76
77
78
            message = '%s %s (%s)\n' % (
                serv_stat,
                key,
                json_msg['message']) if (
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
            status = json_msg.get('code', status)
79
80
81
            if 'details' in json_msg:
                if not details:
                    details = []
82
                if not isinstance(details, list):
83
84
85
                    details = [details]
                if json_msg['details']:
                    details.append(json_msg['details'])
86
        except Exception:
87
            pass
88
        finally:
89
90
            while message.endswith('\n\n'):
                message = message[:-1]
91
92
93
            super(ClientError, self).__init__(message)
            self.status = status if isinstance(status, int) else 0
            self.details = details if details else []
Giorgos Verigakis's avatar
Giorgos Verigakis committed
94

95

96
97
98
99
class Logged(object):

    LOG_TOKEN = False
    LOG_DATA = False
100
    LOG_PID = False
101
    _token = None
102
103
104


class RequestManager(Logged):
105
106
107
108
109
110
111
112
113
114
115
116
    """Handle http request information"""

    def _connection_info(self, url, path, params={}):
        """ Set self.url to scheme://netloc/?params
        :param url: (str or unicode) The service url

        :param path: (str or unicode) The service path (url/path)

        :param params: (dict) Parameters to add to final url

        :returns: (scheme, netloc)
        """
117
        url = url or 'http://127.0.0.1/'
118
119
120
        url += '' if url.endswith('/') else '/'
        if path:
            url += _encode(path[1:] if path.startswith('/') else path)
121
122
        delim = '?'
        for key, val in params.items():
123
            val = quote('' if val in (None, False) else '%s' % _encode(val))
124
125
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
            delim = '&'
126
        parsed = urlparse(url)
127
128
129
130
        self.url = '%s' % url
        self.path = (('%s' % parsed.path) if parsed.path else '/') + (
            '?%s' % parsed.query if parsed.query else '')
        return (parsed.scheme, parsed.netloc)
131
132
133
134
135
136
137
138

    def __init__(
            self, method, url, path,
            data=None, headers={}, params={}):
        method = method.upper()
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
        if headers:
            assert isinstance(headers, dict)
139
        self.headers = dict(headers)
140
141
        self.method, self.data = method, data
        self.scheme, self.netloc = self._connection_info(url, path, params)
142
        self._headers_to_quote, self._header_prefices = [], []
143

144
    def dump_log(self):
145
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
146
147
148
        sendlog.info('- -  -   -     -        -             -')
        sendlog.info('%s %s://%s%s%s' % (
            self.method, self.scheme, self.netloc, self.path, plog))
149
        for key, val in self.headers.items():
150
151
152
            if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
                self._token, val = val, '...'
            sendlog.info('  %s: %s%s' % (key, val, plog))
153
        if self.data:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
154
            sendlog.info('data size: %s%s' % (len(self.data), plog))
155
            if self.LOG_DATA:
156
157
                sendlog.info(self.data.replace(self._token, '...') if (
                    self._token) else self.data)
158
        else:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
159
            sendlog.info('data size: 0%s' % plog)
160

Stavros Sachtouris's avatar
Stavros Sachtouris committed
161
    def _encode_headers(self):
162
        headers = dict()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
163
        for k, v in self.headers.items():
164
165
166
            key = k.lower()
            val = '' if v is None else '%s' % (
                v.encode('utf-8') if isinstance(v, unicode) else v)
167
            quotable = any([key in self._headers_to_quote, ]) or any(
168
169
                [key.startswith(p) for p in self._header_prefices])
            headers[k] = quote(val) if quotable else val
Stavros Sachtouris's avatar
Stavros Sachtouris committed
170
171
        self.headers = headers

172
173
174
175
176
177
    def perform(self, conn):
        """
        :param conn: (httplib connection object)

        :returns: (HTTPResponse)
        """
Stavros Sachtouris's avatar
Stavros Sachtouris committed
178
        self._encode_headers()
179
        self.dump_log()
180
        conn.request(
181
            method=self.method.upper(),
182
            url=self.path.encode('utf-8'),
183
184
            headers=self.headers,
            body=self.data)
185
        sendlog.info('')
186
        keep_trying = TIMEOUT
Stavros Sachtouris's avatar
Stavros Sachtouris committed
187
        while keep_trying > 0:
188
189
190
            try:
                return conn.getresponse()
            except ResponseNotReady:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
191
192
193
                wait = 0.03 * random()
                sleep(wait)
                keep_trying -= wait
194
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
195
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
196
        recvlog.debug(logmsg)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
197
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
198

199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
    @property
    def headers_to_quote(self):
        return self._headers_to_quote

    @headers_to_quote.setter
    def headers_to_quote(self, header_keys):
        self._headers_to_quote += [k.lower() for k in header_keys]
        self._headers_to_quote = list(set(self._headers_to_quote))

    @property
    def header_prefices(self):
        return self._header_prefices

    @header_prefices.setter
    def header_prefices(self, header_key_prefices):
        self._header_prefices += [p.lower() for p in header_key_prefices]
        self._header_prefices = list(set(self._header_prefices))

217

218
class ResponseManager(Logged):
219
220
    """Manage the http request and handle the response data, headers, etc."""

221
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
222
223
        """
        :param request: (RequestManager)
224
225
226
227

        :param poolsize: (int) the size of the connection pool

        :param connection_retry_limit: (int)
228
        """
229
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
230
231
232
        self.request = request
        self._request_performed = False
        self.poolsize = poolsize
233
234
235
236
237
238
239
240
241
242
243
244
        self._headers_to_decode, self._header_prefices = [], []

    def _get_headers_to_decode(self, headers):
        keys = set([k.lower() for k, v in headers])
        encodable = list(keys.intersection(self.headers_to_decode))

        def has_prefix(s):
            for k in self.header_prefices:
                if s.startswith(k):
                    return True
            return False
        return encodable + filter(has_prefix, keys.difference(encodable))
245
246
247
248
249
250

    def _get_response(self):
        if self._request_performed:
            return

        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
251
252
253
254
255
256
257
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
            try:
                with PooledHTTPConnection(
                        self.request.netloc, self.request.scheme,
                        **pool_kw) as connection:
                    self.request.LOG_TOKEN = self.LOG_TOKEN
                    self.request.LOG_DATA = self.LOG_DATA
258
                    self.request.LOG_PID = self.LOG_PID
259
                    r = self.request.perform(connection)
260
261
262
263
264
                    plog = ''
                    if self.LOG_PID:
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
                            self, r, self.request))
                        plog = '\t[%s]' % self
265
266
267
268
                    self._request_performed = True
                    self._status_code, self._status = r.status, unquote(
                        r.reason)
                    recvlog.info(
269
270
                        '%d %s%s' % (
                            self.status_code, self.status, plog))
271
                    self._headers = dict()
272
273
274
275

                    r_headers = r.getheaders()
                    enc_headers = self._get_headers_to_decode(r_headers)
                    for k, v in r_headers:
276
277
                        self._headers[k] = unquote(v).decode('utf-8') if (
                            k.lower()) in enc_headers else v
278
                        recvlog.info('  %s: %s%s' % (k, v, plog))
279
                    self._content = r.read()
280
281
                    recvlog.info('data size: %s%s' % (
                        len(self._content) if self._content else 0, plog))
282
                    if self.LOG_DATA and self._content:
283
284
285
                        data = '%s%s' % (self._content, plog)
                        if self._token:
                            data = data.replace(self._token, '...')
286
287
                        recvlog.info(data)
                    recvlog.info('-             -        -     -   -  - -')
288
289
290
291
292
293
294
295
296
297
298
                break
            except Exception as err:
                if isinstance(err, HTTPException):
                    if retries >= self.CONNECTION_TRY_LIMIT:
                        raise ClientError(
                            'Connection to %s failed %s times (%s: %s )' % (
                                self.request.url, retries, type(err), err))
                else:
                    from traceback import format_stack
                    recvlog.debug(
                        '\n'.join(['%s' % type(err)] + format_stack()))
299
                    raise
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328

    @property
    def status_code(self):
        self._get_response()
        return self._status_code

    @property
    def status(self):
        self._get_response()
        return self._status

    @property
    def headers(self):
        self._get_response()
        return self._headers

    @property
    def content(self):
        self._get_response()
        return self._content

    @property
    def text(self):
        """
        :returns: (str) content
        """
        self._get_response()
        return '%s' % self._content

329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
    @property
    def headers_to_decode(self):
        return self._headers_to_decode

    @headers_to_decode.setter
    def headers_to_decode(self, header_keys):
        self._headers_to_decode += [k.lower() for k in header_keys]
        self._headers_to_decode = list(set(self._headers_to_decode))

    @property
    def header_prefices(self):
        return self._header_prefices

    @header_prefices.setter
    def header_prefices(self, header_key_prefices):
        self._header_prefices += [p.lower() for p in header_key_prefices]
        self._header_prefices = list(set(self._header_prefices))

347
348
349
350
351
352
353
354
355
    @property
    def json(self):
        """
        :returns: (dict) squeezed from json-formated content
        """
        self._get_response()
        try:
            return loads(self._content)
        except ValueError as err:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
356
            raise ClientError('Response not formated in JSON - %s' % err)
357
358


359
class SilentEvent(Thread):
360
    """Thread-run method(*args, **kwargs)"""
361
362
    def __init__(self, method, *args, **kwargs):
        super(self.__class__, self).__init__()
363
        self.method, self.args, self.kwargs = method, args, kwargs
364
365
366
367
368
369
370
371
372
373
374
375
376

    @property
    def exception(self):
        return getattr(self, '_exception', False)

    @property
    def value(self):
        return getattr(self, '_value', None)

    def run(self):
        try:
            self._value = self.method(*(self.args), **(self.kwargs))
        except Exception as e:
377
378
379
380
381
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
                self,
                type(e),
                e.status if isinstance(e, ClientError) else '',
                e))
382
383
            self._exception = e

384

385
class Client(Logged):
386
    service_type = ''
387
    MAX_THREADS = 1
388
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
389
    CONNECTION_RETRY_LIMIT = 0
390

391
392
393
394
395
    def __init__(self, endpoint_url, token, base_url=None):
        #  BW compatibility - keep base_url for some time
        endpoint_url = endpoint_url or base_url
        assert endpoint_url, 'No endpoint_url for client %s' % self
        self.endpoint_url, self.base_url = endpoint_url, endpoint_url
396
        self.token = token
397
        self.headers, self.params = dict(), dict()
398
        self.poolsize = None
399
400
        self.request_headers_to_quote = []
        self.request_header_prefices_to_quote = []
401
402
        self.response_headers = []
        self.response_header_prefices = []
403

404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
    @staticmethod
    def _unquote_header_keys(headers, prefices):
        new_keys = dict()
        for k in headers:
            if k.lower().startswith(prefices):
                new_keys[k] = unquote(k).decode('utf-8')
        for old, new in new_keys.items():
            headers[new] = headers.pop(old)

    @staticmethod
    def _quote_header_keys(headers, prefices):
        new_keys = dict()
        for k in headers:
            if k.lower().startswith(prefices):
                new_keys[k] = quote(k.encode('utf-8'))
        for old, new in new_keys.items():
            headers[new] = headers.pop(old)

422
    def _init_thread_limit(self, limit=1):
423
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
424
425
426
427
428
        self._thread_limit = limit
        self._elapsed_old = 0.0
        self._elapsed_new = 0.0

    def _watch_thread_limit(self, threadlist):
429
430
431
        self._thread_limit = getattr(self, '_thread_limit', 1)
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
432
        recvlog.debug('# running threads: %s' % len(threadlist))
433
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
434
                self._thread_limit < self.MAX_THREADS):
435
            self._thread_limit += 1
436
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
437
438
439
440
441
442
443
444
445
446
447
448
449
            self._thread_limit -= 1

        self._elapsed_old = self._elapsed_new
        if len(threadlist) >= self._thread_limit:
            self._elapsed_new = 0.0
            for thread in threadlist:
                begin_time = time()
                thread.join()
                self._elapsed_new += time() - begin_time
            self._elapsed_new = self._elapsed_new / len(threadlist)
            return []
        return threadlist

450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
    def async_run(self, method, kwarg_list):
        """Fire threads of operations

        :param method: the method to run in each thread

        :param kwarg_list: (list of dicts) the arguments to pass in each method
            call

        :returns: (list) the results of each method call w.r. to the order of
            kwarg_list
        """
        flying, results = {}, {}
        self._init_thread_limit()
        for index, kwargs in enumerate(kwarg_list):
            self._watch_thread_limit(flying.values())
            flying[index] = SilentEvent(method=method, **kwargs)
            flying[index].start()
            unfinished = {}
            for key, thread in flying.items():
                if thread.isAlive():
                    unfinished[key] = thread
                elif thread.exception:
                    raise thread.exception
                else:
                    results[key] = thread.value
            flying = unfinished
        sendlog.info('- - - wait for threads to finish')
        for key, thread in flying.items():
            if thread.isAlive():
                thread.join()
480
            if thread.exception:
481
                raise thread.exception
482
            results[key] = thread.value
483
484
        return results.values()

485
    def set_header(self, name, value, iff=True):
486
        """Set a header 'name':'value'"""
487
        if value is not None and iff:
488
            self.headers['%s' % name] = '%s' % value
489
490
491

    def set_param(self, name, value=None, iff=True):
        if iff:
492
            self.params[name] = '%s' % value
493

494
    def request(
495
496
            self, method, path,
            async_headers=dict(), async_params=dict(),
497
            **kwargs):
498
        """Commit an HTTP request to endpoint_url/path
499
500
501
502
503
        Requests are commited to and performed by Request/ResponseManager
        These classes perform a lazy http request. Present method, by default,
        enforces them to perform the http call. Hint: call present method with
        success=None to get a non-performed ResponseManager object.
        """
504
505
506
        assert isinstance(method, str) or isinstance(method, unicode)
        assert method
        assert isinstance(path, str) or isinstance(path, unicode)
507
        try:
508
509
510
511
            headers = dict(self.headers)
            headers.update(async_headers)
            params = dict(self.params)
            params.update(async_params)
512
513
            success = kwargs.pop('success', 200)
            data = kwargs.pop('data', None)
514
            headers.setdefault('X-Auth-Token', self.token)
515
            if 'json' in kwargs:
516
                data = dumps(kwargs.pop('json'))
517
                headers.setdefault('Content-Type', 'application/json')
518
            if data:
519
                headers.setdefault('Content-Length', '%s' % len(data))
520
            plog = ('\t[%s]' % self) if self.LOG_PID else ''
521
            sendlog.debug('\n\nCMT %s@%s%s', method, self.endpoint_url, plog)
522
            req = RequestManager(
523
                method, self.endpoint_url, path,
524
                data=data, headers=headers, params=params)
525
526
            req.headers_to_quote = self.request_headers_to_quote
            req.header_prefices = self.request_header_prefices_to_quote
527
            #  req.log()
528
            r = ResponseManager(
529
530
531
                req,
                poolsize=self.poolsize,
                connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
532
533
            r.headers_to_decode = self.response_headers
            r.header_prefices = self.response_header_prefices
534
535
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
536
            r._token = headers['X-Auth-Token']
537
        finally:
538
539
            self.headers = dict()
            self.params = dict()
540
541

        if success is not None:
542
            # Success can either be an int or a collection
543
544
            success = (success,) if isinstance(success, int) else success
            if r.status_code not in success:
545
546
547
548
549
550
551
552
                log.debug(u'Client caught error %s (%s)' % (r, type(r)))
                status_msg = getattr(r, 'status', '')
                try:
                    message = u'%s %s\n' % (status_msg, r.text)
                except:
                    message = u'%s %s\n' % (status_msg, r)
                status = getattr(r, 'status_code', getattr(r, 'status', 0))
                raise ClientError(message, status=status)
553
        return r
Giorgos Verigakis's avatar
Giorgos Verigakis committed
554

Giorgos Verigakis's avatar
Giorgos Verigakis committed
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
    def delete(self, path, **kwargs):
        return self.request('delete', path, **kwargs)

    def get(self, path, **kwargs):
        return self.request('get', path, **kwargs)

    def head(self, path, **kwargs):
        return self.request('head', path, **kwargs)

    def post(self, path, **kwargs):
        return self.request('post', path, **kwargs)

    def put(self, path, **kwargs):
        return self.request('put', path, **kwargs)

570
571
572
573
574
    def copy(self, path, **kwargs):
        return self.request('copy', path, **kwargs)

    def move(self, path, **kwargs):
        return self.request('move', path, **kwargs)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
575
576
577
578
579


class Waiter(object):

    def _wait(
580
581
582
            self, item_id, wait_status, get_status,
            delay=1, max_wait=100, wait_cb=None, wait_for_status=False):
        """Wait while the item is still in wait_status or to reach it
Stavros Sachtouris's avatar
Stavros Sachtouris committed
583
584
585

        :param server_id: integer (str or int)

586
        :param wait_status: (str)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
587
588
589
590
591
592
593
594
595

        :param get_status: (method(self, item_id)) if called, returns
            (status, progress %) If no way to tell progress, return None

        :param delay: time interval between retries

        :param wait_cb: (method(total steps)) returns a generator for
            reporting progress or timeouts i.e., for a progress bar

596
597
        :param wait_for_status: (bool) wait FOR (True) or wait WHILE (False)

Stavros Sachtouris's avatar
Stavros Sachtouris committed
598
599
600
601
602
603
604
605
        :returns: (str) the new mode if successful, (bool) False if timed out
        """
        status, progress = get_status(self, item_id)

        if wait_cb:
            wait_gen = wait_cb(max_wait // delay)
            wait_gen.next()

606
607
608
609
610
611
        if wait_for_status ^ (status != wait_status):
            # if wait_cb:
            #     try:
            #         wait_gen.next()
            #     except Exception:
            #         pass
Stavros Sachtouris's avatar
Stavros Sachtouris committed
612
613
614
            return status
        old_wait = total_wait = 0

615
616
        while (wait_for_status ^ (status == wait_status)) and (
                total_wait <= max_wait):
Stavros Sachtouris's avatar
Stavros Sachtouris committed
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
            if wait_cb:
                try:
                    for i in range(total_wait - old_wait):
                        wait_gen.next()
                except Exception:
                    break
            old_wait = total_wait
            total_wait = progress or total_wait + 1
            sleep(delay)
            status, progress = get_status(self, item_id)

        if total_wait < max_wait:
            if wait_cb:
                try:
                    for i in range(max_wait):
                        wait_gen.next()
                except:
                    pass
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
        return status if (wait_for_status ^ (status != wait_status)) else False

    def wait_for(
            self, item_id, target_status, get_status,
            delay=1, max_wait=100, wait_cb=None):
        self._wait(
            item_id, target_status, get_status, delay, max_wait, wait_cb,
            wait_for_status=True)

    def wait_while(
            self, item_id, target_status, get_status,
            delay=1, max_wait=100, wait_cb=None):
        self._wait(
            item_id, target_status, get_status, delay, max_wait, wait_cb,
            wait_for_status=False)