__init__.py 23.7 KB
Newer Older
1
# Copyright 2011-2014 GRNET S.A. All rights reserved. #
Giorgos Verigakis's avatar
Giorgos Verigakis committed
2
3
4
5
6
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
7
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
8
9
10
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
11
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

33
from urllib2 import quote, unquote
34
from urlparse import urlparse
35
from threading import Thread
36
from json import dumps, loads
37
from time import time
38
from httplib import ResponseNotReady, HTTPException
39
40
from time import sleep
from random import random
41
from logging import getLogger
42
43

from objpool.http import PooledHTTPConnection
44

Giorgos Verigakis's avatar
Giorgos Verigakis committed
45

46
47
TIMEOUT = 60.0   # seconds
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
48

49
50
51
log = getLogger(__name__)
sendlog = getLogger('%s.send' % __name__)
recvlog = getLogger('%s.recv' % __name__)
52

53
54
55
56
57
58

def _encode(v):
    if v and isinstance(v, unicode):
        return quote(v.encode('utf-8'))
    return v

59

Giorgos Verigakis's avatar
Giorgos Verigakis committed
60
class ClientError(Exception):
61
    def __init__(self, message, status=0, details=None):
62
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
63
            message, status, details))
64
        try:
65
            message += '' if message and message[-1] == '\n' else '\n'
66
            serv_stat, sep, new_msg = message.partition('{')
67
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
68
            json_msg = loads(new_msg)
69
            key = json_msg.keys()[0]
70
            serv_stat = serv_stat.strip()
71

72
            json_msg = json_msg[key]
73
74
75
76
77
78
            message = '%s %s (%s)\n' % (
                serv_stat,
                key,
                json_msg['message']) if (
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
            status = json_msg.get('code', status)
79
80
81
            if 'details' in json_msg:
                if not details:
                    details = []
82
                if not isinstance(details, list):
83
84
85
                    details = [details]
                if json_msg['details']:
                    details.append(json_msg['details'])
86
        except Exception:
87
            pass
88
        finally:
89
90
            while message.endswith('\n\n'):
                message = message[:-1]
91
92
93
            super(ClientError, self).__init__(message)
            self.status = status if isinstance(status, int) else 0
            self.details = details if details else []
Giorgos Verigakis's avatar
Giorgos Verigakis committed
94

95

96
97
98
99
class Logged(object):

    LOG_TOKEN = False
    LOG_DATA = False
100
    LOG_PID = False
101
    _token = None
102
103
104


class RequestManager(Logged):
105
106
107
108
109
110
111
112
113
114
115
116
    """Handle http request information"""

    def _connection_info(self, url, path, params={}):
        """ Set self.url to scheme://netloc/?params
        :param url: (str or unicode) The service url

        :param path: (str or unicode) The service path (url/path)

        :param params: (dict) Parameters to add to final url

        :returns: (scheme, netloc)
        """
117
        url = url or 'http://127.0.0.1/'
118
119
120
        url += '' if url.endswith('/') else '/'
        if path:
            url += _encode(path[1:] if path.startswith('/') else path)
121
122
        delim = '?'
        for key, val in params.items():
123
            val = quote('' if val in (None, False) else '%s' % _encode(val))
124
125
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
            delim = '&'
126
        parsed = urlparse(url)
127
128
129
130
        self.url = '%s' % url
        self.path = (('%s' % parsed.path) if parsed.path else '/') + (
            '?%s' % parsed.query if parsed.query else '')
        return (parsed.scheme, parsed.netloc)
131
132
133
134
135
136
137
138

    def __init__(
            self, method, url, path,
            data=None, headers={}, params={}):
        method = method.upper()
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
        if headers:
            assert isinstance(headers, dict)
139
        self.headers = dict(headers)
140
141
        self.method, self.data = method, data
        self.scheme, self.netloc = self._connection_info(url, path, params)
142
        self._headers_to_quote, self._header_prefices = [], []
143

144
    def dump_log(self):
145
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
146
147
148
        sendlog.info('- -  -   -     -        -             -')
        sendlog.info('%s %s://%s%s%s' % (
            self.method, self.scheme, self.netloc, self.path, plog))
149
        for key, val in self.headers.items():
150
151
152
            if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
                self._token, val = val, '...'
            sendlog.info('  %s: %s%s' % (key, val, plog))
153
        if self.data:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
154
            sendlog.info('data size: %s%s' % (len(self.data), plog))
155
            if self.LOG_DATA:
156
157
                sendlog.info(self.data.replace(self._token, '...') if (
                    self._token) else self.data)
158
        else:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
159
            sendlog.info('data size: 0%s' % plog)
160

Stavros Sachtouris's avatar
Stavros Sachtouris committed
161
    def _encode_headers(self):
162
        headers = dict()
Stavros Sachtouris's avatar
Stavros Sachtouris committed
163
        for k, v in self.headers.items():
164
165
166
            key = k.lower()
            val = '' if v is None else '%s' % (
                v.encode('utf-8') if isinstance(v, unicode) else v)
167
            quotable = any([key in self._headers_to_quote, ]) or any(
168
169
                [key.startswith(p) for p in self._header_prefices])
            headers[k] = quote(val) if quotable else val
Stavros Sachtouris's avatar
Stavros Sachtouris committed
170
171
        self.headers = headers

172
173
174
175
176
177
    def perform(self, conn):
        """
        :param conn: (httplib connection object)

        :returns: (HTTPResponse)
        """
Stavros Sachtouris's avatar
Stavros Sachtouris committed
178
        self._encode_headers()
179
        self.dump_log()
180
        conn.request(
181
            method=self.method.upper(),
182
            url=self.path.encode('utf-8'),
183
184
            headers=self.headers,
            body=self.data)
185
        sendlog.info('')
186
        keep_trying = TIMEOUT
Stavros Sachtouris's avatar
Stavros Sachtouris committed
187
        while keep_trying > 0:
188
189
190
            try:
                return conn.getresponse()
            except ResponseNotReady:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
191
192
193
                wait = 0.03 * random()
                sleep(wait)
                keep_trying -= wait
194
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
195
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
196
        recvlog.debug(logmsg)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
197
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
198

199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
    @property
    def headers_to_quote(self):
        return self._headers_to_quote

    @headers_to_quote.setter
    def headers_to_quote(self, header_keys):
        self._headers_to_quote += [k.lower() for k in header_keys]
        self._headers_to_quote = list(set(self._headers_to_quote))

    @property
    def header_prefices(self):
        return self._header_prefices

    @header_prefices.setter
    def header_prefices(self, header_key_prefices):
        self._header_prefices += [p.lower() for p in header_key_prefices]
        self._header_prefices = list(set(self._header_prefices))

217

218
class ResponseManager(Logged):
219
220
    """Manage the http request and handle the response data, headers, etc."""

221
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
222
223
        """
        :param request: (RequestManager)
224
225
226
227

        :param poolsize: (int) the size of the connection pool

        :param connection_retry_limit: (int)
228
        """
229
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
230
231
232
        self.request = request
        self._request_performed = False
        self.poolsize = poolsize
233
234
235
236
237
238
239
240
241
242
243
244
        self._headers_to_decode, self._header_prefices = [], []

    def _get_headers_to_decode(self, headers):
        keys = set([k.lower() for k, v in headers])
        encodable = list(keys.intersection(self.headers_to_decode))

        def has_prefix(s):
            for k in self.header_prefices:
                if s.startswith(k):
                    return True
            return False
        return encodable + filter(has_prefix, keys.difference(encodable))
245
246
247
248
249
250

    def _get_response(self):
        if self._request_performed:
            return

        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
251
252
253
254
255
256
257
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
            try:
                with PooledHTTPConnection(
                        self.request.netloc, self.request.scheme,
                        **pool_kw) as connection:
                    self.request.LOG_TOKEN = self.LOG_TOKEN
                    self.request.LOG_DATA = self.LOG_DATA
258
                    self.request.LOG_PID = self.LOG_PID
259
                    r = self.request.perform(connection)
260
261
262
263
264
                    plog = ''
                    if self.LOG_PID:
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
                            self, r, self.request))
                        plog = '\t[%s]' % self
265
266
267
268
                    self._request_performed = True
                    self._status_code, self._status = r.status, unquote(
                        r.reason)
                    recvlog.info(
269
270
                        '%d %s%s' % (
                            self.status_code, self.status, plog))
271
                    self._headers = dict()
272
273
274
275

                    r_headers = r.getheaders()
                    enc_headers = self._get_headers_to_decode(r_headers)
                    for k, v in r_headers:
276
277
                        self._headers[k] = unquote(v).decode('utf-8') if (
                            k.lower()) in enc_headers else v
278
                        recvlog.info('  %s: %s%s' % (k, v, plog))
279
                    self._content = r.read()
280
281
                    recvlog.info('data size: %s%s' % (
                        len(self._content) if self._content else 0, plog))
282
                    if self.LOG_DATA and self._content:
283
284
285
                        data = '%s%s' % (self._content, plog)
                        if self._token:
                            data = data.replace(self._token, '...')
286
287
                        recvlog.info(data)
                    recvlog.info('-             -        -     -   -  - -')
288
289
290
291
292
293
294
295
296
297
298
                break
            except Exception as err:
                if isinstance(err, HTTPException):
                    if retries >= self.CONNECTION_TRY_LIMIT:
                        raise ClientError(
                            'Connection to %s failed %s times (%s: %s )' % (
                                self.request.url, retries, type(err), err))
                else:
                    from traceback import format_stack
                    recvlog.debug(
                        '\n'.join(['%s' % type(err)] + format_stack()))
299
                    raise
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328

    @property
    def status_code(self):
        self._get_response()
        return self._status_code

    @property
    def status(self):
        self._get_response()
        return self._status

    @property
    def headers(self):
        self._get_response()
        return self._headers

    @property
    def content(self):
        self._get_response()
        return self._content

    @property
    def text(self):
        """
        :returns: (str) content
        """
        self._get_response()
        return '%s' % self._content

329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
    @property
    def headers_to_decode(self):
        return self._headers_to_decode

    @headers_to_decode.setter
    def headers_to_decode(self, header_keys):
        self._headers_to_decode += [k.lower() for k in header_keys]
        self._headers_to_decode = list(set(self._headers_to_decode))

    @property
    def header_prefices(self):
        return self._header_prefices

    @header_prefices.setter
    def header_prefices(self, header_key_prefices):
        self._header_prefices += [p.lower() for p in header_key_prefices]
        self._header_prefices = list(set(self._header_prefices))

347
348
349
350
351
352
353
    @property
    def json(self):
        """
        :returns: (dict) squeezed from json-formated content
        """
        self._get_response()
        try:
354
355
356
357
358
            #  Ensure there are no line breaks in json string
            results_in_dict = loads(self._content)
            results_in_str = dumps(results_in_dict)
            #  Escape control characters and parse to python object
            return loads(results_in_str.encode('unicode_escape'))
359
        except ValueError as err:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
360
            raise ClientError('Response not formated in JSON - %s' % err)
361
362


363
class SilentEvent(Thread):
364
    """Thread-run method(*args, **kwargs)"""
365
366
    def __init__(self, method, *args, **kwargs):
        super(self.__class__, self).__init__()
367
        self.method, self.args, self.kwargs = method, args, kwargs
368
369
370
371
372
373
374
375
376
377
378
379
380

    @property
    def exception(self):
        return getattr(self, '_exception', False)

    @property
    def value(self):
        return getattr(self, '_value', None)

    def run(self):
        try:
            self._value = self.method(*(self.args), **(self.kwargs))
        except Exception as e:
381
382
383
384
385
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
                self,
                type(e),
                e.status if isinstance(e, ClientError) else '',
                e))
386
387
            self._exception = e

388

389
class Client(Logged):
390
    service_type = ''
391
    MAX_THREADS = 1
392
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
393
    CONNECTION_RETRY_LIMIT = 0
394

395
396
397
398
399
    def __init__(self, endpoint_url, token, base_url=None):
        #  BW compatibility - keep base_url for some time
        endpoint_url = endpoint_url or base_url
        assert endpoint_url, 'No endpoint_url for client %s' % self
        self.endpoint_url, self.base_url = endpoint_url, endpoint_url
400
        self.token = token
401
        self.headers, self.params = dict(), dict()
402
        self.poolsize = None
403
404
        self.request_headers_to_quote = []
        self.request_header_prefices_to_quote = []
405
406
        self.response_headers = []
        self.response_header_prefices = []
407

408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
    @staticmethod
    def _unquote_header_keys(headers, prefices):
        new_keys = dict()
        for k in headers:
            if k.lower().startswith(prefices):
                new_keys[k] = unquote(k).decode('utf-8')
        for old, new in new_keys.items():
            headers[new] = headers.pop(old)

    @staticmethod
    def _quote_header_keys(headers, prefices):
        new_keys = dict()
        for k in headers:
            if k.lower().startswith(prefices):
                new_keys[k] = quote(k.encode('utf-8'))
        for old, new in new_keys.items():
            headers[new] = headers.pop(old)

426
    def _init_thread_limit(self, limit=1):
427
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
428
429
430
431
432
        self._thread_limit = limit
        self._elapsed_old = 0.0
        self._elapsed_new = 0.0

    def _watch_thread_limit(self, threadlist):
433
434
435
        self._thread_limit = getattr(self, '_thread_limit', 1)
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
436
        recvlog.debug('# running threads: %s' % len(threadlist))
437
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
438
                self._thread_limit < self.MAX_THREADS):
439
            self._thread_limit += 1
440
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
441
442
443
444
445
446
447
448
449
450
451
452
453
            self._thread_limit -= 1

        self._elapsed_old = self._elapsed_new
        if len(threadlist) >= self._thread_limit:
            self._elapsed_new = 0.0
            for thread in threadlist:
                begin_time = time()
                thread.join()
                self._elapsed_new += time() - begin_time
            self._elapsed_new = self._elapsed_new / len(threadlist)
            return []
        return threadlist

454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
    def async_run(self, method, kwarg_list):
        """Fire threads of operations

        :param method: the method to run in each thread

        :param kwarg_list: (list of dicts) the arguments to pass in each method
            call

        :returns: (list) the results of each method call w.r. to the order of
            kwarg_list
        """
        flying, results = {}, {}
        self._init_thread_limit()
        for index, kwargs in enumerate(kwarg_list):
            self._watch_thread_limit(flying.values())
            flying[index] = SilentEvent(method=method, **kwargs)
            flying[index].start()
            unfinished = {}
            for key, thread in flying.items():
                if thread.isAlive():
                    unfinished[key] = thread
                elif thread.exception:
                    raise thread.exception
                else:
                    results[key] = thread.value
            flying = unfinished
        sendlog.info('- - - wait for threads to finish')
        for key, thread in flying.items():
            if thread.isAlive():
                thread.join()
484
            if thread.exception:
485
                raise thread.exception
486
            results[key] = thread.value
487
488
        return results.values()

489
    def set_header(self, name, value, iff=True):
490
        """Set a header 'name':'value'"""
491
        if value is not None and iff:
492
            self.headers['%s' % name] = '%s' % value
493
494
495

    def set_param(self, name, value=None, iff=True):
        if iff:
496
            self.params[name] = '%s' % value
497

498
    def request(
499
500
            self, method, path,
            async_headers=dict(), async_params=dict(),
501
            **kwargs):
502
        """Commit an HTTP request to endpoint_url/path
503
504
505
506
507
        Requests are commited to and performed by Request/ResponseManager
        These classes perform a lazy http request. Present method, by default,
        enforces them to perform the http call. Hint: call present method with
        success=None to get a non-performed ResponseManager object.
        """
508
509
510
        assert isinstance(method, str) or isinstance(method, unicode)
        assert method
        assert isinstance(path, str) or isinstance(path, unicode)
511
        try:
512
513
514
515
            headers = dict(self.headers)
            headers.update(async_headers)
            params = dict(self.params)
            params.update(async_params)
516
517
            success = kwargs.pop('success', 200)
            data = kwargs.pop('data', None)
518
            headers.setdefault('X-Auth-Token', self.token)
519
            if 'json' in kwargs:
520
                data = dumps(kwargs.pop('json'))
521
                headers.setdefault('Content-Type', 'application/json')
522
            if data:
523
                headers.setdefault('Content-Length', '%s' % len(data))
524
            plog = ('\t[%s]' % self) if self.LOG_PID else ''
525
            sendlog.debug('\n\nCMT %s@%s%s', method, self.endpoint_url, plog)
526
            req = RequestManager(
527
                method, self.endpoint_url, path,
528
                data=data, headers=headers, params=params)
529
530
            req.headers_to_quote = self.request_headers_to_quote
            req.header_prefices = self.request_header_prefices_to_quote
531
            #  req.log()
