__init__.py 14.8 KB
Newer Older
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
Giorgos Verigakis's avatar
Giorgos Verigakis committed
2
3
4
5
6
7
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
8
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
9
10
11
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
12
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

Stavros Sachtouris's avatar
Stavros Sachtouris committed
34
from urllib2 import quote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
40
41
42
43
from httplib import ResponseNotReady
from time import sleep
from random import random

from objpool.http import PooledHTTPConnection
44

45
from kamaki.clients.utils import logger
Giorgos Verigakis's avatar
Giorgos Verigakis committed
46

47
LOG_TOKEN = False
48
DEBUG_LOG = logger.get_log_filename()
49

50
51
logger.add_file_logger('clients.send', __name__, filename=DEBUG_LOG)
sendlog = logger.get_logger('clients.send')
52
sendlog.debug('Logging location: %s' % DEBUG_LOG)
53

54
55
logger.add_file_logger('data.send', __name__, filename=DEBUG_LOG)
datasendlog = logger.get_logger('data.send')
56

57
58
logger.add_file_logger('clients.recv', __name__, filename=DEBUG_LOG)
recvlog = logger.get_logger('clients.recv')
59

60
61
logger.add_file_logger('data.recv', __name__, filename=DEBUG_LOG)
datarecvlog = logger.get_logger('data.recv')
Giorgos Verigakis's avatar
Giorgos Verigakis committed
62

63
64
65
logger.add_file_logger('ClientError', __name__, filename=DEBUG_LOG)
clienterrorlog = logger.get_logger('ClientError')

66
67
68
69
70
71
72
73
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']


def _encode(v):
    if v and isinstance(v, unicode):
        return quote(v.encode('utf-8'))
    return v

74

Giorgos Verigakis's avatar
Giorgos Verigakis committed
75
class ClientError(Exception):
76
    def __init__(self, message, status=0, details=None):
77
78
        clienterrorlog.debug(
            'msg[%s], sts[%s], dtl[%s]' % (message, status, details))
79
        try:
80
            message += '' if message and message[-1] == '\n' else '\n'
81
            serv_stat, sep, new_msg = message.partition('{')
82
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
83
            json_msg = loads(new_msg)
84
            key = json_msg.keys()[0]
85
            serv_stat = serv_stat.strip()
86

87
            json_msg = json_msg[key]
88
89
90
91
92
93
            message = '%s %s (%s)\n' % (
                serv_stat,
                key,
                json_msg['message']) if (
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
            status = json_msg.get('code', status)
94
95
96
            if 'details' in json_msg:
                if not details:
                    details = []
97
                if not isinstance(details, list):
98
99
100
                    details = [details]
                if json_msg['details']:
                    details.append(json_msg['details'])
101
        except Exception:
102
            pass
103
        finally:
104
105
            while message.endswith('\n\n'):
                message = message[:-1]
106
107
108
            super(ClientError, self).__init__(message)
            self.status = status if isinstance(status, int) else 0
            self.details = details if details else []
Giorgos Verigakis's avatar
Giorgos Verigakis committed
109

110

111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class RequestManager(object):
    """Handle http request information"""

    def _connection_info(self, url, path, params={}):
        """ Set self.url to scheme://netloc/?params
        :param url: (str or unicode) The service url

        :param path: (str or unicode) The service path (url/path)

        :param params: (dict) Parameters to add to final url

        :returns: (scheme, netloc)
        """
        url = _encode(url) if url else 'http://127.0.0.1/'
        url += '' if url.endswith('/') else '/'
        if path:
            url += _encode(path[1:] if path.startswith('/') else path)
128
129
        delim = '?'
        for key, val in params.items():
130
            val = _encode(val)
131
132
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
            delim = '&'
133
134
135
136
137
138
139
140
141
142
143
144
145
146
        parsed = urlparse(url)
        self.url = url
        self.path = parsed.path or '/'
        if parsed.query:
            self.path += '?%s' % parsed.query
        return (parsed.scheme, parsed.netloc)

    def __init__(
            self, method, url, path,
            data=None, headers={}, params={}):
        method = method.upper()
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
        if headers:
            assert isinstance(headers, dict)
147
        self.headers = dict(headers)
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
        self.method, self.data = method, data
        self.scheme, self.netloc = self._connection_info(url, path, params)

    def perform(self, conn):
        """
        :param conn: (httplib connection object)

        :returns: (HTTPResponse)
        """
        #  sendlog.debug(
        #    'RequestManager.perform mthd(%s), url(%s), headrs(%s), bdlen(%s)',
        #    self.method, self.url, self.headers, self.data)
        conn.request(
            method=str(self.method.upper()),
            url=str(self.path),
            headers=self.headers,
            body=self.data)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
165
166
        keep_trying = 60.0
        while keep_trying > 0:
167
168
169
            try:
                return conn.getresponse()
            except ResponseNotReady:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
170
171
172
173
174
                wait = 0.03 * random()
                sleep(wait)
                keep_trying -= wait
        recvlog('Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self))
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213


class ResponseManager(object):
    """Manage the http request and handle the response data, headers, etc."""

    def __init__(self, request, poolsize=None):
        """
        :param request: (RequestManager)
        """
        self.request = request
        self._request_performed = False
        self.poolsize = poolsize

    def _get_response(self):
        if self._request_performed:
            return

        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
        try:
            with PooledHTTPConnection(
                    self.request.netloc, self.request.scheme,
                    **pool_kw) as connection:
                r = self.request.perform(connection)
                #  recvlog.debug('ResponseManager(%s):' % r)
                self._request_performed = True
                self._headers = dict()
                for k, v in r.getheaders():
                    self.headers[k] = v
                    #  recvlog.debug('\t%s: %s\t(%s)' % (k, v, r))
                self._content = r.read()
                self._status_code = r.status
                self._status = r.reason
        except Exception as err:
            from kamaki.clients import recvlog
            from traceback import format_stack
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
            raise ClientError(
                'Failed while http-connecting to %s (%s)' % (
                    self.request.url,
Stavros Sachtouris's avatar
Stavros Sachtouris committed
214
                    err))
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252

    @property
    def status_code(self):
        self._get_response()
        return self._status_code

    @property
    def status(self):
        self._get_response()
        return self._status

    @property
    def headers(self):
        self._get_response()
        return self._headers

    @property
    def content(self):
        self._get_response()
        return self._content

    @property
    def text(self):
        """
        :returns: (str) content
        """
        self._get_response()
        return '%s' % self._content

    @property
    def json(self):
        """
        :returns: (dict) squeezed from json-formated content
        """
        self._get_response()
        try:
            return loads(self._content)
        except ValueError as err:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
253
            raise ClientError('Response not formated in JSON - %s' % err)
254
255


256
class SilentEvent(Thread):
257
    """ Thread-run method(*args, **kwargs)"""
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
    def __init__(self, method, *args, **kwargs):
        super(self.__class__, self).__init__()
        self.method = method
        self.args = args
        self.kwargs = kwargs

    @property
    def exception(self):
        return getattr(self, '_exception', False)

    @property
    def value(self):
        return getattr(self, '_value', None)

    def run(self):
        try:
            self._value = self.method(*(self.args), **(self.kwargs))
        except Exception as e:
276
277
278
279
280
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
                self,
                type(e),
                e.status if isinstance(e, ClientError) else '',
                e))
281
282
            self._exception = e

283

Giorgos Verigakis's avatar
Giorgos Verigakis committed
284
class Client(object):
285

286
    def __init__(self, base_url, token):
Giorgos Verigakis's avatar
Giorgos Verigakis committed
287
        self.base_url = base_url
288
        self.token = token
289
        self.headers, self.params = dict(), dict()
290
291
292
293
        self.DATE_FORMATS = [
            '%a %b %d %H:%M:%S %Y',
            '%A, %d-%b-%y %H:%M:%S GMT',
            '%a, %d %b %Y %H:%M:%S GMT']
294
        self.MAX_THREADS = 7
295

296
    def _init_thread_limit(self, limit=1):
297
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
298
299
300
301
302
        self._thread_limit = limit
        self._elapsed_old = 0.0
        self._elapsed_new = 0.0

    def _watch_thread_limit(self, threadlist):
303
304
305
        self._thread_limit = getattr(self, '_thread_limit', 1)
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
306
        recvlog.debug('# running threads: %s' % len(threadlist))
307
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
308
                self._thread_limit < self.MAX_THREADS):
309
            self._thread_limit += 1
310
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
311
312
313
314
315
316
317
318
319
320
321
322
323
            self._thread_limit -= 1

        self._elapsed_old = self._elapsed_new
        if len(threadlist) >= self._thread_limit:
            self._elapsed_new = 0.0
            for thread in threadlist:
                begin_time = time()
                thread.join()
                self._elapsed_new += time() - begin_time
            self._elapsed_new = self._elapsed_new / len(threadlist)
            return []
        return threadlist

324
    def _raise_for_status(self, r):
325
        clienterrorlog.debug('raise err from [%s] of type[%s]' % (r, type(r)))
326
        status_msg = getattr(r, 'status', None) or ''
327
        try:
328
            message = '%s %s\n' % (status_msg, r.text)
329
        except:
330
331
332
            message = '%s %s\n' % (status_msg, r)
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
        raise ClientError(message, status=status)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
333

334
    def set_header(self, name, value, iff=True):
335
        """Set a header 'name':'value'"""
336
        if value is not None and iff:
337
            self.headers[name] = value
338
339
340

    def set_param(self, name, value=None, iff=True):
        if iff:
341
            self.params[name] = value
342

343
    def request(
344
345
            self, method, path,
            async_headers=dict(), async_params=dict(),
346
            **kwargs):
347
        """In threaded/asynchronous requests, headers and params are not safe
348
349
350
        Therefore, the standard self.set_header/param system can be used only
        for headers and params that are common for all requests. All other
        params and headers should passes as
351
352
        @param async_headers
        @async_params
353
354
        E.g. in most queries the 'X-Auth-Token' header might be the same for
        all, but the 'Range' header might be different from request to request.
355
        """
356
357
358
        assert isinstance(method, str) or isinstance(method, unicode)
        assert method
        assert isinstance(path, str) or isinstance(path, unicode)
359
        try:
360
361
362
363
            headers = dict(self.headers)
            headers.update(async_headers)
            params = dict(self.params)
            params.update(async_params)
364
365
            success = kwargs.pop('success', 200)
            data = kwargs.pop('data', None)
366
            headers.setdefault('X-Auth-Token', self.token)
367
            if 'json' in kwargs:
368
                data = dumps(kwargs.pop('json'))
369
                headers.setdefault('Content-Type', 'application/json')
370
            if data:
371
372
373
374
375
376
377
378
                headers.setdefault('Content-Length', '%s' % len(data))

            req = RequestManager(
                method, self.base_url, path,
                data=data, headers=headers, params=params)
            sendlog.info('commit a %s @ %s\t[%s]', method, self.base_url, self)
            sendlog.info('\tpath: %s\t[%s]', req.path, self)
            for key, val in req.headers.items():
379
380
                if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
                    continue
381
                sendlog.info('\t%s: %s [%s]', key, val, self)
382
            if data:
383
                datasendlog.info(data)
384
            sendlog.info('END HTTP request commit\t[%s]', self)
385

386
            r = ResponseManager(req)
387
388
            recvlog.info('%d %s', r.status_code, r.status)
            for key, val in r.headers.items():
389
390
                if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
                    continue
391
                recvlog.info('%s: %s', key, val)
392
            if r.content:
393
                datarecvlog.info(r.content)
394
        finally:
395
396
            self.headers = dict()
            self.params = dict()
397
398

        if success is not None:
399
            # Success can either be an int or a collection
400
401
402
            success = (success,) if isinstance(success, int) else success
            if r.status_code not in success:
                self._raise_for_status(r)
403
        return r
Giorgos Verigakis's avatar
Giorgos Verigakis committed
404

Giorgos Verigakis's avatar
Giorgos Verigakis committed
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
    def delete(self, path, **kwargs):
        return self.request('delete', path, **kwargs)

    def get(self, path, **kwargs):
        return self.request('get', path, **kwargs)

    def head(self, path, **kwargs):
        return self.request('head', path, **kwargs)

    def post(self, path, **kwargs):
        return self.request('post', path, **kwargs)

    def put(self, path, **kwargs):
        return self.request('put', path, **kwargs)

420
421
422
423
424
    def copy(self, path, **kwargs):
        return self.request('copy', path, **kwargs)

    def move(self, path, **kwargs):
        return self.request('move', path, **kwargs)