__init__.py 23.7 KB
Newer Older
1
# Copyright 2011-2014 GRNET S.A. All rights reserved. #
Giorgos Verigakis's avatar
Giorgos Verigakis committed
2
3
4
5
6
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
7
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
8
9
10
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
11
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

33
from urllib2 import quote, unquote
34
from urlparse import urlparse
35
from threading import Thread
36
from json import dumps, loads
37
from time import time
38
from httplib import ResponseNotReady, HTTPException
39
40
from time import sleep
from random import random
41
from logging import getLogger
42
import ssl
43

44
from kamaki.clients.utils import https
45

Giorgos Verigakis's avatar
Giorgos Verigakis committed
46

47
48
TIMEOUT = 60.0   # seconds
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

50
51
52
log = getLogger(__name__)
sendlog = getLogger('%s.send' % __name__)
recvlog = getLogger('%s.recv' % __name__)
53

54
55
56
57
58
59

def _encode(v):
    if v and isinstance(v, unicode):
        return quote(v.encode('utf-8'))
    return v

60

Giorgos Verigakis's avatar
Giorgos Verigakis committed
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
            message, status, details))
65
        try:
66
            message += '' if message and message[-1] == '\n' else '\n'
67
            serv_stat, sep, new_msg = message.partition('{')
68
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
69
            json_msg = loads(new_msg)
70
            key = json_msg.keys()[0]
71
            serv_stat = serv_stat.strip()
72

73
            json_msg = json_msg[key]
74
75
76
77
78
79
            message = '%s %s (%s)\n' % (
                serv_stat,
                key,
                json_msg['message']) if (
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
            status = json_msg.get('code', status)
80
81
82
            if 'details' in json_msg:
                if not details:
                    details = []
83
                if not isinstance(details, list):
84
85
86
                    details = [details]
                if json_msg['details']:
                    details.append(json_msg['details'])
87
        except Exception:
88
            pass
89
        finally:
90
91
            while message.endswith('\n\n'):
                message = message[:-1]
92
93
94
            super(ClientError, self).__init__(message)
            self.status = status if isinstance(status, int) else 0
            self.details = details if details else []
Giorgos Verigakis's avatar
Giorgos Verigakis committed
95

96

97
98
99
100
class KamakiSSLError(ClientError):
    """SSL Connection Error"""


101
102
103
104
class Logged(object):

    LOG_TOKEN = False
    LOG_DATA = False
105
    LOG_PID = False
106
    _token = None
107
108
109


class RequestManager(Logged):
110
111
112
113
114
115
116
117
118
119
120
121
    """Handle http request information"""

    def _connection_info(self, url, path, params={}):
        """ Set self.url to scheme://netloc/?params
        :param url: (str or unicode) The service url

        :param path: (str or unicode) The service path (url/path)

        :param params: (dict) Parameters to add to final url

        :returns: (scheme, netloc)
        """
122
        url = url or 'http://127.0.0.1/'
123
124
125
        url += '' if url.endswith('/') else '/'
        if path:
            url += _encode(path[1:] if path.startswith('/') else path)
126
127
        delim = '?'
        for key, val in params.items():
128
            val = quote('' if val in (None, False) else '%s' % _encode(val))
129
130
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
            delim = '&'
131
        parsed = urlparse(url)
132
133
134
135
        self.url = '%s' % url
        self.path = (('%s' % parsed.path) if parsed.path else '/') + (
            '?%s' % parsed.query if parsed.query else '')
        return (parsed.scheme, parsed.netloc)
136
137
138
139
140
141
142
143

    def __init__(
            self, method, url, path,
            data=None, headers={}, params={}):
        method = method.upper()
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
        if headers:
            assert isinstance(headers, dict)
144
        self.headers = dict(headers)
145
146
        self.method, self.data = method, data
        self.scheme, self.netloc = self._connection_info(url, path, params)
147
        self._headers_to_quote, self._header_prefices = [], []
148

149
    def dump_log(self):
150
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
151
152
153
        sendlog.info('- -  -   -     -        -             -')
        sendlog.info('%s %s://%s%s%s' % (
            self.method, self.scheme, self.netloc, self.path, plog))
154
        for key, val in self.headers.items():
155
156
157
            if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
                self._token, val = val, '...'
            sendlog.info('  %s: %s%s' % (key, val, plog))
158
        if self.data:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
159
            sendlog.info('data size: %s%s' % (len(self.data), plog))
160
            if self.LOG_DATA:
161
162
                sendlog.info(self.data.replace(self._token, '...') if (
                    self._token) else self.data)
163
        else:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
164
            sendlog.info('data size: 0%s' % plog)
165

Stavros Sachtouris's avatar
Stavros Sachtouris committed
166
    def _encode_headers(self):
167
        headers = dict()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
168
        for k, v in self.headers.items():
169
170
171
            key = k.lower()
            val = '' if v is None else '%s' % (
                v.encode('utf-8') if isinstance(v, unicode) else v)
172
            quotable = any([key in self._headers_to_quote, ]) or any(
173
174
                [key.startswith(p) for p in self._header_prefices])
            headers[k] = quote(val) if quotable else val
Stavros Sachtouris's avatar
Stavros Sachtouris committed
175
176
        self.headers = headers

177
178
179
180
181
182
    def perform(self, conn):
        """
        :param conn: (httplib connection object)

        :returns: (HTTPResponse)
        """
Stavros Sachtouris's avatar
Stavros Sachtouris committed
183
        self._encode_headers()
184
        self.dump_log()
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
        try:
            conn.request(
                method=self.method.upper(),
                url=self.path.encode('utf-8'),
                headers=self.headers,
                body=self.data)
            sendlog.info('')
            keep_trying = TIMEOUT
            while keep_trying > 0:
                try:
                    return conn.getresponse()
                except ResponseNotReady:
                    wait = 0.03 * random()
                    sleep(wait)
                    keep_trying -= wait
        except ssl.SSLError as ssle:
            raise KamakiSSLError('SSL Connection error (%s)' % ssle)
202
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
203
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
204
        recvlog.debug(logmsg)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
205
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
206

207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
    @property
    def headers_to_quote(self):
        return self._headers_to_quote

    @headers_to_quote.setter
    def headers_to_quote(self, header_keys):
        self._headers_to_quote += [k.lower() for k in header_keys]
        self._headers_to_quote = list(set(self._headers_to_quote))

    @property
    def header_prefices(self):
        return self._header_prefices

    @header_prefices.setter
    def header_prefices(self, header_key_prefices):
        self._header_prefices += [p.lower() for p in header_key_prefices]
        self._header_prefices = list(set(self._header_prefices))

225

226
class ResponseManager(Logged):
227
228
    """Manage the http request and handle the response data, headers, etc."""

229
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
230
231
        """
        :param request: (RequestManager)
232
233
234
235

        :param poolsize: (int) the size of the connection pool

        :param connection_retry_limit: (int)
236
        """
237
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
238
239
240
        self.request = request
        self._request_performed = False
        self.poolsize = poolsize
241
242
243
244
245
246
247
248
249
250
251
252
        self._headers_to_decode, self._header_prefices = [], []

    def _get_headers_to_decode(self, headers):
        keys = set([k.lower() for k, v in headers])
        encodable = list(keys.intersection(self.headers_to_decode))

        def has_prefix(s):
            for k in self.header_prefices:
                if s.startswith(k):
                    return True
            return False
        return encodable + filter(has_prefix, keys.difference(encodable))
253
254
255
256
257
258

    def _get_response(self):
        if self._request_performed:
            return

        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
259
260
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
            try:
261
                with https.PooledHTTPConnection(
262
263
264
265
                        self.request.netloc, self.request.scheme,
                        **pool_kw) as connection:
                    self.request.LOG_TOKEN = self.LOG_TOKEN
                    self.request.LOG_DATA = self.LOG_DATA
266
                    self.request.LOG_PID = self.LOG_PID
267
                    r = self.request.perform(connection)
268
269
270
271
272
                    plog = ''
                    if self.LOG_PID:
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
                            self, r, self.request))
                        plog = '\t[%s]' % self
273
274
275
276
                    self._request_performed = True
                    self._status_code, self._status = r.status, unquote(
                        r.reason)
                    recvlog.info(
277
278
                        '%d %s%s' % (
                            self.status_code, self.status, plog))
279
                    self._headers = dict()
280
281
282
283

                    r_headers = r.getheaders()
                    enc_headers = self._get_headers_to_decode(r_headers)
                    for k, v in r_headers:
284
285
                        self._headers[k] = unquote(v).decode('utf-8') if (
                            k.lower()) in enc_headers else v
286
                        recvlog.info('  %s: %s%s' % (k, v, plog))
287
                    self._content = r.read()
288
289
                    recvlog.info('data size: %s%s' % (
                        len(self._content) if self._content else 0, plog))
290
                    if self.LOG_DATA and self._content:
291
292
293
                        data = '%s%s' % (self._content, plog)
                        if self._token:
                            data = data.replace(self._token, '...')
294
295
                        recvlog.info(data)
                    recvlog.info('-             -        -     -   -  - -')
296
297
298
299
300
301
302
303
304
305
306
                break
            except Exception as err:
                if isinstance(err, HTTPException):
                    if retries >= self.CONNECTION_TRY_LIMIT:
                        raise ClientError(
                            'Connection to %s failed %s times (%s: %s )' % (
                                self.request.url, retries, type(err), err))
                else:
                    from traceback import format_stack
                    recvlog.debug(
                        '\n'.join(['%s' % type(err)] + format_stack()))
307
                    raise
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336

    @property
    def status_code(self):
        self._get_response()
        return self._status_code

    @property
    def status(self):
        self._get_response()
        return self._status

    @property
    def headers(self):
        self._get_response()
        return self._headers

    @property
    def content(self):
        self._get_response()
        return self._content

    @property
    def text(self):
        """
        :returns: (str) content
        """
        self._get_response()
        return '%s' % self._content

337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
    @property
    def headers_to_decode(self):
        return self._headers_to_decode

    @headers_to_decode.setter
    def headers_to_decode(self, header_keys):
        self._headers_to_decode += [k.lower() for k in header_keys]
        self._headers_to_decode = list(set(self._headers_to_decode))

    @property
    def header_prefices(self):
        return self._header_prefices

    @header_prefices.setter
    def header_prefices(self, header_key_prefices):
        self._header_prefices += [p.lower() for p in header_key_prefices]
        self._header_prefices = list(set(self._header_prefices))

355
356
357
358
359
360
361
    @property
    def json(self):
        """
        :returns: (dict) squeezed from json-formated content
        """
        self._get_response()
        try:
362
            return loads(self._content)
363
        except ValueError as err:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
364
            raise ClientError('Response not formated in JSON - %s' % err)
365
366


367
class SilentEvent(Thread):
368
    """Thread-run method(*args, **kwargs)"""
369
370
    def __init__(self, method, *args, **kwargs):
        super(self.__class__, self).__init__()
371
        self.method, self.args, self.kwargs = method, args, kwargs
372
373
374
375
376
377
378
379
380
381
382
383
384

    @property
    def exception(self):
        return getattr(self, '_exception', False)

    @property
    def value(self):
        return getattr(self, '_value', None)

    def run(self):
        try:
            self._value = self.method(*(self.args), **(self.kwargs))
        except Exception as e:
385
386
387
388
389
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
                self,
                type(e),
                e.status if isinstance(e, ClientError) else '',
                e))
390
391
            self._exception = e

392

393
class Client(Logged):
394
    service_type = ''
395
    MAX_THREADS = 1
396
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
397
    CONNECTION_RETRY_LIMIT = 0
398

399
400
401
402
403
    def __init__(self, endpoint_url, token, base_url=None):
        #  BW compatibility - keep base_url for some time
        endpoint_url = endpoint_url or base_url
        assert endpoint_url, 'No endpoint_url for client %s' % self
        self.endpoint_url, self.base_url = endpoint_url, endpoint_url
404
        self.token = token
405
        self.headers, self.params = dict(), dict()
406
        self.poolsize = None
407
408
        self.request_headers_to_quote = []
        self.request_header_prefices_to_quote = []
409
410
        self.response_headers = []
        self.response_header_prefices = []
411

