Skip to content
Snippets Groups Projects
Commit 91c10532 authored by Andrea Spadaccini's avatar Andrea Spadaccini
Browse files

Draft implementation of QMP connection


Basic implementation of the QMP connection and related tests.

Signed-off-by: default avatarAndrea Spadaccini <spadaccio@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parent 55766d34
No related branches found
No related tags found
No related merge requests found
...@@ -34,6 +34,8 @@ import pwd ...@@ -34,6 +34,8 @@ import pwd
import struct import struct
import fcntl import fcntl
import shutil import shutil
import socket
import StringIO
from ganeti import utils from ganeti import utils
from ganeti import constants from ganeti import constants
...@@ -127,6 +129,251 @@ def _OpenTap(vnet_hdr=True): ...@@ -127,6 +129,251 @@ def _OpenTap(vnet_hdr=True):
return (ifname, tapfd) return (ifname, tapfd)
class QmpMessage:
"""QEMU Messaging Protocol (QMP) message.
"""
def __init__(self, data):
"""Creates a new QMP message based on the passed data.
"""
if not isinstance(data, dict):
raise TypeError("QmpMessage must be initialized with a dict")
self.data = data
def __getitem__(self, field_name):
"""Get the value of the required field if present, or None.
Overrides the [] operator to provide access to the message data,
returning None if the required item is not in the message
@return: the value of the field_name field, or None if field_name
is not contained in the message
"""
if field_name in self.data:
return self.data[field_name]
return None
def __setitem__(self, field_name, field_value):
"""Set the value of the required field_name to field_value.
"""
self.data[field_name] = field_value
@staticmethod
def BuildFromJsonString(json_string):
"""Build a QmpMessage from a JSON encoded string.
@type json_string: str
@param json_string: JSON string representing the message
@rtype: L{QmpMessage}
@return: a L{QmpMessage} built from json_string
"""
# Parse the string
data = serializer.LoadJson(json_string)
return QmpMessage(data)
def __str__(self):
# The protocol expects the JSON object to be sent as a single
# line, hence the need for indent=False.
return serializer.DumpJson(self.data, indent=False)
def __eq__(self, other):
# When comparing two QmpMessages, we are interested in comparing
# their internal representation of the message data
return self.data == other.data
class QmpConnection:
"""Connection to the QEMU Monitor using the QEMU Monitor Protocol (QMP).
"""
_FIRST_MESSAGE_KEY = "QMP"
_EVENT_KEY = "event"
_ERROR_KEY = "error"
_ERROR_CLASS_KEY = "class"
_ERROR_DATA_KEY = "data"
_ERROR_DESC_KEY = "desc"
_EXECUTE_KEY = "execute"
_ARGUMENTS_KEY = "arguments"
_CAPABILITIES_COMMAND = "qmp_capabilities"
_MESSAGE_END_TOKEN = "\r\n"
_SOCKET_TIMEOUT = 5
def __init__(self, monitor_filename):
"""Instantiates the QmpConnection object.
@type monitor_filename: string
@param monitor_filename: the filename of the UNIX raw socket on which the
QMP monitor is listening
"""
self.monitor_filename = monitor_filename
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# We want to fail if the server doesn't send a complete message
# in a reasonable amount of time
self.sock.settimeout(self._SOCKET_TIMEOUT)
self._connected = False
self._buf = ""
def _check_connection(self):
"""Make sure that the connection is established.
"""
if not self._connected:
raise errors.ProgrammerError("To use a QmpConnection you need to first"
" invoke connect() on it")
def connect(self):
"""Connects to the QMP monitor.
Connects to the UNIX socket and makes sure that we can actually send and
receive data to the kvm instance via QMP.
@raise errors.HypervisorError: when there are communication errors
@raise errors.ProgrammerError: when there are data serialization errors
"""
self.sock.connect(self.monitor_filename)
self._connected = True
# Check if we receive a correct greeting message from the server
# (As per the QEMU Protocol Specification 0.1 - section 2.2)
greeting = self._Recv()
if not greeting[self._FIRST_MESSAGE_KEY]:
self._connected = False
raise errors.HypervisorError("kvm: qmp communication error (wrong"
" server greeting")
# Let's put the monitor in command mode using the qmp_capabilities
# command, or else no command will be executable.
# (As per the QEMU Protocol Specification 0.1 - section 4)
self.Execute(self._CAPABILITIES_COMMAND)
def _ParseMessage(self, buf):
"""Extract and parse a QMP message from the given buffer.
Seeks for a QMP message in the given buf. If found, it parses it and
returns it together with the rest of the characters in the buf.
If no message is found, returns None and the whole buffer.
@raise errors.ProgrammerError: when there are data serialization errors
"""
message = None
# Check if we got the message end token (CRLF, as per the QEMU Protocol
# Specification 0.1 - Section 2.1.1)
pos = buf.find(self._MESSAGE_END_TOKEN)
if pos >= 0:
try:
message = QmpMessage.BuildFromJsonString(buf[:pos + 1])
except Exception, err:
raise errors.ProgrammerError("QMP data serialization error: %s" % err)
buf = buf[pos + 1:]
return (message, buf)
def _Recv(self):
"""Receives a message from QMP and decodes the received JSON object.
@rtype: QmpMessage
@return: the received message
@raise errors.HypervisorError: when there are communication errors
@raise errors.ProgrammerError: when there are data serialization errors
"""
self._check_connection()
# Check if there is already a message in the buffer
(message, self._buf) = self._ParseMessage(self._buf)
if message:
return message
recv_buffer = StringIO.StringIO(self._buf)
recv_buffer.seek(len(self._buf))
try:
while True:
data = self.sock.recv(4096)
if not data:
break
recv_buffer.write(data)
(message, self._buf) = self._ParseMessage(recv_buffer.getvalue())
if message:
return message
except socket.timeout, err:
raise errors.HypervisorError("Timeout while receiving a QMP message: "
"%s" % (err))
except socket.error, err:
raise errors.HypervisorError("Unable to receive data from KVM using the"
" QMP protocol: %s" % err)
def _Send(self, message):
"""Encodes and sends a message to KVM using QMP.
@type message: QmpMessage
@param message: message to send to KVM
@raise errors.HypervisorError: when there are communication errors
@raise errors.ProgrammerError: when there are data serialization errors
"""
self._check_connection()
try:
message_str = str(message)
except Exception, err:
raise errors.ProgrammerError("QMP data deserialization error: %s" % err)
try:
self.sock.sendall(message_str)
except socket.timeout, err:
raise errors.HypervisorError("Timeout while sending a QMP message: "
"%s (%s)" % (err.string, err.errno))
except socket.error, err:
raise errors.HypervisorError("Unable to send data from KVM using the"
" QMP protocol: %s" % err)
def Execute(self, command, arguments=None):
"""Executes a QMP command and returns the response of the server.
@type command: str
@param command: the command to execute
@type arguments: dict
@param arguments: dictionary of arguments to be passed to the command
@rtype: dict
@return: dictionary representing the received JSON object
@raise errors.HypervisorError: when there are communication errors
@raise errors.ProgrammerError: when there are data serialization errors
"""
self._check_connection()
message = QmpMessage({self._EXECUTE_KEY: command})
if arguments:
message[self._ARGUMENTS_KEY] = arguments
self._Send(message)
# Events can occur between the sending of the command and the reception
# of the response, so we need to filter out messages with the event key.
while True:
response = self._Recv()
err = response[self._ERROR_KEY]
if err:
raise errors.HypervisorError("kvm: error executing the %s"
" command: %s (%s, %s):" %
(command,
err[self._ERROR_DESC_KEY],
err[self._ERROR_CLASS_KEY],
err[self._ERROR_DATA_KEY]))
elif not response[self._EVENT_KEY]:
return response
class KVMHypervisor(hv_base.BaseHypervisor): class KVMHypervisor(hv_base.BaseHypervisor):
"""KVM hypervisor interface""" """KVM hypervisor interface"""
CAN_MIGRATE = True CAN_MIGRATE = True
...@@ -325,6 +572,13 @@ class KVMHypervisor(hv_base.BaseHypervisor): ...@@ -325,6 +572,13 @@ class KVMHypervisor(hv_base.BaseHypervisor):
""" """
return utils.PathJoin(cls._CTRL_DIR, "%s.serial" % instance_name) return utils.PathJoin(cls._CTRL_DIR, "%s.serial" % instance_name)
@classmethod
def _InstanceQmpMonitor(cls, instance_name):
"""Returns the instance serial QMP socket name
"""
return utils.PathJoin(cls._CTRL_DIR, "%s.qmp" % instance_name)
@staticmethod @staticmethod
def _SocatUnixConsoleParams(): def _SocatUnixConsoleParams():
"""Returns the correct parameters for socat """Returns the correct parameters for socat
...@@ -396,6 +650,7 @@ class KVMHypervisor(hv_base.BaseHypervisor): ...@@ -396,6 +650,7 @@ class KVMHypervisor(hv_base.BaseHypervisor):
utils.RemoveFile(pidfile) utils.RemoveFile(pidfile)
utils.RemoveFile(cls._InstanceMonitor(instance_name)) utils.RemoveFile(cls._InstanceMonitor(instance_name))
utils.RemoveFile(cls._InstanceSerial(instance_name)) utils.RemoveFile(cls._InstanceSerial(instance_name))
utils.RemoveFile(cls._InstanceQmpMonitor(instance_name))
utils.RemoveFile(cls._InstanceKVMRuntime(instance_name)) utils.RemoveFile(cls._InstanceKVMRuntime(instance_name))
utils.RemoveFile(cls._InstanceKeymapFile(instance_name)) utils.RemoveFile(cls._InstanceKeymapFile(instance_name))
uid_file = cls._InstanceUidFile(instance_name) uid_file = cls._InstanceUidFile(instance_name)
...@@ -939,6 +1194,12 @@ class KVMHypervisor(hv_base.BaseHypervisor): ...@@ -939,6 +1194,12 @@ class KVMHypervisor(hv_base.BaseHypervisor):
utils.EnsureDirs([(self._InstanceChrootDir(name), utils.EnsureDirs([(self._InstanceChrootDir(name),
constants.SECURE_DIR_MODE)]) constants.SECURE_DIR_MODE)])
# Automatically enable QMP if version is >= 0.14
if (v_major, v_min) >= (0, 14):
logging.debug("Enabling QMP")
kvm_cmd.extend(["-qmp", "unix:%s,server,nowait" %
self._InstanceQmpMonitor(instance.name)])
# Configure the network now for starting instances and bridged interfaces, # Configure the network now for starting instances and bridged interfaces,
# during FinalizeMigration for incoming instances' routed interfaces # during FinalizeMigration for incoming instances' routed interfaces
for nic_seq, nic in enumerate(kvm_nics): for nic_seq, nic in enumerate(kvm_nics):
......
...@@ -21,8 +21,13 @@ ...@@ -21,8 +21,13 @@
"""Script for testing the hypervisor.hv_kvm module""" """Script for testing the hypervisor.hv_kvm module"""
import threading
import tempfile
import unittest import unittest
import socket
import os
from ganeti import serializer
from ganeti import constants from ganeti import constants
from ganeti import compat from ganeti import compat
from ganeti import objects from ganeti import objects
...@@ -33,6 +38,129 @@ from ganeti.hypervisor import hv_kvm ...@@ -33,6 +38,129 @@ from ganeti.hypervisor import hv_kvm
import testutils import testutils
class QmpStub(threading.Thread):
"""Stub for a QMP endpoint for a KVM instance
"""
_QMP_BANNER_DATA = {"QMP": {"version": {
"package": "",
"qemu": {"micro": 50, "minor": 13, "major": 0},
"capabilities": [],
}}}
_EMPTY_RESPONSE = {"return": []}
def __init__(self, socket_filename, server_responses):
"""Creates a QMP stub
@type socket_filename: string
@param socket_filename: filename of the UNIX socket that will be created
this class and used for the communication
@type server_responses: list
@param server_responses: list of responses that the server sends in response
to whatever it receives
"""
threading.Thread.__init__(self)
self.socket_filename = socket_filename
self.script = server_responses
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.bind(self.socket_filename)
self.socket.listen(1)
def run(self):
# Hypothesis: the messages we receive contain only a complete QMP message
# encoded in JSON.
conn, addr = self.socket.accept()
# Send the banner as the first thing
conn.send(self.encode_string(self._QMP_BANNER_DATA))
# Expect qmp_capabilities and return an empty response
conn.recv(4096)
conn.send(self.encode_string(self._EMPTY_RESPONSE))
while True:
# We ignore the expected message, as the purpose of this object is not
# to verify the correctness of the communication but to act as a
# partner for the SUT (System Under Test, that is QmpConnection)
msg = conn.recv(4096)
if not msg:
break
if not self.script:
break
response = self.script.pop(0)
if isinstance(response, str):
conn.send(response)
elif isinstance(response, list):
for chunk in response:
conn.send(chunk)
else:
raise errors.ProgrammerError("Unknown response type for %s" % response)
conn.close()
def encode_string(self, message):
return (serializer.DumpJson(message, indent=False) +
hv_kvm.QmpConnection._MESSAGE_END_TOKEN)
class TestQmpMessage(testutils.GanetiTestCase):
def testSerialization(self):
test_data = {"execute": "command", "arguments": ["a", "b", "c"]}
message = hv_kvm.QmpMessage(test_data)
for k, v in test_data.items():
self.failUnless(message[k] == v)
rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(str(message))
self.failUnless(rebuilt_message == message)
class TestQmp(testutils.GanetiTestCase):
def testQmp(self):
requests = [
{"execute": "query-kvm", "arguments": []},
{"execute": "eject", "arguments": {"device": "ide1-cd0"}},
{"execute": "query-status", "arguments": []},
{"execute": "query-name", "arguments": []},
]
server_responses = [
# One message, one send()
'{"return": {"enabled": true, "present": true}}\r\n',
# Message sent using multiple send()
['{"retur', 'n": {}}\r\n'],
# Multiple messages sent using one send()
'{"return": [{"name": "quit"}, {"name": "eject"}]}\r\n'
'{"return": {"running": true, "singlestep": false}}\r\n',
]
expected_responses = [
{"return": {"enabled": True, "present": True}},
{"return": {}},
{"return": [{"name": "quit"}, {"name": "eject"}]},
{"return": {"running": True, "singlestep": False}},
]
# Set up the stub
socket_file = tempfile.NamedTemporaryFile()
os.remove(socket_file.name)
qmp_stub = QmpStub(socket_file.name, server_responses)
qmp_stub.start()
# Set up the QMP connection
qmp_connection = hv_kvm.QmpConnection(socket_file.name)
qmp_connection.connect()
# Format the script
for request, expected_response in zip(requests, expected_responses):
response = qmp_connection.Execute(request)
self.failUnless(response == hv_kvm.QmpMessage(expected_response))
class TestConsole(unittest.TestCase): class TestConsole(unittest.TestCase):
def _Test(self, instance, hvparams): def _Test(self, instance, hvparams):
cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, hvparams, {}) cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, hvparams, {})
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment