client.py 10.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#
#

# Copyright (C) 2007, 2008 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.

"""HTTP client module.

"""

import os
import select
import socket
import errno
import threading

from ganeti import workerpool
from ganeti import http


HTTP_CLIENT_THREADS = 10


class HttpClientRequest(object):
  def __init__(self, host, port, method, path, headers=None, post_data=None,
               ssl_params=None, ssl_verify_peer=False):
    """Describes an HTTP request.

    @type host: string
    @param host: Hostname
    @type port: int
    @param port: Port
    @type method: string
    @param method: Method name
    @type path: string
    @param path: Request path
    @type headers: dict or None
    @param headers: Additional headers to send
    @type post_data: string or None
    @param post_data: Additional data to send
    @type ssl_params: HttpSslParams
    @param ssl_params: SSL key and certificate
    @type ssl_verify_peer: bool
Iustin Pop's avatar
Iustin Pop committed
58
59
    @param ssl_verify_peer: Whether to compare our certificate with
        server's certificate
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133

    """
    if post_data is not None:
      assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
        "Only POST and GET requests support sending data"

    assert path.startswith("/"), "Path must start with slash (/)"

    # Request attributes
    self.host = host
    self.port = port
    self.ssl_params = ssl_params
    self.ssl_verify_peer = ssl_verify_peer
    self.method = method
    self.path = path
    self.headers = headers
    self.post_data = post_data

    self.success = None
    self.error = None

    # Raw response
    self.response = None

    # Response attributes
    self.resp_version = None
    self.resp_status_code = None
    self.resp_reason = None
    self.resp_headers = None
    self.resp_body = None


class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
  pass


class _HttpServerToClientMessageReader(http.HttpMessageReader):
  # Length limits
  START_LINE_LENGTH_MAX = 512
  HEADER_LENGTH_MAX = 4096

  def ParseStartLine(self, start_line):
    """Parses the status line sent by the server.

    """
    # Empty lines are skipped when reading
    assert start_line

    try:
      [version, status, reason] = start_line.split(None, 2)
    except ValueError:
      try:
        [version, status] = start_line.split(None, 1)
        reason = ""
      except ValueError:
        version = http.HTTP_0_9

    if version:
      version = version.upper()

    # The status code is a three-digit number
    try:
      status = int(status)
      if status < 100 or status > 999:
        status = -1
    except ValueError:
      status = -1

    if status == -1:
      raise http.HttpError("Invalid status code (%r)" % start_line)

    return http.HttpServerToClientStartLine(version, status, reason)


134
class HttpClientRequestExecutor(http.HttpBase):
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
  # Default headers
  DEFAULT_HEADERS = {
    http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
    # TODO: For keep-alive, don't send "Connection: close"
    http.HTTP_CONNECTION: "close",
    }

  # Timeouts in seconds for socket layer
  # TODO: Soft timeout instead of only socket timeout?
  # TODO: Make read timeout configurable per OpCode?
  CONNECT_TIMEOUT = 5
  WRITE_TIMEOUT = 10
  READ_TIMEOUT = None
  CLOSE_TIMEOUT = 1

  def __init__(self, req):
    """Initializes the HttpClientRequestExecutor class.

    @type req: HttpClientRequest
    @param req: Request object

    """
157
    http.HttpBase.__init__(self)
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
    self.request = req

    try:
      # TODO: Implement connection caching/keep-alive
      self.sock = self._CreateSocket(req.ssl_params,
                                     req.ssl_verify_peer)

      # Disable Python's timeout
      self.sock.settimeout(None)

      # Operate in non-blocking mode
      self.sock.setblocking(0)

      response_msg_reader = None
      response_msg = None
      force_close = True

      self._Connect()
      try:
        self._SendRequest()
        (response_msg_reader, response_msg) = self._ReadResponse()

        # Only wait for server to close if we didn't have any exception.
        force_close = False
      finally:
        # TODO: Keep-alive is not supported, always close connection
        force_close = True
185
186
187
        http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT,
                                self.WRITE_TIMEOUT, response_msg_reader,
                                force_close)
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236

      self.sock.close()
      self.sock = None

      req.response = response_msg

      req.resp_version = req.response.start_line.version
      req.resp_status_code = req.response.start_line.code
      req.resp_reason = req.response.start_line.reason
      req.resp_headers = req.response.headers
      req.resp_body = req.response.body

      req.success = True
      req.error = None

    except http.HttpError, err:
      req.success = False
      req.error = str(err)

  def _Connect(self):
    """Non-blocking connect to host with timeout.

    """
    connected = False
    while True:
      try:
        connect_error = self.sock.connect_ex((self.request.host,
                                              self.request.port))
      except socket.gaierror, err:
        raise http.HttpError("Connection failed: %s" % str(err))

      if connect_error == errno.EINTR:
        # Mask signals
        pass

      elif connect_error == 0:
        # Connection established
        connected = True
        break

      elif connect_error == errno.EINPROGRESS:
        # Connection started
        break

      raise http.HttpError("Connection failed (%s: %s)" %
                             (connect_error, os.strerror(connect_error)))

    if not connected:
      # Wait for connection
237
238
      event = http.WaitForSocketCondition(self.sock, select.POLLOUT,
                                          self.CONNECT_TIMEOUT)
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
      if event is None:
        raise http.HttpError("Timeout while connecting to server")

      # Get error code
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
      if connect_error != 0:
        raise http.HttpError("Connection failed (%s: %s)" %
                               (connect_error, os.strerror(connect_error)))

    # Enable TCP keep-alive
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)

    # If needed, Linux specific options are available to change the TCP
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
    # TCP_KEEPINTVL.

255
256
    # Do the secret SSL handshake
    if self.using_ssl:
Iustin Pop's avatar
Iustin Pop committed
257
      self.sock.set_connect_state() # pylint: disable-msg=E1103
258
      try:
259
        http.Handshake(self.sock, self.WRITE_TIMEOUT)
260
261
262
      except http.HttpSessionHandshakeUnexpectedEOF:
        raise http.HttpError("Server closed connection during SSL handshake")

263
264
265
266
267
268
269
270
271
272
  def _SendRequest(self):
    """Sends request to server.

    """
    # Headers
    send_headers = self.DEFAULT_HEADERS.copy()

    if self.request.headers:
      send_headers.update(self.request.headers)

273
274
    send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
                                              self.request.port)
275
276
277
278
279
280
281
282
283

    # Response message
    msg = http.HttpMessage()

    # Combine request line. We only support HTTP/1.0 (no chunked transfers and
    # no keep-alive).
    # TODO: For keep-alive, change to HTTP/1.1
    msg.start_line = \
      http.HttpClientToServerStartLine(method=self.request.method.upper(),
284
285
                                       path=self.request.path,
                                       version=http.HTTP_1_0)
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
    msg.headers = send_headers
    msg.body = self.request.post_data

    try:
      _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
    except http.HttpSocketTimeout:
      raise http.HttpError("Timeout while sending request")
    except socket.error, err:
      raise http.HttpError("Error sending request: %s" % err)

  def _ReadResponse(self):
    """Read response from server.

    """
    response_msg = http.HttpMessage()

    try:
      response_msg_reader = \
        _HttpServerToClientMessageReader(self.sock, response_msg,
                                         self.READ_TIMEOUT)
    except http.HttpSocketTimeout:
      raise http.HttpError("Timeout while reading response")
    except socket.error, err:
      raise http.HttpError("Error reading response: %s" % err)

    return (response_msg_reader, response_msg)


class _HttpClientPendingRequest(object):
  """Data class for pending requests.

  """
  def __init__(self, request):
    self.request = request

    # Thread synchronization
    self.done = threading.Event()


class HttpClientWorker(workerpool.BaseWorker):
  """HTTP client worker class.

  """
  def RunTask(self, pend_req):
    try:
      HttpClientRequestExecutor(pend_req.request)
    finally:
      pend_req.done.set()


class HttpClientWorkerPool(workerpool.WorkerPool):
  def __init__(self, manager):
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
                                   HttpClientWorker)
    self.manager = manager


class HttpClientManager(object):
  """Manages HTTP requests.

  """
  def __init__(self):
    self._wpool = HttpClientWorkerPool(self)

  def __del__(self):
    self.Shutdown()

  def ExecRequests(self, requests):
    """Execute HTTP requests.

    This function can be called from multiple threads at the same time.

    @type requests: List of HttpClientRequest instances
    @param requests: The requests to execute
    @rtype: List of HttpClientRequest instances
Iustin Pop's avatar
Iustin Pop committed
361
    @return: The list of requests passed in
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386

    """
    # _HttpClientPendingRequest is used for internal thread synchronization
    pending = [_HttpClientPendingRequest(req) for req in requests]

    try:
      # Add requests to queue
      for pend_req in pending:
        self._wpool.AddTask(pend_req)

    finally:
      # In case of an exception we should still wait for the rest, otherwise
      # another thread from the worker pool could modify the request object
      # after we returned.

      # And wait for them to finish
      for pend_req in pending:
        pend_req.done.wait()

    # Return original list
    return requests

  def Shutdown(self):
    self._wpool.Quiesce()
    self._wpool.TerminateWorkers()