532
            r = ResponseManager(
533
534
535
                req,
                poolsize=self.poolsize,
                connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
536
537
            r.headers_to_decode = self.response_headers
            r.header_prefices = self.response_header_prefices
538
539
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
540
            r._token = headers['X-Auth-Token']
541
        finally:
542
543
            self.headers = dict()
            self.params = dict()
544
545

        if success is not None:
546
            # Success can either be an int or a collection
547
548
            success = (success,) if isinstance(success, int) else success
            if r.status_code not in success:
549
550
551
552
553
554
555
556
                log.debug(u'Client caught error %s (%s)' % (r, type(r)))
                status_msg = getattr(r, 'status', '')
                try:
                    message = u'%s %s\n' % (status_msg, r.text)
                except:
                    message = u'%s %s\n' % (status_msg, r)
                status = getattr(r, 'status_code', getattr(r, 'status', 0))
                raise ClientError(message, status=status)
557
        return r
Giorgos Verigakis's avatar
Giorgos Verigakis committed
558

Giorgos Verigakis's avatar
Giorgos Verigakis committed
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
    def delete(self, path, **kwargs):
        return self.request('delete', path, **kwargs)

    def get(self, path, **kwargs):
        return self.request('get', path, **kwargs)

    def head(self, path, **kwargs):
        return self.request('head', path, **kwargs)

    def post(self, path, **kwargs):
        return self.request('post', path, **kwargs)

    def put(self, path, **kwargs):
        return self.request('put', path, **kwargs)

574
575
576
577
578
    def copy(self, path, **kwargs):
        return self.request('copy', path, **kwargs)

    def move(self, path, **kwargs):
        return self.request('move', path, **kwargs)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
579
580
581
582
583


class Waiter(object):

    def _wait(
584
585
586
            self, item_id, wait_status, get_status,
            delay=1, max_wait=100, wait_cb=None, wait_for_status=False):
        """Wait while the item is still in wait_status or to reach it
Stavros Sachtouris's avatar
Stavros Sachtouris committed
587
588
589

        :param server_id: integer (str or int)

590
        :param wait_status: (str)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
591
592
593
594
595
596
597
598
599

        :param get_status: (method(self, item_id)) if called, returns
            (status, progress %) If no way to tell progress, return None

        :param delay: time interval between retries

        :param wait_cb: (method(total steps)) returns a generator for
            reporting progress or timeouts i.e., for a progress bar

600
601
        :param wait_for_status: (bool) wait FOR (True) or wait WHILE (False)

Stavros Sachtouris's avatar
Stavros Sachtouris committed
602
603
604
605
606
607
608
609
        :returns: (str) the new mode if successful, (bool) False if timed out
        """
        status, progress = get_status(self, item_id)

        if wait_cb:
            wait_gen = wait_cb(max_wait // delay)
            wait_gen.next()

610
611
612
613
614
615
        if wait_for_status ^ (status != wait_status):
            # if wait_cb:
            #     try:
            #         wait_gen.next()
            #     except Exception:
            #         pass
Stavros Sachtouris's avatar
Stavros Sachtouris committed
616
617
618
            return status
        old_wait = total_wait = 0

619
620
        while (wait_for_status ^ (status == wait_status)) and (
                total_wait <= max_wait):
Stavros Sachtouris's avatar
Stavros Sachtouris committed
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
            if wait_cb:
                try:
                    for i in range(total_wait - old_wait):
                        wait_gen.next()
                except Exception:
                    break
            old_wait = total_wait
            total_wait = progress or total_wait + 1
            sleep(delay)
            status, progress = get_status(self, item_id)

        if total_wait < max_wait:
            if wait_cb:
                try:
                    for i in range(max_wait):
                        wait_gen.next()
                except:
                    pass
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
        return status if (wait_for_status ^ (status != wait_status)) else False

    def wait_for(
            self, item_id, target_status, get_status,
            delay=1, max_wait=100, wait_cb=None):
        self._wait(
            item_id, target_status, get_status, delay, max_wait, wait_cb,
            wait_for_status=True)

    def wait_while(
            self, item_id, target_status, get_status,
            delay=1, max_wait=100, wait_cb=None):
        self._wait(
            item_id, target_status, get_status, delay, max_wait, wait_cb,
            wait_for_status=False)