__init__.py 21.8 KB
Newer Older
1
# Copyright 2011-2014 GRNET S.A. All rights reserved.
Giorgos Verigakis's avatar
Giorgos Verigakis committed
2
3
4
5
6
7
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
8
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
9
10
11
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
12
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady, HTTPException
40
41
from time import sleep
from random import random
42
from logging import getLogger
43
44

from objpool.http import PooledHTTPConnection
45

Giorgos Verigakis's avatar
Giorgos Verigakis committed
46

47
48
TIMEOUT = 60.0   # seconds
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

50
51
52
log = getLogger(__name__)
sendlog = getLogger('%s.send' % __name__)
recvlog = getLogger('%s.recv' % __name__)
53

54
55
56
57
58
59

def _encode(v):
    if v and isinstance(v, unicode):
        return quote(v.encode('utf-8'))
    return v

60

Giorgos Verigakis's avatar
Giorgos Verigakis committed
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
65
66
            message,
            status,
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

75
            json_msg = json_msg[key]
76
77
78
79
80
81
            message = '%s %s (%s)\n' % (
                serv_stat,
                key,
                json_msg['message']) if (
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
            status = json_msg.get('code', status)
82
83
84
            if 'details' in json_msg:
                if not details:
                    details = []
85
                if not isinstance(details, list):
86
87
88
                    details = [details]
                if json_msg['details']:
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
93
            while message.endswith('\n\n'):
                message = message[:-1]
94
95
96
            super(ClientError, self).__init__(message)
            self.status = status if isinstance(status, int) else 0
            self.details = details if details else []
Giorgos Verigakis's avatar
Giorgos Verigakis committed
97

98

99
100
101
102
class Logged(object):

    LOG_TOKEN = False
    LOG_DATA = False
103
    LOG_PID = False
104
    _token = None
105
106
107


class RequestManager(Logged):
108
109
110
111
112
113
114
115
116
117
118
119
    """Handle http request information"""

    def _connection_info(self, url, path, params={}):
        """ Set self.url to scheme://netloc/?params
        :param url: (str or unicode) The service url

        :param path: (str or unicode) The service path (url/path)

        :param params: (dict) Parameters to add to final url

        :returns: (scheme, netloc)
        """
120
        url = url or 'http://127.0.0.1/'
121
122
123
        url += '' if url.endswith('/') else '/'
        if path:
            url += _encode(path[1:] if path.startswith('/') else path)
124
125
        delim = '?'
        for key, val in params.items():
126
            val = quote('' if val in (None, False) else '%s' % _encode(val))
127
128
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
            delim = '&'
129
        parsed = urlparse(url)
130
131
        self.url = _encode(u'%s' % url)
        self.path = _encode((u'%s' % parsed.path) if parsed.path else '/')
132
133
        if parsed.query:
            self.path += '?%s' % parsed.query
134
        return (_encode(parsed.scheme), _encode(parsed.netloc))
135
136
137
138
139
140
141
142

    def __init__(
            self, method, url, path,
            data=None, headers={}, params={}):
        method = method.upper()
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
        if headers:
            assert isinstance(headers, dict)
143
        self.headers = dict(headers)
144
145
146
        self.method, self.data = method, data
        self.scheme, self.netloc = self._connection_info(url, path, params)

147
    def dump_log(self):
148
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
149
150
151
        sendlog.info('- -  -   -     -        -             -')
        sendlog.info('%s %s://%s%s%s' % (
            self.method, self.scheme, self.netloc, self.path, plog))
152
        for key, val in self.headers.items():
153
154
155
            if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
                self._token, val = val, '...'
            sendlog.info('  %s: %s%s' % (key, val, plog))
156
        if self.data:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
157
            sendlog.info('data size: %s%s' % (len(self.data), plog))
158
            if self.LOG_DATA:
159
160
                sendlog.info(self.data.replace(self._token, '...') if (
                    self._token) else self.data)
161
        else:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
162
            sendlog.info('data size: 0%s' % plog)
163

Stavros Sachtouris's avatar
Stavros Sachtouris committed
164
    def _encode_headers(self):
165
        headers = dict(self.headers)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
166
        for k, v in self.headers.items():
167
            headers[k] = quote('' if v is None else '%s' % v)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
168
169
        self.headers = headers

170
171
172
173
174
175
    def perform(self, conn):
        """
        :param conn: (httplib connection object)

        :returns: (HTTPResponse)
        """
Stavros Sachtouris's avatar
Stavros Sachtouris committed
176
        self._encode_headers()
177
        self.dump_log()
178
        conn.request(
179
180
            method=self.method.upper(),
            url=('%s' % self.path) or '',
181
182
            headers=self.headers,
            body=self.data)
183
        sendlog.info('')
184
        keep_trying = TIMEOUT
Stavros Sachtouris's avatar
Stavros Sachtouris committed
185
        while keep_trying > 0:
186
187
188
            try:
                return conn.getresponse()
            except ResponseNotReady:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
189
190
191
                wait = 0.03 * random()
                sleep(wait)
                keep_trying -= wait
192
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
193
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
194
        recvlog.debug(logmsg)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
195
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
196
197


198
class ResponseManager(Logged):
199
200
    """Manage the http request and handle the response data, headers, etc."""

201
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
202
203
        """
        :param request: (RequestManager)
204
205
206
207

        :param poolsize: (int) the size of the connection pool

        :param connection_retry_limit: (int)
208
        """
209
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
210
211
212
        self.request = request
        self._request_performed = False
        self.poolsize = poolsize
213
214
215
216
217
218
219
220
221
222
223
224
        self._headers_to_decode, self._header_prefices = [], []

    def _get_headers_to_decode(self, headers):
        keys = set([k.lower() for k, v in headers])
        encodable = list(keys.intersection(self.headers_to_decode))

        def has_prefix(s):
            for k in self.header_prefices:
                if s.startswith(k):
                    return True
            return False
        return encodable + filter(has_prefix, keys.difference(encodable))
225
226
227
228
229
230

    def _get_response(self):
        if self._request_performed:
            return

        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
231
232
233
234
235
236
237
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
            try:
                with PooledHTTPConnection(
                        self.request.netloc, self.request.scheme,
                        **pool_kw) as connection:
                    self.request.LOG_TOKEN = self.LOG_TOKEN
                    self.request.LOG_DATA = self.LOG_DATA
238
                    self.request.LOG_PID = self.LOG_PID
239
                    r = self.request.perform(connection)
240
241
242
243
244
                    plog = ''
                    if self.LOG_PID:
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
                            self, r, self.request))
                        plog = '\t[%s]' % self