412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
    @staticmethod
    def _unquote_header_keys(headers, prefices):
        new_keys = dict()
        for k in headers:
            if k.lower().startswith(prefices):
                new_keys[k] = unquote(k).decode('utf-8')
        for old, new in new_keys.items():
            headers[new] = headers.pop(old)

    @staticmethod
    def _quote_header_keys(headers, prefices):
        new_keys = dict()
        for k in headers:
            if k.lower().startswith(prefices):
                new_keys[k] = quote(k.encode('utf-8'))
        for old, new in new_keys.items():
            headers[new] = headers.pop(old)

430
    def _init_thread_limit(self, limit=1):
431
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
432
433
434
435
436
        self._thread_limit = limit
        self._elapsed_old = 0.0
        self._elapsed_new = 0.0

    def _watch_thread_limit(self, threadlist):
437
438
439
        self._thread_limit = getattr(self, '_thread_limit', 1)
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
440
        recvlog.debug('# running threads: %s' % len(threadlist))
441
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
442
                self._thread_limit < self.MAX_THREADS):
443
            self._thread_limit += 1
444
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
445
446
447
448
449
450
451
452
453
454
455
456
457
            self._thread_limit -= 1

        self._elapsed_old = self._elapsed_new
        if len(threadlist) >= self._thread_limit:
            self._elapsed_new = 0.0
            for thread in threadlist:
                begin_time = time()
                thread.join()
                self._elapsed_new += time() - begin_time
            self._elapsed_new = self._elapsed_new / len(threadlist)
            return []
        return threadlist

458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
    def async_run(self, method, kwarg_list):
        """Fire threads of operations

        :param method: the method to run in each thread

        :param kwarg_list: (list of dicts) the arguments to pass in each method
            call

        :returns: (list) the results of each method call w.r. to the order of
            kwarg_list
        """
        flying, results = {}, {}
        self._init_thread_limit()
        for index, kwargs in enumerate(kwarg_list):
            self._watch_thread_limit(flying.values())
            flying[index] = SilentEvent(method=method, **kwargs)
            flying[index].start()
            unfinished = {}
            for key, thread in flying.items():
                if thread.isAlive():
                    unfinished[key] = thread
                elif thread.exception:
                    raise thread.exception
                else:
                    results[key] = thread.value
            flying = unfinished
        sendlog.info('- - - wait for threads to finish')
        for key, thread in flying.items():
            if thread.isAlive():
                thread.join()
488
            if thread.exception:
489
                raise thread.exception
490
            results[key] = thread.value
491
492
        return results.values()

493
    def set_header(self, name, value, iff=True):
494
        """Set a header 'name':'value'"""
495
        if value is not None and iff:
496
            self.headers['%s' % name] = '%s' % value
497
498
499

    def set_param(self, name, value=None, iff=True):
        if iff:
500
            self.params[name] = '%s' % value
501

502
    def request(
503
504
            self, method, path,
            async_headers=dict(), async_params=dict(),
505
            **kwargs):
506
        """Commit an HTTP request to endpoint_url/path
507
508
509
510
511
        Requests are commited to and performed by Request/ResponseManager
        These classes perform a lazy http request. Present method, by default,
        enforces them to perform the http call. Hint: call present method with
        success=None to get a non-performed ResponseManager object.
        """
512
513
514
        assert isinstance(method, str) or isinstance(method, unicode)
        assert method
        assert isinstance(path, str) or isinstance(path, unicode)
515
        try:
516
517
518
519
            headers = dict(self.headers)
            headers.update(async_headers)
            params = dict(self.params)
            params.update(async_params)
520
521
            success = kwargs.pop('success', 200)
            data = kwargs.pop('data', None)
522
            headers.setdefault('X-Auth-Token', self.token)
523
            if 'json' in kwargs:
524
                data = dumps(kwargs.pop('json'))
525
                headers.setdefault('Content-Type', 'application/json')
526
            if data:
527
                headers.setdefault('Content-Length', '%s' % len(data))
528
            plog = ('\t[%s]' % self) if self.LOG_PID else ''
529
            sendlog.debug('\n\nCMT %s@%s%s', method, self.endpoint_url, plog)
530
            req = RequestManager(
531
                method, self.endpoint_url, path,
532
                data=data, headers=headers, params=params)
533
534
            req.headers_to_quote = self.request_headers_to_quote
            req.header_prefices = self.request_header_prefices_to_quote
535
            #  req.log()
