Commit 81010134 authored by Iustin Pop's avatar Iustin Pop
Browse files

Break trunk by removing twisted

This patch switches from the twisted usage for inter-node protocol to
simple BaseHTTPServer/httplib. The patch has more deletions because we
use no authentication, no encryption at all.

As such, this is just for trunk, and only for testing. What it brings is
the ability to use the rpc library from within multiple threads in
parallel (or it should so).

Since the changes are very few and non-intrusive, they can be reverted
without impacting the rest of the code.

This passes burnin. QA was not tested.

Reviewed-by: imsnah
parent 84152b96
......@@ -28,6 +28,8 @@ import os
import sys
import resource
import traceback
import BaseHTTPServer
import simplejson
from optparse import OptionParser
......@@ -40,55 +42,61 @@ from ganeti import errors
from ganeti import ssconf
from ganeti import utils
from twisted.spread import pb
from twisted.internet import reactor
from twisted.cred import checkers, portal
from OpenSSL import SSL
class ServerContextFactory:
"""SSL context factory class that uses a given certificate.
"""
@staticmethod
def getContext():
"""Return a customized context.
The context will be set to use our certificate.
"""
ctx = SSL.Context(SSL.TLSv1_METHOD)
ctx.use_certificate_file(constants.SSL_CERT_FILE)
ctx.use_privatekey_file(constants.SSL_CERT_FILE)
return ctx
class ServerObject(pb.Avatar):
class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
"""The server implementation.
This class holds all methods exposed over the RPC interface.
"""
def __init__(self, name):
self.name = name
def perspectiveMessageReceived(self, broker, message, args, kw):
"""Custom message dispatching function.
This function overrides the pb.Avatar function in order to provide
a simple form of exception passing (as text only).
def do_PUT(self):
"""Handle a post request.
"""
args = broker.unserialize(args, self)
kw = broker.unserialize(kw, self)
method = getattr(self, "perspective_%s" % message)
tb = None
state = None
path = self.path
if path.startswith("/"):
path = path[1:]
mname = "perspective_%s" % path
if not hasattr(self, mname):
self.send_error(404)
return False
method = getattr(self, mname)
try:
state = method(*args, **kw)
except:
tb = traceback.format_exc()
body_length = int(self.headers.get('Content-Length', '0'))
except ValueError:
self.send_error(400, 'No Content-Length header or invalid format')
return False
return broker.serialize((tb, state), self, method, args, kw)
try:
body = self.rfile.read(body_length)
except socket.error, err:
logger.Error("Socket error while reading: %s" % str(err))
return
try:
params = simplejson.loads(body)
result = method(params)
payload = simplejson.dumps(result)
except Exception, err:
self.send_error(500, "Error: %s" % str(err))
return False
self.send_response(200)
self.send_header('Content-Length', str(len(payload)))
self.end_headers()
self.wfile.write(payload)
return True
def log_message(self, format, *args):
"""Log a request to the log.
This is the same as the parent, we just log somewhere else.
"""
msg = ("%s - - [%s] %s\n" %
(self.address_string(),
self.log_date_time_string(),
format % args))
logger.Debug(msg)
# the new block devices --------------------------
......@@ -487,21 +495,6 @@ class ServerObject(pb.Avatar):
return utils.TestDelay(duration)
class MyRealm:
"""Simple realm that forwards all requests to a ServerObject.
"""
__implements__ = portal.IRealm
def requestAvatar(self, avatarId, mind, *interfaces):
"""Return an avatar based on our ServerObject class.
"""
if pb.IPerspective not in interfaces:
raise NotImplementedError
return pb.IPerspective, ServerObject(avatarId), lambda:None
def ParseOptions():
"""Parse the command line options.
......@@ -550,11 +543,8 @@ def main():
logger.SetupLogging(twisted_workaround=True, debug=options.debug,
program="ganeti-noded")
p = portal.Portal(MyRealm())
p.registerChecker(
checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
reactor.run()
httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject)
httpd.serve_forever()
def createDaemon():
......
......@@ -26,43 +26,10 @@
# pylint: disable-msg=C0103
import os
import socket
import httplib
from twisted.internet.pollreactor import PollReactor
class ReReactor(PollReactor):
"""A re-startable Reactor implementation.
"""
def run(self, installSignalHandlers=1):
"""Custom run method.
This is customized run that, before calling Reactor.run, will
reinstall the shutdown events and re-create the threadpool in case
these are not present (as will happen on the second run of the
reactor).
"""
if not 'shutdown' in self._eventTriggers:
# the shutdown queue has been killed, we are most probably
# at the second run, thus recreate the queue
self.addSystemEventTrigger('during', 'shutdown', self.crash)
self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
if self.threadpool is not None and self.threadpool.joined == 1:
# in case the threadpool has been stopped, re-start it
# and add a trigger to stop it at reactor shutdown
self.threadpool.start()
self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
return PollReactor.run(self, installSignalHandlers)
import twisted.internet.main
twisted.internet.main.installReactor(ReReactor())
from twisted.spread import pb
from twisted.internet import reactor
from twisted.cred import credentials
from OpenSSL import SSL, crypto
import simplejson
from ganeti import logger
from ganeti import utils
......@@ -82,107 +49,41 @@ class NodeController:
def __init__(self, parent, node):
self.parent = parent
self.node = node
self.failed = False
def _check_end(self):
"""Stop the reactor if we got all the results.
"""
if len(self.parent.results) == len(self.parent.nc):
reactor.stop()
def cb_call(self, obj):
"""Callback for successful connect.
If the connect and login sequence succeeded, we proceed with
making the actual call.
"""
deferred = obj.callRemote(self.parent.procedure, self.parent.args)
deferred.addCallbacks(self.cb_done, self.cb_err2)
def cb_done(self, result):
"""Callback for successful call.
When we receive the result from a call, we check if it was an
error and if so we raise a generic RemoteError (we can't pass yet
the actual exception over). If there was no error, we store the
result.
"""
tb, self.parent.results[self.node] = result
self._check_end()
if tb:
raise errors.RemoteError("Remote procedure error calling %s on %s:"
"\n%s" % (self.parent.procedure,
self.node,
tb))
def cb_err1(self, reason):
"""Error callback for unsuccessful connect.
"""
logger.Error("caller_connect: could not connect to remote host %s,"
" reason %s" % (self.node, reason))
self.parent.results[self.node] = False
self._check_end()
def cb_err2(self, reason):
"""Error callback for unsuccessful call.
This is when the call didn't return anything, not even an error,
or when it time out, etc.
"""
logger.Error("caller_call: could not call %s on node %s,"
" reason %s" % (self.parent.procedure, self.node, reason))
self.parent.results[self.node] = False
self._check_end()
class MirrorContextFactory:
"""Certificate verifier factory.
This factory creates contexts that verify if the remote end has a
specific certificate (i.e. our own certificate).
The checks we do are that the PEM dump of the certificate is the
same as our own and (somewhat redundantly) that the SHA checksum is
the same.
"""
isClient = 1
def __init__(self):
self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
try:
fd = open(constants.SSL_CERT_FILE, 'r')
try:
data = fd.read(16384)
finally:
fd.close()
except EnvironmentError, err:
raise errors.ConfigurationError("missing SSL certificate: %s" %
str(err))
self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
self.mydigest = self.mycert.digest('SHA')
def verifier(self, conn, x509, errno, err_depth, retcode):
"""Certificate verify method.
hc.connect()
hc.putrequest('PUT', "/%s" % self.parent.procedure,
skip_accept_encoding=True)
hc.putheader('Content-Length', str(len(parent.body)))
hc.endheaders()
hc.send(parent.body)
except socket.error, err:
logger.Error("Error connecting to %s: %s" % (node, str(err)))
self.failed = True
def get_response(self):
"""Try to process the response from the node.
"""
if self.mydigest != x509.digest('SHA'):
if self.failed:
# we already failed in connect
return False
if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
resp = self.http_conn.getresponse()
if resp.status != 200:
return False
return True
def getContext(self):
"""Context generator.
try:
length = int(resp.getheader('Content-Length', '0'))
except ValueError:
return False
if not length:
logger.Error("Zero-length reply from %s" % self.node)
return False
payload = resp.read(length)
unload = simplejson.loads(payload)
return unload
"""
context = SSL.Context(SSL.TLSv1_METHOD)
context.set_verify(SSL.VERIFY_PEER, self.verifier)
return context
class Client:
"""RPC Client class.
......@@ -208,6 +109,7 @@ class Client:
self.results = {}
self.procedure = procedure
self.args = args
self.body = simplejson.dumps(args)
#--- generic connector -------------
......@@ -222,13 +124,7 @@ class Client:
"""Add a node to the target list.
"""
factory = pb.PBClientFactory()
self.nc[connect_node] = nc = NodeController(self, connect_node)
reactor.connectSSL(connect_node, self.port, factory,
MirrorContextFactory())
#d = factory.getRootObject()
d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
d.addCallbacks(nc.cb_call, nc.cb_err1)
def getresult(self):
"""Return the results of the call.
......@@ -243,8 +139,8 @@ class Client:
queued, otherwise it does nothing.
"""
if self.nc:
reactor.run()
for node, nc in self.nc.items():
self.results[node] = nc.get_response()
def call_volume_list(node_list, vg_name):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment