client.py 10.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#
#

# Copyright (C) 2007, 2008 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.

"""HTTP client module.

"""

Iustin Pop's avatar
Iustin Pop committed
25
26
27
28
29
30
31
# pylint: disable-msg=E1103

# # E1103: %s %r has no %r member (but some types could not be
# inferred), since _socketobject could be ssl or not and pylint
# doesn't parse that


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import os
import select
import socket
import errno
import threading

from ganeti import workerpool
from ganeti import http


HTTP_CLIENT_THREADS = 10


class HttpClientRequest(object):
  def __init__(self, host, port, method, path, headers=None, post_data=None,
               ssl_params=None, ssl_verify_peer=False):
    """Describes an HTTP request.

    @type host: string
    @param host: Hostname
    @type port: int
    @param port: Port
    @type method: string
    @param method: Method name
    @type path: string
    @param path: Request path
    @type headers: dict or None
    @param headers: Additional headers to send
    @type post_data: string or None
    @param post_data: Additional data to send
    @type ssl_params: HttpSslParams
    @param ssl_params: SSL key and certificate
    @type ssl_verify_peer: bool
Iustin Pop's avatar
Iustin Pop committed
65
66
    @param ssl_verify_peer: Whether to compare our certificate with
        server's certificate
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140

    """
    if post_data is not None:
      assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
        "Only POST and GET requests support sending data"

    assert path.startswith("/"), "Path must start with slash (/)"

    # Request attributes
    self.host = host
    self.port = port
    self.ssl_params = ssl_params
    self.ssl_verify_peer = ssl_verify_peer
    self.method = method
    self.path = path
    self.headers = headers
    self.post_data = post_data

    self.success = None
    self.error = None

    # Raw response
    self.response = None

    # Response attributes
    self.resp_version = None
    self.resp_status_code = None
    self.resp_reason = None
    self.resp_headers = None
    self.resp_body = None


class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
  pass


class _HttpServerToClientMessageReader(http.HttpMessageReader):
  # Length limits
  START_LINE_LENGTH_MAX = 512
  HEADER_LENGTH_MAX = 4096

  def ParseStartLine(self, start_line):
    """Parses the status line sent by the server.

    """
    # Empty lines are skipped when reading
    assert start_line

    try:
      [version, status, reason] = start_line.split(None, 2)
    except ValueError:
      try:
        [version, status] = start_line.split(None, 1)
        reason = ""
      except ValueError:
        version = http.HTTP_0_9

    if version:
      version = version.upper()

    # The status code is a three-digit number
    try:
      status = int(status)
      if status < 100 or status > 999:
        status = -1
    except ValueError:
      status = -1

    if status == -1:
      raise http.HttpError("Invalid status code (%r)" % start_line)

    return http.HttpServerToClientStartLine(version, status, reason)


141
class HttpClientRequestExecutor(http.HttpBase):
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
  # Default headers
  DEFAULT_HEADERS = {
    http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
    # TODO: For keep-alive, don't send "Connection: close"
    http.HTTP_CONNECTION: "close",
    }

  # Timeouts in seconds for socket layer
  # TODO: Soft timeout instead of only socket timeout?
  # TODO: Make read timeout configurable per OpCode?
  CONNECT_TIMEOUT = 5
  WRITE_TIMEOUT = 10
  READ_TIMEOUT = None
  CLOSE_TIMEOUT = 1

  def __init__(self, req):
    """Initializes the HttpClientRequestExecutor class.

    @type req: HttpClientRequest
    @param req: Request object

    """
164
    http.HttpBase.__init__(self)
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
    self.request = req

    try:
      # TODO: Implement connection caching/keep-alive
      self.sock = self._CreateSocket(req.ssl_params,
                                     req.ssl_verify_peer)

      # Disable Python's timeout
      self.sock.settimeout(None)

      # Operate in non-blocking mode
      self.sock.setblocking(0)

      response_msg_reader = None
      response_msg = None
      force_close = True

      self._Connect()
      try:
        self._SendRequest()
        (response_msg_reader, response_msg) = self._ReadResponse()

        # Only wait for server to close if we didn't have any exception.
        force_close = False
      finally:
        # TODO: Keep-alive is not supported, always close connection
        force_close = True
192
193
194
        http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT,
                                self.WRITE_TIMEOUT, response_msg_reader,
                                force_close)
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243

      self.sock.close()
      self.sock = None

      req.response = response_msg

      req.resp_version = req.response.start_line.version
      req.resp_status_code = req.response.start_line.code
      req.resp_reason = req.response.start_line.reason
      req.resp_headers = req.response.headers
      req.resp_body = req.response.body

      req.success = True
      req.error = None

    except http.HttpError, err:
      req.success = False
      req.error = str(err)

  def _Connect(self):
    """Non-blocking connect to host with timeout.

    """
    connected = False
    while True:
      try:
        connect_error = self.sock.connect_ex((self.request.host,
                                              self.request.port))
      except socket.gaierror, err:
        raise http.HttpError("Connection failed: %s" % str(err))

      if connect_error == errno.EINTR:
        # Mask signals
        pass

      elif connect_error == 0:
        # Connection established
        connected = True
        break

      elif connect_error == errno.EINPROGRESS:
        # Connection started
        break

      raise http.HttpError("Connection failed (%s: %s)" %
                             (connect_error, os.strerror(connect_error)))

    if not connected:
      # Wait for connection
244
245
      event = http.WaitForSocketCondition(self.sock, select.POLLOUT,
                                          self.CONNECT_TIMEOUT)
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
      if event is None:
        raise http.HttpError("Timeout while connecting to server")

      # Get error code
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
      if connect_error != 0:
        raise http.HttpError("Connection failed (%s: %s)" %
                               (connect_error, os.strerror(connect_error)))

    # Enable TCP keep-alive
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)

    # If needed, Linux specific options are available to change the TCP
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
    # TCP_KEEPINTVL.

262
263
    # Do the secret SSL handshake
    if self.using_ssl:
Iustin Pop's avatar
Iustin Pop committed
264
      self.sock.set_connect_state() # pylint: disable-msg=E1103
265
      try:
266
        http.Handshake(self.sock, self.WRITE_TIMEOUT)
267
268
269
      except http.HttpSessionHandshakeUnexpectedEOF:
        raise http.HttpError("Server closed connection during SSL handshake")

270
271
272
273
274
275
276
277
278
279
  def _SendRequest(self):
    """Sends request to server.

    """
    # Headers
    send_headers = self.DEFAULT_HEADERS.copy()

    if self.request.headers:
      send_headers.update(self.request.headers)

280
281
    send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
                                              self.request.port)
282
283
284
285
286
287
288
289
290

    # Response message
    msg = http.HttpMessage()

    # Combine request line. We only support HTTP/1.0 (no chunked transfers and
    # no keep-alive).
    # TODO: For keep-alive, change to HTTP/1.1
    msg.start_line = \
      http.HttpClientToServerStartLine(method=self.request.method.upper(),
291
292
                                       path=self.request.path,
                                       version=http.HTTP_1_0)
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
    msg.headers = send_headers
    msg.body = self.request.post_data

    try:
      _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
    except http.HttpSocketTimeout:
      raise http.HttpError("Timeout while sending request")
    except socket.error, err:
      raise http.HttpError("Error sending request: %s" % err)

  def _ReadResponse(self):
    """Read response from server.

    """
    response_msg = http.HttpMessage()

    try:
      response_msg_reader = \
        _HttpServerToClientMessageReader(self.sock, response_msg,
                                         self.READ_TIMEOUT)
    except http.HttpSocketTimeout:
      raise http.HttpError("Timeout while reading response")
    except socket.error, err:
      raise http.HttpError("Error reading response: %s" % err)

    return (response_msg_reader, response_msg)


class _HttpClientPendingRequest(object):
  """Data class for pending requests.

  """
  def __init__(self, request):
    self.request = request

    # Thread synchronization
    self.done = threading.Event()


class HttpClientWorker(workerpool.BaseWorker):
  """HTTP client worker class.

  """
  def RunTask(self, pend_req):
    try:
      HttpClientRequestExecutor(pend_req.request)
    finally:
      pend_req.done.set()


class HttpClientWorkerPool(workerpool.WorkerPool):
  def __init__(self, manager):
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
                                   HttpClientWorker)
    self.manager = manager


class HttpClientManager(object):
  """Manages HTTP requests.

  """
  def __init__(self):
    self._wpool = HttpClientWorkerPool(self)

  def __del__(self):
    self.Shutdown()

  def ExecRequests(self, requests):
    """Execute HTTP requests.

    This function can be called from multiple threads at the same time.

    @type requests: List of HttpClientRequest instances
    @param requests: The requests to execute
    @rtype: List of HttpClientRequest instances
Iustin Pop's avatar
Iustin Pop committed
368
    @return: The list of requests passed in
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393

    """
    # _HttpClientPendingRequest is used for internal thread synchronization
    pending = [_HttpClientPendingRequest(req) for req in requests]

    try:
      # Add requests to queue
      for pend_req in pending:
        self._wpool.AddTask(pend_req)

    finally:
      # In case of an exception we should still wait for the rest, otherwise
      # another thread from the worker pool could modify the request object
      # after we returned.

      # And wait for them to finish
      for pend_req in pending:
        pend_req.done.wait()

    # Return original list
    return requests

  def Shutdown(self):
    self._wpool.Quiesce()
    self._wpool.TerminateWorkers()