Commit 73a59d9e authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

ganeti.http: Move _SocketOperation to module-level function

This is a preparation step to move the HTTP server class to the
same model as the HTTP client (polling, non-blocking I/O, better
OpenSSL error handling).

Reviewed-by: ultrotter
parent f22c1cea
......@@ -90,6 +90,11 @@ HTTP_KEEP_ALIVE = "Keep-Alive"
_SSL_UNEXPECTED_EOF = "Unexpected EOF"
# Socket operations
(SOCKOP_SEND,
SOCKOP_RECV,
SOCKOP_SHUTDOWN) = range(3)
class SocketClosed(socket.error):
pass
......@@ -101,7 +106,14 @@ class _HttpClientError(Exception):
This should only be used for internal error reporting.
"""
pass
class _HttpSocketTimeout(Exception):
"""Internal exception for socket timeouts.
This should only be used for internal error reporting.
"""
class HTTPException(Exception):
......@@ -160,13 +172,13 @@ class HTTPJsonConverter:
return serializer.LoadJson(data)
def WaitForSocketCondition(sock, poller, event, timeout):
def WaitForSocketCondition(poller, sock, event, timeout):
"""Waits for a condition to occur on the socket.
@type sock: socket
@param socket: Wait for events on this socket
@type poller: select.Poller
@param poller: Poller object as created by select.poll()
@type sock: socket
@param socket: Wait for events on this socket
@type event: int
@param event: ORed condition (see select module)
@type timeout: float or None
......@@ -199,6 +211,122 @@ def WaitForSocketCondition(sock, poller, event, timeout):
poller.unregister(sock)
def SocketOperation(poller, sock, op, arg1, timeout):
"""Wrapper around socket functions.
This function abstracts error handling for socket operations, especially
for the complicated interaction with OpenSSL.
@type poller: select.Poller
@param poller: Poller object as created by select.poll()
@type sock: socket
@param socket: Socket for the operation
@type op: int
@param op: Operation to execute (SOCKOP_* constants)
@type arg1: any
@param arg1: Parameter for function (if needed)
@type timeout: None or float
@param timeout: Timeout in seconds or None
"""
# TODO: event_poll/event_check/override
if op == SOCKOP_SEND:
event_poll = select.POLLOUT
event_check = select.POLLOUT
elif op == SOCKOP_RECV:
event_poll = select.POLLIN
event_check = select.POLLIN | select.POLLPRI
elif op == SOCKOP_SHUTDOWN:
event_poll = None
event_check = None
# The timeout is only used when OpenSSL requests polling for a condition.
# It is not advisable to have no timeout for shutdown.
assert timeout
else:
raise AssertionError("Invalid socket operation")
# No override by default
event_override = 0
while True:
# Poll only for certain operations and when asked for by an override
if event_override or op in (SOCKOP_SEND, SOCKOP_RECV):
if event_override:
wait_for_event = event_override
else:
wait_for_event = event_poll
event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
if event is None:
raise _HttpSocketTimeout()
if (op == SOCKOP_RECV and
event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
return ""
if not event & wait_for_event:
continue
# Reset override
event_override = 0
try:
try:
if op == SOCKOP_SEND:
return sock.send(arg1)
elif op == SOCKOP_RECV:
return sock.recv(arg1)
elif op == SOCKOP_SHUTDOWN:
if isinstance(sock, OpenSSL.SSL.ConnectionType):
# PyOpenSSL's shutdown() doesn't take arguments
return sock.shutdown()
else:
return sock.shutdown(arg1)
except OpenSSL.SSL.WantWriteError:
# OpenSSL wants to write, poll for POLLOUT
event_override = select.POLLOUT
continue
except OpenSSL.SSL.WantReadError:
# OpenSSL wants to read, poll for POLLIN
event_override = select.POLLIN | select.POLLPRI
continue
except OpenSSL.SSL.WantX509LookupError:
continue
except OpenSSL.SSL.SysCallError, err:
if op == SOCKOP_SEND:
# arg1 is the data when writing
if err.args and err.args[0] == -1 and arg1 == "":
# errors when writing empty strings are expected
# and can be ignored
return 0
elif op == SOCKOP_RECV:
if err.args == (-1, _SSL_UNEXPECTED_EOF):
return ""
raise socket.error(err.args)
except OpenSSL.SSL.Error, err:
raise socket.error(err.args)
except socket.error, err:
if err.args and err.args[0] == errno.EAGAIN:
# Ignore EAGAIN
continue
raise
class HttpSslParams(object):
"""Data class for SSL key and certificate.
......@@ -773,12 +901,6 @@ class HttpClientRequestExecutor(_HttpSocketBase):
PS_BODY = "body"
PS_COMPLETE = "complete"
# Socket operations
(OP_SEND,
OP_RECV,
OP_CLOSE_CHECK,
OP_SHUTDOWN) = range(4)
def __init__(self, req):
"""Initializes the HttpClientRequestExecutor class.
......@@ -1045,116 +1167,6 @@ class HttpClientRequestExecutor(_HttpSocketBase):
return buf
def _SocketOperation(self, op, arg1, error_msg, timeout_msg):
"""Wrapper around socket functions.
This function abstracts error handling for socket operations, especially
for the complicated interaction with OpenSSL.
"""
if op == self.OP_SEND:
event_poll = select.POLLOUT
event_check = select.POLLOUT
timeout = self.WRITE_TIMEOUT
elif op in (self.OP_RECV, self.OP_CLOSE_CHECK):
event_poll = select.POLLIN
event_check = select.POLLIN | select.POLLPRI
if op == self.OP_CLOSE_CHECK:
timeout = self.CLOSE_TIMEOUT
else:
timeout = self.READ_TIMEOUT
elif op == self.OP_SHUTDOWN:
event_poll = None
event_check = None
# The timeout is only used when OpenSSL requests polling for a condition.
# It is not advisable to have no timeout for shutdown.
timeout = self.WRITE_TIMEOUT
else:
raise AssertionError("Invalid socket operation")
# No override by default
event_override = 0
while True:
# Poll only for certain operations and when asked for by an override
if (event_override or
op in (self.OP_SEND, self.OP_RECV, self.OP_CLOSE_CHECK)):
if event_override:
wait_for_event = event_override
else:
wait_for_event = event_poll
event = WaitForSocketCondition(self.sock, self.poller, wait_for_event,
timeout)
if event is None:
raise _HttpClientTimeout(timeout_msg)
if (op == self.OP_RECV and
event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
return ""
if not event & wait_for_event:
continue
# Reset override
event_override = 0
try:
try:
if op == self.OP_SEND:
return self.sock.send(arg1)
elif op in (self.OP_RECV, self.OP_CLOSE_CHECK):
return self.sock.recv(arg1)
elif op == self.OP_SHUTDOWN:
if self._using_ssl:
# PyOpenSSL's shutdown() doesn't take arguments
return self.sock.shutdown()
else:
return self.sock.shutdown(arg1)
except OpenSSL.SSL.WantWriteError:
# OpenSSL wants to write, poll for POLLOUT
event_override = select.POLLOUT
continue
except OpenSSL.SSL.WantReadError:
# OpenSSL wants to read, poll for POLLIN
event_override = select.POLLIN | select.POLLPRI
continue
except OpenSSL.SSL.WantX509LookupError:
continue
except OpenSSL.SSL.SysCallError, err:
if op == self.OP_SEND:
# arg1 is the data when writing
if err.args and err.args[0] == -1 and arg1 == "":
# errors when writing empty strings are expected
# and can be ignored
return 0
elif op == self.OP_RECV:
if err.args == (-1, _SSL_UNEXPECTED_EOF):
return ""
raise socket.error(err.args)
except OpenSSL.SSL.Error, err:
raise socket.error(err.args)
except socket.error, err:
if err.args and err.args[0] == errno.EAGAIN:
# Ignore EAGAIN
continue
raise _HttpClientError("%s: %s" % (error_msg, str(err)))
def _Connect(self):
"""Non-blocking connect to host with timeout.
......@@ -1185,7 +1197,7 @@ class HttpClientRequestExecutor(_HttpSocketBase):
if not connected:
# Wait for connection
event = WaitForSocketCondition(self.sock, self.poller,
event = WaitForSocketCondition(self.poller, self.sock,
select.POLLOUT, self.CONNECT_TIMEOUT)
if event is None:
raise _HttpClientError("Timeout while connecting to server")
......@@ -1213,9 +1225,13 @@ class HttpClientRequestExecutor(_HttpSocketBase):
# Send only 4 KB at a time
data = buf[:4096]
sent = self._SocketOperation(self.OP_SEND, data,
"Error while sending request",
"Timeout while sending request")
try:
sent = SocketOperation(self.poller, self.sock, SOCKOP_SEND, data,
self.WRITE_TIMEOUT)
except _HttpSocketTimeout:
raise _HttpClientError("Timeout while sending request")
except socket.error, err:
raise _HttpClientError("Error sending request: %s" % err)
# Remove sent bytes
buf = buf[sent:]
......@@ -1231,9 +1247,13 @@ class HttpClientRequestExecutor(_HttpSocketBase):
buf = ""
eof = False
while self.parser_status != self.PS_COMPLETE:
data = self._SocketOperation(self.OP_RECV, 4096,
"Error while reading response",
"Timeout while reading response")
try:
data = SocketOperation(self.poller, self.sock, SOCKOP_RECV, 4096,
self.READ_TIMEOUT)
except _HttpSocketTimeout:
raise _HttpClientError("Timeout while reading response")
except socket.error, err:
raise _HttpClientError("Error while reading response: %s" % err)
if data:
buf += data
......@@ -1263,17 +1283,21 @@ class HttpClientRequestExecutor(_HttpSocketBase):
# Wait for server to close
try:
# Check whether it's actually closed
if not self._SocketOperation(self.OP_CLOSE_CHECK, 1,
"Error", "Timeout"):
if not SocketOperation(self.poller, self.sock, SOCKOP_RECV, 1,
self.CLOSE_TIMEOUT):
return
except (socket.error, _HttpClientError):
except (socket.error, _HttpClientError, _HttpSocketTimeout):
# Ignore errors at this stage
pass
# Close the connection from our side
self._SocketOperation(self.OP_SHUTDOWN, socket.SHUT_RDWR,
"Error while shutting down connection",
"Timeout while shutting down connection")
try:
SocketOperation(self.poller, self.sock, SOCKOP_SHUTDOWN,
socket.SHUT_RDWR, self.WRITE_TIMEOUT)
except _HttpSocketTimeout:
raise _HttpClientError("Timeout while shutting down connection")
except socket.error, err:
raise _HttpClientError("Error while shutting down connection: %s" % err)
class _HttpClientPendingRequest(object):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment