__init__.py 16.2 KB
Newer Older
1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
Giorgos Verigakis's avatar
Giorgos Verigakis committed
2
3
4
5
6
7
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
8
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
9
10
11
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
12
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady, HTTPException
40
41
from time import sleep
from random import random
42
from logging import getLogger
43
44

from objpool.http import PooledHTTPConnection
45

Giorgos Verigakis's avatar
Giorgos Verigakis committed
46

47
48
TIMEOUT = 60.0   # seconds
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

50
51
52
log = getLogger(__name__)
sendlog = getLogger('%s.send' % __name__)
recvlog = getLogger('%s.recv' % __name__)
53

54
55
56
57
58
59

def _encode(v):
    if v and isinstance(v, unicode):
        return quote(v.encode('utf-8'))
    return v

60

Giorgos Verigakis's avatar
Giorgos Verigakis committed
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
65
66
            message,
            status,
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

75
            json_msg = json_msg[key]
76
77
78
79
80
81
            message = '%s %s (%s)\n' % (
                serv_stat,
                key,
                json_msg['message']) if (
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
            status = json_msg.get('code', status)
82
83
84
            if 'details' in json_msg:
                if not details:
                    details = []
85
                if not isinstance(details, list):
86
87
88
                    details = [details]
                if json_msg['details']:
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
93
            while message.endswith('\n\n'):
                message = message[:-1]
94
95
96
            super(ClientError, self).__init__(message)
            self.status = status if isinstance(status, int) else 0
            self.details = details if details else []
Giorgos Verigakis's avatar
Giorgos Verigakis committed
97

98

99
100
101
102
class Logged(object):

    LOG_TOKEN = False
    LOG_DATA = False
103
    LOG_PID = False
104
105
106


class RequestManager(Logged):
107
108
109
110
111
112
113
114
115
116
117
118
    """Handle http request information"""

    def _connection_info(self, url, path, params={}):
        """ Set self.url to scheme://netloc/?params
        :param url: (str or unicode) The service url

        :param path: (str or unicode) The service path (url/path)

        :param params: (dict) Parameters to add to final url

        :returns: (scheme, netloc)
        """
119
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
120
121
122
        url += '' if url.endswith('/') else '/'
        if path:
            url += _encode(path[1:] if path.startswith('/') else path)
123
124
        delim = '?'
        for key, val in params.items():
125
            val = _encode(u'%s' % val)
126
127
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
            delim = '&'
128
129
130
131
132
133
134
135
136
137
138
139
140
141
        parsed = urlparse(url)
        self.url = url
        self.path = parsed.path or '/'
        if parsed.query:
            self.path += '?%s' % parsed.query
        return (parsed.scheme, parsed.netloc)

    def __init__(
            self, method, url, path,
            data=None, headers={}, params={}):
        method = method.upper()
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
        if headers:
            assert isinstance(headers, dict)
142
        self.headers = dict(headers)
143
144
145
        self.method, self.data = method, data
        self.scheme, self.netloc = self._connection_info(url, path, params)

146
    def dump_log(self):
147
148
149
150
        plog = '\t[%s]' if self.LOG_PID else ''
        sendlog.info('- -  -   -     -        -             -')
        sendlog.info('%s %s://%s%s%s' % (
            self.method, self.scheme, self.netloc, self.path, plog))
151
        for key, val in self.headers.items():
152
153
154
155
            #if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
            #    continue
            show = (key.lower() != 'x-auth-token') or self.LOG_TOKEN
            sendlog.info('  %s: %s%s' % (key, val if show else '', plog))
156
        if self.data:
157
            sendlog.info('data size:%s%s' % (len(self.data), plog))
158
            if self.LOG_DATA:
159
                sendlog.info(self.data)
160
        else:
161
            sendlog.info('data size:0%s' % plog)
162
        sendlog.info('')
163

164
165
166
167
168
169
170
171
172
173
174
    def perform(self, conn):
        """
        :param conn: (httplib connection object)

        :returns: (HTTPResponse)
        """
        conn.request(
            method=str(self.method.upper()),
            url=str(self.path),
            headers=self.headers,
            body=self.data)
175
        self.dump_log()
176
        keep_trying = TIMEOUT
Stavros Sachtouris's avatar
Stavros Sachtouris committed
177
        while keep_trying > 0:
178
179
180
            try:
                return conn.getresponse()
            except ResponseNotReady:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
181
182
183
                wait = 0.03 * random()
                sleep(wait)
                keep_trying -= wait
184
185
        plog = '\t[%s]' if self.LOG_PID else ''
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
186
        recvlog.debug(logmsg)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
187
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
188
189


190
class ResponseManager(Logged):
191
192
    """Manage the http request and handle the response data, headers, etc."""

193
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
194
195
        """
        :param request: (RequestManager)
196
197
198
199

        :param poolsize: (int) the size of the connection pool

        :param connection_retry_limit: (int)
200
        """
201
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
202
203
204
205
206
207
208
209
210
        self.request = request
        self._request_performed = False
        self.poolsize = poolsize

    def _get_response(self):
        if self._request_performed:
            return

        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
211
212
213
214
215
216
217
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
            try:
                with PooledHTTPConnection(
                        self.request.netloc, self.request.scheme,
                        **pool_kw) as connection:
                    self.request.LOG_TOKEN = self.LOG_TOKEN
                    self.request.LOG_DATA = self.LOG_DATA
218
                    self.request.LOG_PID = self.LOG_PID
219
                    r = self.request.perform(connection)
220
221
222
223
224
                    plog = ''
                    if self.LOG_PID:
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
                            self, r, self.request))
                        plog = '\t[%s]' % self
