__init__.py 15.2 KB
Newer Older
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
Giorgos Verigakis's avatar
Giorgos Verigakis committed
2
3
4
5
6
7
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
8
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
9
10
11
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
12
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

Stavros Sachtouris's avatar
Stavros Sachtouris committed
34
from urllib2 import quote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
40
41
42
43
from httplib import ResponseNotReady
from time import sleep
from random import random

from objpool.http import PooledHTTPConnection
44

45
from kamaki import logger
Giorgos Verigakis's avatar
Giorgos Verigakis committed
46

47
LOG_FILE = logger.get_log_filename()
48
49
TIMEOUT = 60.0   # seconds
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
50

51
logger.add_file_logger('clients.send', __name__, filename=LOG_FILE)
52
sendlog = logger.get_logger('clients.send')
53
sendlog.debug('Logging location: %s' % LOG_FILE)
54

55
logger.add_file_logger('data.send', __name__, filename=LOG_FILE)
56
datasendlog = logger.get_logger('data.send')
57

58
logger.add_file_logger('clients.recv', __name__, filename=LOG_FILE)
59
recvlog = logger.get_logger('clients.recv')
60

61
logger.add_file_logger('data.recv', __name__, filename=LOG_FILE)
62
datarecvlog = logger.get_logger('data.recv')
Giorgos Verigakis's avatar
Giorgos Verigakis committed
63

64
logger.add_file_logger('ClientError', __name__, filename=LOG_FILE)
65
66
clienterrorlog = logger.get_logger('ClientError')

67
68
69
70
71
72

def _encode(v):
    if v and isinstance(v, unicode):
        return quote(v.encode('utf-8'))
    return v

73

Giorgos Verigakis's avatar
Giorgos Verigakis committed
74
class ClientError(Exception):
75
    def __init__(self, message, status=0, details=None):
76
77
78
79
        clienterrorlog.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
            message,
            status,
            details))
80
        try:
81
            message += '' if message and message[-1] == '\n' else '\n'
82
            serv_stat, sep, new_msg = message.partition('{')
83
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
84
            json_msg = loads(new_msg)
85
            key = json_msg.keys()[0]
86
            serv_stat = serv_stat.strip()
87

88
            json_msg = json_msg[key]
89
90
91
92
93
94
            message = '%s %s (%s)\n' % (
                serv_stat,
                key,
                json_msg['message']) if (
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
            status = json_msg.get('code', status)
95
96
97
            if 'details' in json_msg:
                if not details:
                    details = []
98
                if not isinstance(details, list):
99
100
101
                    details = [details]
                if json_msg['details']:
                    details.append(json_msg['details'])
102
        except Exception:
103
            pass
104
        finally:
105
106
            while message.endswith('\n\n'):
                message = message[:-1]
107
108
109
            super(ClientError, self).__init__(message)
            self.status = status if isinstance(status, int) else 0
            self.details = details if details else []
Giorgos Verigakis's avatar
Giorgos Verigakis committed
110

111

112
113
114
115
116
117
118
class Logged(object):

    LOG_TOKEN = False
    LOG_DATA = False


class RequestManager(Logged):
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
    """Handle http request information"""

    def _connection_info(self, url, path, params={}):
        """ Set self.url to scheme://netloc/?params
        :param url: (str or unicode) The service url

        :param path: (str or unicode) The service path (url/path)

        :param params: (dict) Parameters to add to final url

        :returns: (scheme, netloc)
        """
        url = _encode(url) if url else 'http://127.0.0.1/'
        url += '' if url.endswith('/') else '/'
        if path:
            url += _encode(path[1:] if path.startswith('/') else path)
135
136
        delim = '?'
        for key, val in params.items():
137
            val = _encode(val)
138
139
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
            delim = '&'
140
141
142
143
144
145
146
147
148
149
150
151
152
153
        parsed = urlparse(url)
        self.url = url
        self.path = parsed.path or '/'
        if parsed.query:
            self.path += '?%s' % parsed.query
        return (parsed.scheme, parsed.netloc)

    def __init__(
            self, method, url, path,
            data=None, headers={}, params={}):
        method = method.upper()
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
        if headers:
            assert isinstance(headers, dict)
154
        self.headers = dict(headers)
155
156
157
        self.method, self.data = method, data
        self.scheme, self.netloc = self._connection_info(url, path, params)

158
    def log(self):
159
        sendlog.info('%s %s://%s%s\t[%s]' % (
160
161
162
163
164
            self.method,
            self.scheme,
            self.netloc,
            self.path,
            self))
165
        for key, val in self.headers.items():
166
            if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
167
                continue
168
            sendlog.info('  %s: %s\t[%s]' % (key, val, self))
169
        if self.data:
170
171
            sendlog.info('data size:%s\t[%s]' % (len(self.data), self))
            if self.LOG_DATA:
172
                datasendlog.info(self.data)
173
        else:
174
            sendlog.info('data size:0\t[%s]' % self)
175
        sendlog.info('')
176

177
178
179
180
181
182
183
184
185
186
187
    def perform(self, conn):
        """
        :param conn: (httplib connection object)

        :returns: (HTTPResponse)
        """
        conn.request(
            method=str(self.method.upper()),
            url=str(self.path),
            headers=self.headers,
            body=self.data)
188
        self.log()
189
        keep_trying = TIMEOUT
Stavros Sachtouris's avatar
Stavros Sachtouris committed
190
        while keep_trying > 0:
191
192
193
            try:
                return conn.getresponse()
            except ResponseNotReady:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
194
195
196
                wait = 0.03 * random()
                sleep(wait)
                keep_trying -= wait
197
        logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
198
        recvlog.debug(logmsg)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
199
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
200
201


202
class ResponseManager(Logged):
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
    """Manage the http request and handle the response data, headers, etc."""

    def __init__(self, request, poolsize=None):
        """
        :param request: (RequestManager)
        """
        self.request = request
        self._request_performed = False
        self.poolsize = poolsize

    def _get_response(self):
        if self._request_performed:
            return

        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
        try:
            with PooledHTTPConnection(
                    self.request.netloc, self.request.scheme,
                    **pool_kw) as connection:
222
223
                self.request.LOG_TOKEN = self.LOG_TOKEN
                self.request.LOG_DATA = self.LOG_DATA
224
                r = self.request.perform(connection)
225
                recvlog.info('[resp: %s] <-- [req: %s]\n' % (r, self.request))
226
                self._request_performed = True
227
                self._status_code, self._status = r.status, r.reason
228
                recvlog.info(
229
                    '%d %s\t[p: %s]' % (self.status_code, self.status, self))
230
231
                self._headers = dict()
                for k, v in r.getheaders():
232
                    if (not self.LOG_TOKEN) and k.lower() == 'x-auth-token':
233
                        continue
234
                    self._headers[k] = v
235
                    recvlog.info('  %s: %s\t[p: %s]' % (k, v, self))
236
                self._content = r.read()
237
                recvlog.info('data size: %s\t[p: %s]' % (
238
239
                    len(self._content) if self._content else 0,
                    self))
240
241
                if self.LOG_DATA and self._content:
                    datarecvlog.info('%s\t[p: %s]' % (self._content, self))
242
243
244
245
246
247
        except Exception as err:
            from traceback import format_stack
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
            raise ClientError(
                'Failed while http-connecting to %s (%s)' % (
                    self.request.url,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
248
                    err))
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286

    @property
    def status_code(self):
        self._get_response()
        return self._status_code

    @property
    def status(self):
        self._get_response()
        return self._status

    @property
    def headers(self):
        self._get_response()
        return self._headers

    @property
    def content(self):
        self._get_response()
        return self._content

    @property
    def text(self):
        """
        :returns: (str) content
        """
        self._get_response()
        return '%s' % self._content

    @property
    def json(self):
        """
        :returns: (dict) squeezed from json-formated content
        """
        self._get_response()
        try:
            return loads(self._content)
        except ValueError as err:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
287
            raise ClientError('Response not formated in JSON - %s' % err)
288
289


290
class SilentEvent(Thread):
291
    """Thread-run method(*args, **kwargs)"""
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
    def __init__(self, method, *args, **kwargs):
        super(self.__class__, self).__init__()
        self.method = method
        self.args = args
        self.kwargs = kwargs

    @property
    def exception(self):
        return getattr(self, '_exception', False)

    @property
    def value(self):
        return getattr(self, '_value', None)

    def run(self):
        try:
            self._value = self.method(*(self.args), **(self.kwargs))
        except Exception as e:
310
311
312
313
314
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
                self,
                type(e),
                e.status if isinstance(e, ClientError) else '',
                e))
315
316
            self._exception = e

317

Giorgos Verigakis's avatar
Giorgos Verigakis committed
318
class Client(object):
319

320
321
322
323
324
    MAX_THREADS = 7
    DATE_FORMATS = [
        '%a %b %d %H:%M:%S %Y',
        '%A, %d-%b-%y %H:%M:%S GMT',
        '%a, %d %b %Y %H:%M:%S GMT']
325
326
    LOG_TOKEN = False
    LOG_DATA = False
327

328
    def __init__(self, base_url, token):
Giorgos Verigakis's avatar
Giorgos Verigakis committed
329
        self.base_url = base_url
330
        self.token = token
331
        self.headers, self.params = dict(), dict()
332

333
    def _init_thread_limit(self, limit=1):
334
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
335
336
337
338
339
        self._thread_limit = limit
        self._elapsed_old = 0.0
        self._elapsed_new = 0.0

    def _watch_thread_limit(self, threadlist):
340
341
342
        self._thread_limit = getattr(self, '_thread_limit', 1)
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
343
        recvlog.debug('# running threads: %s' % len(threadlist))
344
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
345
                self._thread_limit < self.MAX_THREADS):
346
            self._thread_limit += 1
347
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
348
349
350
351
352
353
354
355
356
357
358
359
360
            self._thread_limit -= 1

        self._elapsed_old = self._elapsed_new
        if len(threadlist) >= self._thread_limit:
            self._elapsed_new = 0.0
            for thread in threadlist:
                begin_time = time()
                thread.join()
                self._elapsed_new += time() - begin_time
            self._elapsed_new = self._elapsed_new / len(threadlist)
            return []
        return threadlist

361
    def _raise_for_status(self, r):
362
        clienterrorlog.debug('raise err from [%s] of type[%s]' % (r, type(r)))
363
        status_msg = getattr(r, 'status', None) or ''
364
        try:
365
            message = '%s %s\n' % (status_msg, r.text)
366
        except:
367
368
369
            message = '%s %s\n' % (status_msg, r)
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
        raise ClientError(message, status=status)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
370

371
    def set_header(self, name, value, iff=True):
372
        """Set a header 'name':'value'"""
373
        if value is not None and iff:
374
            self.headers[name] = value
375
376
377

    def set_param(self, name, value=None, iff=True):
        if iff:
378
            self.params[name] = value
379

380
    def request(
381
382
            self, method, path,
            async_headers=dict(), async_params=dict(),
383
            **kwargs):
384
385
386
387
388
389
        """Commit an HTTP request to base_url/path
        Requests are commited to and performed by Request/ResponseManager
        These classes perform a lazy http request. Present method, by default,
        enforces them to perform the http call. Hint: call present method with
        success=None to get a non-performed ResponseManager object.
        """
390
391
392
        assert isinstance(method, str) or isinstance(method, unicode)
        assert method
        assert isinstance(path, str) or isinstance(path, unicode)
393
        try:
394
395
396
397
            headers = dict(self.headers)
            headers.update(async_headers)
            params = dict(self.params)
            params.update(async_params)
398
399
            success = kwargs.pop('success', 200)
            data = kwargs.pop('data', None)
400
            headers.setdefault('X-Auth-Token', self.token)
401
            if 'json' in kwargs:
402
                data = dumps(kwargs.pop('json'))
403
                headers.setdefault('Content-Type', 'application/json')
404
            if data:
405
406
                headers.setdefault('Content-Length', '%s' % len(data))

407
            sendlog.debug('\n\nCMT %s@%s\t[%s]', method, self.base_url, self)
408
409
410
            req = RequestManager(
                method, self.base_url, path,
                data=data, headers=headers, params=params)
411
            #  req.log()
412
            r = ResponseManager(req)
413
            r.LOG_TOKEN, r.LOG_DATA = self.LOG_TOKEN, self.LOG_DATA
414
        finally:
415
416
            self.headers = dict()
            self.params = dict()
417
418

        if success is not None:
419
            # Success can either be an int or a collection
420
421
422
            success = (success,) if isinstance(success, int) else success
            if r.status_code not in success:
                self._raise_for_status(r)
423
        return r
Giorgos Verigakis's avatar
Giorgos Verigakis committed
424

Giorgos Verigakis's avatar
Giorgos Verigakis committed
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
    def delete(self, path, **kwargs):
        return self.request('delete', path, **kwargs)

    def get(self, path, **kwargs):
        return self.request('get', path, **kwargs)

    def head(self, path, **kwargs):
        return self.request('head', path, **kwargs)

    def post(self, path, **kwargs):
        return self.request('post', path, **kwargs)

    def put(self, path, **kwargs):
        return self.request('put', path, **kwargs)

440
441
442
443
444
    def copy(self, path, **kwargs):
        return self.request('copy', path, **kwargs)

    def move(self, path, **kwargs):
        return self.request('move', path, **kwargs)