From 02cab3e7536e27274df63de4718ab39aa8ca7de8 Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <hansmi@google.com>
Date: Thu, 4 Dec 2008 15:23:38 +0000
Subject: [PATCH] ganeti.http: Split HTTP server and client into separate files

This includes a large rewrite of the HTTP server code. The handling of
OpenSSL errors had some problems that were hard to fix with its
structure. When preparing all of this, I realized that actually HTTP
is a message protocol and that the same code can be used on both the
server and client side to parse requests/responses, with only a few
differences. There are still a few TODOs in the code, but none should
be a show stopper. Many pylint warnings have been fixed, too.

The old code will be removed once all users have been migrated.

Reviewed-by: amishchenko
---
 Makefile.am          |   4 +-
 lib/http/__init__.py | 399 ++++++++++++++++++++++++++++++++++-
 lib/http/client.py   | 388 ++++++++++++++++++++++++++++++++++
 lib/http/server.py   | 484 +++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 1271 insertions(+), 4 deletions(-)
 create mode 100644 lib/http/client.py
 create mode 100644 lib/http/server.py

diff --git a/Makefile.am b/Makefile.am
index 9483ed586..e427760cd 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -101,7 +101,9 @@ rapi_PYTHON = \
 	lib/rapi/rlib2.py
 
 http_PYTHON = \
-	lib/http/__init__.py
+	lib/http/__init__.py \
+	lib/http/client.py \
+	lib/http/server.py
 
 
 docsgml = \
diff --git a/lib/http/__init__.py b/lib/http/__init__.py
index b4f8c3b34..6eb1eb01e 100644
--- a/lib/http/__init__.py
+++ b/lib/http/__init__.py
@@ -18,7 +18,7 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
 
-"""HTTP server module.
+"""HTTP module.
 
 """
 
@@ -30,10 +30,8 @@ import OpenSSL
 import os
 import select
 import socket
-import sys
 import time
 import signal
-import logging
 import errno
 import threading
 
@@ -103,6 +101,14 @@ class SocketClosed(socket.error):
   pass
 
 
+class HttpError(Exception):
+  """Internal exception for HTTP errors.
+
+  This should only be used for internal error reporting.
+
+  """
+
+
 class _HttpClientError(Exception):
   """Internal exception for HTTP client errors.
 
@@ -330,6 +336,34 @@ def SocketOperation(poller, sock, op, arg1, timeout):
       raise
 
 
+def ShutdownConnection(poller, sock, close_timeout, write_timeout, msgreader,
+                       force):
+  """Closes the connection.
+
+  """
+  poller = select.poll()
+
+  #print msgreader.peer_will_close, force
+  if msgreader and msgreader.peer_will_close and not force:
+    # Wait for peer to close
+    try:
+      # Check whether it's actually closed
+      if not SocketOperation(poller, sock, SOCKOP_RECV, 1, close_timeout):
+        return
+    except (socket.error, HttpError, HttpSocketTimeout):
+      # Ignore errors at this stage
+      pass
+
+  # Close the connection from our side
+  try:
+    SocketOperation(poller, sock, SOCKOP_SHUTDOWN, socket.SHUT_RDWR,
+                    write_timeout)
+  except HttpSocketTimeout:
+    raise HttpError("Timeout while shutting down connection")
+  except socket.error, err:
+    raise HttpError("Error while shutting down connection: %s" % err)
+
+
 class HttpSslParams(object):
   """Data class for SSL key and certificate.
 
@@ -1475,3 +1509,362 @@ class _SSLFileObject(object):
       except OpenSSL.SSL.Error, err:
         self._ConnectionLost()
         raise socket.error(err.args)
+
+
+class HttpMessage(object):
+  """Data structure for HTTP message.
+
+  """
+  def __init__(self):
+    self.start_line = None
+    self.headers = None
+    self.body = None
+    self.decoded_body = None
+
+
+class HttpClientToServerStartLine(object):
+  """Data structure for HTTP request start line.
+
+  """
+  def __init__(self, method, path, version):
+    self.method = method
+    self.path = path
+    self.version = version
+
+  def __str__(self):
+    return "%s %s %s" % (self.method, self.path, self.version)
+
+
+class HttpServerToClientStartLine(object):
+  """Data structure for HTTP response start line.
+
+  """
+  def __init__(self, version, code, reason):
+    self.version = version
+    self.code = code
+    self.reason = reason
+
+  def __str__(self):
+    return "%s %s %s" % (self.version, self.code, self.reason)
+
+
+class HttpMessageWriter(object):
+  """Writes an HTTP message to a socket.
+
+  """
+  def __init__(self, sock, msg, write_timeout):
+    self._msg = msg
+
+    self._PrepareMessage()
+
+    buf = self._FormatMessage()
+
+    poller = select.poll()
+    while buf:
+      # Send only 4 KB at a time
+      data = buf[:4096]
+
+      sent = SocketOperation(poller, sock, SOCKOP_SEND, data,
+                             write_timeout)
+
+      # Remove sent bytes
+      buf = buf[sent:]
+
+    assert not buf, "Message wasn't sent completely"
+
+  def _PrepareMessage(self):
+    """Prepares the HTTP message by setting mandatory headers.
+
+    """
+    # RFC2616, section 4.3: "The presence of a message-body in a request is
+    # signaled by the inclusion of a Content-Length or Transfer-Encoding header
+    # field in the request's message-headers."
+    if self._msg.body:
+      self._msg.headers[HTTP_CONTENT_LENGTH] = len(self._msg.body)
+
+  def _FormatMessage(self):
+    """Serializes the HTTP message into a string.
+
+    """
+    buf = StringIO()
+
+    # Add start line
+    buf.write(str(self._msg.start_line))
+    buf.write("\r\n")
+
+    # Add headers
+    if self._msg.start_line.version != HTTP_0_9:
+      for name, value in self._msg.headers.iteritems():
+        buf.write("%s: %s\r\n" % (name, value))
+
+    buf.write("\r\n")
+
+    # Add message body if needed
+    if self.HasMessageBody():
+      buf.write(self._msg.body)
+
+    elif self._msg.body:
+      logging.warning("Ignoring message body")
+
+    return buf.getvalue()
+
+  def HasMessageBody(self):
+    """Checks whether the HTTP message contains a body.
+
+    Can be overriden by subclasses.
+
+    """
+    return bool(self._msg.body)
+
+
+class HttpMessageReader(object):
+  """Reads HTTP message from socket.
+
+  """
+  # Length limits
+  START_LINE_LENGTH_MAX = None
+  HEADER_LENGTH_MAX = None
+
+  # Parser state machine
+  PS_START_LINE = "start-line"
+  PS_HEADERS = "headers"
+  PS_BODY = "entity-body"
+  PS_COMPLETE = "complete"
+
+  def __init__(self, sock, msg, read_timeout):
+    self.sock = sock
+    self.msg = msg
+
+    self.poller = select.poll()
+    self.start_line_buffer = None
+    self.header_buffer = StringIO()
+    self.body_buffer = StringIO()
+    self.parser_status = self.PS_START_LINE
+    self.content_length = None
+    self.peer_will_close = None
+
+    buf = ""
+    eof = False
+    while self.parser_status != self.PS_COMPLETE:
+      data = SocketOperation(self.poller, sock, SOCKOP_RECV, 4096,
+                             read_timeout)
+
+      if data:
+        buf += data
+      else:
+        eof = True
+
+      # Do some parsing and error checking while more data arrives
+      buf = self._ContinueParsing(buf, eof)
+
+      # Must be done only after the buffer has been evaluated
+      # TODO: Connection-length < len(data read) and connection closed
+      if (eof and
+          self.parser_status in (self.PS_START_LINE,
+                                 self.PS_HEADERS)):
+        raise HttpError("Connection closed prematurely")
+
+    # Parse rest
+    buf = self._ContinueParsing(buf, True)
+
+    assert self.parser_status == self.PS_COMPLETE
+    assert not buf, "Parser didn't read full response"
+
+    msg.body = self.body_buffer.getvalue()
+
+    # TODO: Content-type, error handling
+    if msg.body:
+      msg.decoded_body = HttpJsonConverter().Decode(msg.body)
+    else:
+      msg.decoded_body = None
+
+    if msg.decoded_body:
+      logging.debug("Message body: %s", msg.decoded_body)
+
+  def _ContinueParsing(self, buf, eof):
+    """Main function for HTTP message state machine.
+
+    @type buf: string
+    @param buf: Receive buffer
+    @type eof: bool
+    @param eof: Whether we've reached EOF on the socket
+    @rtype: string
+    @return: Updated receive buffer
+
+    """
+    if self.parser_status == self.PS_START_LINE:
+      # Expect start line
+      while True:
+        idx = buf.find("\r\n")
+
+        # RFC2616, section 4.1: "In the interest of robustness, servers SHOULD
+        # ignore any empty line(s) received where a Request-Line is expected.
+        # In other words, if the server is reading the protocol stream at the
+        # beginning of a message and receives a CRLF first, it should ignore
+        # the CRLF."
+        if idx == 0:
+          # TODO: Limit number of CRLFs for safety?
+          buf = buf[:2]
+          continue
+
+        if idx > 0:
+          self.start_line_buffer = buf[:idx]
+
+          self._CheckStartLineLength(len(self.start_line_buffer))
+
+          # Remove status line, including CRLF
+          buf = buf[idx + 2:]
+
+          self.msg.start_line = self.ParseStartLine(self.start_line_buffer)
+
+          self.parser_status = self.PS_HEADERS
+        else:
+          # Check whether incoming data is getting too large, otherwise we just
+          # fill our read buffer.
+          self._CheckStartLineLength(len(buf))
+
+        break
+
+    # TODO: Handle messages without headers
+    if self.parser_status == self.PS_HEADERS:
+      # Wait for header end
+      idx = buf.find("\r\n\r\n")
+      if idx >= 0:
+        self.header_buffer.write(buf[:idx + 2])
+
+        self._CheckHeaderLength(self.header_buffer.tell())
+
+        # Remove headers, including CRLF
+        buf = buf[idx + 4:]
+
+        self._ParseHeaders()
+
+        self.parser_status = self.PS_BODY
+      else:
+        # Check whether incoming data is getting too large, otherwise we just
+        # fill our read buffer.
+        self._CheckHeaderLength(len(buf))
+
+    if self.parser_status == self.PS_BODY:
+      # TODO: Implement max size for body_buffer
+      self.body_buffer.write(buf)
+      buf = ""
+
+      # Check whether we've read everything
+      #
+      # RFC2616, section 4.4: "When a message-body is included with a message,
+      # the transfer-length of that body is determined by one of the following
+      # [...] 5. By the server closing the connection. (Closing the connection
+      # cannot be used to indicate the end of a request body, since that would
+      # leave no possibility for the server to send back a response.)"
+      if (eof or
+          self.content_length is None or
+          (self.content_length is not None and
+           self.body_buffer.tell() >= self.content_length)):
+        self.parser_status = self.PS_COMPLETE
+
+    return buf
+
+  def _CheckStartLineLength(self, length):
+    """Limits the start line buffer size.
+
+    @type length: int
+    @param length: Buffer size
+
+    """
+    if (self.START_LINE_LENGTH_MAX is not None and
+        length > self.START_LINE_LENGTH_MAX):
+      raise HttpError("Start line longer than %d chars" %
+                       self.START_LINE_LENGTH_MAX)
+
+  def _CheckHeaderLength(self, length):
+    """Limits the header buffer size.
+
+    @type length: int
+    @param length: Buffer size
+
+    """
+    if (self.HEADER_LENGTH_MAX is not None and
+        length > self.HEADER_LENGTH_MAX):
+      raise HttpError("Headers longer than %d chars" % self.HEADER_LENGTH_MAX)
+
+  def ParseStartLine(self, start_line):
+    """Parses the start line of a message.
+
+    Must be overriden by subclass.
+
+    @type start_line: string
+    @param start_line: Start line string
+
+    """
+    raise NotImplementedError()
+
+  def _WillPeerCloseConnection(self):
+    """Evaluate whether peer will close the connection.
+
+    @rtype: bool
+    @return: Whether peer will close the connection
+
+    """
+    # RFC2616, section 14.10: "HTTP/1.1 defines the "close" connection option
+    # for the sender to signal that the connection will be closed after
+    # completion of the response. For example,
+    #
+    #        Connection: close
+    #
+    # in either the request or the response header fields indicates that the
+    # connection SHOULD NOT be considered `persistent' (section 8.1) after the
+    # current request/response is complete."
+
+    hdr_connection = self.msg.headers.get(HTTP_CONNECTION, None)
+    if hdr_connection:
+      hdr_connection = hdr_connection.lower()
+
+    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
+    if self.msg.start_line.version == HTTP_1_1:
+      return (hdr_connection and "close" in hdr_connection)
+
+    # Some HTTP/1.0 implementations have support for persistent connections,
+    # using rules different than HTTP/1.1.
+
+    # For older HTTP, Keep-Alive indicates persistent connection.
+    if self.msg.headers.get(HTTP_KEEP_ALIVE):
+      return False
+
+    # At least Akamai returns a "Connection: Keep-Alive" header, which was
+    # supposed to be sent by the client.
+    if hdr_connection and "keep-alive" in hdr_connection:
+      return False
+
+    return True
+
+  def _ParseHeaders(self):
+    """Parses the headers.
+
+    This function also adjusts internal variables based on header values.
+
+    RFC2616, section 4.3: "The presence of a message-body in a request is
+    signaled by the inclusion of a Content-Length or Transfer-Encoding header
+    field in the request's message-headers."
+
+    """
+    # Parse headers
+    self.header_buffer.seek(0, 0)
+    self.msg.headers = mimetools.Message(self.header_buffer, 0)
+
+    self.peer_will_close = self._WillPeerCloseConnection()
+
+    # Do we have a Content-Length header?
+    hdr_content_length = self.msg.headers.get(HTTP_CONTENT_LENGTH, None)
+    if hdr_content_length:
+      try:
+        self.content_length = int(hdr_content_length)
+      except ValueError:
+        self.content_length = None
+      if self.content_length is not None and self.content_length < 0:
+        self.content_length = None
+
+    # if the connection remains open and a content-length was not provided,
+    # then assume that the connection WILL close.
+    if self.content_length is None:
+      self.peer_will_close = True
diff --git a/lib/http/client.py b/lib/http/client.py
new file mode 100644
index 000000000..50c91722e
--- /dev/null
+++ b/lib/http/client.py
@@ -0,0 +1,388 @@
+#
+#
+
+# Copyright (C) 2007, 2008 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+"""HTTP client module.
+
+"""
+
+import BaseHTTPServer
+import cgi
+import logging
+import OpenSSL
+import os
+import select
+import socket
+import sys
+import time
+import signal
+import errno
+import threading
+
+from ganeti import constants
+from ganeti import serializer
+from ganeti import workerpool
+from ganeti import utils
+from ganeti import http
+
+
+HTTP_CLIENT_THREADS = 10
+
+
+class HttpClientRequest(object):
+  def __init__(self, host, port, method, path, headers=None, post_data=None,
+               ssl_params=None, ssl_verify_peer=False):
+    """Describes an HTTP request.
+
+    @type host: string
+    @param host: Hostname
+    @type port: int
+    @param port: Port
+    @type method: string
+    @param method: Method name
+    @type path: string
+    @param path: Request path
+    @type headers: dict or None
+    @param headers: Additional headers to send
+    @type post_data: string or None
+    @param post_data: Additional data to send
+    @type ssl_params: HttpSslParams
+    @param ssl_params: SSL key and certificate
+    @type ssl_verify_peer: bool
+    @param ssl_verify_peer: Whether to compare our certificate with server's
+                            certificate
+
+    """
+    if post_data is not None:
+      assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
+        "Only POST and GET requests support sending data"
+
+    assert path.startswith("/"), "Path must start with slash (/)"
+
+    # Request attributes
+    self.host = host
+    self.port = port
+    self.ssl_params = ssl_params
+    self.ssl_verify_peer = ssl_verify_peer
+    self.method = method
+    self.path = path
+    self.headers = headers
+    self.post_data = post_data
+
+    self.success = None
+    self.error = None
+
+    # Raw response
+    self.response = None
+
+    # Response attributes
+    self.resp_version = None
+    self.resp_status_code = None
+    self.resp_reason = None
+    self.resp_headers = None
+    self.resp_body = None
+
+
+class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
+  pass
+
+
+class _HttpServerToClientMessageReader(http.HttpMessageReader):
+  # Length limits
+  START_LINE_LENGTH_MAX = 512
+  HEADER_LENGTH_MAX = 4096
+
+  def ParseStartLine(self, start_line):
+    """Parses the status line sent by the server.
+
+    """
+    # Empty lines are skipped when reading
+    assert start_line
+
+    try:
+      [version, status, reason] = start_line.split(None, 2)
+    except ValueError:
+      try:
+        [version, status] = start_line.split(None, 1)
+        reason = ""
+      except ValueError:
+        version = http.HTTP_0_9
+
+    if version:
+      version = version.upper()
+
+    # The status code is a three-digit number
+    try:
+      status = int(status)
+      if status < 100 or status > 999:
+        status = -1
+    except ValueError:
+      status = -1
+
+    if status == -1:
+      raise http.HttpError("Invalid status code (%r)" % start_line)
+
+    return http.HttpServerToClientStartLine(version, status, reason)
+
+
+class HttpClientRequestExecutor(http.HttpSocketBase):
+  # Default headers
+  DEFAULT_HEADERS = {
+    http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
+    # TODO: For keep-alive, don't send "Connection: close"
+    http.HTTP_CONNECTION: "close",
+    }
+
+  # Timeouts in seconds for socket layer
+  # TODO: Soft timeout instead of only socket timeout?
+  # TODO: Make read timeout configurable per OpCode?
+  CONNECT_TIMEOUT = 5
+  WRITE_TIMEOUT = 10
+  READ_TIMEOUT = None
+  CLOSE_TIMEOUT = 1
+
+  def __init__(self, req):
+    """Initializes the HttpClientRequestExecutor class.
+
+    @type req: HttpClientRequest
+    @param req: Request object
+
+    """
+    http.HttpSocketBase.__init__(self)
+    self.request = req
+
+    self.poller = select.poll()
+
+    try:
+      # TODO: Implement connection caching/keep-alive
+      self.sock = self._CreateSocket(req.ssl_params,
+                                     req.ssl_verify_peer)
+
+      # Disable Python's timeout
+      self.sock.settimeout(None)
+
+      # Operate in non-blocking mode
+      self.sock.setblocking(0)
+
+      response_msg_reader = None
+      response_msg = None
+      force_close = True
+
+      self._Connect()
+      try:
+        self._SendRequest()
+        (response_msg_reader, response_msg) = self._ReadResponse()
+
+        # Only wait for server to close if we didn't have any exception.
+        force_close = False
+      finally:
+        # TODO: Keep-alive is not supported, always close connection
+        force_close = True
+        http.ShutdownConnection(self.poller, self.sock,
+                                self.CLOSE_TIMEOUT, self.WRITE_TIMEOUT,
+                                response_msg_reader, force_close)
+
+      self.sock.close()
+      self.sock = None
+
+      req.response = response_msg
+
+      req.resp_version = req.response.start_line.version
+      req.resp_status_code = req.response.start_line.code
+      req.resp_reason = req.response.start_line.reason
+      req.resp_headers = req.response.headers
+      req.resp_body = req.response.body
+
+      req.success = True
+      req.error = None
+
+    except http.HttpError, err:
+      req.success = False
+      req.error = str(err)
+
+  def _Connect(self):
+    """Non-blocking connect to host with timeout.
+
+    """
+    connected = False
+    while True:
+      try:
+        connect_error = self.sock.connect_ex((self.request.host,
+                                              self.request.port))
+      except socket.gaierror, err:
+        raise http.HttpError("Connection failed: %s" % str(err))
+
+      if connect_error == errno.EINTR:
+        # Mask signals
+        pass
+
+      elif connect_error == 0:
+        # Connection established
+        connected = True
+        break
+
+      elif connect_error == errno.EINPROGRESS:
+        # Connection started
+        break
+
+      raise http.HttpError("Connection failed (%s: %s)" %
+                             (connect_error, os.strerror(connect_error)))
+
+    if not connected:
+      # Wait for connection
+      event = http.WaitForSocketCondition(self.poller, self.sock,
+                                          select.POLLOUT, self.CONNECT_TIMEOUT)
+      if event is None:
+        raise http.HttpError("Timeout while connecting to server")
+
+      # Get error code
+      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+      if connect_error != 0:
+        raise http.HttpError("Connection failed (%s: %s)" %
+                               (connect_error, os.strerror(connect_error)))
+
+    # Enable TCP keep-alive
+    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+
+    # If needed, Linux specific options are available to change the TCP
+    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
+    # TCP_KEEPINTVL.
+
+  def _SendRequest(self):
+    """Sends request to server.
+
+    """
+    # Headers
+    send_headers = self.DEFAULT_HEADERS.copy()
+
+    if self.request.headers:
+      send_headers.update(self.request.headers)
+
+    send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
+
+    # Response message
+    msg = http.HttpMessage()
+
+    # Combine request line. We only support HTTP/1.0 (no chunked transfers and
+    # no keep-alive).
+    # TODO: For keep-alive, change to HTTP/1.1
+    msg.start_line = \
+      http.HttpClientToServerStartLine(method=self.request.method.upper(),
+                                       path=self.request.path, version=http.HTTP_1_0)
+    msg.headers = send_headers
+    msg.body = self.request.post_data
+
+    try:
+      _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
+    except http.HttpSocketTimeout:
+      raise http.HttpError("Timeout while sending request")
+    except socket.error, err:
+      raise http.HttpError("Error sending request: %s" % err)
+
+  def _ReadResponse(self):
+    """Read response from server.
+
+    """
+    response_msg = http.HttpMessage()
+
+    try:
+      response_msg_reader = \
+        _HttpServerToClientMessageReader(self.sock, response_msg,
+                                         self.READ_TIMEOUT)
+    except http.HttpSocketTimeout:
+      raise http.HttpError("Timeout while reading response")
+    except socket.error, err:
+      raise http.HttpError("Error reading response: %s" % err)
+
+    return (response_msg_reader, response_msg)
+
+
+class _HttpClientPendingRequest(object):
+  """Data class for pending requests.
+
+  """
+  def __init__(self, request):
+    self.request = request
+
+    # Thread synchronization
+    self.done = threading.Event()
+
+
+class HttpClientWorker(workerpool.BaseWorker):
+  """HTTP client worker class.
+
+  """
+  def RunTask(self, pend_req):
+    try:
+      HttpClientRequestExecutor(pend_req.request)
+    finally:
+      pend_req.done.set()
+
+
+class HttpClientWorkerPool(workerpool.WorkerPool):
+  def __init__(self, manager):
+    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
+                                   HttpClientWorker)
+    self.manager = manager
+
+
+class HttpClientManager(object):
+  """Manages HTTP requests.
+
+  """
+  def __init__(self):
+    self._wpool = HttpClientWorkerPool(self)
+
+  def __del__(self):
+    self.Shutdown()
+
+  def ExecRequests(self, requests):
+    """Execute HTTP requests.
+
+    This function can be called from multiple threads at the same time.
+
+    @type requests: List of HttpClientRequest instances
+    @param requests: The requests to execute
+    @rtype: List of HttpClientRequest instances
+    @returns: The list of requests passed in
+
+    """
+    # _HttpClientPendingRequest is used for internal thread synchronization
+    pending = [_HttpClientPendingRequest(req) for req in requests]
+
+    try:
+      # Add requests to queue
+      for pend_req in pending:
+        self._wpool.AddTask(pend_req)
+
+    finally:
+      # In case of an exception we should still wait for the rest, otherwise
+      # another thread from the worker pool could modify the request object
+      # after we returned.
+
+      # And wait for them to finish
+      for pend_req in pending:
+        pend_req.done.wait()
+
+    # Return original list
+    return requests
+
+  def Shutdown(self):
+    self._wpool.Quiesce()
+    self._wpool.TerminateWorkers()
diff --git a/lib/http/server.py b/lib/http/server.py
new file mode 100644
index 000000000..d6f2b8877
--- /dev/null
+++ b/lib/http/server.py
@@ -0,0 +1,484 @@
+#
+#
+
+# Copyright (C) 2007, 2008 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+"""HTTP server module.
+
+"""
+
+import BaseHTTPServer
+import cgi
+import logging
+import os
+import select
+import socket
+import time
+import signal
+
+from ganeti import constants
+from ganeti import serializer
+from ganeti import utils
+from ganeti import http
+
+
+WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
+MONTHNAME = [None,
+             'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
+             'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
+
+# Default error message
+DEFAULT_ERROR_CONTENT_TYPE = "text/html"
+DEFAULT_ERROR_MESSAGE = """\
+<html>
+<head>
+<title>Error response</title>
+</head>
+<body>
+<h1>Error response</h1>
+<p>Error code %(code)d.
+<p>Message: %(message)s.
+<p>Error code explanation: %(code)s = %(explain)s.
+</body>
+</html>
+"""
+
+
+def _DateTimeHeader():
+  """Return the current date and time formatted for a message header.
+
+  """
+  (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
+  return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
+          (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
+
+
+class _HttpServerRequest(object):
+  """Data structure for HTTP request on server side.
+
+  """
+  def __init__(self, request_msg):
+    # Request attributes
+    self.request_method = request_msg.start_line.method
+    self.request_path = request_msg.start_line.path
+    self.request_headers = request_msg.headers
+    self.request_body = request_msg.decoded_body
+
+    # Response attributes
+    self.resp_headers = {}
+
+
+class _HttpServerToClientMessageWriter(http.HttpMessageWriter):
+  """Writes an HTTP response to client.
+
+  """
+  def __init__(self, sock, request_msg, response_msg, write_timeout):
+    """TODO
+
+    """
+    self._request_msg = request_msg
+    self._response_msg = response_msg
+    http.HttpMessageWriter.__init__(self, sock, response_msg, write_timeout)
+
+  def HasMessageBody(self):
+    """Logic to detect whether response should contain a message body.
+
+    """
+    if self._request_msg.start_line:
+      request_method = self._request_msg.start_line.method
+    else:
+      request_method = None
+
+    response_code = self._response_msg.start_line.code
+
+    # RFC2616, section 4.3: "A message-body MUST NOT be included in a request
+    # if the specification of the request method (section 5.1.1) does not allow
+    # sending an entity-body in requests"
+    #
+    # RFC2616, section 9.4: "The HEAD method is identical to GET except that
+    # the server MUST NOT return a message-body in the response."
+    #
+    # RFC2616, section 10.2.5: "The 204 response MUST NOT include a
+    # message-body [...]"
+    #
+    # RFC2616, section 10.3.5: "The 304 response MUST NOT contain a
+    # message-body, [...]"
+
+    return (http.HttpMessageWriter.HasMessageBody(self) and
+            (request_method is not None and request_method != http.HTTP_HEAD) and
+            response_code >= http.HTTP_OK and
+            response_code not in (http.HTTP_NO_CONTENT, http.HTTP_NOT_MODIFIED))
+
+
+class _HttpClientToServerMessageReader(http.HttpMessageReader):
+  """Reads an HTTP request sent by client.
+
+  """
+  # Length limits
+  START_LINE_LENGTH_MAX = 4096
+  HEADER_LENGTH_MAX = 4096
+
+  def ParseStartLine(self, start_line):
+    """Parses the start line sent by client.
+
+    Example: "GET /index.html HTTP/1.1"
+
+    @type start_line: string
+    @param start_line: Start line
+
+    """
+    # Empty lines are skipped when reading
+    assert start_line
+
+    logging.debug("HTTP request: %s", start_line)
+
+    words = start_line.split()
+
+    if len(words) == 3:
+      [method, path, version] = words
+      if version[:5] != 'HTTP/':
+        raise http.HttpBadRequest("Bad request version (%r)" % version)
+
+      try:
+        base_version_number = version.split("/", 1)[1]
+        version_number = base_version_number.split(".")
+
+        # RFC 2145 section 3.1 says there can be only one "." and
+        #   - major and minor numbers MUST be treated as
+        #      separate integers;
+        #   - HTTP/2.4 is a lower version than HTTP/2.13, which in
+        #      turn is lower than HTTP/12.3;
+        #   - Leading zeros MUST be ignored by recipients.
+        if len(version_number) != 2:
+          raise http.HttpBadRequest("Bad request version (%r)" % version)
+
+        version_number = (int(version_number[0]), int(version_number[1]))
+      except (ValueError, IndexError):
+        raise http.HttpBadRequest("Bad request version (%r)" % version)
+
+      if version_number >= (2, 0):
+        raise http.HttpVersionNotSupported("Invalid HTTP Version (%s)" %
+                                      base_version_number)
+
+    elif len(words) == 2:
+      version = http.HTTP_0_9
+      [method, path] = words
+      if method != http.HTTP_GET:
+        raise http.HttpBadRequest("Bad HTTP/0.9 request type (%r)" % method)
+
+    else:
+      raise http.HttpBadRequest("Bad request syntax (%r)" % start_line)
+
+    return http.HttpClientToServerStartLine(method, path, version)
+
+
+class _HttpServerRequestExecutor(object):
+  """Implements server side of HTTP.
+
+  This class implements the server side of HTTP. It's based on code of Python's
+  BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
+  character encodings. Keep-alive connections are not supported.
+
+  """
+  # The default request version.  This only affects responses up until
+  # the point where the request line is parsed, so it mainly decides what
+  # the client gets back when sending a malformed request line.
+  # Most web servers default to HTTP 0.9, i.e. don't send a status line.
+  default_request_version = http.HTTP_0_9
+
+  # Error message settings
+  error_message_format = DEFAULT_ERROR_MESSAGE
+  error_content_type = DEFAULT_ERROR_CONTENT_TYPE
+
+  responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
+
+  # Timeouts in seconds for socket layer
+  WRITE_TIMEOUT = 10
+  READ_TIMEOUT = 10
+  CLOSE_TIMEOUT = 1
+
+  def __init__(self, server, sock, client_addr):
+    """Initializes this class.
+
+    """
+    self.server = server
+    self.sock = sock
+    self.client_addr = client_addr
+
+    self.poller = select.poll()
+
+    self.request_msg = http.HttpMessage()
+    self.response_msg = http.HttpMessage()
+
+    self.response_msg.start_line = \
+      http.HttpServerToClientStartLine(version=self.default_request_version,
+                                       code=None, reason=None)
+
+    # Disable Python's timeout
+    self.sock.settimeout(None)
+
+    # Operate in non-blocking mode
+    self.sock.setblocking(0)
+
+    logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
+    try:
+      request_msg_reader = None
+      force_close = True
+      try:
+        try:
+          try:
+            request_msg_reader = self._ReadRequest()
+            self._HandleRequest()
+
+            # Only wait for client to close if we didn't have any exception.
+            force_close = False
+          except http.HttpException, err:
+            self._SetErrorStatus(err)
+        finally:
+          # Try to send a response
+          self._SendResponse()
+      finally:
+        http.ShutdownConnection(self.poller, sock,
+                                self.CLOSE_TIMEOUT, self.WRITE_TIMEOUT,
+                                request_msg_reader, force_close)
+
+      self.sock.close()
+      self.sock = None
+    finally:
+      logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
+
+  def _ReadRequest(self):
+    """Reads a request sent by client.
+
+    """
+    try:
+      request_msg_reader = \
+        _HttpClientToServerMessageReader(self.sock, self.request_msg,
+                                         self.READ_TIMEOUT)
+    except http.HttpSocketTimeout:
+      raise http.HttpError("Timeout while reading request")
+    except socket.error, err:
+      raise http.HttpError("Error reading request: %s" % err)
+
+    self.response_msg.start_line.version = self.request_msg.start_line.version
+
+    return request_msg_reader
+
+  def _HandleRequest(self):
+    """Calls the handler function for the current request.
+
+    """
+    handler_context = _HttpServerRequest(self.request_msg)
+
+    try:
+      result = self.server.HandleRequest(handler_context)
+    except (http.HttpException, KeyboardInterrupt, SystemExit):
+      raise
+    except Exception, err:
+      logging.exception("Caught exception")
+      raise http.HttpInternalError(message=str(err))
+    except:
+      logging.exception("Unknown exception")
+      raise http.HttpInternalError(message="Unknown error")
+
+    # TODO: Content-type
+    encoder = http.HttpJsonConverter()
+    self.response_msg.start_line.code = http.HTTP_OK
+    self.response_msg.body = encoder.Encode(result)
+    self.response_msg.headers = handler_context.resp_headers
+    self.response_msg.headers[http.HTTP_CONTENT_TYPE] = encoder.CONTENT_TYPE
+
+  def _SendResponse(self):
+    """Sends the response to the client.
+
+    """
+    if self.response_msg.start_line.code is None:
+      return
+
+    if not self.response_msg.headers:
+      self.response_msg.headers = {}
+
+    self.response_msg.headers.update({
+      # TODO: Keep-alive is not supported
+      http.HTTP_CONNECTION: "close",
+      http.HTTP_DATE: _DateTimeHeader(),
+      http.HTTP_SERVER: http.HTTP_GANETI_VERSION,
+      })
+
+    # Get response reason based on code
+    response_code = self.response_msg.start_line.code
+    if response_code in self.responses:
+      response_reason = self.responses[response_code][0]
+    else:
+      response_reason = ""
+    self.response_msg.start_line.reason = response_reason
+
+    logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
+                 self.request_msg.start_line, response_code)
+
+    try:
+      _HttpServerToClientMessageWriter(self.sock, self.request_msg,
+                                       self.response_msg, self.WRITE_TIMEOUT)
+    except http.HttpSocketTimeout:
+      raise http.HttpError("Timeout while sending response")
+    except socket.error, err:
+      raise http.HttpError("Error sending response: %s" % err)
+
+  def _SetErrorStatus(self, err):
+    """Sets the response code and body from a HttpException.
+
+    @type err: HttpException
+    @param err: Exception instance
+
+    """
+    try:
+      (shortmsg, longmsg) = self.responses[err.code]
+    except KeyError:
+      shortmsg = longmsg = "Unknown"
+
+    if err.message:
+      message = err.message
+    else:
+      message = shortmsg
+
+    values = {
+      "code": err.code,
+      "message": cgi.escape(message),
+      "explain": longmsg,
+      }
+
+    self.response_msg.start_line.code = err.code
+    self.response_msg.headers = {
+      http.HTTP_CONTENT_TYPE: self.error_content_type,
+      }
+    self.response_msg.body = self.error_message_format % values
+
+
+class HttpServer(http.HttpSocketBase):
+  """Generic HTTP server class
+
+  Users of this class must subclass it and override the HandleRequest function.
+
+  """
+  MAX_CHILDREN = 20
+
+  def __init__(self, mainloop, local_address, port,
+               ssl_params=None, ssl_verify_peer=False):
+    """Initializes the HTTP server
+
+    @type mainloop: ganeti.daemon.Mainloop
+    @param mainloop: Mainloop used to poll for I/O events
+    @type local_addess: string
+    @param local_address: Local IP address to bind to
+    @type port: int
+    @param port: TCP port to listen on
+    @type ssl_params: HttpSslParams
+    @param ssl_params: SSL key and certificate
+    @type ssl_verify_peer: bool
+    @param ssl_verify_peer: Whether to require client certificate and compare
+                            it with our certificate
+
+    """
+    http.HttpSocketBase.__init__(self)
+
+    self.mainloop = mainloop
+    self.local_address = local_address
+    self.port = port
+
+    self.socket = self._CreateSocket(ssl_params, ssl_verify_peer)
+
+    # Allow port to be reused
+    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+    self._children = []
+
+    mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
+    mainloop.RegisterSignal(self)
+
+  def Start(self):
+    self.socket.bind((self.local_address, self.port))
+    self.socket.listen(5)
+
+  def Stop(self):
+    self.socket.close()
+
+  def OnIO(self, fd, condition):
+    if condition & select.POLLIN:
+      self._IncomingConnection()
+
+  def OnSignal(self, signum):
+    if signum == signal.SIGCHLD:
+      self._CollectChildren(True)
+
+  def _CollectChildren(self, quick):
+    """Checks whether any child processes are done
+
+    @type quick: bool
+    @param quick: Whether to only use non-blocking functions
+
+    """
+    if not quick:
+      # Don't wait for other processes if it should be a quick check
+      while len(self._children) > self.MAX_CHILDREN:
+        try:
+          # Waiting without a timeout brings us into a potential DoS situation.
+          # As soon as too many children run, we'll not respond to new
+          # requests. The real solution would be to add a timeout for children
+          # and killing them after some time.
+          pid, status = os.waitpid(0, 0)
+        except os.error:
+          pid = None
+        if pid and pid in self._children:
+          self._children.remove(pid)
+
+    for child in self._children:
+      try:
+        pid, status = os.waitpid(child, os.WNOHANG)
+      except os.error:
+        pid = None
+      if pid and pid in self._children:
+        self._children.remove(pid)
+
+  def _IncomingConnection(self):
+    """Called for each incoming connection
+
+    """
+    (connection, client_addr) = self.socket.accept()
+
+    self._CollectChildren(False)
+
+    pid = os.fork()
+    if pid == 0:
+      # Child process
+      try:
+        _HttpServerRequestExecutor(self, connection, client_addr)
+      except Exception:
+        logging.exception("Error while handling request from %s:%s",
+                          client_addr[0], client_addr[1])
+        os._exit(1)
+      os._exit(0)
+    else:
+      self._children.append(pid)
+
+  def HandleRequest(self, req):
+    """Handles a request.
+
+    Must be overriden by subclass.
+
+    """
+    raise NotImplementedError()
-- 
GitLab