diff --git a/lib/hypervisor/hv_kvm.py b/lib/hypervisor/hv_kvm.py index f66096646142f0841ec1dc90ae5c24ff34396d8a..6aa2374a42a9d5037f37df549125ab5b77f613f2 100644 --- a/lib/hypervisor/hv_kvm.py +++ b/lib/hypervisor/hv_kvm.py @@ -34,6 +34,8 @@ import pwd import struct import fcntl import shutil +import socket +import StringIO from ganeti import utils from ganeti import constants @@ -127,6 +129,251 @@ def _OpenTap(vnet_hdr=True): return (ifname, tapfd) +class QmpMessage: + """QEMU Messaging Protocol (QMP) message. + + """ + + def __init__(self, data): + """Creates a new QMP message based on the passed data. + + """ + if not isinstance(data, dict): + raise TypeError("QmpMessage must be initialized with a dict") + + self.data = data + + def __getitem__(self, field_name): + """Get the value of the required field if present, or None. + + Overrides the [] operator to provide access to the message data, + returning None if the required item is not in the message + @return: the value of the field_name field, or None if field_name + is not contained in the message + + """ + + if field_name in self.data: + return self.data[field_name] + + return None + + def __setitem__(self, field_name, field_value): + """Set the value of the required field_name to field_value. + + """ + self.data[field_name] = field_value + + @staticmethod + def BuildFromJsonString(json_string): + """Build a QmpMessage from a JSON encoded string. + + @type json_string: str + @param json_string: JSON string representing the message + @rtype: L{QmpMessage} + @return: a L{QmpMessage} built from json_string + + """ + # Parse the string + data = serializer.LoadJson(json_string) + return QmpMessage(data) + + def __str__(self): + # The protocol expects the JSON object to be sent as a single + # line, hence the need for indent=False. + return serializer.DumpJson(self.data, indent=False) + + def __eq__(self, other): + # When comparing two QmpMessages, we are interested in comparing + # their internal representation of the message data + return self.data == other.data + + +class QmpConnection: + """Connection to the QEMU Monitor using the QEMU Monitor Protocol (QMP). + + """ + _FIRST_MESSAGE_KEY = "QMP" + _EVENT_KEY = "event" + _ERROR_KEY = "error" + _ERROR_CLASS_KEY = "class" + _ERROR_DATA_KEY = "data" + _ERROR_DESC_KEY = "desc" + _EXECUTE_KEY = "execute" + _ARGUMENTS_KEY = "arguments" + _CAPABILITIES_COMMAND = "qmp_capabilities" + _MESSAGE_END_TOKEN = "\r\n" + _SOCKET_TIMEOUT = 5 + + def __init__(self, monitor_filename): + """Instantiates the QmpConnection object. + + @type monitor_filename: string + @param monitor_filename: the filename of the UNIX raw socket on which the + QMP monitor is listening + + """ + self.monitor_filename = monitor_filename + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + # We want to fail if the server doesn't send a complete message + # in a reasonable amount of time + self.sock.settimeout(self._SOCKET_TIMEOUT) + self._connected = False + self._buf = "" + + def _check_connection(self): + """Make sure that the connection is established. + + """ + if not self._connected: + raise errors.ProgrammerError("To use a QmpConnection you need to first" + " invoke connect() on it") + + def connect(self): + """Connects to the QMP monitor. + + Connects to the UNIX socket and makes sure that we can actually send and + receive data to the kvm instance via QMP. + + @raise errors.HypervisorError: when there are communication errors + @raise errors.ProgrammerError: when there are data serialization errors + + """ + self.sock.connect(self.monitor_filename) + self._connected = True + + # Check if we receive a correct greeting message from the server + # (As per the QEMU Protocol Specification 0.1 - section 2.2) + greeting = self._Recv() + if not greeting[self._FIRST_MESSAGE_KEY]: + self._connected = False + raise errors.HypervisorError("kvm: qmp communication error (wrong" + " server greeting") + + # Let's put the monitor in command mode using the qmp_capabilities + # command, or else no command will be executable. + # (As per the QEMU Protocol Specification 0.1 - section 4) + self.Execute(self._CAPABILITIES_COMMAND) + + def _ParseMessage(self, buf): + """Extract and parse a QMP message from the given buffer. + + Seeks for a QMP message in the given buf. If found, it parses it and + returns it together with the rest of the characters in the buf. + If no message is found, returns None and the whole buffer. + + @raise errors.ProgrammerError: when there are data serialization errors + + """ + message = None + # Check if we got the message end token (CRLF, as per the QEMU Protocol + # Specification 0.1 - Section 2.1.1) + pos = buf.find(self._MESSAGE_END_TOKEN) + if pos >= 0: + try: + message = QmpMessage.BuildFromJsonString(buf[:pos + 1]) + except Exception, err: + raise errors.ProgrammerError("QMP data serialization error: %s" % err) + buf = buf[pos + 1:] + + return (message, buf) + + def _Recv(self): + """Receives a message from QMP and decodes the received JSON object. + + @rtype: QmpMessage + @return: the received message + @raise errors.HypervisorError: when there are communication errors + @raise errors.ProgrammerError: when there are data serialization errors + + """ + self._check_connection() + + # Check if there is already a message in the buffer + (message, self._buf) = self._ParseMessage(self._buf) + if message: + return message + + recv_buffer = StringIO.StringIO(self._buf) + recv_buffer.seek(len(self._buf)) + try: + while True: + data = self.sock.recv(4096) + if not data: + break + recv_buffer.write(data) + + (message, self._buf) = self._ParseMessage(recv_buffer.getvalue()) + if message: + return message + + except socket.timeout, err: + raise errors.HypervisorError("Timeout while receiving a QMP message: " + "%s" % (err)) + except socket.error, err: + raise errors.HypervisorError("Unable to receive data from KVM using the" + " QMP protocol: %s" % err) + + def _Send(self, message): + """Encodes and sends a message to KVM using QMP. + + @type message: QmpMessage + @param message: message to send to KVM + @raise errors.HypervisorError: when there are communication errors + @raise errors.ProgrammerError: when there are data serialization errors + + """ + self._check_connection() + try: + message_str = str(message) + except Exception, err: + raise errors.ProgrammerError("QMP data deserialization error: %s" % err) + + try: + self.sock.sendall(message_str) + except socket.timeout, err: + raise errors.HypervisorError("Timeout while sending a QMP message: " + "%s (%s)" % (err.string, err.errno)) + except socket.error, err: + raise errors.HypervisorError("Unable to send data from KVM using the" + " QMP protocol: %s" % err) + + def Execute(self, command, arguments=None): + """Executes a QMP command and returns the response of the server. + + @type command: str + @param command: the command to execute + @type arguments: dict + @param arguments: dictionary of arguments to be passed to the command + @rtype: dict + @return: dictionary representing the received JSON object + @raise errors.HypervisorError: when there are communication errors + @raise errors.ProgrammerError: when there are data serialization errors + + """ + self._check_connection() + message = QmpMessage({self._EXECUTE_KEY: command}) + if arguments: + message[self._ARGUMENTS_KEY] = arguments + self._Send(message) + + # Events can occur between the sending of the command and the reception + # of the response, so we need to filter out messages with the event key. + while True: + response = self._Recv() + err = response[self._ERROR_KEY] + if err: + raise errors.HypervisorError("kvm: error executing the %s" + " command: %s (%s, %s):" % + (command, + err[self._ERROR_DESC_KEY], + err[self._ERROR_CLASS_KEY], + err[self._ERROR_DATA_KEY])) + + elif not response[self._EVENT_KEY]: + return response + + class KVMHypervisor(hv_base.BaseHypervisor): """KVM hypervisor interface""" CAN_MIGRATE = True @@ -325,6 +572,13 @@ class KVMHypervisor(hv_base.BaseHypervisor): """ return utils.PathJoin(cls._CTRL_DIR, "%s.serial" % instance_name) + @classmethod + def _InstanceQmpMonitor(cls, instance_name): + """Returns the instance serial QMP socket name + + """ + return utils.PathJoin(cls._CTRL_DIR, "%s.qmp" % instance_name) + @staticmethod def _SocatUnixConsoleParams(): """Returns the correct parameters for socat @@ -396,6 +650,7 @@ class KVMHypervisor(hv_base.BaseHypervisor): utils.RemoveFile(pidfile) utils.RemoveFile(cls._InstanceMonitor(instance_name)) utils.RemoveFile(cls._InstanceSerial(instance_name)) + utils.RemoveFile(cls._InstanceQmpMonitor(instance_name)) utils.RemoveFile(cls._InstanceKVMRuntime(instance_name)) utils.RemoveFile(cls._InstanceKeymapFile(instance_name)) uid_file = cls._InstanceUidFile(instance_name) @@ -939,6 +1194,12 @@ class KVMHypervisor(hv_base.BaseHypervisor): utils.EnsureDirs([(self._InstanceChrootDir(name), constants.SECURE_DIR_MODE)]) + # Automatically enable QMP if version is >= 0.14 + if (v_major, v_min) >= (0, 14): + logging.debug("Enabling QMP") + kvm_cmd.extend(["-qmp", "unix:%s,server,nowait" % + self._InstanceQmpMonitor(instance.name)]) + # Configure the network now for starting instances and bridged interfaces, # during FinalizeMigration for incoming instances' routed interfaces for nic_seq, nic in enumerate(kvm_nics): diff --git a/test/ganeti.hypervisor.hv_kvm_unittest.py b/test/ganeti.hypervisor.hv_kvm_unittest.py index d9e5e7377db1cb7955dedd6fcdbe729749c79bd2..7ff3d4d2938348e863fd3f7c7e01d518b1337bf2 100755 --- a/test/ganeti.hypervisor.hv_kvm_unittest.py +++ b/test/ganeti.hypervisor.hv_kvm_unittest.py @@ -21,8 +21,13 @@ """Script for testing the hypervisor.hv_kvm module""" +import threading +import tempfile import unittest +import socket +import os +from ganeti import serializer from ganeti import constants from ganeti import compat from ganeti import objects @@ -33,6 +38,129 @@ from ganeti.hypervisor import hv_kvm import testutils +class QmpStub(threading.Thread): + """Stub for a QMP endpoint for a KVM instance + + """ + _QMP_BANNER_DATA = {"QMP": {"version": { + "package": "", + "qemu": {"micro": 50, "minor": 13, "major": 0}, + "capabilities": [], + }}} + _EMPTY_RESPONSE = {"return": []} + + def __init__(self, socket_filename, server_responses): + """Creates a QMP stub + + @type socket_filename: string + @param socket_filename: filename of the UNIX socket that will be created + this class and used for the communication + @type server_responses: list + @param server_responses: list of responses that the server sends in response + to whatever it receives + """ + threading.Thread.__init__(self) + self.socket_filename = socket_filename + self.script = server_responses + + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.socket.bind(self.socket_filename) + self.socket.listen(1) + + def run(self): + # Hypothesis: the messages we receive contain only a complete QMP message + # encoded in JSON. + conn, addr = self.socket.accept() + + # Send the banner as the first thing + conn.send(self.encode_string(self._QMP_BANNER_DATA)) + + # Expect qmp_capabilities and return an empty response + conn.recv(4096) + conn.send(self.encode_string(self._EMPTY_RESPONSE)) + + while True: + # We ignore the expected message, as the purpose of this object is not + # to verify the correctness of the communication but to act as a + # partner for the SUT (System Under Test, that is QmpConnection) + msg = conn.recv(4096) + if not msg: + break + + if not self.script: + break + response = self.script.pop(0) + if isinstance(response, str): + conn.send(response) + elif isinstance(response, list): + for chunk in response: + conn.send(chunk) + else: + raise errors.ProgrammerError("Unknown response type for %s" % response) + + conn.close() + + def encode_string(self, message): + return (serializer.DumpJson(message, indent=False) + + hv_kvm.QmpConnection._MESSAGE_END_TOKEN) + + +class TestQmpMessage(testutils.GanetiTestCase): + def testSerialization(self): + test_data = {"execute": "command", "arguments": ["a", "b", "c"]} + message = hv_kvm.QmpMessage(test_data) + + for k, v in test_data.items(): + self.failUnless(message[k] == v) + + rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(str(message)) + self.failUnless(rebuilt_message == message) + + +class TestQmp(testutils.GanetiTestCase): + def testQmp(self): + requests = [ + {"execute": "query-kvm", "arguments": []}, + {"execute": "eject", "arguments": {"device": "ide1-cd0"}}, + {"execute": "query-status", "arguments": []}, + {"execute": "query-name", "arguments": []}, + ] + + server_responses = [ + # One message, one send() + '{"return": {"enabled": true, "present": true}}\r\n', + + # Message sent using multiple send() + ['{"retur', 'n": {}}\r\n'], + + # Multiple messages sent using one send() + '{"return": [{"name": "quit"}, {"name": "eject"}]}\r\n' + '{"return": {"running": true, "singlestep": false}}\r\n', + ] + + expected_responses = [ + {"return": {"enabled": True, "present": True}}, + {"return": {}}, + {"return": [{"name": "quit"}, {"name": "eject"}]}, + {"return": {"running": True, "singlestep": False}}, + ] + + # Set up the stub + socket_file = tempfile.NamedTemporaryFile() + os.remove(socket_file.name) + qmp_stub = QmpStub(socket_file.name, server_responses) + qmp_stub.start() + + # Set up the QMP connection + qmp_connection = hv_kvm.QmpConnection(socket_file.name) + qmp_connection.connect() + + # Format the script + for request, expected_response in zip(requests, expected_responses): + response = qmp_connection.Execute(request) + self.failUnless(response == hv_kvm.QmpMessage(expected_response)) + + class TestConsole(unittest.TestCase): def _Test(self, instance, hvparams): cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, hvparams, {})