225
226
227
228
                    self._request_performed = True
                    self._status_code, self._status = r.status, unquote(
                        r.reason)
                    recvlog.info(
229
230
                        '%d %s%s' % (
                            self.status_code, self.status, plog))
231
232
233
234
235
236
237
                    self._headers = dict()
                    for k, v in r.getheaders():
                        if (not self.LOG_TOKEN) and (
                                k.lower() == 'x-auth-token'):
                            continue
                        v = unquote(v)
                        self._headers[k] = v
238
                        recvlog.info('  %s: %s%s' % (k, v, plog))
239
                    self._content = r.read()
240
241
                    recvlog.info('data size: %s%s' % (
                        len(self._content) if self._content else 0, plog))
242
                    if self.LOG_DATA and self._content:
243
244
                        recvlog.info('%s%s' % (self._content, plog))
                    sendlog.info('-             -        -     -   -  - -')
245
246
247
248
249
250
251
252
253
254
255
256
257
                break
            except Exception as err:
                if isinstance(err, HTTPException):
                    if retries >= self.CONNECTION_TRY_LIMIT:
                        raise ClientError(
                            'Connection to %s failed %s times (%s: %s )' % (
                                self.request.url, retries, type(err), err))
                else:
                    from traceback import format_stack
                    recvlog.debug(
                        '\n'.join(['%s' % type(err)] + format_stack()))
                    raise ClientError(
                        'Failed while http-connecting to %s (%s)' % (
258
                            self.request.url, err))
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296

    @property
    def status_code(self):
        self._get_response()
        return self._status_code

    @property
    def status(self):
        self._get_response()
        return self._status

    @property
    def headers(self):
        self._get_response()
        return self._headers

    @property
    def content(self):
        self._get_response()
        return self._content

    @property
    def text(self):
        """
        :returns: (str) content
        """
        self._get_response()
        return '%s' % self._content

    @property
    def json(self):
        """
        :returns: (dict) squeezed from json-formated content
        """
        self._get_response()
        try:
            return loads(self._content)
        except ValueError as err:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
297
            raise ClientError('Response not formated in JSON - %s' % err)
298
299


300
class SilentEvent(Thread):
301
    """Thread-run method(*args, **kwargs)"""
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
    def __init__(self, method, *args, **kwargs):
        super(self.__class__, self).__init__()
        self.method = method
        self.args = args
        self.kwargs = kwargs

    @property
    def exception(self):
        return getattr(self, '_exception', False)

    @property
    def value(self):
        return getattr(self, '_value', None)

    def run(self):
        try:
            self._value = self.method(*(self.args), **(self.kwargs))
        except Exception as e:
320
321
322
323
324
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
                self,
                type(e),
                e.status if isinstance(e, ClientError) else '',
                e))
325
326
            self._exception = e

327

328
class Client(Logged):
329

330
    MAX_THREADS = 7
331
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
332
    CONNECTION_RETRY_LIMIT = 0
333

