Commit 4331f6cd authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Reuse HTTP client pool for RPC

ganeti-masterd: Add initialization and shutdown of RPC pool. It needs
to be shutdown before forking.

ganeti.cli: Add decorator function to initialize and shutdown RPC pool.

ganeti.rpc: Add functions to initialize and shutdown RPC pool. Throw
exception when used without proper initialization.

gnt-cluster, gnt-node: Use decorator function to initialize and shutdown
RPC pool.

Reviewed-by: iustinp
parent 54d1a06e
......@@ -428,13 +428,17 @@ def main():
utils.debug = options.debug
utils.no_fork = True
ssconf.CheckMaster(options.debug)
rpc.Init()
try:
ssconf.CheckMaster(options.debug)
# we believe we are the master, let's ask the other nodes...
if not CheckAgreement():
return
# we believe we are the master, let's ask the other nodes...
if not CheckAgreement():
return
master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
finally:
rpc.Shutdown()
# become a daemon
if options.fork:
......@@ -446,19 +450,23 @@ def main():
utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
stderr_logging=not options.fork)
logging.info("ganeti master daemon startup")
rpc.Init()
try:
logging.info("ganeti master daemon startup")
# activate ip
master_node = ssconf.SimpleConfigReader().GetMasterNode()
if not rpc.RpcRunner.call_node_start_master(master_node, False):
logging.error("Can't activate master IP address")
# activate ip
master_node = ssconf.SimpleConfigReader().GetMasterNode()
if not rpc.RpcRunner.call_node_start_master(master_node, False):
logging.error("Can't activate master IP address")
master.setup_queue()
try:
master.serve_forever()
master.setup_queue()
try:
master.serve_forever()
finally:
master.server_cleanup()
utils.RemovePidFile(constants.MASTERD_PID)
finally:
master.server_cleanup()
utils.RemovePidFile(constants.MASTERD_PID)
rpc.Shutdown()
if __name__ == "__main__":
......
......@@ -36,6 +36,7 @@ from ganeti import constants
from ganeti import opcodes
from ganeti import luxi
from ganeti import ssconf
from ganeti import rpc
from optparse import (OptionParser, make_option, TitledHelpFormatter,
Option, OptionValueError)
......@@ -51,6 +52,7 @@ __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
"JobSubmittedException", "FormatTimestamp", "ParseTimespec",
"ValidateBeParams",
"ToStderr", "ToStdout",
"UsesRPC",
]
......@@ -424,6 +426,16 @@ def ValidateBeParams(bep):
raise errors.ParameterError("Invalid number of VCPUs")
def UsesRPC(fn):
def wrapper(*args, **kwargs):
rpc.Init()
try:
return fn(*args, **kwargs)
finally:
rpc.Shutdown()
return wrapper
def AskUser(text, choices=None):
"""Ask the user a question.
......
......@@ -41,6 +41,36 @@ from ganeti import http
from ganeti import serializer
# Module level variable
_http_manager = None
def Init():
"""Initializes the module-global HTTP client manager.
Must be called before using any RPC function.
"""
global _http_manager
assert not _http_manager, "RPC module initialized more than once"
_http_manager = http.HttpClientManager()
def Shutdown():
"""Stops the module-global HTTP client manager.
Must be called before quitting the program.
"""
global _http_manager
if _http_manager:
_http_manager.Shutdown()
_http_manager = None
class Client:
"""RPC Client class.
......@@ -103,12 +133,9 @@ class Client:
@returns: List of RPC results
"""
# TODO: Shared and reused manager
mgr = http.HttpClientManager()
try:
mgr.ExecRequests(self.nc.values())
finally:
mgr.Shutdown()
assert _http_manager, "RPC module not intialized"
_http_manager.ExecRequests(self.nc.values())
results = {}
......
......@@ -36,6 +36,7 @@ from ganeti import bootstrap
from ganeti import ssh
@UsesRPC
def InitCluster(opts, args):
"""Initialize the cluster.
......@@ -125,6 +126,7 @@ def InitCluster(opts, args):
return 0
@UsesRPC
def DestroyCluster(opts, args):
"""Destroy the cluster.
......@@ -396,6 +398,7 @@ def VerifyDisks(opts, args):
return retcode
@UsesRPC
def MasterFailover(opts, args):
"""Failover the master node.
......
......@@ -42,6 +42,7 @@ _LIST_DEF_FIELDS = [
]
@UsesRPC
def AddNode(opts, args):
"""Add a node to the cluster.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment