diff --git a/lib/http.py b/lib/http.py index e3c52fd706c87af482159ae2c53428d330c161e5..d6756ffc5ea0e03398cbe4b7c3c8ffc620049582 100644 --- a/lib/http.py +++ b/lib/http.py @@ -31,12 +31,18 @@ import sys import time import signal import logging +import errno + +from cStringIO import StringIO from ganeti import constants from ganeti import serializer +from ganeti import workerpool from ganeti import utils +HTTP_CLIENT_THREADS = 10 + HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] @@ -86,6 +92,10 @@ class SocketClosed(socket.error): pass +class ResponseError(Exception): + pass + + class HTTPException(Exception): code = None message = None @@ -640,6 +650,549 @@ class HttpServer(object): raise NotImplementedError() +class HttpClientRequest(object): + def __init__(self, host, port, method, path, headers=None, post_data=None): + """Describes an HTTP request. + + @type host: string + @param host: Hostname + @type port: int + @param port: Port + @type method: string + @param method: Method name + @type path: string + @param path: Request path + @type headers: dict or None + @param headers: Additional headers to send + @type post_data: string or None + @param post_data: Additional data to send + + """ + if post_data is not None: + assert method.upper() in (HTTP_POST, HTTP_PUT), \ + "Only POST and GET requests support sending data" + + assert path.startswith("/"), "Path must start with slash (/)" + + self.host = host + self.port = port + self.method = method + self.path = path + self.headers = headers + self.post_data = post_data + + self.success = None + self.error = None + + self.resp_status_line = None + self.resp_version = None + self.resp_status = None + self.resp_reason = None + self.resp_headers = None + self.resp_body = None + + +class HttpClientRequestExecutor(object): + # Default headers + DEFAULT_HEADERS = { + HTTP_USER_AGENT: HTTP_GANETI_VERSION, + # TODO: For keep-alive, don't send "Connection: close" + HTTP_CONNECTION: "close", + } + + # Length limits + STATUS_LINE_LENGTH_MAX = 512 + HEADER_LENGTH_MAX = 4 * 1024 + + # Timeouts in seconds + # TODO: Make read timeout configurable per OpCode + CONNECT_TIMEOUT = 5.0 + WRITE_TIMEOUT = 10 + READ_TIMEOUT = None + CLOSE_TIMEOUT = 1 + + # Parser state machine + PS_STATUS_LINE = "status-line" + PS_HEADERS = "headers" + PS_BODY = "body" + PS_COMPLETE = "complete" + + def __init__(self, req): + """Initializes the HttpClientRequestExecutor class. + + @type req: HttpClientRequest + @param req: Request object + + """ + self.request = req + + self.parser_status = self.PS_STATUS_LINE + self.header_buffer = StringIO() + self.body_buffer = StringIO() + self.content_length = None + self.server_will_close = None + + self.poller = select.poll() + + # TODO: SSL + + try: + # TODO: Implement connection caching/keep-alive + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # Disable Python's timeout + self.sock.settimeout(None) + + # Operate in non-blocking mode + self.sock.setblocking(0) + + force_close = True + self._Connect() + try: + self._SendRequest() + self._ReadResponse() + + # Only wait for server to close if we didn't have any exception. + force_close = False + finally: + self._CloseConnection(force_close) + + self.sock.close() + self.sock = None + + req.resp_body = self.body_buffer.getvalue() + + req.success = True + req.error = None + + except ResponseError, err: + req.success = False + req.error = str(err) + + def _BuildRequest(self): + """Build HTTP request. + + @rtype: string + @return: Complete request + + """ + # Headers + send_headers = self.DEFAULT_HEADERS.copy() + + if self.request.headers: + send_headers.update(req.headers) + + send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port) + + if self.request.post_data: + send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data) + + buf = StringIO() + + # Add request line. We only support HTTP/1.0 (no chunked transfers and no + # keep-alive). + # TODO: For keep-alive, change to HTTP/1.1 + buf.write("%s %s %s\r\n" % (self.request.method.upper(), + self.request.path, HTTP_1_0)) + + # Add headers + for name, value in send_headers.iteritems(): + buf.write("%s: %s\r\n" % (name, value)) + + buf.write("\r\n") + + if self.request.post_data: + buf.write(self.request.post_data) + + return buf.getvalue() + + def _ParseStatusLine(self): + """Parses the status line sent by the server. + + """ + line = self.request.resp_status_line + + if not line: + raise ResponseError("Empty status line") + + try: + [version, status, reason] = line.split(None, 2) + except ValueError: + try: + [version, status] = line.split(None, 1) + reason = "" + except ValueError: + version = HTTP_9_0 + + if version: + version = version.upper() + + if version not in (HTTP_1_0, HTTP_1_1): + # We do not support HTTP/0.9, despite the specification requiring it + # (RFC2616, section 19.6) + raise ResponseError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" % + line) + + # The status code is a three-digit number + try: + status = int(status) + if status < 100 or status > 999: + status = -1 + except ValueError: + status = -1 + + if status == -1: + raise ResponseError("Invalid status code (%r)" % line) + + self.request.resp_version = version + self.request.resp_status = status + self.request.resp_reason = reason + + def _WillServerCloseConnection(self): + """Evaluate whether server will close the connection. + + @rtype: bool + @return: Whether server will close the connection + + """ + hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None) + if hdr_connection: + hdr_connection = hdr_connection.lower() + + # An HTTP/1.1 server is assumed to stay open unless explicitly closed. + if self.request.resp_version == HTTP_1_1: + return (hdr_connection and "close" in hdr_connection) + + # Some HTTP/1.0 implementations have support for persistent connections, + # using rules different than HTTP/1.1. + + # For older HTTP, Keep-Alive indicates persistent connection. + if self.request.resp_headers.get(HTTP_KEEP_ALIVE): + return False + + # At least Akamai returns a "Connection: Keep-Alive" header, which was + # supposed to be sent by the client. + if hdr_connection and "keep-alive" in hdr_connection: + return False + + return True + + def _ParseHeaders(self): + """Parses the headers sent by the server. + + This function also adjusts internal variables based on the header values. + + """ + req = self.request + + # Parse headers + self.header_buffer.seek(0, 0) + req.resp_headers = mimetools.Message(self.header_buffer, 0) + + self.server_will_close = self._WillServerCloseConnection() + + # Do we have a Content-Length header? + hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None) + if hdr_content_length: + try: + self.content_length = int(hdr_content_length) + except ValueError: + pass + if self.content_length is not None and self.content_length < 0: + self.content_length = None + + # does the body have a fixed length? (of zero) + if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or + 100 <= req.resp_status < 200 or req.method == HTTP_HEAD): + self.content_length = 0 + + # if the connection remains open and a content-length was not provided, + # then assume that the connection WILL close. + if self.content_length is None: + self.server_will_close = True + + def _CheckStatusLineLength(self, length): + if length > self.STATUS_LINE_LENGTH_MAX: + raise ResponseError("Status line longer than %d chars" % + self.STATUS_LINE_LENGTH_MAX) + + def _CheckHeaderLength(self, length): + if length > self.HEADER_LENGTH_MAX: + raise ResponseError("Headers longer than %d chars" % + self.HEADER_LENGTH_MAX) + + def _ParseBuffer(self, buf, eof): + """Main function for HTTP response state machine. + + @type buf: string + @param buf: Receive buffer + @type eof: bool + @param eof: Whether we've reached EOF on the socket + @rtype: string + @return: Updated receive buffer + + """ + if self.parser_status == self.PS_STATUS_LINE: + # Expect status line + idx = buf.find("\r\n") + if idx >= 0: + self.request.resp_status_line = buf[:idx] + + self._CheckStatusLineLength(len(self.request.resp_status_line)) + + # Remove status line, including CRLF + buf = buf[idx + 2:] + + self._ParseStatusLine() + + self.parser_status = self.PS_HEADERS + else: + # Check whether incoming data is getting too large, otherwise we just + # fill our read buffer. + self._CheckStatusLineLength(len(buf)) + + if self.parser_status == self.PS_HEADERS: + # Wait for header end + idx = buf.find("\r\n\r\n") + if idx >= 0: + self.header_buffer.write(buf[:idx + 2]) + + self._CheckHeaderLength(self.header_buffer.tell()) + + # Remove headers, including CRLF + buf = buf[idx + 4:] + + self._ParseHeaders() + + self.parser_status = self.PS_BODY + else: + # Check whether incoming data is getting too large, otherwise we just + # fill our read buffer. + self._CheckHeaderLength(len(buf)) + + if self.parser_status == self.PS_BODY: + self.body_buffer.write(buf) + buf = "" + + # Check whether we've read everything + if (eof or + (self.content_length is not None and + self.body_buffer.tell() >= self.content_length)): + self.parser_status = self.PS_COMPLETE + + return buf + + def _WaitForCondition(self, event, timeout): + """Waits for a condition to occur on the socket. + + @type event: int + @param event: ORed condition (see select module) + @type timeout: float or None + @param timeout: Timeout in seconds + @rtype: int or None + @return: None for timeout, otherwise occured conditions + + """ + check = (event | select.POLLPRI | + select.POLLNVAL | select.POLLHUP | select.POLLERR) + + if timeout is not None: + # Poller object expects milliseconds + timeout *= 1000 + + self.poller.register(self.sock, event) + try: + while True: + # TODO: If the main thread receives a signal and we have no timeout, we + # could wait forever. This should check a global "quit" flag or + # something every so often. + io_events = self.poller.poll(timeout) + if io_events: + for (evfd, evcond) in io_events: + if evcond & check: + return evcond + else: + # Timeout + return None + finally: + self.poller.unregister(self.sock) + + def _Connect(self): + """Non-blocking connect to host with timeout. + + """ + connected = False + while True: + connect_error = self.sock.connect_ex((self.request.host, + self.request.port)) + if connect_error == errno.EINTR: + # Mask signals + pass + + elif connect_error == 0: + # Connection established + connected = True + break + + elif connect_error == errno.EINPROGRESS: + # Connection started + break + + raise ResponseError("Connection failed (%s: %s)" % + (connect_error, os.strerror(connect_error))) + + if not connected: + # Wait for connection + event = self._WaitForCondition(select.POLLOUT, self.CONNECT_TIMEOUT) + if event is None: + raise ResponseError("Timeout while connecting to server") + + # Get error code + connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + if connect_error != 0: + raise ResponseError("Connection failed (%s: %s)" % + (connect_error, os.strerror(connect_error))) + + # Enable TCP keep-alive + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + + # If needed, Linux specific options are available to change the TCP + # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and + # TCP_KEEPINTVL. + + def _SendRequest(self): + """Sends request to server. + + """ + buf = self._BuildRequest() + + while buf: + event = self._WaitForCondition(select.POLLOUT, self.WRITE_TIMEOUT) + if event is None: + raise ResponseError("Timeout while sending request") + + try: + # Only send 4 KB at a time + data = buf[:4096] + + sent = self.sock.send(data) + except socket.error, err: + if err.args and err.args[0] == errno.EAGAIN: + # Ignore EAGAIN + continue + + raise ResponseError("Sending request failed: %s" % str(err)) + + # Remove sent bytes + buf = buf[sent:] + + assert not buf, "Request wasn't sent completely" + + def _ReadResponse(self): + """Read response from server. + + Calls the parser function after reading a chunk of data. + + """ + buf = "" + eof = False + while self.parser_status != self.PS_COMPLETE: + event = self._WaitForCondition(select.POLLIN, self.READ_TIMEOUT) + if event is None: + raise ResponseError("Timeout while reading response") + + if event & (select.POLLIN | select.POLLPRI): + try: + data = self.sock.recv(4096) + except socket.error, err: + if err.args and err.args[0] == errno.EAGAIN: + # Ignore EAGAIN + continue + + raise ResponseError("Reading response failed: %s" % str(err)) + + if data: + buf += data + else: + eof = True + + if event & (select.POLLNVAL | select.POLLHUP | select.POLLERR): + eof = True + + # Do some parsing and error checking while more data arrives + buf = self._ParseBuffer(buf, eof) + + # Must be done only after the buffer has been evaluated + if (eof and + self.parser_status in (self.PS_STATUS_LINE, + self.PS_HEADERS)): + raise ResponseError("Connection closed prematurely") + + # Parse rest + buf = self._ParseBuffer(buf, True) + + assert self.parser_status == self.PS_COMPLETE + assert not buf, "Parser didn't read full response" + + def _CloseConnection(self, force): + """Closes the connection. + + """ + if self.server_will_close and not force: + # Wait for server to close + event = self._WaitForCondition(select.POLLIN, self.CLOSE_TIMEOUT) + if event is None: + # Server didn't close connection within CLOSE_TIMEOUT + pass + else: + try: + # Check whether it's actually closed + if not self.sock.recv(1): + return + except socket.error, err: + # Ignore errors at this stage + pass + + # Close the connection from our side + self.sock.shutdown(socket.SHUT_RDWR) + + +class HttpClientWorker(workerpool.BaseWorker): + """HTTP client worker class. + + """ + def RunTask(self, req): + HttpClientRequestExecutor(req) + + +class HttpClientWorkerPool(workerpool.WorkerPool): + def __init__(self, manager): + workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS, + HttpClientWorker) + self.manager = manager + + +class HttpClientManager(object): + def __init__(self): + self._wpool = HttpClientWorkerPool(self) + + def __del__(self): + self.Shutdown() + + def ExecRequests(self, requests): + # Add requests to queue + for req in requests: + self._wpool.AddTask(req) + + # And wait for them to finish + self._wpool.Quiesce() + + return requests + + def Shutdown(self): + self._wpool.TerminateWorkers() + + class _SSLFileObject(object): """Wrapper around socket._fileobject