From 81010134cfee8b3a744185398ab384ed2e5aebe5 Mon Sep 17 00:00:00 2001
From: Iustin Pop <iustin@google.com>
Date: Fri, 22 Feb 2008 12:39:34 +0000
Subject: [PATCH] Break trunk by removing twisted

This patch switches from the twisted usage for inter-node protocol to
simple BaseHTTPServer/httplib. The patch has more deletions because we
use no authentication, no encryption at all.

As such, this is just for trunk, and only for testing. What it brings is
the ability to use the rpc library from within multiple threads in
parallel (or it should so).

Since the changes are very few and non-intrusive, they can be reverted
without impacting the rest of the code.

This passes burnin. QA was not tested.

Reviewed-by: imsnah
---
 daemons/ganeti-noded | 108 ++++++++++++---------------
 lib/rpc.py           | 172 +++++++++----------------------------------
 2 files changed, 83 insertions(+), 197 deletions(-)

diff --git a/daemons/ganeti-noded b/daemons/ganeti-noded
index f344474aa..3a2937407 100755
--- a/daemons/ganeti-noded
+++ b/daemons/ganeti-noded
@@ -28,6 +28,8 @@ import os
 import sys
 import resource
 import traceback
+import BaseHTTPServer
+import simplejson
 
 from optparse import OptionParser
 
@@ -40,55 +42,61 @@ from ganeti import errors
 from ganeti import ssconf
 from ganeti import utils
 
-from twisted.spread import pb
-from twisted.internet import reactor
-from twisted.cred import checkers, portal
-from OpenSSL import SSL
 
-
-class ServerContextFactory:
-  """SSL context factory class that uses a given certificate.
-
-  """
-  @staticmethod
-  def getContext():
-    """Return a customized context.
-
-    The context will be set to use our certificate.
-
-    """
-    ctx = SSL.Context(SSL.TLSv1_METHOD)
-    ctx.use_certificate_file(constants.SSL_CERT_FILE)
-    ctx.use_privatekey_file(constants.SSL_CERT_FILE)
-    return ctx
-
-class ServerObject(pb.Avatar):
+class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
   """The server implementation.
 
   This class holds all methods exposed over the RPC interface.
 
   """
-  def __init__(self, name):
-    self.name = name
-
-  def perspectiveMessageReceived(self, broker, message, args, kw):
-    """Custom message dispatching function.
-
-    This function overrides the pb.Avatar function in order to provide
-    a simple form of exception passing (as text only).
+  def do_PUT(self):
+    """Handle a post request.
 
     """
-    args = broker.unserialize(args, self)
-    kw = broker.unserialize(kw, self)
-    method = getattr(self, "perspective_%s" % message)
-    tb = None
-    state = None
+    path = self.path
+    if path.startswith("/"):
+      path = path[1:]
+    mname = "perspective_%s" % path
+    if not hasattr(self, mname):
+      self.send_error(404)
+      return False
+
+    method = getattr(self, mname)
     try:
-      state = method(*args, **kw)
-    except:
-      tb = traceback.format_exc()
+      body_length = int(self.headers.get('Content-Length', '0'))
+    except ValueError:
+      self.send_error(400, 'No Content-Length header or invalid format')
+      return False
 
-    return broker.serialize((tb, state), self, method, args, kw)
+    try:
+      body = self.rfile.read(body_length)
+    except socket.error, err:
+      logger.Error("Socket error while reading: %s" % str(err))
+      return
+    try:
+      params = simplejson.loads(body)
+      result = method(params)
+      payload = simplejson.dumps(result)
+    except Exception, err:
+      self.send_error(500, "Error: %s" % str(err))
+      return False
+    self.send_response(200)
+    self.send_header('Content-Length', str(len(payload)))
+    self.end_headers()
+    self.wfile.write(payload)
+    return True
+
+  def log_message(self, format, *args):
+    """Log a request to the log.
+
+    This is the same as the parent, we just log somewhere else.
+
+    """
+    msg = ("%s - - [%s] %s\n" %
+           (self.address_string(),
+            self.log_date_time_string(),
+            format % args))
+    logger.Debug(msg)
 
   # the new block devices  --------------------------
 
@@ -487,21 +495,6 @@ class ServerObject(pb.Avatar):
     return utils.TestDelay(duration)
 
 
-class MyRealm:
-  """Simple realm that forwards all requests to a ServerObject.
-
-  """
-  __implements__ = portal.IRealm
-
-  def requestAvatar(self, avatarId, mind, *interfaces):
-    """Return an avatar based on our ServerObject class.
-
-    """
-    if pb.IPerspective not in interfaces:
-      raise NotImplementedError
-    return pb.IPerspective, ServerObject(avatarId), lambda:None
-
-
 def ParseOptions():
   """Parse the command line options.
 
@@ -550,11 +543,8 @@ def main():
   logger.SetupLogging(twisted_workaround=True, debug=options.debug,
                       program="ganeti-noded")
 
-  p = portal.Portal(MyRealm())
-  p.registerChecker(
-    checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
-  reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
-  reactor.run()
+  httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject)
+  httpd.serve_forever()
 
 
 def createDaemon():
diff --git a/lib/rpc.py b/lib/rpc.py
index 7a1210745..0e4b5dccc 100644
--- a/lib/rpc.py
+++ b/lib/rpc.py
@@ -26,43 +26,10 @@
 # pylint: disable-msg=C0103
 
 import os
+import socket
+import httplib
 
-from twisted.internet.pollreactor import PollReactor
-
-class ReReactor(PollReactor):
-  """A re-startable Reactor implementation.
-
-  """
-  def run(self, installSignalHandlers=1):
-    """Custom run method.
-
-    This is customized run that, before calling Reactor.run, will
-    reinstall the shutdown events and re-create the threadpool in case
-    these are not present (as will happen on the second run of the
-    reactor).
-
-    """
-    if not 'shutdown' in self._eventTriggers:
-      # the shutdown queue has been killed, we are most probably
-      # at the second run, thus recreate the queue
-      self.addSystemEventTrigger('during', 'shutdown', self.crash)
-      self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
-    if self.threadpool is not None and self.threadpool.joined == 1:
-      # in case the threadpool has been stopped, re-start it
-      # and add a trigger to stop it at reactor shutdown
-      self.threadpool.start()
-      self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
-
-    return PollReactor.run(self, installSignalHandlers)
-
-
-import twisted.internet.main
-twisted.internet.main.installReactor(ReReactor())
-
-from twisted.spread import pb
-from twisted.internet import reactor
-from twisted.cred import credentials
-from OpenSSL import SSL, crypto
+import simplejson
 
 from ganeti import logger
 from ganeti import utils
@@ -82,107 +49,41 @@ class NodeController:
   def __init__(self, parent, node):
     self.parent = parent
     self.node = node
+    self.failed = False
 
-  def _check_end(self):
-    """Stop the reactor if we got all the results.
-
-    """
-    if len(self.parent.results) == len(self.parent.nc):
-      reactor.stop()
-
-  def cb_call(self, obj):
-    """Callback for successful connect.
-
-    If the connect and login sequence succeeded, we proceed with
-    making the actual call.
-
-    """
-    deferred = obj.callRemote(self.parent.procedure, self.parent.args)
-    deferred.addCallbacks(self.cb_done, self.cb_err2)
-
-  def cb_done(self, result):
-    """Callback for successful call.
-
-    When we receive the result from a call, we check if it was an
-    error and if so we raise a generic RemoteError (we can't pass yet
-    the actual exception over). If there was no error, we store the
-    result.
-
-    """
-    tb, self.parent.results[self.node] = result
-    self._check_end()
-    if tb:
-      raise errors.RemoteError("Remote procedure error calling %s on %s:"
-                               "\n%s" % (self.parent.procedure,
-                                         self.node,
-                                         tb))
-
-  def cb_err1(self, reason):
-    """Error callback for unsuccessful connect.
-
-    """
-    logger.Error("caller_connect: could not connect to remote host %s,"
-                 " reason %s" % (self.node, reason))
-    self.parent.results[self.node] = False
-    self._check_end()
-
-  def cb_err2(self, reason):
-    """Error callback for unsuccessful call.
-
-    This is when the call didn't return anything, not even an error,
-    or when it time out, etc.
-
-    """
-    logger.Error("caller_call: could not call %s on node %s,"
-                 " reason %s" % (self.parent.procedure, self.node, reason))
-    self.parent.results[self.node] = False
-    self._check_end()
-
-
-class MirrorContextFactory:
-  """Certificate verifier factory.
-
-  This factory creates contexts that verify if the remote end has a
-  specific certificate (i.e. our own certificate).
-
-  The checks we do are that the PEM dump of the certificate is the
-  same as our own and (somewhat redundantly) that the SHA checksum is
-  the same.
-
-  """
-  isClient = 1
-
-  def __init__(self):
+    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
     try:
-      fd = open(constants.SSL_CERT_FILE, 'r')
-      try:
-        data = fd.read(16384)
-      finally:
-        fd.close()
-    except EnvironmentError, err:
-      raise errors.ConfigurationError("missing SSL certificate: %s" %
-                                      str(err))
-    self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
-    self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
-    self.mydigest = self.mycert.digest('SHA')
-
-  def verifier(self, conn, x509, errno, err_depth, retcode):
-    """Certificate verify method.
+      hc.connect()
+      hc.putrequest('PUT', "/%s" % self.parent.procedure,
+                    skip_accept_encoding=True)
+      hc.putheader('Content-Length', str(len(parent.body)))
+      hc.endheaders()
+      hc.send(parent.body)
+    except socket.error, err:
+      logger.Error("Error connecting to %s: %s" % (node, str(err)))
+      self.failed = True
+
+  def get_response(self):
+    """Try to process the response from the node.
 
     """
-    if self.mydigest != x509.digest('SHA'):
+    if self.failed:
+      # we already failed in connect
       return False
-    if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
+    resp = self.http_conn.getresponse()
+    if resp.status != 200:
       return False
-    return True
-
-  def getContext(self):
-    """Context generator.
+    try:
+      length = int(resp.getheader('Content-Length', '0'))
+    except ValueError:
+      return False
+    if not length:
+      logger.Error("Zero-length reply from %s" % self.node)
+      return False
+    payload = resp.read(length)
+    unload = simplejson.loads(payload)
+    return unload
 
-    """
-    context = SSL.Context(SSL.TLSv1_METHOD)
-    context.set_verify(SSL.VERIFY_PEER, self.verifier)
-    return context
 
 class Client:
   """RPC Client class.
@@ -208,6 +109,7 @@ class Client:
     self.results = {}
     self.procedure = procedure
     self.args = args
+    self.body = simplejson.dumps(args)
 
   #--- generic connector -------------
 
@@ -222,13 +124,7 @@ class Client:
     """Add a node to the target list.
 
     """
-    factory = pb.PBClientFactory()
     self.nc[connect_node] = nc = NodeController(self, connect_node)
-    reactor.connectSSL(connect_node, self.port, factory,
-                       MirrorContextFactory())
-    #d = factory.getRootObject()
-    d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
-    d.addCallbacks(nc.cb_call, nc.cb_err1)
 
   def getresult(self):
     """Return the results of the call.
@@ -243,8 +139,8 @@ class Client:
     queued, otherwise it does nothing.
 
     """
-    if self.nc:
-      reactor.run()
+    for node, nc in self.nc.items():
+      self.results[node] = nc.get_response()
 
 
 def call_volume_list(node_list, vg_name):
-- 
GitLab