Commit 02cab3e7 authored by Michael Hanselmann's avatar Michael Hanselmann

ganeti.http: Split HTTP server and client into separate files

This includes a large rewrite of the HTTP server code. The handling of
OpenSSL errors had some problems that were hard to fix with its
structure. When preparing all of this, I realized that actually HTTP
is a message protocol and that the same code can be used on both the
server and client side to parse requests/responses, with only a few
differences. There are still a few TODOs in the code, but none should
be a show stopper. Many pylint warnings have been fixed, too.

The old code will be removed once all users have been migrated.

Reviewed-by: amishchenko
parent 84f2756e
......@@ -101,7 +101,9 @@ rapi_PYTHON = \
lib/rapi/rlib2.py
http_PYTHON = \
lib/http/__init__.py
lib/http/__init__.py \
lib/http/client.py \
lib/http/server.py
docsgml = \
......
......@@ -18,7 +18,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""HTTP server module.
"""HTTP module.
"""
......@@ -30,10 +30,8 @@ import OpenSSL
import os
import select
import socket
import sys
import time
import signal
import logging
import errno
import threading
......@@ -103,6 +101,14 @@ class SocketClosed(socket.error):
pass
class HttpError(Exception):
"""Internal exception for HTTP errors.
This should only be used for internal error reporting.
"""
class _HttpClientError(Exception):
"""Internal exception for HTTP client errors.
......@@ -330,6 +336,34 @@ def SocketOperation(poller, sock, op, arg1, timeout):
raise
def ShutdownConnection(poller, sock, close_timeout, write_timeout, msgreader,
force):
"""Closes the connection.
"""
poller = select.poll()
#print msgreader.peer_will_close, force
if msgreader and msgreader.peer_will_close and not force:
# Wait for peer to close
try:
# Check whether it's actually closed
if not SocketOperation(poller, sock, SOCKOP_RECV, 1, close_timeout):
return
except (socket.error, HttpError, HttpSocketTimeout):
# Ignore errors at this stage
pass
# Close the connection from our side
try:
SocketOperation(poller, sock, SOCKOP_SHUTDOWN, socket.SHUT_RDWR,
write_timeout)
except HttpSocketTimeout:
raise HttpError("Timeout while shutting down connection")
except socket.error, err:
raise HttpError("Error while shutting down connection: %s" % err)
class HttpSslParams(object):
"""Data class for SSL key and certificate.
......@@ -1475,3 +1509,362 @@ class _SSLFileObject(object):
except OpenSSL.SSL.Error, err:
self._ConnectionLost()
raise socket.error(err.args)
class HttpMessage(object):
"""Data structure for HTTP message.
"""
def __init__(self):
self.start_line = None
self.headers = None
self.body = None
self.decoded_body = None
class HttpClientToServerStartLine(object):
"""Data structure for HTTP request start line.
"""
def __init__(self, method, path, version):
self.method = method
self.path = path
self.version = version
def __str__(self):
return "%s %s %s" % (self.method, self.path, self.version)
class HttpServerToClientStartLine(object):
"""Data structure for HTTP response start line.
"""
def __init__(self, version, code, reason):
self.version = version
self.code = code
self.reason = reason
def __str__(self):
return "%s %s %s" % (self.version, self.code, self.reason)
class HttpMessageWriter(object):
"""Writes an HTTP message to a socket.
"""
def __init__(self, sock, msg, write_timeout):
self._msg = msg
self._PrepareMessage()
buf = self._FormatMessage()
poller = select.poll()
while buf:
# Send only 4 KB at a time
data = buf[:4096]
sent = SocketOperation(poller, sock, SOCKOP_SEND, data,
write_timeout)
# Remove sent bytes
buf = buf[sent:]
assert not buf, "Message wasn't sent completely"
def _PrepareMessage(self):
"""Prepares the HTTP message by setting mandatory headers.
"""
# RFC2616, section 4.3: "The presence of a message-body in a request is
# signaled by the inclusion of a Content-Length or Transfer-Encoding header
# field in the request's message-headers."
if self._msg.body:
self._msg.headers[HTTP_CONTENT_LENGTH] = len(self._msg.body)
def _FormatMessage(self):
"""Serializes the HTTP message into a string.
"""
buf = StringIO()
# Add start line
buf.write(str(self._msg.start_line))
buf.write("\r\n")
# Add headers
if self._msg.start_line.version != HTTP_0_9:
for name, value in self._msg.headers.iteritems():
buf.write("%s: %s\r\n" % (name, value))
buf.write("\r\n")
# Add message body if needed
if self.HasMessageBody():
buf.write(self._msg.body)
elif self._msg.body:
logging.warning("Ignoring message body")
return buf.getvalue()
def HasMessageBody(self):
"""Checks whether the HTTP message contains a body.
Can be overriden by subclasses.
"""
return bool(self._msg.body)
class HttpMessageReader(object):
"""Reads HTTP message from socket.
"""
# Length limits
START_LINE_LENGTH_MAX = None
HEADER_LENGTH_MAX = None
# Parser state machine
PS_START_LINE = "start-line"
PS_HEADERS = "headers"
PS_BODY = "entity-body"
PS_COMPLETE = "complete"
def __init__(self, sock, msg, read_timeout):
self.sock = sock
self.msg = msg
self.poller = select.poll()
self.start_line_buffer = None
self.header_buffer = StringIO()
self.body_buffer = StringIO()
self.parser_status = self.PS_START_LINE
self.content_length = None
self.peer_will_close = None
buf = ""
eof = False
while self.parser_status != self.PS_COMPLETE:
data = SocketOperation(self.poller, sock, SOCKOP_RECV, 4096,
read_timeout)
if data:
buf += data
else:
eof = True
# Do some parsing and error checking while more data arrives
buf = self._ContinueParsing(buf, eof)
# Must be done only after the buffer has been evaluated
# TODO: Connection-length < len(data read) and connection closed
if (eof and
self.parser_status in (self.PS_START_LINE,
self.PS_HEADERS)):
raise HttpError("Connection closed prematurely")
# Parse rest
buf = self._ContinueParsing(buf, True)
assert self.parser_status == self.PS_COMPLETE
assert not buf, "Parser didn't read full response"
msg.body = self.body_buffer.getvalue()
# TODO: Content-type, error handling
if msg.body:
msg.decoded_body = HttpJsonConverter().Decode(msg.body)
else:
msg.decoded_body = None
if msg.decoded_body:
logging.debug("Message body: %s", msg.decoded_body)
def _ContinueParsing(self, buf, eof):
"""Main function for HTTP message state machine.
@type buf: string
@param buf: Receive buffer
@type eof: bool
@param eof: Whether we've reached EOF on the socket
@rtype: string
@return: Updated receive buffer
"""
if self.parser_status == self.PS_START_LINE:
# Expect start line
while True:
idx = buf.find("\r\n")
# RFC2616, section 4.1: "In the interest of robustness, servers SHOULD
# ignore any empty line(s) received where a Request-Line is expected.
# In other words, if the server is reading the protocol stream at the
# beginning of a message and receives a CRLF first, it should ignore
# the CRLF."
if idx == 0:
# TODO: Limit number of CRLFs for safety?
buf = buf[:2]
continue
if idx > 0:
self.start_line_buffer = buf[:idx]
self._CheckStartLineLength(len(self.start_line_buffer))
# Remove status line, including CRLF
buf = buf[idx + 2:]
self.msg.start_line = self.ParseStartLine(self.start_line_buffer)
self.parser_status = self.PS_HEADERS
else:
# Check whether incoming data is getting too large, otherwise we just
# fill our read buffer.
self._CheckStartLineLength(len(buf))
break
# TODO: Handle messages without headers
if self.parser_status == self.PS_HEADERS:
# Wait for header end
idx = buf.find("\r\n\r\n")
if idx >= 0:
self.header_buffer.write(buf[:idx + 2])
self._CheckHeaderLength(self.header_buffer.tell())
# Remove headers, including CRLF
buf = buf[idx + 4:]
self._ParseHeaders()
self.parser_status = self.PS_BODY
else:
# Check whether incoming data is getting too large, otherwise we just
# fill our read buffer.
self._CheckHeaderLength(len(buf))
if self.parser_status == self.PS_BODY:
# TODO: Implement max size for body_buffer
self.body_buffer.write(buf)
buf = ""
# Check whether we've read everything
#
# RFC2616, section 4.4: "When a message-body is included with a message,
# the transfer-length of that body is determined by one of the following
# [...] 5. By the server closing the connection. (Closing the connection
# cannot be used to indicate the end of a request body, since that would
# leave no possibility for the server to send back a response.)"
if (eof or
self.content_length is None or
(self.content_length is not None and
self.body_buffer.tell() >= self.content_length)):
self.parser_status = self.PS_COMPLETE
return buf
def _CheckStartLineLength(self, length):
"""Limits the start line buffer size.
@type length: int
@param length: Buffer size
"""
if (self.START_LINE_LENGTH_MAX is not None and
length > self.START_LINE_LENGTH_MAX):
raise HttpError("Start line longer than %d chars" %
self.START_LINE_LENGTH_MAX)
def _CheckHeaderLength(self, length):
"""Limits the header buffer size.
@type length: int
@param length: Buffer size
"""
if (self.HEADER_LENGTH_MAX is not None and
length > self.HEADER_LENGTH_MAX):
raise HttpError("Headers longer than %d chars" % self.HEADER_LENGTH_MAX)
def ParseStartLine(self, start_line):
"""Parses the start line of a message.
Must be overriden by subclass.
@type start_line: string
@param start_line: Start line string
"""
raise NotImplementedError()
def _WillPeerCloseConnection(self):
"""Evaluate whether peer will close the connection.
@rtype: bool
@return: Whether peer will close the connection
"""
# RFC2616, section 14.10: "HTTP/1.1 defines the "close" connection option
# for the sender to signal that the connection will be closed after
# completion of the response. For example,
#
# Connection: close
#
# in either the request or the response header fields indicates that the
# connection SHOULD NOT be considered `persistent' (section 8.1) after the
# current request/response is complete."
hdr_connection = self.msg.headers.get(HTTP_CONNECTION, None)
if hdr_connection:
hdr_connection = hdr_connection.lower()
# An HTTP/1.1 server is assumed to stay open unless explicitly closed.
if self.msg.start_line.version == HTTP_1_1:
return (hdr_connection and "close" in hdr_connection)
# Some HTTP/1.0 implementations have support for persistent connections,
# using rules different than HTTP/1.1.
# For older HTTP, Keep-Alive indicates persistent connection.
if self.msg.headers.get(HTTP_KEEP_ALIVE):
return False
# At least Akamai returns a "Connection: Keep-Alive" header, which was
# supposed to be sent by the client.
if hdr_connection and "keep-alive" in hdr_connection:
return False
return True
def _ParseHeaders(self):
"""Parses the headers.
This function also adjusts internal variables based on header values.
RFC2616, section 4.3: "The presence of a message-body in a request is
signaled by the inclusion of a Content-Length or Transfer-Encoding header
field in the request's message-headers."
"""
# Parse headers
self.header_buffer.seek(0, 0)
self.msg.headers = mimetools.Message(self.header_buffer, 0)
self.peer_will_close = self._WillPeerCloseConnection()
# Do we have a Content-Length header?
hdr_content_length = self.msg.headers.get(HTTP_CONTENT_LENGTH, None)
if hdr_content_length:
try:
self.content_length = int(hdr_content_length)
except ValueError:
self.content_length = None
if self.content_length is not None and self.content_length < 0:
self.content_length = None
# if the connection remains open and a content-length was not provided,
# then assume that the connection WILL close.
if self.content_length is None:
self.peer_will_close = True
#
#
# Copyright (C) 2007, 2008 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""HTTP client module.
"""
import BaseHTTPServer
import cgi
import logging
import OpenSSL
import os
import select
import socket
import sys
import time
import signal
import errno
import threading
from ganeti import constants
from ganeti import serializer
from ganeti import workerpool
from ganeti import utils
from ganeti import http
HTTP_CLIENT_THREADS = 10
class HttpClientRequest(object):
def __init__(self, host, port, method, path, headers=None, post_data=None,
ssl_params=None, ssl_verify_peer=False):
"""Describes an HTTP request.
@type host: string
@param host: Hostname
@type port: int
@param port: Port
@type method: string
@param method: Method name
@type path: string
@param path: Request path
@type headers: dict or None
@param headers: Additional headers to send
@type post_data: string or None
@param post_data: Additional data to send
@type ssl_params: HttpSslParams
@param ssl_params: SSL key and certificate
@type ssl_verify_peer: bool
@param ssl_verify_peer: Whether to compare our certificate with server's
certificate
"""
if post_data is not None:
assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
"Only POST and GET requests support sending data"
assert path.startswith("/"), "Path must start with slash (/)"
# Request attributes
self.host = host
self.port = port
self.ssl_params = ssl_params
self.ssl_verify_peer = ssl_verify_peer
self.method = method
self.path = path
self.headers = headers
self.post_data = post_data
self.success = None
self.error = None
# Raw response
self.response = None
# Response attributes
self.resp_version = None
self.resp_status_code = None
self.resp_reason = None
self.resp_headers = None
self.resp_body = None
class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
pass
class _HttpServerToClientMessageReader(http.HttpMessageReader):
# Length limits
START_LINE_LENGTH_MAX = 512
HEADER_LENGTH_MAX = 4096
def ParseStartLine(self, start_line):
"""Parses the status line sent by the server.
"""
# Empty lines are skipped when reading
assert start_line
try:
[version, status, reason] = start_line.split(None, 2)
except ValueError:
try:
[version, status] = start_line.split(None, 1)
reason = ""
except ValueError:
version = http.HTTP_0_9
if version:
version = version.upper()
# The status code is a three-digit number
try:
status = int(status)
if status < 100 or status > 999:
status = -1
except ValueError:
status = -1
if status == -1:
raise http.HttpError("Invalid status code (%r)" % start_line)
return http.HttpServerToClientStartLine(version, status, reason)
class HttpClientRequestExecutor(http.HttpSocketBase):
# Default headers
DEFAULT_HEADERS = {
http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
# TODO: For keep-alive, don't send "Connection: close"
http.HTTP_CONNECTION: "close",
}
# Timeouts in seconds for socket layer
# TODO: Soft timeout instead of only socket timeout?
# TODO: Make read timeout configurable per OpCode?
CONNECT_TIMEOUT = 5
WRITE_TIMEOUT = 10
READ_TIMEOUT = None
CLOSE_TIMEOUT = 1
def __init__(self, req):
"""Initializes the HttpClientRequestExecutor class.
@type req: HttpClientRequest
@param req: Request object
"""
http.HttpSocketBase.__init__(self)
self.request = req
self.poller = select.poll()
try:
# TODO: Implement connection caching/keep-alive
self.sock = self._CreateSocket(req.ssl_params,
req.ssl_verify_peer)
# Disable Python's timeout
self.sock.settimeout(None)
# Operate in non-blocking mode
self.sock.setblocking(0)
response_msg_reader = None
response_msg = None
force_close = True
self._Connect()
try: