__init__.py 19.5 KB
Newer Older
1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
Giorgos Verigakis's avatar
Giorgos Verigakis committed
2
3
4
5
6
7
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
8
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
9
10
11
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
Stavros Sachtouris's avatar
Stavros Sachtouris committed
12
#      copyright notice, self.list of conditions and the following
Giorgos Verigakis's avatar
Giorgos Verigakis committed
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady, HTTPException
40
41
from time import sleep
from random import random
42
from logging import getLogger
43
44

from objpool.http import PooledHTTPConnection
45

Giorgos Verigakis's avatar
Giorgos Verigakis committed
46

47
48
TIMEOUT = 60.0   # seconds
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

50
51
52
log = getLogger(__name__)
sendlog = getLogger('%s.send' % __name__)
recvlog = getLogger('%s.recv' % __name__)
53

54
55
56
57
58
59

def _encode(v):
    if v and isinstance(v, unicode):
        return quote(v.encode('utf-8'))
    return v

60

Giorgos Verigakis's avatar
Giorgos Verigakis committed
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
65
66
            message,
            status,
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

75
            json_msg = json_msg[key]
76
77
78
79
80
81
            message = '%s %s (%s)\n' % (
                serv_stat,
                key,
                json_msg['message']) if (
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
            status = json_msg.get('code', status)
82
83
84
            if 'details' in json_msg:
                if not details:
                    details = []
85
                if not isinstance(details, list):
86
87
88
                    details = [details]
                if json_msg['details']:
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
93
            while message.endswith('\n\n'):
                message = message[:-1]
94
95
96
            super(ClientError, self).__init__(message)
            self.status = status if isinstance(status, int) else 0
            self.details = details if details else []
Giorgos Verigakis's avatar
Giorgos Verigakis committed
97

98

99
100
101
102
class Logged(object):

    LOG_TOKEN = False
    LOG_DATA = False
103
    LOG_PID = False
104
    _token = None
105
106
107


class RequestManager(Logged):
108
109
110
111
112
113
114
115
116
117
118
119
    """Handle http request information"""

    def _connection_info(self, url, path, params={}):
        """ Set self.url to scheme://netloc/?params
        :param url: (str or unicode) The service url

        :param path: (str or unicode) The service path (url/path)

        :param params: (dict) Parameters to add to final url

        :returns: (scheme, netloc)
        """
120
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
121
122
123
        url += '' if url.endswith('/') else '/'
        if path:
            url += _encode(path[1:] if path.startswith('/') else path)
124
125
        delim = '?'
        for key, val in params.items():
Stavros Sachtouris's avatar
Stavros Sachtouris committed
126
            val = '' if val in (None, False) else _encode(u'%s' % val)
127
128
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
            delim = '&'
129
130
131
132
133
134
135
136
137
138
139
140
141
142
        parsed = urlparse(url)
        self.url = url
        self.path = parsed.path or '/'
        if parsed.query:
            self.path += '?%s' % parsed.query
        return (parsed.scheme, parsed.netloc)

    def __init__(
            self, method, url, path,
            data=None, headers={}, params={}):
        method = method.upper()
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
        if headers:
            assert isinstance(headers, dict)
143
        self.headers = dict(headers)
144
145
146
        self.method, self.data = method, data
        self.scheme, self.netloc = self._connection_info(url, path, params)

147
    def dump_log(self):
148
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
149
150
151
        sendlog.info('- -  -   -     -        -             -')
        sendlog.info('%s %s://%s%s%s' % (
            self.method, self.scheme, self.netloc, self.path, plog))
152
        for key, val in self.headers.items():
153
154
155
            if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
                self._token, val = val, '...'
            sendlog.info('  %s: %s%s' % (key, val, plog))
156
        if self.data:
157
            sendlog.info('data size:%s%s' % (len(self.data), plog))
158
            if self.LOG_DATA:
159
160
                sendlog.info(self.data.replace(self._token, '...') if (
                    self._token) else self.data)
161
        else:
162
            sendlog.info('data size:0%s' % plog)
163

164
165
166
167
168
169
    def perform(self, conn):
        """
        :param conn: (httplib connection object)

        :returns: (HTTPResponse)
        """
170
        self.dump_log()
171
172
173
174
175
        conn.request(
            method=str(self.method.upper()),
            url=str(self.path),
            headers=self.headers,
            body=self.data)
176
        sendlog.info('')
177
        keep_trying = TIMEOUT
Stavros Sachtouris's avatar
Stavros Sachtouris committed
178
        while keep_trying > 0:
179
180
181
            try:
                return conn.getresponse()
            except ResponseNotReady:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
182
183
184
                wait = 0.03 * random()
                sleep(wait)
                keep_trying -= wait
185
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
186
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
187
        recvlog.debug(logmsg)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
188
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
189
190


191
class ResponseManager(Logged):
192
193
    """Manage the http request and handle the response data, headers, etc."""

194
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
195
196
        """
        :param request: (RequestManager)
197
198
199
200

        :param poolsize: (int) the size of the connection pool

        :param connection_retry_limit: (int)
201
        """
202
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
203
204
205
206
207
208
209
210
211
        self.request = request
        self._request_performed = False
        self.poolsize = poolsize

    def _get_response(self):
        if self._request_performed:
            return

        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
212
213
214
215
216
217
218
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
            try:
                with PooledHTTPConnection(
                        self.request.netloc, self.request.scheme,
                        **pool_kw) as connection:
                    self.request.LOG_TOKEN = self.LOG_TOKEN
                    self.request.LOG_DATA = self.LOG_DATA
219
                    self.request.LOG_PID = self.LOG_PID
220
                    r = self.request.perform(connection)
221
222
223
224
225
                    plog = ''
                    if self.LOG_PID:
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
                            self, r, self.request))
                        plog = '\t[%s]' % self
226
227
228
229
                    self._request_performed = True
                    self._status_code, self._status = r.status, unquote(
                        r.reason)
                    recvlog.info(
230
231
                        '%d %s%s' % (
                            self.status_code, self.status, plog))
232
233
                    self._headers = dict()
                    for k, v in r.getheaders():
234
235
236
                        if k.lower in ('x-auth-token', ) and (
                                not self.LOG_TOKEN):
                            self._token, v = v, '...'
237
238
                        v = unquote(v)
                        self._headers[k] = v
239
                        recvlog.info('  %s: %s%s' % (k, v, plog))
240
                    self._content = r.read()
241
242
                    recvlog.info('data size: %s%s' % (
                        len(self._content) if self._content else 0, plog))
243
                    if self.LOG_DATA and self._content:
244
245
246
                        data = '%s%s' % (self._content, plog)
                        if self._token:
                            data = data.replace(self._token, '...')
247
248
                        recvlog.info(data)
                    recvlog.info('-             -        -     -   -  - -')
249
250
251
252
253
254
255
256
257
258
259
260
261
                break
            except Exception as err:
                if isinstance(err, HTTPException):
                    if retries >= self.CONNECTION_TRY_LIMIT:
                        raise ClientError(
                            'Connection to %s failed %s times (%s: %s )' % (
                                self.request.url, retries, type(err), err))
                else:
                    from traceback import format_stack
                    recvlog.debug(
                        '\n'.join(['%s' % type(err)] + format_stack()))
                    raise ClientError(
                        'Failed while http-connecting to %s (%s)' % (
262
                            self.request.url, err))
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300

    @property
    def status_code(self):
        self._get_response()
        return self._status_code

    @property
    def status(self):
        self._get_response()
        return self._status

    @property
    def headers(self):
        self._get_response()
        return self._headers

    @property
    def content(self):
        self._get_response()
        return self._content

    @property
    def text(self):
        """
        :returns: (str) content
        """
        self._get_response()
        return '%s' % self._content

    @property
    def json(self):
        """
        :returns: (dict) squeezed from json-formated content
        """
        self._get_response()
        try:
            return loads(self._content)
        except ValueError as err:
Stavros Sachtouris's avatar
Stavros Sachtouris committed
301
            raise ClientError('Response not formated in JSON - %s' % err)
302
303


304
class SilentEvent(Thread):
305
    """Thread-run method(*args, **kwargs)"""
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
    def __init__(self, method, *args, **kwargs):
        super(self.__class__, self).__init__()
        self.method = method
        self.args = args
        self.kwargs = kwargs

    @property
    def exception(self):
        return getattr(self, '_exception', False)

    @property
    def value(self):
        return getattr(self, '_value', None)

    def run(self):
        try:
            self._value = self.method(*(self.args), **(self.kwargs))
        except Exception as e:
324
325
326
327
328
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
                self,
                type(e),
                e.status if isinstance(e, ClientError) else '',
                e))
329
330
            self._exception = e

331

332
class Client(Logged):
333

334
    MAX_THREADS = 1
335
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
336
    CONNECTION_RETRY_LIMIT = 0
337

338
    def __init__(self, base_url, token):
339
        assert base_url, 'No base_url for client %s' % self
Giorgos Verigakis's avatar
Giorgos Verigakis committed
340
        self.base_url = base_url
341
        self.token = token
342
        self.headers, self.params = dict(), dict()
343

344
    def _init_thread_limit(self, limit=1):
345
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
346
347
348
349
350
        self._thread_limit = limit
        self._elapsed_old = 0.0
        self._elapsed_new = 0.0

    def _watch_thread_limit(self, threadlist):
351
352
353
        self._thread_limit = getattr(self, '_thread_limit', 1)
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
354
        recvlog.debug('# running threads: %s' % len(threadlist))
355
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
356
                self._thread_limit < self.MAX_THREADS):
357
            self._thread_limit += 1
358
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
359
360
361
362
363
364
365
366
367
368
369
370
371
            self._thread_limit -= 1

        self._elapsed_old = self._elapsed_new
        if len(threadlist) >= self._thread_limit:
            self._elapsed_new = 0.0
            for thread in threadlist:
                begin_time = time()
                thread.join()
                self._elapsed_new += time() - begin_time
            self._elapsed_new = self._elapsed_new / len(threadlist)
            return []
        return threadlist

372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
    def async_run(self, method, kwarg_list):
        """Fire threads of operations

        :param method: the method to run in each thread

        :param kwarg_list: (list of dicts) the arguments to pass in each method
            call

        :returns: (list) the results of each method call w.r. to the order of
            kwarg_list
        """
        flying, results = {}, {}
        self._init_thread_limit()
        for index, kwargs in enumerate(kwarg_list):
            self._watch_thread_limit(flying.values())
            flying[index] = SilentEvent(method=method, **kwargs)
            flying[index].start()
            unfinished = {}
            for key, thread in flying.items():
                if thread.isAlive():
                    unfinished[key] = thread
                elif thread.exception:
                    raise thread.exception
                else:
                    results[key] = thread.value
            flying = unfinished
        sendlog.info('- - - wait for threads to finish')
        for key, thread in flying.items():
            if thread.isAlive():
                thread.join()
402
            if thread.exception:
403
                raise thread.exception
404
            results[key] = thread.value
405
406
        return results.values()

407
    def _raise_for_status(self, r):
408
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
409
        status_msg = getattr(r, 'status', None) or ''
410
        try:
411
            message = '%s %s\n' % (status_msg, r.text)
412
        except:
413
414
415
            message = '%s %s\n' % (status_msg, r)
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
        raise ClientError(message, status=status)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
416

417
    def set_header(self, name, value, iff=True):
418
        """Set a header 'name':'value'"""
419
        if value is not None and iff:
420
            self.headers[name] = unicode(value)
421
422
423

    def set_param(self, name, value=None, iff=True):
        if iff:
424
            self.params[name] = unicode(value)
425

426
    def request(
427
428
            self, method, path,
            async_headers=dict(), async_params=dict(),
429
            **kwargs):
430
431
432
433
434
435
        """Commit an HTTP request to base_url/path
        Requests are commited to and performed by Request/ResponseManager
        These classes perform a lazy http request. Present method, by default,
        enforces them to perform the http call. Hint: call present method with
        success=None to get a non-performed ResponseManager object.
        """
436
437
438
        assert isinstance(method, str) or isinstance(method, unicode)
        assert method
        assert isinstance(path, str) or isinstance(path, unicode)
439
        try:
440
441
442
443
            headers = dict(self.headers)
            headers.update(async_headers)
            params = dict(self.params)
            params.update(async_params)
444
445
            success = kwargs.pop('success', 200)
            data = kwargs.pop('data', None)
446
            headers.setdefault('X-Auth-Token', self.token)
447
            if 'json' in kwargs:
448
                data = dumps(kwargs.pop('json'))
449
                headers.setdefault('Content-Type', 'application/json')
450
            if data:
451
452
                headers.setdefault('Content-Length', '%s' % len(data))

453
            plog = ('\t[%s]' % self) if self.LOG_PID else ''
454
            sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
455
456
457
            req = RequestManager(
                method, self.base_url, path,
                data=data, headers=headers, params=params)
458
            #  req.log()
459
460
            r = ResponseManager(
                req, connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
461
462
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
463
            r._token = headers['X-Auth-Token']
464
        finally:
465
466
            self.headers = dict()
            self.params = dict()
467
468

        if success is not None:
469
            # Success can either be an int or a collection
470
471
472
            success = (success,) if isinstance(success, int) else success
            if r.status_code not in success:
                self._raise_for_status(r)
473
        return r
Giorgos Verigakis's avatar
Giorgos Verigakis committed
474

Giorgos Verigakis's avatar
Giorgos Verigakis committed
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
    def delete(self, path, **kwargs):
        return self.request('delete', path, **kwargs)

    def get(self, path, **kwargs):
        return self.request('get', path, **kwargs)

    def head(self, path, **kwargs):
        return self.request('head', path, **kwargs)

    def post(self, path, **kwargs):
        return self.request('post', path, **kwargs)

    def put(self, path, **kwargs):
        return self.request('put', path, **kwargs)

490
491
492
493
494
    def copy(self, path, **kwargs):
        return self.request('copy', path, **kwargs)

    def move(self, path, **kwargs):
        return self.request('move', path, **kwargs)
Stavros Sachtouris's avatar
Stavros Sachtouris committed
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552


class Waiter(object):

    def _wait(
            self, item_id, current_status, get_status,
            delay=1, max_wait=100, wait_cb=None):
        """Wait for item while its status is current_status

        :param server_id: integer (str or int)

        :param current_status: (str)

        :param get_status: (method(self, item_id)) if called, returns
            (status, progress %) If no way to tell progress, return None

        :param delay: time interval between retries

        :param wait_cb: (method(total steps)) returns a generator for
            reporting progress or timeouts i.e., for a progress bar

        :returns: (str) the new mode if successful, (bool) False if timed out
        """
        status, progress = get_status(self, item_id)

        if wait_cb:
            wait_gen = wait_cb(max_wait // delay)
            wait_gen.next()

        if status != current_status:
            if wait_cb:
                try:
                    wait_gen.next()
                except Exception:
                    pass
            return status
        old_wait = total_wait = 0

        while status == current_status and total_wait <= max_wait:
            if wait_cb:
                try:
                    for i in range(total_wait - old_wait):
                        wait_gen.next()
                except Exception:
                    break
            old_wait = total_wait
            total_wait = progress or total_wait + 1
            sleep(delay)
            status, progress = get_status(self, item_id)

        if total_wait < max_wait:
            if wait_cb:
                try:
                    for i in range(max_wait):
                        wait_gen.next()
                except:
                    pass
        return status if status != current_status else False