334
    def __init__(self, base_url, token):
335
        assert base_url, 'No base_url for client %s' % self
Giorgos Verigakis's avatar
Giorgos Verigakis committed
336
        self.base_url = base_url
337
        self.token = token
338
        self.headers, self.params = dict(), dict()
339

340
    def _init_thread_limit(self, limit=1):
341
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
342
343
344
345
346
        self._thread_limit = limit
        self._elapsed_old = 0.0
        self._elapsed_new = 0.0

    def _watch_thread_limit(self, threadlist):
347
348
349
        self._thread_limit = getattr(self, '_thread_limit', 1)
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
350
        recvlog.debug('# running threads: %s' % len(threadlist))
351
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
352
                self._thread_limit < self.MAX_THREADS):
353
            self._thread_limit += 1
354
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
355
356
357
358
359
360
361
362
363
364
365
366
367
            self._thread_limit -= 1

        self._elapsed_old = self._elapsed_new
        if len(threadlist) >= self._thread_limit:
            self._elapsed_new = 0.0
            for thread in threadlist:
                begin_time = time()
                thread.join()
                self._elapsed_new += time() - begin_time
            self._elapsed_new = self._elapsed_new / len(threadlist)
            return []
        return threadlist

368
    def _raise_for_status(self, r):
369
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
370
        status_msg = getattr(r, 'status', None) or ''
371
        try:
372
            message = '%s %s\n' % (status_msg, r.text)
373
        except:
374
375
376
            message = '%s %s\n' % (status_msg, r)
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
        raise ClientError(message, status=status)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
377

378
    def set_header(self, name, value, iff=True):
379
        """Set a header 'name':'value'"""
380
        if value is not None and iff:
381
            self.headers[name] = unicode(value)
382
383
384

    def set_param(self, name, value=None, iff=True):
        if iff:
385
            self.params[name] = unicode(value)
386

387
    def request(
388
389
            self, method, path,
            async_headers=dict(), async_params=dict(),
390
            **kwargs):
391
392
393
394
395
396
        """Commit an HTTP request to base_url/path
        Requests are commited to and performed by Request/ResponseManager
        These classes perform a lazy http request. Present method, by default,
        enforces them to perform the http call. Hint: call present method with
        success=None to get a non-performed ResponseManager object.
        """
397
398
399
        assert isinstance(method, str) or isinstance(method, unicode)
        assert method
        assert isinstance(path, str) or isinstance(path, unicode)
400
        try:
401
402
403
404
            headers = dict(self.headers)
            headers.update(async_headers)
            params = dict(self.params)
            params.update(async_params)
405
406
            success = kwargs.pop('success', 200)
            data = kwargs.pop('data', None)
407
            headers.setdefault('X-Auth-Token', self.token)
408
            if 'json' in kwargs:
409
                data = dumps(kwargs.pop('json'))
410
                headers.setdefault('Content-Type', 'application/json')
411
            if data:
412
413
                headers.setdefault('Content-Length', '%s' % len(data))

414
415
            plog = '\t[%s]' if self.LOG_PID else ''
            sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
416
417
418
            req = RequestManager(
                method, self.base_url, path,
                data=data, headers=headers, params=params)
419
            #  req.log()
420
421
            r = ResponseManager(
                req, connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
422
423
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
424
        finally:
425
426
            self.headers = dict()
            self.params = dict()
427
428

        if success is not None:
429
            # Success can either be an int or a collection
430
431
432
            success = (success,) if isinstance(success, int) else success
            if r.status_code not in success:
                self._raise_for_status(r)
433
        return r
Giorgos Verigakis's avatar
Giorgos Verigakis committed
434

Giorgos Verigakis's avatar
Giorgos Verigakis committed
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
    def delete(self, path, **kwargs):
        return self.request('delete', path, **kwargs)

    def get(self, path, **kwargs):
        return self.request('get', path, **kwargs)

    def head(self, path, **kwargs):
        return self.request('head', path, **kwargs)

    def post(self, path, **kwargs):
        return self.request('post', path, **kwargs)

    def put(self, path, **kwargs):
        return self.request('put', path, **kwargs)

450
451
452
453
454
    def copy(self, path, **kwargs):
        return self.request('copy', path, **kwargs)

    def move(self, path, **kwargs):
        return self.request('move', path, **kwargs)