536
            r = ResponseManager(
537
538
539
                req,
                poolsize=self.poolsize,
                connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
540
541
            r.headers_to_decode = self.response_headers
            r.header_prefices = self.response_header_prefices
542
543
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
544
            r._token = headers['X-Auth-Token']
545
        finally:
546
547
            self.headers = dict()
            self.params = dict()
548
549

        if success is not None:
550
            # Success can either be an int or a collection
551
552
            success = (success,) if isinstance(success, int) else success
            if r.status_code not in success:
553
554
555
556
557
558
559
560
                log.debug(u'Client caught error %s (%s)' % (r, type(r)))
                status_msg = getattr(r, 'status', '')
                try:
                    message = u'%s %s\n' % (status_msg, r.text)
                except:
                    message = u'%s %s\n' % (status_msg, r)
                status = getattr(r, 'status_code', getattr(r, 'status', 0))
                raise ClientError(message, status=status)
561
        return r
Giorgos Verigakis's avatar
Giorgos Verigakis committed
562

Giorgos Verigakis's avatar
Giorgos Verigakis committed
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
    def delete(self, path, **kwargs):
        return self.request('delete', path, **kwargs)

    def get(self, path, **kwargs):
        return self.request('get', path, **kwargs)

    def head(self, path, **kwargs):
        return self.request('head', path, **kwargs)

    def post(self, path, **kwargs):
        return self.request('post', path, **kwargs)

    def put(self, path, **kwargs):
        return self.request('put', path, **kwargs)

578
579
580
581
582
    def copy(self, path, **kwargs):
        return self.request('copy', path, **kwargs)

    def move(self, path, **kwargs):
        return self.request('move', path, **kwargs)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
583
584
585
586
587


class Waiter(object):

    def _wait(
588
589
590
            self, item_id, wait_status, get_status,
            delay=1, max_wait=100, wait_cb=None, wait_for_status=False):
        """Wait while the item is still in wait_status or to reach it
Stavros Sachtouris's avatar
Stavros Sachtouris committed
591
592
593

        :param server_id: integer (str or int)

594
        :param wait_status: (str)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
595
596
597
598
599
600
601
602
603

        :param get_status: (method(self, item_id)) if called, returns
            (status, progress %) If no way to tell progress, return None

        :param delay: time interval between retries

        :param wait_cb: (method(total steps)) returns a generator for
            reporting progress or timeouts i.e., for a progress bar

604
605
        :param wait_for_status: (bool) wait FOR (True) or wait WHILE (False)

Stavros Sachtouris's avatar
Stavros Sachtouris committed
606
607
608
609
610
611
612
613
        :returns: (str) the new mode if successful, (bool) False if timed out
        """
        status, progress = get_status(self, item_id)

        if wait_cb:
            wait_gen = wait_cb(max_wait // delay)
            wait_gen.next()

614
615
616
617
618
619
        if wait_for_status ^ (status != wait_status):
            # if wait_cb:
            #     try:
            #         wait_gen.next()
            #     except Exception:
            #         pass
Stavros Sachtouris's avatar
Stavros Sachtouris committed
620
621
622
            return status
        old_wait = total_wait = 0

623
624
        while (wait_for_status ^ (status == wait_status)) and (
                total_wait <= max_wait):
Stavros Sachtouris's avatar
Stavros Sachtouris committed
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
            if wait_cb:
                try:
                    for i in range(total_wait - old_wait):
                        wait_gen.next()
                except Exception:
                    break
            old_wait = total_wait
            total_wait = progress or total_wait + 1
            sleep(delay)
            status, progress = get_status(self, item_id)

        if total_wait < max_wait:
            if wait_cb:
                try:
                    for i in range(max_wait):
                        wait_gen.next()
                except:
                    pass
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
        return status if (wait_for_status ^ (status != wait_status)) else False

    def wait_for(
            self, item_id, target_status, get_status,
            delay=1, max_wait=100, wait_cb=None):
        self._wait(
            item_id, target_status, get_status, delay, max_wait, wait_cb,
            wait_for_status=True)

    def wait_while(
            self, item_id, target_status, get_status,
            delay=1, max_wait=100, wait_cb=None):
        self._wait(
            item_id, target_status, get_status, delay, max_wait, wait_cb,
            wait_for_status=False)