245
246
247
248
                    self._request_performed = True
                    self._status_code, self._status = r.status, unquote(
                        r.reason)
                    recvlog.info(
249
250
                        '%d %s%s' % (
                            self.status_code, self.status, plog))
251
                    self._headers = dict()
252
253
254
255

                    r_headers = r.getheaders()
                    enc_headers = self._get_headers_to_decode(r_headers)
                    for k, v in r_headers:
256
257
258
                        if k.lower in ('x-auth-token', ) and (
                                not self.LOG_TOKEN):
                            self._token, v = v, '...'
259
260
                        elif k.lower() in enc_headers:
                            v = unquote(v).decode('utf-8')
261
                        self._headers[k] = v
262
                        recvlog.info('  %s: %s%s' % (k, v, plog))
263
                    self._content = r.read()
264
265
                    recvlog.info('data size: %s%s' % (
                        len(self._content) if self._content else 0, plog))
266
                    if self.LOG_DATA and self._content:
267
268
269
                        data = '%s%s' % (self._content, plog)
                        if self._token:
                            data = data.replace(self._token, '...')
270
271
                        recvlog.info(data)
                    recvlog.info('-             -        -     -   -  - -')
272
273
274
275
276
277
278
279
280
281
282
                break
            except Exception as err:
                if isinstance(err, HTTPException):
                    if retries >= self.CONNECTION_TRY_LIMIT:
                        raise ClientError(
                            'Connection to %s failed %s times (%s: %s )' % (
                                self.request.url, retries, type(err), err))
                else:
                    from traceback import format_stack
                    recvlog.debug(
                        '\n'.join(['%s' % type(err)] + format_stack()))
283
                    raise
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312

    @property
    def status_code(self):
        self._get_response()
        return self._status_code

    @property
    def status(self):
        self._get_response()
        return self._status

    @property
    def headers(self):
        self._get_response()
        return self._headers

    @property
    def content(self):
        self._get_response()
        return self._content

    @property
    def text(self):
        """
        :returns: (str) content
        """
        self._get_response()
        return '%s' % self._content

313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
    @property
    def headers_to_decode(self):
        return self._headers_to_decode

    @headers_to_decode.setter
    def headers_to_decode(self, header_keys):
        self._headers_to_decode += [k.lower() for k in header_keys]
        self._headers_to_decode = list(set(self._headers_to_decode))

    @property
    def header_prefices(self):
        return self._header_prefices

    @header_prefices.setter
    def header_prefices(self, header_key_prefices):
        self._header_prefices += [p.lower() for p in header_key_prefices]
        self._header_prefices = list(set(self._header_prefices))

331
332
333
334
335
336
337
338
339
    @property
    def json(self):
        """
        :returns: (dict) squeezed from json-formated content
        """
        self._get_response()
        try:
            return loads(self._content)
        except ValueError as err:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
340
            raise ClientError('Response not formated in JSON - %s' % err)
341
342


343
class SilentEvent(Thread):
344
    """Thread-run method(*args, **kwargs)"""
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
    def __init__(self, method, *args, **kwargs):
        super(self.__class__, self).__init__()
        self.method = method
        self.args = args
        self.kwargs = kwargs

    @property
    def exception(self):
        return getattr(self, '_exception', False)

    @property
    def value(self):
        return getattr(self, '_value', None)

    def run(self):
        try:
            self._value = self.method(*(self.args), **(self.kwargs))
        except Exception as e:
363
364
365
366
367
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
                self,
                type(e),
                e.status if isinstance(e, ClientError) else '',
                e))
368
369
            self._exception = e

370

371
class Client(Logged):
372
    service_type = ''
373
    MAX_THREADS = 1
374
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
375
    CONNECTION_RETRY_LIMIT = 0
376

377
    def __init__(self, base_url, token):
378
        assert base_url, 'No base_url for client %s' % self
Giorgos Verigakis's avatar
Giorgos Verigakis committed
379
        self.base_url = base_url
380
        self.token = token
381
        self.headers, self.params = dict(), dict()
382
        self.poolsize = None
383
        self.headers_to_decode, self.header_prefices = [], []
384

385
    def _init_thread_limit(self, limit=1):
386
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
387
388
389
390
391
        self._thread_limit = limit
        self._elapsed_old = 0.0
        self._elapsed_new = 0.0

    def _watch_thread_limit(self, threadlist):
392
393
394
        self._thread_limit = getattr(self, '_thread_limit', 1)
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
395
        recvlog.debug('# running threads: %s' % len(threadlist))
396
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
397
                self._thread_limit < self.MAX_THREADS):
398
            self._thread_limit += 1
399
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
400
401
402
403
404
405
406
407
408
409
410
411
412
            self._thread_limit -= 1

        self._elapsed_old = self._elapsed_new
        if len(threadlist) >= self._thread_limit:
            self._elapsed_new = 0.0
            for thread in threadlist:
                begin_time = time()
                thread.join()
                self._elapsed_new += time() - begin_time
            self._elapsed_new = self._elapsed_new / len(threadlist)
            return []
        return threadlist

413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
    def async_run(self, method, kwarg_list):
        """Fire threads of operations

        :param method: the method to run in each thread

        :param kwarg_list: (list of dicts) the arguments to pass in each method
            call

        :returns: (list) the results of each method call w.r. to the order of
            kwarg_list
        """
        flying, results = {}, {}
        self._init_thread_limit()
        for index, kwargs in enumerate(kwarg_list):
            self._watch_thread_limit(flying.values())
            flying[index] = SilentEvent(method=method, **kwargs)
            flying[index].start()
            unfinished = {}
            for key, thread in flying.items():
                if thread.isAlive():
                    unfinished[key] = thread
                elif thread.exception:
                    raise thread.exception
                else:
                    results[key] = thread.value
            flying = unfinished
        sendlog.info('- - - wait for threads to finish')
        for key, thread in flying.items():
            if thread.isAlive():
                thread.join()
443
            if thread.exception:
444
                raise thread.exception
445
            results[key] = thread.value
446
447
        return results.values()

448
    def _raise_for_status(self, r):
449
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
450
        status_msg = getattr(r, 'status', None) or ''
451
        try:
452
            message = '%s %s\n' % (status_msg, r.text)
453
        except:
454
455
456
            message = '%s %s\n' % (status_msg, r)
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
        raise ClientError(message, status=status)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
457

458
    def set_header(self, name, value, iff=True):
459
        """Set a header 'name':'value'"""
460
        if value is not None and iff:
461
            self.headers['%s' % name] = '%s' % value
462
463
464

    def set_param(self, name, value=None, iff=True):
        if iff:
465
            self.params[name] = '%s' % value
466

467
    def request(
468
469
            self, method, path,
            async_headers=dict(), async_params=dict(),
470
            **kwargs):
471
472
473
474
475
476
        """Commit an HTTP request to base_url/path
        Requests are commited to and performed by Request/ResponseManager
        These classes perform a lazy http request. Present method, by default,
        enforces them to perform the http call. Hint: call present method with
        success=None to get a non-performed ResponseManager object.
        """
477
478
479
        assert isinstance(method, str) or isinstance(method, unicode)
        assert method
        assert isinstance(path, str) or isinstance(path, unicode)
480
        try:
481
482
483
484
            headers = dict(self.headers)
            headers.update(async_headers)
            params = dict(self.params)
            params.update(async_params)
485
486
            success = kwargs.pop('success', 200)
            data = kwargs.pop('data', None)
487
            headers.setdefault('X-Auth-Token', self.token)
488
            if 'json' in kwargs:
489
                data = dumps(kwargs.pop('json'))
490
                headers.setdefault('Content-Type', 'application/json')
491
            if data:
492
493
                headers.setdefault('Content-Length', '%s' % len(data))

494
            plog = ('\t[%s]' % self) if self.LOG_PID else ''
495
            sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
496
497
498
            req = RequestManager(
                method, self.base_url, path,
                data=data, headers=headers, params=params)
499
            #  req.log()
500
            r = ResponseManager(
501
502
503
                req,
                poolsize=self.poolsize,
                connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
504
505
            r.headers_to_decode = self.headers_to_decode
            r.header_prefices = self.header_prefices
506
507
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
508
            r._token = headers['X-Auth-Token']
509
        finally:
510
511
            self.headers = dict()
            self.params = dict()
512
513

        if success is not None:
514
            # Success can either be an int or a collection
515
516
517
            success = (success,) if isinstance(success, int) else success
            if r.status_code not in success:
                self._raise_for_status(r)
518
        return r
Giorgos Verigakis's avatar
Giorgos Verigakis committed
519

Giorgos Verigakis's avatar
Giorgos Verigakis committed
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
    def delete(self, path, **kwargs):
        return self.request('delete', path, **kwargs)

    def get(self, path, **kwargs):
        return self.request('get', path, **kwargs)

    def head(self, path, **kwargs):
        return self.request('head', path, **kwargs)

    def post(self, path, **kwargs):
        return self.request('post', path, **kwargs)

    def put(self, path, **kwargs):
        return self.request('put', path, **kwargs)

535
536
537
538
539
    def copy(self, path, **kwargs):
        return self.request('copy', path, **kwargs)

    def move(self, path, **kwargs):
        return self.request('move', path, **kwargs)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
540
541
542
543
544


class Waiter(object):

    def _wait(
545
546
547
            self, item_id, wait_status, get_status,
            delay=1, max_wait=100, wait_cb=None, wait_for_status=False):
        """Wait while the item is still in wait_status or to reach it
Stavros Sachtouris's avatar
Stavros Sachtouris committed
548
549
550

        :param server_id: integer (str or int)

551
        :param wait_status: (str)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
552
553
554
555
556
557
558
559
560

        :param get_status: (method(self, item_id)) if called, returns
            (status, progress %) If no way to tell progress, return None

        :param delay: time interval between retries

        :param wait_cb: (method(total steps)) returns a generator for
            reporting progress or timeouts i.e., for a progress bar

561
562
        :param wait_for_status: (bool) wait FOR (True) or wait WHILE (False)

Stavros Sachtouris's avatar
Stavros Sachtouris committed
563
564
565
566
567
568
569
570
        :returns: (str) the new mode if successful, (bool) False if timed out
        """
        status, progress = get_status(self, item_id)

        if wait_cb:
            wait_gen = wait_cb(max_wait // delay)
            wait_gen.next()

571
572
573
574
575
576
        if wait_for_status ^ (status != wait_status):
            # if wait_cb:
            #     try:
            #         wait_gen.next()
            #     except Exception:
            #         pass
Stavros Sachtouris's avatar
Stavros Sachtouris committed
577
578
579
            return status
        old_wait = total_wait = 0

580
581
        while (wait_for_status ^ (status == wait_status)) and (
                total_wait <= max_wait):
Stavros Sachtouris's avatar
Stavros Sachtouris committed
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
            if wait_cb:
                try:
                    for i in range(total_wait - old_wait):
                        wait_gen.next()
                except Exception:
                    break
            old_wait = total_wait
            total_wait = progress or total_wait + 1
            sleep(delay)
            status, progress = get_status(self, item_id)

        if total_wait < max_wait:
            if wait_cb:
                try:
                    for i in range(max_wait):
                        wait_gen.next()
                except:
                    pass
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
        return status if (wait_for_status ^ (status != wait_status)) else False

    def wait_for(
            self, item_id, target_status, get_status,
            delay=1, max_wait=100, wait_cb=None):
        self._wait(
            item_id, target_status, get_status, delay, max_wait, wait_cb,
            wait_for_status=True)

    def wait_while(
            self, item_id, target_status, get_status,
            delay=1, max_wait=100, wait_cb=None):
        self._wait(
            item_id, target_status, get_status, delay, max_wait, wait_cb,
            wait_for_status=False)