Commit 7e5a6e86 authored by Guido Trotter's avatar Guido Trotter
Browse files

masterd: use AsyncTerminatedMessageStream for luxi



Each luxi connection now creates an asyncore MasterClientHandler (which
is an AsyncTerminatedMessageStream subclass, sending each message to a
client worker). This makes it harder to DOS the master daemon by just
creating luxi connections, as each of them will use memory and file
descriptors, but not a dedicated thread.

Each connection will only handle one message at a time.
Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parent 23e0ef8c
......@@ -34,10 +34,8 @@ import os
import pwd
import sys
import socket
import SocketServer
import time
import tempfile
import collections
import logging
from optparse import OptionParser
......@@ -66,14 +64,56 @@ EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
class ClientRequestWorker(workerpool.BaseWorker):
# pylint: disable-msg=W0221
def RunTask(self, server, request, client_address):
def RunTask(self, server, message, client):
"""Process the request.
"""
client_ops = ClientOps(server)
try:
server.request_handler_class(request, client_address, server)
finally:
request.close()
(method, args) = luxi.ParseRequest(message)
except luxi.ProtocolError, err:
logging.error("Protocol Error: %s", err)
client.close_log()
return
success = False
try:
result = client_ops.handle_request(method, args)
success = True
except errors.GenericError, err:
logging.exception("Unexpected exception")
success = False
result = errors.EncodeException(err)
except:
logging.exception("Unexpected exception")
err = sys.exc_info()
result = "Caught exception: %s" % str(err[1])
try:
reply = luxi.FormatResponse(success, result)
client.send_message(reply)
# awake the main thread so that it can write out the data.
server.awaker.signal()
except:
logging.exception("Send error")
client.close_log()
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
"""Handler for master peers.
"""
_MAX_UNHANDLED = 1
def __init__(self, server, connected_socket, client_address, family):
daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
client_address,
constants.LUXI_EOM,
family, self._MAX_UNHANDLED)
self.server = server
def handle_message(self, message, _):
self.server.request_workers.AddTask(self.server, message, self)
class MasterServer(daemon.AsyncStreamServer):
......@@ -83,32 +123,35 @@ class MasterServer(daemon.AsyncStreamServer):
master socket.
"""
def __init__(self, mainloop, address, handler_class, uid, gid):
family = socket.AF_UNIX
def __init__(self, mainloop, address, uid, gid):
"""MasterServer constructor
@type mainloop: ganeti.daemon.Mainloop
@param mainloop: Mainloop used to poll for I/O events
@param address: the unix socket address to bind the MasterServer to
@param handler_class: handler class for the connections
@param uid: The uid of the owner of the socket
@param gid: The gid of the owner of the socket
"""
temp_name = tempfile.mktemp(dir=os.path.dirname(address))
daemon.AsyncStreamServer.__init__(self, socket.AF_UNIX, temp_name)
daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
os.chmod(temp_name, 0770)
os.chown(temp_name, uid, gid)
os.rename(temp_name, address)
self.request_handler_class = handler_class
self.mainloop = mainloop
self.awaker = daemon.AsyncAwaker()
# We'll only start threads once we've forked.
self.context = None
self.request_workers = None
def handle_connection(self, connected_socket, client_address):
self.request_workers.AddTask(self, connected_socket, client_address)
# TODO: add connection count and limit the number of open connections to a
# maximum number to avoid breaking for lack of file descriptors or memory.
MasterClientHandler(self, connected_socket, client_address, self.family)
def setup_queue(self):
self.context = GanetiContext()
......@@ -132,54 +175,6 @@ class MasterServer(daemon.AsyncStreamServer):
self.context.jobqueue.Shutdown()
class ClientRqHandler(SocketServer.BaseRequestHandler):
"""Client handler"""
READ_SIZE = 4096
def setup(self):
# pylint: disable-msg=W0201
# setup() is the api for initialising for this class
self._buffer = ""
self._msgs = collections.deque()
self._ops = ClientOps(self.server)
def handle(self):
while True:
msg = self.read_message()
if msg is None:
logging.debug("client closed connection")
break
(method, args) = luxi.ParseRequest(msg)
success = False
try:
result = self._ops.handle_request(method, args)
success = True
except errors.GenericError, err:
logging.exception("Unexpected exception")
result = errors.EncodeException(err)
except:
logging.exception("Unexpected exception")
result = "Caught exception: %s" % str(sys.exc_info()[1])
self.send_message(luxi.FormatResponse(success, result))
def read_message(self):
while not self._msgs:
data = self.request.recv(self.READ_SIZE)
if not data:
return None
new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
self._buffer = new_msgs.pop()
self._msgs.extend(new_msgs)
return self._msgs.popleft()
def send_message(self, msg):
# TODO: sendall is not guaranteed to send everything
self.request.sendall(msg + constants.LUXI_EOM)
class ClientOps:
"""Class holding high-level client operations."""
def __init__(self, server):
......@@ -526,7 +521,7 @@ def ExecMasterd (options, args): # pylint: disable-msg=W0613
utils.RemoveFile(constants.MASTER_SOCKET)
mainloop = daemon.Mainloop()
master = MasterServer(mainloop, constants.MASTER_SOCKET, ClientRqHandler,
master = MasterServer(mainloop, constants.MASTER_SOCKET,
options.uid, options.gid)
try:
rpc.Init()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment