Commit 31ff0247 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Merge branch 'devel-2.1'



* devel-2.1:
  RAPI: /2/{nodes,instances}/$name should return 404 for unknown items
  ganeti-masterd: Improve error logging for client requests
  Return disk_template from LUQueryInstanceData
  RAPI client: Rename Get{Node,Instance}Info, add new GetInstanceInfo
  RAPI client: Log request to be made
  Add missing documentation for RAPI instance creation mode
  Add checks for master IP in cluster verify
  Remove unused import from daemon.py
  utils.IgnoreSignals
  AsyncUDPSocket.handle_error
  Add a forgotten comment about overriding a method
  ganeti-noded: add the --no-mlock option
  Describe more ganeti-noded options in the manpage

Conflicts:
	daemons/ganeti-masterd: Trivial
	test/ganeti.backend_unittest.py: Trivial
	test/ganeti.utils_unittest.py: Trivial
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parents c7406bbe e8ebbd2b
......@@ -170,9 +170,10 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
result = self._ops.handle_request(method, args)
success = True
except errors.GenericError, err:
logging.exception("Unexpected exception")
result = errors.EncodeException(err)
except:
logging.error("Unexpected exception", exc_info=True)
logging.exception("Unexpected exception")
result = "Caught exception: %s" % str(sys.exc_info()[1])
self.send_message(luxi.FormatResponse(success, result))
......
......@@ -906,7 +906,11 @@ def ExecNoded(options, _):
"""Main node daemon function, executed with the PID file held.
"""
utils.Mlockall()
if options.mlock:
utils.Mlockall()
request_executor_class = MlockallRequestExecutor
else:
request_executor_class = http.server.HttpServerRequestExecutor
# Read SSL certificate
if options.ssl:
......@@ -925,7 +929,7 @@ def ExecNoded(options, _):
mainloop = daemon.Mainloop()
server = NodeHttpServer(mainloop, options.bind_address, options.port,
ssl_params=ssl_params, ssl_verify_peer=True,
request_executor_class=MlockallRequestExecutor)
request_executor_class=request_executor_class)
server.Start()
try:
mainloop.Run()
......@@ -941,6 +945,10 @@ def main():
usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
version="%%prog (ganeti) %s" %
constants.RELEASE_VERSION)
parser.add_option("--no-mlock", dest="mlock",
help="Do not mlock the node memory in ram",
default=True, action="store_false")
dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
dirs.append((constants.LOG_OS_DIR, 0750))
dirs.append((constants.LOCK_DIR, 1777))
......
......@@ -375,6 +375,8 @@ Body parameters:
Must be ``1`` (older Ganeti versions used a different format for
instance creation requests, version ``0``, but that format is not
documented).
``mode``
Instance creation mode (string, required).
``name`` (string, required)
Instance name
``disk_template`` (string, required)
......
......@@ -486,6 +486,8 @@ def VerifyNode(what, cluster_name):
"""
result = {}
my_name = utils.HostInfo().name
port = utils.GetDaemonPort(constants.NODED)
if constants.NV_HYPERVISOR in what:
result[constants.NV_HYPERVISOR] = tmp = {}
......@@ -510,7 +512,6 @@ def VerifyNode(what, cluster_name):
if constants.NV_NODENETTEST in what:
result[constants.NV_NODENETTEST] = tmp = {}
my_name = utils.HostInfo().name
my_pip = my_sip = None
for name, pip, sip in what[constants.NV_NODENETTEST]:
if name == my_name:
......@@ -521,7 +522,6 @@ def VerifyNode(what, cluster_name):
tmp[my_name] = ("Can't find my own primary/secondary IP"
" in the node list")
else:
port = utils.GetDaemonPort(constants.NODED)
for name, pip, sip in what[constants.NV_NODENETTEST]:
fail = []
if not utils.TcpPing(pip, port, source=my_pip):
......@@ -533,6 +533,17 @@ def VerifyNode(what, cluster_name):
tmp[name] = ("failure using the %s interface(s)" %
" and ".join(fail))
if constants.NV_MASTERIP in what:
# FIXME: add checks on incoming data structures (here and in the
# rest of the function)
master_name, master_ip = what[constants.NV_MASTERIP]
if master_name == my_name:
source = constants.LOCALHOST_IP_ADDRESS
else:
source = None
result[constants.NV_MASTERIP] = utils.TcpPing(master_ip, port,
source=source)
if constants.NV_LVLIST in what:
try:
val = GetVolumeList(what[constants.NV_LVLIST])
......
......@@ -1339,6 +1339,18 @@ class LUVerifyCluster(LogicalUnit):
"tcp communication with node '%s': %s",
anode, nresult[constants.NV_NODENETTEST][anode])
test = constants.NV_MASTERIP not in nresult
_ErrorIf(test, self.ENODENET, node,
"node hasn't returned node master IP reachability data")
if not test:
if not nresult[constants.NV_MASTERIP]:
if node == self.master_node:
msg = "the master node cannot reach the master IP (not configured?)"
else:
msg = "cannot reach the master IP"
_ErrorIf(True, self.ENODENET, node, msg)
def _VerifyInstance(self, instance, instanceconfig, node_image):
"""Verify an instance.
......@@ -1673,6 +1685,8 @@ class LUVerifyCluster(LogicalUnit):
# FIXME: verify OS list
# do local checksums
master_files = [constants.CLUSTER_CONF_FILE]
master_node = self.master_node = self.cfg.GetMasterNode()
master_ip = self.cfg.GetMasterIP()
file_names = ssconf.SimpleStore().GetFileList()
file_names.extend(constants.ALL_CERT_FILES)
......@@ -1696,6 +1710,7 @@ class LUVerifyCluster(LogicalUnit):
constants.NV_HVINFO: self.cfg.GetHypervisorType(),
constants.NV_NODESETUP: None,
constants.NV_TIME: None,
constants.NV_MASTERIP: (master_node, master_ip),
}
if vg_name is not None:
......@@ -1742,7 +1757,6 @@ class LUVerifyCluster(LogicalUnit):
self.cfg.GetClusterName())
nvinfo_endtime = time.time()
master_node = self.cfg.GetMasterNode()
all_drbd_map = self.cfg.ComputeDRBDMap()
feedback_fn("* Verifying node status")
......@@ -8055,6 +8069,7 @@ class LUQueryInstanceData(NoHooksLU):
"os": instance.os,
# this happens to be the same format used for hooks
"nics": _NICListToTuple(self, instance.nics),
"disk_template": instance.disk_template,
"disks": disks,
"hypervisor": instance.hypervisor,
"network_port": instance.network_port,
......
......@@ -623,6 +623,7 @@ NV_PVLIST = "pvlist"
NV_DRBDLIST = "drbd-list"
NV_NODESETUP = "nodesetup"
NV_TIME = "time"
NV_MASTERIP = "master-ip"
# SSL certificate check constants (in days)
SSL_CERT_EXPIRATION_WARN = 30
......
......@@ -25,7 +25,6 @@
import asyncore
import os
import signal
import errno
import logging
import sched
import time
......@@ -92,27 +91,12 @@ class AsyncUDPSocket(asyncore.dispatcher):
# differ and treat all messages equally.
pass
def do_read(self):
try:
payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE)
except socket.error, err:
if err.errno == errno.EINTR:
# we got a signal while trying to read. no need to do anything,
# handle_read will be called again if there is data on the socket.
return
else:
raise
ip, port = address
self.handle_datagram(payload, ip, port)
# this method is overriding an asyncore.dispatcher method
def handle_read(self):
try:
self.do_read()
except: # pylint: disable-msg=W0702
# we need to catch any exception here, log it, but proceed, because even
# if we failed handling a single request, we still want to continue.
logging.error("Unexpected exception", exc_info=True)
payload, address = utils.IgnoreSignals(self.recvfrom,
constants.MAX_UDP_DATA_SIZE)
ip, port = address
self.handle_datagram(payload, ip, port)
def handle_datagram(self, payload, ip, port):
"""Handle an already read udp datagram
......@@ -126,27 +110,21 @@ class AsyncUDPSocket(asyncore.dispatcher):
# something scheduled to be written
return bool(self._out_queue)
# this method is overriding an asyncore.dispatcher method
def handle_write(self):
try:
if not self._out_queue:
logging.error("handle_write called with empty output queue")
return
(ip, port, payload) = self._out_queue[0]
try:
self.sendto(payload, 0, (ip, port))
except socket.error, err:
if err.errno == errno.EINTR:
# we got a signal while trying to write. no need to do anything,
# handle_write will be called again because we haven't emptied the
# _out_queue, and we'll try again
return
else:
raise
self._out_queue.pop(0)
except: # pylint: disable-msg=W0702
# we need to catch any exception here, log it, but proceed, because even
# if we failed sending a single datagram we still want to continue.
logging.error("Unexpected exception", exc_info=True)
if not self._out_queue:
logging.error("handle_write called with empty output queue")
return
(ip, port, payload) = self._out_queue[0]
utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
self._out_queue.pop(0)
# this method is overriding an asyncore.dispatcher method
def handle_error(self):
"""Log an error in handling any request, and proceed.
"""
logging.exception("Error while handling asyncore request")
def enqueue_send(self, ip, port, payload):
"""Enqueue a datagram to be sent when possible
......@@ -168,7 +146,7 @@ class AsyncUDPSocket(asyncore.dispatcher):
"""
result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
if result is not None and result & select.POLLIN:
self.do_read()
self.handle_read()
return True
else:
return False
......
......@@ -195,6 +195,20 @@ def SubmitJob(op, cl=None):
raise http.HttpGatewayTimeout("Timeout while talking to the master"
" daemon. Error: %s" % str(err))
def HandleItemQueryErrors(fn, *args, **kwargs):
"""Converts errors when querying a single item.
"""
try:
return fn(*args, **kwargs)
except errors.OpPrereqError, err:
if len(err.args) == 2 and err.args[1] == errors.ECODE_NOENT:
raise http.HttpNotFound()
raise
def GetClient():
"""Geric wrapper for luxi.Client(), for better http compatiblity.
......
......@@ -400,12 +400,19 @@ class GanetiRapiClient(object):
encoded_content = None
# Build URL
url = [self._base_url, path]
urlparts = [self._base_url, path]
if query:
url.append("?")
url.append(urllib.urlencode(self._EncodeQuery(query)))
urlparts.append("?")
urlparts.append(urllib.urlencode(self._EncodeQuery(query)))
req = _RapiRequest(method, "".join(url), self._headers, encoded_content)
url = "".join(urlparts)
self._logger.debug("Sending request %s %s to %s:%s"
" (headers=%r, content=%r)",
method, url, self._host, self._port, self._headers,
encoded_content)
req = _RapiRequest(method, url, self._headers, encoded_content)
try:
resp = self._http.open(req)
......@@ -545,7 +552,7 @@ class GanetiRapiClient(object):
else:
return [i["id"] for i in instances]
def GetInstanceInfo(self, instance):
def GetInstance(self, instance):
"""Gets information about an instance.
@type instance: str
......@@ -559,6 +566,24 @@ class GanetiRapiClient(object):
("/%s/instances/%s" %
(GANETI_RAPI_VERSION, instance)), None, None)
def GetInstanceInfo(self, instance, static=None):
"""Gets information about an instance.
@type instance: string
@param instance: Instance name
@rtype: string
@return: Job ID
"""
if static is not None:
query = [("static", static)]
else:
query = None
return self._SendRequest(HTTP_GET,
("/%s/instances/%s/info" %
(GANETI_RAPI_VERSION, instance)), query, None)
def CreateInstance(self, mode, name, disk_template, disks, nics,
**kwargs):
"""Creates a new instance.
......@@ -895,7 +920,7 @@ class GanetiRapiClient(object):
else:
return [n["id"] for n in nodes]
def GetNodeInfo(self, node):
def GetNode(self, node):
"""Gets information about a node.
@type node: str
......
......@@ -312,8 +312,10 @@ class R_2_nodes_name(baserlib.R_Generic):
"""
node_name = self.items[0]
client = baserlib.GetClient()
result = client.QueryNodes(names=[node_name], fields=N_FIELDS,
use_locking=self.useLocking())
result = baserlib.HandleItemQueryErrors(client.QueryNodes,
names=[node_name], fields=N_FIELDS,
use_locking=self.useLocking())
return baserlib.MapFields(N_FIELDS, result[0])
......@@ -699,8 +701,11 @@ class R_2_instances_name(baserlib.R_Generic):
"""
client = baserlib.GetClient()
instance_name = self.items[0]
result = client.QueryInstances(names=[instance_name], fields=I_FIELDS,
use_locking=self.useLocking())
result = baserlib.HandleItemQueryErrors(client.QueryInstances,
names=[instance_name],
fields=I_FIELDS,
use_locking=self.useLocking())
return baserlib.MapFields(I_FIELDS, result[0])
......
......@@ -2984,6 +2984,20 @@ def RunInSeparateProcess(fn, *args):
return bool(exitcode)
def IgnoreSignals(fn, *args, **kwargs):
"""Tries to call a function ignoring failures due to EINTR.
"""
try:
return fn(*args, **kwargs)
except (EnvironmentError, socket.error), err:
if err.errno != errno.EINTR:
raise
except select.error, err:
if not (err.args and err.args[0] == errno.EINTR):
raise
def LockedMethod(fn):
"""Synchronized object access decorator.
......
......@@ -52,6 +52,13 @@
responsible for the node functions in the ganeti system.
</para>
<para>
By default, in order to be able to support features such as node
powercycling even on systems with a very damaged root disk, ganeti-noded
mlocks itself in ram. You can disable this feature by passing in the
<option>--no-mlock</option>.
</para>
<para>
For testing purposes, you can give the <option>-f</option>
option and the program won't detach from the running terminal.
......@@ -61,6 +68,27 @@
Debug-level message can be activated by giving the
<option>-d</option> option.
</para>
<para>
Logging to syslog, rather than its own log file, can be enabled by
passing in the <option>--syslog</option> option.
</para>
<para>
The ganeti-noded daemon listens to port 1811 TCP, on all interfaces, by
default. This can be overridden by an entry in /etc/services or by
passing the <option>-p</option> option. The <option>-b</option> option
can be used to specify the address to bind to (defaults to 0.0.0.0).
</para>
<para>
Ganeti noded communication is protected via SSL, with a key generated at
cluster init time. This can be disabled with the
<option>--no-ssl</option> option, or a different ssl key and certificate
can be specified using the <option>-K</option> and <option>-C</option>
options.
</para>
<refsect2>
<title>ROLE</title>
<para>
......
......@@ -28,6 +28,7 @@ import tempfile
import unittest
from ganeti import utils
from ganeti import constants
from ganeti import backend
import testutils
......@@ -69,5 +70,27 @@ class TestX509Certificates(unittest.TestCase):
self.assertEqual(utils.ListVisibleFiles(self.tmpdir), [name])
class TestNodeVerify(testutils.GanetiTestCase):
def testMasterIPLocalhost(self):
# this a real functional test, but requires localhost to be reachable
local_data = (utils.HostInfo().name, constants.LOCALHOST_IP_ADDRESS)
result = backend.VerifyNode({constants.NV_MASTERIP: local_data}, None)
self.failUnless(constants.NV_MASTERIP in result,
"Master IP data not returned")
self.failUnless(result[constants.NV_MASTERIP], "Cannot reach localhost")
def testMasterIPUnreachable(self):
# Network 192.0.2.0/24 is reserved for test/documentation as per
# RFC 5735
bad_data = ("master.example.com", "192.0.2.1")
# we just test that whatever TcpPing returns, VerifyNode returns too
utils.TcpPing = lambda a, b, source=None: False
result = backend.VerifyNode({constants.NV_MASTERIP: bad_data}, None)
self.failUnless(constants.NV_MASTERIP in result,
"Master IP data not returned")
self.failIf(result[constants.NV_MASTERIP],
"Result from utils.TcpPing corrupted")
if __name__ == "__main__":
testutils.GanetiTestProgram()
......@@ -233,12 +233,31 @@ class GanetiRapiClientTests(testutils.GanetiTestCase):
self.assertHandler(rlib2.R_2_instances)
self.assertBulk()
def testGetInstanceInfo(self):
def testGetInstance(self):
self.rapi.AddResponse("[]")
self.assertEqual([], self.client.GetInstanceInfo("instance"))
self.assertEqual([], self.client.GetInstance("instance"))
self.assertHandler(rlib2.R_2_instances_name)
self.assertItems(["instance"])
def testGetInstanceInfo(self):
self.rapi.AddResponse("21291")
self.assertEqual(21291, self.client.GetInstanceInfo("inst3"))
self.assertHandler(rlib2.R_2_instances_name_info)
self.assertItems(["inst3"])
self.assertQuery("static", None)
self.rapi.AddResponse("3428")
self.assertEqual(3428, self.client.GetInstanceInfo("inst31", static=False))
self.assertHandler(rlib2.R_2_instances_name_info)
self.assertItems(["inst31"])
self.assertQuery("static", ["0"])
self.rapi.AddResponse("15665")
self.assertEqual(15665, self.client.GetInstanceInfo("inst32", static=True))
self.assertHandler(rlib2.R_2_instances_name_info)
self.assertItems(["inst32"])
self.assertQuery("static", ["1"])
def testCreateInstanceOldVersion(self):
self.rapi.AddResponse(serializer.DumpJson([]))
self.assertRaises(NotImplementedError, self.client.CreateInstance,
......@@ -424,9 +443,9 @@ class GanetiRapiClientTests(testutils.GanetiTestCase):
self.assertHandler(rlib2.R_2_nodes)
self.assertBulk()
def testGetNodeInfo(self):
def testGetNode(self):
self.rapi.AddResponse("{}")
self.assertEqual({}, self.client.GetNodeInfo("node-foo"))
self.assertEqual({}, self.client.GetNode("node-foo"))
self.assertHandler(rlib2.R_2_nodes_name)
self.assertItems(["node-foo"])
......
......@@ -40,6 +40,7 @@ import warnings
import distutils.version
import glob
import md5
import errno
import ganeti
import testutils
......@@ -1152,8 +1153,8 @@ class TestOwnIpAddress(unittest.TestCase):
def testNowOwnAddress(self):
"""check that I don't own an address"""
# network 192.0.2.0/24 is reserved for test/documentation as per
# rfc 3330, so we *should* not have an address of this range... if
# Network 192.0.2.0/24 is reserved for test/documentation as per
# RFC 5735, so we *should* not have an address of this range... if
# this fails, we should extend the test to multiple addresses
DST_IP = "192.0.2.1"
self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
......@@ -2210,5 +2211,41 @@ class TestHmacFunctions(unittest.TestCase):
salt="xyz0"))
class TestIgnoreSignals(unittest.TestCase):
"""Test the IgnoreSignals decorator"""
@staticmethod
def _Raise(exception):
raise exception
@staticmethod
def _Return(rval):
return rval
def testIgnoreSignals(self):
sock_err_intr = socket.error(errno.EINTR, "Message")
sock_err_intr.errno = errno.EINTR
sock_err_inval = socket.error(errno.EINVAL, "Message")
sock_err_inval.errno = errno.EINVAL
env_err_intr = EnvironmentError(errno.EINTR, "Message")
env_err_inval = EnvironmentError(errno.EINVAL, "Message")
self.assertRaises(socket.error, self._Raise, sock_err_intr)
self.assertRaises(socket.error, self._Raise, sock_err_inval)
self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
sock_err_inval)
self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
env_err_inval)
self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
if __name__ == '__main__':
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment