__init__.py 14.8 KB
Newer Older
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
Giorgos Verigakis's avatar
Giorgos Verigakis committed
2
3
4
5
6
7
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
8
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
9
10
11
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
12
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

Stavros Sachtouris's avatar
Stavros Sachtouris committed
34
from urllib2 import quote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
40
41
42
43
from httplib import ResponseNotReady
from time import sleep
from random import random

from objpool.http import PooledHTTPConnection
44

Stavros Sachtouris's avatar
Stavros Sachtouris committed
45
from kamaki.logger import add_file_logger, get_log_filename
Giorgos Verigakis's avatar
Giorgos Verigakis committed
46

47
48
TIMEOUT = 60.0   # seconds
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

Stavros Sachtouris's avatar
Stavros Sachtouris committed
50
51
52
53
log = add_file_logger(__name__)
log.debug('Logging location: %s' % get_log_filename())
sendlog = add_file_logger('%s.send' % __name__)
recvlog = add_file_logger('%s.recv' % __name__)
54

55
56
57
58
59
60

def _encode(v):
    if v and isinstance(v, unicode):
        return quote(v.encode('utf-8'))
    return v

61

Giorgos Verigakis's avatar
Giorgos Verigakis committed
62
class ClientError(Exception):
63
    def __init__(self, message, status=0, details=None):
64
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
65
66
67
            message,
            status,
            details))
68
        try:
69
            message += '' if message and message[-1] == '\n' else '\n'
70
            serv_stat, sep, new_msg = message.partition('{')
71
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
72
            json_msg = loads(new_msg)
73
            key = json_msg.keys()[0]
74
            serv_stat = serv_stat.strip()
75

76
            json_msg = json_msg[key]
77
78
79
80
81
82
            message = '%s %s (%s)\n' % (
                serv_stat,
                key,
                json_msg['message']) if (
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
            status = json_msg.get('code', status)
83
84
85
            if 'details' in json_msg:
                if not details:
                    details = []
86
                if not isinstance(details, list):
87
88
89
                    details = [details]
                if json_msg['details']:
                    details.append(json_msg['details'])
90
        except Exception:
91
            pass
92
        finally:
93
94
            while message.endswith('\n\n'):
                message = message[:-1]
95
96
97
            super(ClientError, self).__init__(message)
            self.status = status if isinstance(status, int) else 0
            self.details = details if details else []
Giorgos Verigakis's avatar
Giorgos Verigakis committed
98

99

100
101
102
103
104
105
106
class Logged(object):

    LOG_TOKEN = False
    LOG_DATA = False


class RequestManager(Logged):
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
    """Handle http request information"""

    def _connection_info(self, url, path, params={}):
        """ Set self.url to scheme://netloc/?params
        :param url: (str or unicode) The service url

        :param path: (str or unicode) The service path (url/path)

        :param params: (dict) Parameters to add to final url

        :returns: (scheme, netloc)
        """
        url = _encode(url) if url else 'http://127.0.0.1/'
        url += '' if url.endswith('/') else '/'
        if path:
            url += _encode(path[1:] if path.startswith('/') else path)
123
124
        delim = '?'
        for key, val in params.items():
125
            val = _encode(val)
126
127
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
            delim = '&'
128
129
130
131
132
133
134
135
136
137
138
139
140
141
        parsed = urlparse(url)
        self.url = url
        self.path = parsed.path or '/'
        if parsed.query:
            self.path += '?%s' % parsed.query
        return (parsed.scheme, parsed.netloc)

    def __init__(
            self, method, url, path,
            data=None, headers={}, params={}):
        method = method.upper()
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
        if headers:
            assert isinstance(headers, dict)
142
        self.headers = dict(headers)
143
144
145
        self.method, self.data = method, data
        self.scheme, self.netloc = self._connection_info(url, path, params)

146
    def log(self):
147
        sendlog.info('%s %s://%s%s\t[%s]' % (
148
149
150
151
152
            self.method,
            self.scheme,
            self.netloc,
            self.path,
            self))
153
        for key, val in self.headers.items():
154
            if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
155
                continue
156
            sendlog.info('  %s: %s\t[%s]' % (key, val, self))
157
        if self.data:
158
159
            sendlog.info('data size:%s\t[%s]' % (len(self.data), self))
            if self.LOG_DATA:
160
                sendlog.info(self.data)
161
        else:
162
            sendlog.info('data size:0\t[%s]' % self)
163
        sendlog.info('')
164

165
166
167
168
169
170
171
172
173
174
175
    def perform(self, conn):
        """
        :param conn: (httplib connection object)

        :returns: (HTTPResponse)
        """
        conn.request(
            method=str(self.method.upper()),
            url=str(self.path),
            headers=self.headers,
            body=self.data)
176
        self.log()
177
        keep_trying = TIMEOUT
Stavros Sachtouris's avatar
Stavros Sachtouris committed
178
        while keep_trying > 0:
179
180
181
            try:
                return conn.getresponse()
            except ResponseNotReady:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
182
183
184
                wait = 0.03 * random()
                sleep(wait)
                keep_trying -= wait
185
        logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
186
        recvlog.debug(logmsg)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
187
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
188
189


190
class ResponseManager(Logged):
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
    """Manage the http request and handle the response data, headers, etc."""

    def __init__(self, request, poolsize=None):
        """
        :param request: (RequestManager)
        """
        self.request = request
        self._request_performed = False
        self.poolsize = poolsize

    def _get_response(self):
        if self._request_performed:
            return

        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
        try:
            with PooledHTTPConnection(
                    self.request.netloc, self.request.scheme,
                    **pool_kw) as connection:
210
211
                self.request.LOG_TOKEN = self.LOG_TOKEN
                self.request.LOG_DATA = self.LOG_DATA
212
                r = self.request.perform(connection)
213
214
                recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
                    self, r, self.request))
215
                self._request_performed = True
216
                self._status_code, self._status = r.status, r.reason
217
                recvlog.info(
218
                    '%d %s\t[p: %s]' % (self.status_code, self.status, self))
219
220
                self._headers = dict()
                for k, v in r.getheaders():
221
                    if (not self.LOG_TOKEN) and k.lower() == 'x-auth-token':
222
                        continue
223
                    self._headers[k] = v
224
                    recvlog.info('  %s: %s\t[p: %s]' % (k, v, self))
225
                self._content = r.read()
226
                recvlog.info('data size: %s\t[p: %s]' % (
227
228
                    len(self._content) if self._content else 0,
                    self))
229
                if self.LOG_DATA and self._content:
230
                    recvlog.info('%s\t[p: %s]' % (self._content, self))
231
232
233
234
235
236
        except Exception as err:
            from traceback import format_stack
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
            raise ClientError(
                'Failed while http-connecting to %s (%s)' % (
                    self.request.url,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
237
                    err))
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275

    @property
    def status_code(self):
        self._get_response()
        return self._status_code

    @property
    def status(self):
        self._get_response()
        return self._status

    @property
    def headers(self):
        self._get_response()
        return self._headers

    @property
    def content(self):
        self._get_response()
        return self._content

    @property
    def text(self):
        """
        :returns: (str) content
        """
        self._get_response()
        return '%s' % self._content

    @property
    def json(self):
        """
        :returns: (dict) squeezed from json-formated content
        """
        self._get_response()
        try:
            return loads(self._content)
        except ValueError as err:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
276
            raise ClientError('Response not formated in JSON - %s' % err)
277
278


279
class SilentEvent(Thread):
280
    """Thread-run method(*args, **kwargs)"""
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
    def __init__(self, method, *args, **kwargs):
        super(self.__class__, self).__init__()
        self.method = method
        self.args = args
        self.kwargs = kwargs

    @property
    def exception(self):
        return getattr(self, '_exception', False)

    @property
    def value(self):
        return getattr(self, '_value', None)

    def run(self):
        try:
            self._value = self.method(*(self.args), **(self.kwargs))
        except Exception as e:
299
300
301
302
303
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
                self,
                type(e),
                e.status if isinstance(e, ClientError) else '',
                e))
304
305
            self._exception = e

306

Giorgos Verigakis's avatar
Giorgos Verigakis committed
307
class Client(object):
308

309
310
311
312
313
    MAX_THREADS = 7
    DATE_FORMATS = [
        '%a %b %d %H:%M:%S %Y',
        '%A, %d-%b-%y %H:%M:%S GMT',
        '%a, %d %b %Y %H:%M:%S GMT']
314
315
    LOG_TOKEN = False
    LOG_DATA = False
316

317
    def __init__(self, base_url, token):
Giorgos Verigakis's avatar
Giorgos Verigakis committed
318
        self.base_url = base_url
319
        self.token = token
320
        self.headers, self.params = dict(), dict()
321

322
    def _init_thread_limit(self, limit=1):
323
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
324
325
326
327
328
        self._thread_limit = limit
        self._elapsed_old = 0.0
        self._elapsed_new = 0.0

    def _watch_thread_limit(self, threadlist):
329
330
331
        self._thread_limit = getattr(self, '_thread_limit', 1)
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
332
        recvlog.debug('# running threads: %s' % len(threadlist))
333
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
334
                self._thread_limit < self.MAX_THREADS):
335
            self._thread_limit += 1
336
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
337
338
339
340
341
342
343
344
345
346
347
348
349
            self._thread_limit -= 1

        self._elapsed_old = self._elapsed_new
        if len(threadlist) >= self._thread_limit:
            self._elapsed_new = 0.0
            for thread in threadlist:
                begin_time = time()
                thread.join()
                self._elapsed_new += time() - begin_time
            self._elapsed_new = self._elapsed_new / len(threadlist)
            return []
        return threadlist

350
    def _raise_for_status(self, r):
351
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
352
        status_msg = getattr(r, 'status', None) or ''
353
        try:
354
            message = '%s %s\n' % (status_msg, r.text)
355
        except:
356
357
358
            message = '%s %s\n' % (status_msg, r)
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
        raise ClientError(message, status=status)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
359

360
    def set_header(self, name, value, iff=True):
361
        """Set a header 'name':'value'"""
362
        if value is not None and iff:
363
            self.headers[name] = value
364
365
366

    def set_param(self, name, value=None, iff=True):
        if iff:
367
            self.params[name] = value
368

369
    def request(
370
371
            self, method, path,
            async_headers=dict(), async_params=dict(),
372
            **kwargs):
373
374
375
376
377
378
        """Commit an HTTP request to base_url/path
        Requests are commited to and performed by Request/ResponseManager
        These classes perform a lazy http request. Present method, by default,
        enforces them to perform the http call. Hint: call present method with
        success=None to get a non-performed ResponseManager object.
        """
379
380
381
        assert isinstance(method, str) or isinstance(method, unicode)
        assert method
        assert isinstance(path, str) or isinstance(path, unicode)
382
        try:
383
384
385
386
            headers = dict(self.headers)
            headers.update(async_headers)
            params = dict(self.params)
            params.update(async_params)
387
388
            success = kwargs.pop('success', 200)
            data = kwargs.pop('data', None)
389
            headers.setdefault('X-Auth-Token', self.token)
390
            if 'json' in kwargs:
391
                data = dumps(kwargs.pop('json'))
392
                headers.setdefault('Content-Type', 'application/json')
393
            if data:
394
395
                headers.setdefault('Content-Length', '%s' % len(data))

396
            sendlog.debug('\n\nCMT %s@%s\t[%s]', method, self.base_url, self)
397
398
399
            req = RequestManager(
                method, self.base_url, path,
                data=data, headers=headers, params=params)
400
            #  req.log()
401
            r = ResponseManager(req)
402
            r.LOG_TOKEN, r.LOG_DATA = self.LOG_TOKEN, self.LOG_DATA
403
        finally:
404
405
            self.headers = dict()
            self.params = dict()
406
407

        if success is not None:
408
            # Success can either be an int or a collection
409
410
411
            success = (success,) if isinstance(success, int) else success
            if r.status_code not in success:
                self._raise_for_status(r)
412
        return r
Giorgos Verigakis's avatar
Giorgos Verigakis committed
413

Giorgos Verigakis's avatar
Giorgos Verigakis committed
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
    def delete(self, path, **kwargs):
        return self.request('delete', path, **kwargs)

    def get(self, path, **kwargs):
        return self.request('get', path, **kwargs)

    def head(self, path, **kwargs):
        return self.request('head', path, **kwargs)

    def post(self, path, **kwargs):
        return self.request('post', path, **kwargs)

    def put(self, path, **kwargs):
        return self.request('put', path, **kwargs)

429
430
431
432
433
    def copy(self, path, **kwargs):
        return self.request('copy', path, **kwargs)

    def move(self, path, **kwargs):
        return self.request('move', path, **kwargs)