From 8a0b06d20d5873e35feea3821e71409add03d51e Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Thu, 6 Nov 2008 11:25:10 +0000 Subject: [PATCH] Add new HTTP client class It is based on the WorkerPool class which is already used for the job queue and master daemon. Each request must be encapsulated in an instance of HttpClientRequest, which will then be passed to HttpClientManager for processing. Upon completion, the request object contains the response or error message. While not yet enabled, the HTTP client implementation has been designed with HTTP/1.1 and keep-alive in mind. For now it only uses HTTP/1.0, though. HttpClientManager will likely need more changes when integrating with the RPC layer. SSL will also be integrated in a second step. Reviewed-by: iustinp, ultrotter --- lib/http.py | 553 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 553 insertions(+) diff --git a/lib/http.py b/lib/http.py index e3c52fd70..d6756ffc5 100644 --- a/lib/http.py +++ b/lib/http.py @@ -31,12 +31,18 @@ import sys import time import signal import logging +import errno + +from cStringIO import StringIO from ganeti import constants from ganeti import serializer +from ganeti import workerpool from ganeti import utils +HTTP_CLIENT_THREADS = 10 + HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] @@ -86,6 +92,10 @@ class SocketClosed(socket.error): pass +class ResponseError(Exception): + pass + + class HTTPException(Exception): code = None message = None @@ -640,6 +650,549 @@ class HttpServer(object): raise NotImplementedError() +class HttpClientRequest(object): + def __init__(self, host, port, method, path, headers=None, post_data=None): + """Describes an HTTP request. + + @type host: string + @param host: Hostname + @type port: int + @param port: Port + @type method: string + @param method: Method name + @type path: string + @param path: Request path + @type headers: dict or None + @param headers: Additional headers to send + @type post_data: string or None + @param post_data: Additional data to send + + """ + if post_data is not None: + assert method.upper() in (HTTP_POST, HTTP_PUT), \ + "Only POST and GET requests support sending data" + + assert path.startswith("/"), "Path must start with slash (/)" + + self.host = host + self.port = port + self.method = method + self.path = path + self.headers = headers + self.post_data = post_data + + self.success = None + self.error = None + + self.resp_status_line = None + self.resp_version = None + self.resp_status = None + self.resp_reason = None + self.resp_headers = None + self.resp_body = None + + +class HttpClientRequestExecutor(object): + # Default headers + DEFAULT_HEADERS = { + HTTP_USER_AGENT: HTTP_GANETI_VERSION, + # TODO: For keep-alive, don't send "Connection: close" + HTTP_CONNECTION: "close", + } + + # Length limits + STATUS_LINE_LENGTH_MAX = 512 + HEADER_LENGTH_MAX = 4 * 1024 + + # Timeouts in seconds + # TODO: Make read timeout configurable per OpCode + CONNECT_TIMEOUT = 5.0 + WRITE_TIMEOUT = 10 + READ_TIMEOUT = None + CLOSE_TIMEOUT = 1 + + # Parser state machine + PS_STATUS_LINE = "status-line" + PS_HEADERS = "headers" + PS_BODY = "body" + PS_COMPLETE = "complete" + + def __init__(self, req): + """Initializes the HttpClientRequestExecutor class. + + @type req: HttpClientRequest + @param req: Request object + + """ + self.request = req + + self.parser_status = self.PS_STATUS_LINE + self.header_buffer = StringIO() + self.body_buffer = StringIO() + self.content_length = None + self.server_will_close = None + + self.poller = select.poll() + + # TODO: SSL + + try: + # TODO: Implement connection caching/keep-alive + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # Disable Python's timeout + self.sock.settimeout(None) + + # Operate in non-blocking mode + self.sock.setblocking(0) + + force_close = True + self._Connect() + try: + self._SendRequest() + self._ReadResponse() + + # Only wait for server to close if we didn't have any exception. + force_close = False + finally: + self._CloseConnection(force_close) + + self.sock.close() + self.sock = None + + req.resp_body = self.body_buffer.getvalue() + + req.success = True + req.error = None + + except ResponseError, err: + req.success = False + req.error = str(err) + + def _BuildRequest(self): + """Build HTTP request. + + @rtype: string + @return: Complete request + + """ + # Headers + send_headers = self.DEFAULT_HEADERS.copy() + + if self.request.headers: + send_headers.update(req.headers) + + send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port) + + if self.request.post_data: + send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data) + + buf = StringIO() + + # Add request line. We only support HTTP/1.0 (no chunked transfers and no + # keep-alive). + # TODO: For keep-alive, change to HTTP/1.1 + buf.write("%s %s %s\r\n" % (self.request.method.upper(), + self.request.path, HTTP_1_0)) + + # Add headers + for name, value in send_headers.iteritems(): + buf.write("%s: %s\r\n" % (name, value)) + + buf.write("\r\n") + + if self.request.post_data: + buf.write(self.request.post_data) + + return buf.getvalue() + + def _ParseStatusLine(self): + """Parses the status line sent by the server. + + """ + line = self.request.resp_status_line + + if not line: + raise ResponseError("Empty status line") + + try: + [version, status, reason] = line.split(None, 2) + except ValueError: + try: + [version, status] = line.split(None, 1) + reason = "" + except ValueError: + version = HTTP_9_0 + + if version: + version = version.upper() + + if version not in (HTTP_1_0, HTTP_1_1): + # We do not support HTTP/0.9, despite the specification requiring it + # (RFC2616, section 19.6) + raise ResponseError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" % + line) + + # The status code is a three-digit number + try: + status = int(status) + if status < 100 or status > 999: + status = -1 + except ValueError: + status = -1 + + if status == -1: + raise ResponseError("Invalid status code (%r)" % line) + + self.request.resp_version = version + self.request.resp_status = status + self.request.resp_reason = reason + + def _WillServerCloseConnection(self): + """Evaluate whether server will close the connection. + + @rtype: bool + @return: Whether server will close the connection + + """ + hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None) + if hdr_connection: + hdr_connection = hdr_connection.lower() + + # An HTTP/1.1 server is assumed to stay open unless explicitly closed. + if self.request.resp_version == HTTP_1_1: + return (hdr_connection and "close" in hdr_connection) + + # Some HTTP/1.0 implementations have support for persistent connections, + # using rules different than HTTP/1.1. + + # For older HTTP, Keep-Alive indicates persistent connection. + if self.request.resp_headers.get(HTTP_KEEP_ALIVE): + return False + + # At least Akamai returns a "Connection: Keep-Alive" header, which was + # supposed to be sent by the client. + if hdr_connection and "keep-alive" in hdr_connection: + return False + + return True + + def _ParseHeaders(self): + """Parses the headers sent by the server. + + This function also adjusts internal variables based on the header values. + + """ + req = self.request + + # Parse headers + self.header_buffer.seek(0, 0) + req.resp_headers = mimetools.Message(self.header_buffer, 0) + + self.server_will_close = self._WillServerCloseConnection() + + # Do we have a Content-Length header? + hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None) + if hdr_content_length: + try: + self.content_length = int(hdr_content_length) + except ValueError: + pass + if self.content_length is not None and self.content_length < 0: + self.content_length = None + + # does the body have a fixed length? (of zero) + if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or + 100 <= req.resp_status < 200 or req.method == HTTP_HEAD): + self.content_length = 0 + + # if the connection remains open and a content-length was not provided, + # then assume that the connection WILL close. + if self.content_length is None: + self.server_will_close = True + + def _CheckStatusLineLength(self, length): + if length > self.STATUS_LINE_LENGTH_MAX: + raise ResponseError("Status line longer than %d chars" % + self.STATUS_LINE_LENGTH_MAX) + + def _CheckHeaderLength(self, length): + if length > self.HEADER_LENGTH_MAX: + raise ResponseError("Headers longer than %d chars" % + self.HEADER_LENGTH_MAX) + + def _ParseBuffer(self, buf, eof): + """Main function for HTTP response state machine. + + @type buf: string + @param buf: Receive buffer + @type eof: bool + @param eof: Whether we've reached EOF on the socket + @rtype: string + @return: Updated receive buffer + + """ + if self.parser_status == self.PS_STATUS_LINE: + # Expect status line + idx = buf.find("\r\n") + if idx >= 0: + self.request.resp_status_line = buf[:idx] + + self._CheckStatusLineLength(len(self.request.resp_status_line)) + + # Remove status line, including CRLF + buf = buf[idx + 2:] + + self._ParseStatusLine() + + self.parser_status = self.PS_HEADERS + else: + # Check whether incoming data is getting too large, otherwise we just + # fill our read buffer. + self._CheckStatusLineLength(len(buf)) + + if self.parser_status == self.PS_HEADERS: + # Wait for header end + idx = buf.find("\r\n\r\n") + if idx >= 0: + self.header_buffer.write(buf[:idx + 2]) + + self._CheckHeaderLength(self.header_buffer.tell()) + + # Remove headers, including CRLF + buf = buf[idx + 4:] + + self._ParseHeaders() + + self.parser_status = self.PS_BODY + else: + # Check whether incoming data is getting too large, otherwise we just + # fill our read buffer. + self._CheckHeaderLength(len(buf)) + + if self.parser_status == self.PS_BODY: + self.body_buffer.write(buf) + buf = "" + + # Check whether we've read everything + if (eof or + (self.content_length is not None and + self.body_buffer.tell() >= self.content_length)): + self.parser_status = self.PS_COMPLETE + + return buf + + def _WaitForCondition(self, event, timeout): + """Waits for a condition to occur on the socket. + + @type event: int + @param event: ORed condition (see select module) + @type timeout: float or None + @param timeout: Timeout in seconds + @rtype: int or None + @return: None for timeout, otherwise occured conditions + + """ + check = (event | select.POLLPRI | + select.POLLNVAL | select.POLLHUP | select.POLLERR) + + if timeout is not None: + # Poller object expects milliseconds + timeout *= 1000 + + self.poller.register(self.sock, event) + try: + while True: + # TODO: If the main thread receives a signal and we have no timeout, we + # could wait forever. This should check a global "quit" flag or + # something every so often. + io_events = self.poller.poll(timeout) + if io_events: + for (evfd, evcond) in io_events: + if evcond & check: + return evcond + else: + # Timeout + return None + finally: + self.poller.unregister(self.sock) + + def _Connect(self): + """Non-blocking connect to host with timeout. + + """ + connected = False + while True: + connect_error = self.sock.connect_ex((self.request.host, + self.request.port)) + if connect_error == errno.EINTR: + # Mask signals + pass + + elif connect_error == 0: + # Connection established + connected = True + break + + elif connect_error == errno.EINPROGRESS: + # Connection started + break + + raise ResponseError("Connection failed (%s: %s)" % + (connect_error, os.strerror(connect_error))) + + if not connected: + # Wait for connection + event = self._WaitForCondition(select.POLLOUT, self.CONNECT_TIMEOUT) + if event is None: + raise ResponseError("Timeout while connecting to server") + + # Get error code + connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + if connect_error != 0: + raise ResponseError("Connection failed (%s: %s)" % + (connect_error, os.strerror(connect_error))) + + # Enable TCP keep-alive + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + + # If needed, Linux specific options are available to change the TCP + # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and + # TCP_KEEPINTVL. + + def _SendRequest(self): + """Sends request to server. + + """ + buf = self._BuildRequest() + + while buf: + event = self._WaitForCondition(select.POLLOUT, self.WRITE_TIMEOUT) + if event is None: + raise ResponseError("Timeout while sending request") + + try: + # Only send 4 KB at a time + data = buf[:4096] + + sent = self.sock.send(data) + except socket.error, err: + if err.args and err.args[0] == errno.EAGAIN: + # Ignore EAGAIN + continue + + raise ResponseError("Sending request failed: %s" % str(err)) + + # Remove sent bytes + buf = buf[sent:] + + assert not buf, "Request wasn't sent completely" + + def _ReadResponse(self): + """Read response from server. + + Calls the parser function after reading a chunk of data. + + """ + buf = "" + eof = False + while self.parser_status != self.PS_COMPLETE: + event = self._WaitForCondition(select.POLLIN, self.READ_TIMEOUT) + if event is None: + raise ResponseError("Timeout while reading response") + + if event & (select.POLLIN | select.POLLPRI): + try: + data = self.sock.recv(4096) + except socket.error, err: + if err.args and err.args[0] == errno.EAGAIN: + # Ignore EAGAIN + continue + + raise ResponseError("Reading response failed: %s" % str(err)) + + if data: + buf += data + else: + eof = True + + if event & (select.POLLNVAL | select.POLLHUP | select.POLLERR): + eof = True + + # Do some parsing and error checking while more data arrives + buf = self._ParseBuffer(buf, eof) + + # Must be done only after the buffer has been evaluated + if (eof and + self.parser_status in (self.PS_STATUS_LINE, + self.PS_HEADERS)): + raise ResponseError("Connection closed prematurely") + + # Parse rest + buf = self._ParseBuffer(buf, True) + + assert self.parser_status == self.PS_COMPLETE + assert not buf, "Parser didn't read full response" + + def _CloseConnection(self, force): + """Closes the connection. + + """ + if self.server_will_close and not force: + # Wait for server to close + event = self._WaitForCondition(select.POLLIN, self.CLOSE_TIMEOUT) + if event is None: + # Server didn't close connection within CLOSE_TIMEOUT + pass + else: + try: + # Check whether it's actually closed + if not self.sock.recv(1): + return + except socket.error, err: + # Ignore errors at this stage + pass + + # Close the connection from our side + self.sock.shutdown(socket.SHUT_RDWR) + + +class HttpClientWorker(workerpool.BaseWorker): + """HTTP client worker class. + + """ + def RunTask(self, req): + HttpClientRequestExecutor(req) + + +class HttpClientWorkerPool(workerpool.WorkerPool): + def __init__(self, manager): + workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS, + HttpClientWorker) + self.manager = manager + + +class HttpClientManager(object): + def __init__(self): + self._wpool = HttpClientWorkerPool(self) + + def __del__(self): + self.Shutdown() + + def ExecRequests(self, requests): + # Add requests to queue + for req in requests: + self._wpool.AddTask(req) + + # And wait for them to finish + self._wpool.Quiesce() + + return requests + + def Shutdown(self): + self._wpool.TerminateWorkers() + + class _SSLFileObject(object): """Wrapper around socket._fileobject -- GitLab