diff --git a/daemons/ganeti-noded b/daemons/ganeti-noded index f344474aa49bdb6ffce604491f6bc4bfd2cad31c..3a2937407c77ff79a2b89ad0a39b4eb003e3d452 100755 --- a/daemons/ganeti-noded +++ b/daemons/ganeti-noded @@ -28,6 +28,8 @@ import os import sys import resource import traceback +import BaseHTTPServer +import simplejson from optparse import OptionParser @@ -40,55 +42,61 @@ from ganeti import errors from ganeti import ssconf from ganeti import utils -from twisted.spread import pb -from twisted.internet import reactor -from twisted.cred import checkers, portal -from OpenSSL import SSL - -class ServerContextFactory: - """SSL context factory class that uses a given certificate. - - """ - @staticmethod - def getContext(): - """Return a customized context. - - The context will be set to use our certificate. - - """ - ctx = SSL.Context(SSL.TLSv1_METHOD) - ctx.use_certificate_file(constants.SSL_CERT_FILE) - ctx.use_privatekey_file(constants.SSL_CERT_FILE) - return ctx - -class ServerObject(pb.Avatar): +class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """The server implementation. This class holds all methods exposed over the RPC interface. """ - def __init__(self, name): - self.name = name - - def perspectiveMessageReceived(self, broker, message, args, kw): - """Custom message dispatching function. - - This function overrides the pb.Avatar function in order to provide - a simple form of exception passing (as text only). + def do_PUT(self): + """Handle a post request. """ - args = broker.unserialize(args, self) - kw = broker.unserialize(kw, self) - method = getattr(self, "perspective_%s" % message) - tb = None - state = None + path = self.path + if path.startswith("/"): + path = path[1:] + mname = "perspective_%s" % path + if not hasattr(self, mname): + self.send_error(404) + return False + + method = getattr(self, mname) try: - state = method(*args, **kw) - except: - tb = traceback.format_exc() + body_length = int(self.headers.get('Content-Length', '0')) + except ValueError: + self.send_error(400, 'No Content-Length header or invalid format') + return False - return broker.serialize((tb, state), self, method, args, kw) + try: + body = self.rfile.read(body_length) + except socket.error, err: + logger.Error("Socket error while reading: %s" % str(err)) + return + try: + params = simplejson.loads(body) + result = method(params) + payload = simplejson.dumps(result) + except Exception, err: + self.send_error(500, "Error: %s" % str(err)) + return False + self.send_response(200) + self.send_header('Content-Length', str(len(payload))) + self.end_headers() + self.wfile.write(payload) + return True + + def log_message(self, format, *args): + """Log a request to the log. + + This is the same as the parent, we just log somewhere else. + + """ + msg = ("%s - - [%s] %s\n" % + (self.address_string(), + self.log_date_time_string(), + format % args)) + logger.Debug(msg) # the new block devices -------------------------- @@ -487,21 +495,6 @@ class ServerObject(pb.Avatar): return utils.TestDelay(duration) -class MyRealm: - """Simple realm that forwards all requests to a ServerObject. - - """ - __implements__ = portal.IRealm - - def requestAvatar(self, avatarId, mind, *interfaces): - """Return an avatar based on our ServerObject class. - - """ - if pb.IPerspective not in interfaces: - raise NotImplementedError - return pb.IPerspective, ServerObject(avatarId), lambda:None - - def ParseOptions(): """Parse the command line options. @@ -550,11 +543,8 @@ def main(): logger.SetupLogging(twisted_workaround=True, debug=options.debug, program="ganeti-noded") - p = portal.Portal(MyRealm()) - p.registerChecker( - checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata)) - reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory()) - reactor.run() + httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject) + httpd.serve_forever() def createDaemon(): diff --git a/lib/rpc.py b/lib/rpc.py index 7a1210745f604fbe2dccfae495bc35b35ee6359c..0e4b5dccc6e75c7578a80380152e39c749b9802e 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -26,43 +26,10 @@ # pylint: disable-msg=C0103 import os +import socket +import httplib -from twisted.internet.pollreactor import PollReactor - -class ReReactor(PollReactor): - """A re-startable Reactor implementation. - - """ - def run(self, installSignalHandlers=1): - """Custom run method. - - This is customized run that, before calling Reactor.run, will - reinstall the shutdown events and re-create the threadpool in case - these are not present (as will happen on the second run of the - reactor). - - """ - if not 'shutdown' in self._eventTriggers: - # the shutdown queue has been killed, we are most probably - # at the second run, thus recreate the queue - self.addSystemEventTrigger('during', 'shutdown', self.crash) - self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll) - if self.threadpool is not None and self.threadpool.joined == 1: - # in case the threadpool has been stopped, re-start it - # and add a trigger to stop it at reactor shutdown - self.threadpool.start() - self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop) - - return PollReactor.run(self, installSignalHandlers) - - -import twisted.internet.main -twisted.internet.main.installReactor(ReReactor()) - -from twisted.spread import pb -from twisted.internet import reactor -from twisted.cred import credentials -from OpenSSL import SSL, crypto +import simplejson from ganeti import logger from ganeti import utils @@ -82,107 +49,41 @@ class NodeController: def __init__(self, parent, node): self.parent = parent self.node = node + self.failed = False - def _check_end(self): - """Stop the reactor if we got all the results. - - """ - if len(self.parent.results) == len(self.parent.nc): - reactor.stop() - - def cb_call(self, obj): - """Callback for successful connect. - - If the connect and login sequence succeeded, we proceed with - making the actual call. - - """ - deferred = obj.callRemote(self.parent.procedure, self.parent.args) - deferred.addCallbacks(self.cb_done, self.cb_err2) - - def cb_done(self, result): - """Callback for successful call. - - When we receive the result from a call, we check if it was an - error and if so we raise a generic RemoteError (we can't pass yet - the actual exception over). If there was no error, we store the - result. - - """ - tb, self.parent.results[self.node] = result - self._check_end() - if tb: - raise errors.RemoteError("Remote procedure error calling %s on %s:" - "\n%s" % (self.parent.procedure, - self.node, - tb)) - - def cb_err1(self, reason): - """Error callback for unsuccessful connect. - - """ - logger.Error("caller_connect: could not connect to remote host %s," - " reason %s" % (self.node, reason)) - self.parent.results[self.node] = False - self._check_end() - - def cb_err2(self, reason): - """Error callback for unsuccessful call. - - This is when the call didn't return anything, not even an error, - or when it time out, etc. - - """ - logger.Error("caller_call: could not call %s on node %s," - " reason %s" % (self.parent.procedure, self.node, reason)) - self.parent.results[self.node] = False - self._check_end() - - -class MirrorContextFactory: - """Certificate verifier factory. - - This factory creates contexts that verify if the remote end has a - specific certificate (i.e. our own certificate). - - The checks we do are that the PEM dump of the certificate is the - same as our own and (somewhat redundantly) that the SHA checksum is - the same. - - """ - isClient = 1 - - def __init__(self): + self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port) try: - fd = open(constants.SSL_CERT_FILE, 'r') - try: - data = fd.read(16384) - finally: - fd.close() - except EnvironmentError, err: - raise errors.ConfigurationError("missing SSL certificate: %s" % - str(err)) - self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data) - self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert) - self.mydigest = self.mycert.digest('SHA') - - def verifier(self, conn, x509, errno, err_depth, retcode): - """Certificate verify method. + hc.connect() + hc.putrequest('PUT', "/%s" % self.parent.procedure, + skip_accept_encoding=True) + hc.putheader('Content-Length', str(len(parent.body))) + hc.endheaders() + hc.send(parent.body) + except socket.error, err: + logger.Error("Error connecting to %s: %s" % (node, str(err))) + self.failed = True + + def get_response(self): + """Try to process the response from the node. """ - if self.mydigest != x509.digest('SHA'): + if self.failed: + # we already failed in connect return False - if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem: + resp = self.http_conn.getresponse() + if resp.status != 200: return False - return True - - def getContext(self): - """Context generator. + try: + length = int(resp.getheader('Content-Length', '0')) + except ValueError: + return False + if not length: + logger.Error("Zero-length reply from %s" % self.node) + return False + payload = resp.read(length) + unload = simplejson.loads(payload) + return unload - """ - context = SSL.Context(SSL.TLSv1_METHOD) - context.set_verify(SSL.VERIFY_PEER, self.verifier) - return context class Client: """RPC Client class. @@ -208,6 +109,7 @@ class Client: self.results = {} self.procedure = procedure self.args = args + self.body = simplejson.dumps(args) #--- generic connector ------------- @@ -222,13 +124,7 @@ class Client: """Add a node to the target list. """ - factory = pb.PBClientFactory() self.nc[connect_node] = nc = NodeController(self, connect_node) - reactor.connectSSL(connect_node, self.port, factory, - MirrorContextFactory()) - #d = factory.getRootObject() - d = factory.login(credentials.UsernamePassword("master_node", self.nodepw)) - d.addCallbacks(nc.cb_call, nc.cb_err1) def getresult(self): """Return the results of the call. @@ -243,8 +139,8 @@ class Client: queued, otherwise it does nothing. """ - if self.nc: - reactor.run() + for node, nc in self.nc.items(): + self.results[node] = nc.get_response() def call_volume_list(node_list, vg_name):