From 95ab4de904ab6140a62e863c7a8c433c490a7fc1 Mon Sep 17 00:00:00 2001 From: David Knowles <dknowles@google.com> Date: Tue, 16 Mar 2010 13:21:38 -0400 Subject: [PATCH] Adding RAPI client library. Signed-off-by: David Knowles <dknowles@google.com> Reviewed-by: Iustin Pop <iustin@google.com> Signed-off-by: Iustin Pop <iustin@google.com> (modified slightly the unittest to account for missing httplib2 library) --- Makefile.am | 2 + lib/rapi/client.py | 863 ++++++++++++++++++++++++++++ test/ganeti.rapi.client_unittest.py | 446 ++++++++++++++ 3 files changed, 1311 insertions(+) create mode 100644 lib/rapi/client.py create mode 100755 test/ganeti.rapi.client_unittest.py diff --git a/Makefile.am b/Makefile.am index c6e5e86dc..444347da5 100644 --- a/Makefile.am +++ b/Makefile.am @@ -129,6 +129,7 @@ hypervisor_PYTHON = \ rapi_PYTHON = \ lib/rapi/__init__.py \ lib/rapi/baserlib.py \ + lib/rapi/client.py \ lib/rapi/connector.py \ lib/rapi/rlib2.py @@ -333,6 +334,7 @@ python_tests = \ test/ganeti.mcpu_unittest.py \ test/ganeti.objects_unittest.py \ test/ganeti.opcodes_unittest.py \ + test/ganeti.rapi.client_unittest.py \ test/ganeti.rapi.resources_unittest.py \ test/ganeti.serializer_unittest.py \ test/ganeti.ssh_unittest.py \ diff --git a/lib/rapi/client.py b/lib/rapi/client.py new file mode 100644 index 000000000..5a1b555a9 --- /dev/null +++ b/lib/rapi/client.py @@ -0,0 +1,863 @@ +# +# + +# Copyright (C) 2010 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Ganeti RAPI client.""" + +import httplib +import httplib2 +import simplejson +import socket +import urllib +from OpenSSL import SSL +from OpenSSL import crypto + + +HTTP_DELETE = "DELETE" +HTTP_GET = "GET" +HTTP_PUT = "PUT" +HTTP_POST = "POST" +REPLACE_DISK_PRI = "replace_on_primary" +REPLACE_DISK_SECONDARY = "replace_on_secondary" +REPLACE_DISK_CHG = "replace_new_secondary" +REPLACE_DISK_AUTO = "replace_auto" +VALID_REPLACEMENT_MODES = frozenset([ + REPLACE_DISK_PRI, REPLACE_DISK_SECONDARY, REPLACE_DISK_CHG, + REPLACE_DISK_AUTO + ]) +VALID_NODE_ROLES = frozenset([ + "drained", "master", "master-candidate", "offline", "regular" + ]) +VALID_STORAGE_TYPES = frozenset(["file", "lvm-pv", "lvm-vg"]) + + +class Error(Exception): + """Base error class for this module. + + """ + pass + + +class CertificateError(Error): + """Raised when a problem is found with the SSL certificate. + + """ + pass + + +class GanetiApiError(Error): + """Generic error raised from Ganeti API. + + """ + pass + + +class InvalidReplacementMode(Error): + """Raised when an invalid disk replacement mode is attempted. + + """ + pass + + +class InvalidStorageType(Error): + """Raised when an invalid storage type is used. + + """ + pass + + +class InvalidNodeRole(Error): + """Raised when an invalid node role is used. + + """ + pass + + +class GanetiRapiClient(object): + """Ganeti RAPI client. + + """ + + USER_AGENT = "Ganeti RAPI Client" + + def __init__(self, master_hostname, port=5080, username=None, password=None, + ssl_cert=None): + """Constructor. + + @type master_hostname: str + @param master_hostname: the ganeti cluster master to interact with + @type port: int + @param port: the port on which the RAPI is running. (default is 5080) + @type username: str + @param username: the username to connect with + @type password: str + @param password: the password to connect with + @type ssl_cert: str or None + @param ssl_cert: the expected SSL certificate. if None, SSL certificate + will not be verified + + """ + self._master_hostname = master_hostname + self._port = port + if ssl_cert: + _VerifyCertificate(self._master_hostname, self._port, ssl_cert) + + self._http = httplib2.Http() + self._headers = { + "Accept": "text/plain", + "Content-type": "application/x-www-form-urlencoded", + "User-Agent": self.USER_AGENT} + self._version = None + if username and password: + self._http.add_credentials(username, password) + + def _MakeUrl(self, path, query=None, prepend_version=True): + """Constructs the URL to pass to the HTTP client. + + @type path: str + @param path: HTTP URL path + @type query: list of two-tuples + @param query: query arguments to pass to urllib.urlencode + @type prepend_version: bool + @param prepend_version: whether to automatically fetch and prepend the + Ganeti version to the URL path + + @rtype: str + @return: URL path + + """ + if prepend_version: + if not self._version: + self._GetVersionInternal() + path = "/%d%s" % (self._version, path) + + return "https://%(host)s:%(port)d%(path)s?%(query)s" % { + "host": self._master_hostname, + "port": self._port, + "path": path, + "query": urllib.urlencode(query or [])} + + def _SendRequest(self, method, path, query=None, content=None, + prepend_version=True): + """Sends an HTTP request. + + This constructs a full URL, encodes and decodes HTTP bodies, and + handles invalid responses in a pythonic way. + + @type method: str + @param method: HTTP method to use + @type path: str + @param path: HTTP URL path + @type query: list of two-tuples + @param query: query arguments to pass to urllib.urlencode + @type content: str or None + @param content: HTTP body content + @type prepend_version: bool + @param prepend_version: whether to automatically fetch and prepend the + Ganeti version to the URL path + + @rtype: str + @return: JSON-Decoded response + + @raises GanetiApiError: If an invalid response is returned + + """ + if content: + simplejson.JSONEncoder(sort_keys=True).encode(content) + + url = self._MakeUrl(path, query, prepend_version) + resp_headers, resp_content = self._http.request( + url, method, body=content, headers=self._headers) + + if resp_content: + resp_content = simplejson.loads(resp_content) + + # TODO: Are there other status codes that are valid? (redirect?) + if resp_headers.status != 200: + if isinstance(resp_content, dict): + msg = ("%s %s: %s" % + (resp_content["code"], resp_content["message"], + resp_content["explain"])) + else: + msg = resp_content + raise GanetiApiError(msg) + + return resp_content + + def _GetVersionInternal(self): + """Gets the Remote API version running on the cluster. + + @rtype: int + @return: Ganeti version + + """ + self._version = self._SendRequest(HTTP_GET, "/version", + prepend_version=False) + return self._version + + def GetVersion(self): + """Gets the ganeti version running on the cluster. + + @rtype: int + @return: Ganeti version + + """ + if not self._version: + self._GetVersionInternal() + return self._version + + def GetOperatingSystems(self): + """Gets the Operating Systems running in the Ganeti cluster. + + @rtype: list of str + @return: operating systems + + """ + return self._SendRequest(HTTP_GET, "/os") + + def GetInfo(self): + """Gets info about the cluster. + + @rtype: dict + @return: information about the cluster + + """ + return self._SendRequest(HTTP_GET, "/info") + + def GetClusterTags(self): + """Gets the cluster tags. + + @rtype: list of str + @return: cluster tags + + """ + return self._SendRequest(HTTP_GET, "/tags") + + def AddClusterTags(self, tags, dry_run=False): + """Adds tags to the cluster. + + @type tags: list of str + @param tags: tags to add to the cluster + @type dry_run: bool + @param dry_run: whether to perform a dry run + + @rtype: int + @return: job id + + """ + query = [("tag", t) for t in tags] + if dry_run: + query.append(("dry-run", 1)) + + self._SendRequest(HTTP_PUT, "/tags", query) + + def DeleteClusterTags(self, tags, dry_run=False): + """Deletes tags from the cluster. + + @type tags: list of str + @param tags: tags to delete + @type dry_run: bool + @param dry_run: whether to perform a dry run + + """ + query = [("tag", t) for t in tags] + if dry_run: + query.append(("dry-run", 1)) + + self._SendRequest(HTTP_DELETE, "/tags", query) + + def GetInstances(self, bulk=False): + """Gets information about instances on the cluster. + + @type bulk: bool + @param bulk: whether to return all information about all instances + + @rtype: list of dict or list of str + @return: if bulk is True, info about the instances, else a list of instances + + """ + query = [] + if bulk: + query.append(("bulk", 1)) + + instances = self._SendRequest(HTTP_GET, "/instances", query) + if bulk: + return instances + else: + return [i["id"] for i in instances] + + + def GetInstanceInfo(self, instance): + """Gets information about an instance. + + @type instance: str + @param instance: instance whose info to return + + @rtype: dict + @return: info about the instance + + """ + return self._SendRequest(HTTP_GET, "/instances/%s" % instance) + + def CreateInstance(self, dry_run=False): + """Creates a new instance. + + @type dry_run: bool + @param dry_run: whether to perform a dry run + + @rtype: int + @return: job id + + """ + # TODO: Pass arguments needed to actually create an instance. + query = [] + if dry_run: + query.append(("dry-run", 1)) + + return self._SendRequest(HTTP_POST, "/instances", query) + + def DeleteInstance(self, instance, dry_run=False): + """Deletes an instance. + + @type instance: str + @param instance: the instance to delete + + """ + query = [] + if dry_run: + query.append(("dry-run", 1)) + + self._SendRequest(HTTP_DELETE, "/instances/%s" % instance, query) + + def GetInstanceTags(self, instance): + """Gets tags for an instance. + + @type instance: str + @param instance: instance whose tags to return + + @rtype: list of str + @return: tags for the instance + + """ + return self._SendRequest(HTTP_GET, "/instances/%s/tags" % instance) + + def AddInstanceTags(self, instance, tags, dry_run=False): + """Adds tags to an instance. + + @type instance: str + @param instance: instance to add tags to + @type tags: list of str + @param tags: tags to add to the instance + @type dry_run: bool + @param dry_run: whether to perform a dry run + + @rtype: int + @return: job id + + """ + query = [("tag", t) for t in tags] + if dry_run: + query.append(("dry-run", 1)) + + self._SendRequest(HTTP_PUT, "/instances/%s/tags" % instance, query) + + def DeleteInstanceTags(self, instance, tags, dry_run=False): + """Deletes tags from an instance. + + @type instance: str + @param instance: instance to delete tags from + @type tags: list of str + @param tags: tags to delete + @type dry_run: bool + @param dry_run: whether to perform a dry run + + """ + query = [("tag", t) for t in tags] + if dry_run: + query.append(("dry-run", 1)) + + self._SendRequest(HTTP_DELETE, "/instances/%s/tags" % instance, query) + + def RebootInstance(self, instance, reboot_type=None, ignore_secondaries=None, + dry_run=False): + """Reboots an instance. + + @type instance: str + @param instance: instance to rebot + @type reboot_type: str + @param reboot_type: one of: hard, soft, full + @type ignore_secondaries: bool + @param ignore_secondaries: if True, ignores errors for the secondary node + while re-assembling disks (in hard-reboot mode only) + @type dry_run: bool + @param dry_run: whether to perform a dry run + + """ + query = [] + if reboot_type: + query.append(("type", reboot_type)) + if ignore_secondaries is not None: + query.append(("ignore_secondaries", ignore_secondaries)) + if dry_run: + query.append(("dry-run", 1)) + + self._SendRequest(HTTP_POST, "/instances/%s/reboot" % instance, query) + + def ShutdownInstance(self, instance, dry_run=False): + """Shuts down an instance. + + @type instance: str + @param instance: the instance to shut down + @type dry_run: bool + @param dry_run: whether to perform a dry run + + """ + query = [] + if dry_run: + query.append(("dry-run", 1)) + + self._SendRequest(HTTP_PUT, "/instances/%s/shutdown" % instance, query) + + def StartupInstance(self, instance, dry_run=False): + """Starts up an instance. + + @type instance: str + @param instance: the instance to start up + @type dry_run: bool + @param dry_run: whether to perform a dry run + + """ + query = [] + if dry_run: + query.append(("dry-run", 1)) + + self._SendRequest(HTTP_PUT, "/instances/%s/startup" % instance, query) + + def ReinstallInstance(self, instance, os, no_startup=False): + """Reinstalls an instance. + + @type instance: str + @param instance: the instance to reinstall + @type os: str + @param os: the os to reinstall + @type no_startup: bool + @param no_startup: whether to start the instance automatically + + """ + query = [("os", os)] + if no_startup: + query.append(("nostartup", 1)) + self._SendRequest(HTTP_POST, "/instances/%s/reinstall" % instance, query) + + def ReplaceInstanceDisks(self, instance, disks, mode="replace_auto", + remote_node=None, iallocator="hail", dry_run=False): + """Replaces disks on an instance. + + @type instance: str + @param instance: instance whose disks to replace + @type disks: list of str + @param disks: disks to replace + @type mode: str + @param mode: replacement mode to use. defaults to replace_auto + @type remote_node: str or None + @param remote_node: new secondary node to use (for use with + replace_new_secondary mdoe) + @type iallocator: str or None + @param iallocator: instance allocator plugin to use (for use with + replace_auto mdoe). default is hail + @type dry_run: bool + @param dry_run: whether to perform a dry run + + @rtype: int + @return: job id + + @raises InvalidReplacementMode: If an invalid disk replacement mode is given + @raises GanetiApiError: If no secondary node is given with a non-auto + replacement mode is requested. + + """ + if mode not in VALID_REPLACEMENT_MODES: + raise InvalidReplacementMode("%s is not a valid disk replacement mode.", + mode) + + query = [("mode", mode), ("disks", ",".join(disks))] + + if mode is REPLACE_DISK_AUTO: + query.append(("iallocator", iallocator)) + elif mode is REPLACE_DISK_SECONDARY: + if remote_node is None: + raise GanetiApiError("You must supply a new secondary node.") + query.append(("remote_node", remote_node)) + + if dry_run: + query.append(("dry-run", 1)) + + return self._SendRequest(HTTP_POST, + "/instances/%s/replace-disks" % instance, query) + + def GetJobs(self): + """Gets all jobs for the cluster. + + @rtype: list of int + @return: job ids for the cluster + + """ + return [int(j["id"]) for j in self._SendRequest(HTTP_GET, "/jobs")] + + def GetJobStatus(self, job_id): + """Gets the status of a job. + + @type job_id: int + @param job_id: job id whose status to query + + @rtype: dict + @return: job status + + """ + return self._SendRequest(HTTP_GET, "/jobs/%d" % job_id) + + def DeleteJob(self, job_id, dry_run=False): + """Deletes a job. + + @type job_id: int + @param job_id: id of the job to delete + @type dry_run: bool + @param dry_run: whether to perform a dry run + + """ + query = [] + if dry_run: + query.append(("dry-run", 1)) + + self._SendRequest(HTTP_DELETE, "/jobs/%d" % job_id, query) + + def GetNodes(self, bulk=False): + """Gets all nodes in the cluster. + + @type bulk: bool + @param bulk: whether to return all information about all instances + + @rtype: list of dict or str + @return: if bulk is true, info about nodes in the cluster, + else list of nodes in the cluster + + """ + query = [] + if bulk: + query.append(("bulk", 1)) + + nodes = self._SendRequest(HTTP_GET, "/nodes", query) + if bulk: + return nodes + else: + return [n["id"] for n in nodes] + + def GetNodeInfo(self, node): + """Gets information about a node. + + @type node: str + @param node: node whose info to return + + @rtype: dict + @return: info about the node + + """ + return self._SendRequest(HTTP_GET, "/nodes/%s" % node) + + def EvacuateNode(self, node, iallocator=None, remote_node=None, + dry_run=False): + """Evacuates instances from a Ganeti node. + + @type node: str + @param node: node to evacuate + @type iallocator: str or None + @param iallocator: instance allocator to use + @type remote_node: str + @param remote_node: node to evaucate to + @type dry_run: bool + @param dry_run: whether to perform a dry run + + @rtype: int + @return: job id + + @raises GanetiApiError: if an iallocator and remote_node are both specified + + """ + query = [] + if iallocator and remote_node: + raise GanetiApiError("Only one of iallocator or remote_node can be used.") + + if iallocator: + query.append(("iallocator", iallocator)) + if remote_node: + query.append(("remote_node", remote_node)) + if dry_run: + query.append(("dry-run", 1)) + + return self._SendRequest(HTTP_POST, "/nodes/%s/evacuate" % node, query) + + def MigrateNode(self, node, live=True, dry_run=False): + """Migrates all primary instances from a node. + + @type node: str + @param node: node to migrate + @type live: bool + @param live: whether to use live migration + @type dry_run: bool + @param dry_run: whether to perform a dry run + + @rtype: int + @return: job id + + """ + query = [] + if live: + query.append(("live", 1)) + if dry_run: + query.append(("dry-run", 1)) + + return self._SendRequest(HTTP_POST, "/nodes/%s/migrate" % node, query) + + def GetNodeRole(self, node): + """Gets the current role for a node. + + @type node: str + @param node: node whose role to return + + @rtype: str + @return: the current role for a node + + """ + return self._SendRequest(HTTP_GET, "/nodes/%s/role" % node) + + def SetNodeRole(self, node, role, force=False): + """Sets the role for a node. + + @type node: str + @param node: the node whose role to set + @type role: str + @param role: the role to set for the node + @type force: bool + @param force: whether to force the role change + + @rtype: int + @return: job id + + @raise InvalidNodeRole: If an invalid node role is specified + + """ + if role not in VALID_NODE_ROLES: + raise InvalidNodeRole("%s is not a valid node role.", role) + + query = [("force", force)] + return self._SendRequest(HTTP_PUT, "/nodes/%s/role" % node, query, + content=role) + + def GetNodeStorageUnits(self, node, storage_type, output_fields): + """Gets the storage units for a node. + + @type node: str + @param node: the node whose storage units to return + @type storage_type: str + @param storage_type: storage type whose units to return + @type output_fields: str + @param output_fields: storage type fields to return + + @rtype: int + @return: job id where results can be retrieved + + @raise InvalidStorageType: If an invalid storage type is specified + + """ + # TODO: Add default for storage_type & output_fields + if storage_type not in VALID_STORAGE_TYPES: + raise InvalidStorageType("%s is an invalid storage type.", storage_type) + + query = [("storage_type", storage_type), ("output_fields", output_fields)] + return self._SendRequest(HTTP_GET, "/nodes/%s/storage" % node, query) + + def ModifyNodeStorageUnits(self, node, storage_type, name, allocatable=True): + """Modifies parameters of storage units on the node. + + @type node: str + @param node: node whose storage units to modify + @type storage_type: str + @param storage_type: storage type whose units to modify + @type name: str + @param name: name of the storage unit + @type allocatable: bool + @param allocatable: TODO: Document me + + @rtype: int + @return: job id + + @raise InvalidStorageType: If an invalid storage type is specified + + """ + if storage_type not in VALID_STORAGE_TYPES: + raise InvalidStorageType("%s is an invalid storage type.", storage_type) + + query = [ + ("storage_type", storage_type), ("name", name), + ("allocatable", allocatable) + ] + return self._SendRequest(HTTP_PUT, "/nodes/%s/storage/modify" % node, query) + + def RepairNodeStorageUnits(self, node, storage_type, name): + """Repairs a storage unit on the node. + + @type node: str + @param node: node whose storage units to repair + @type storage_type: str + @param storage_type: storage type to repair + @type name: str + @param name: name of the storage unit to repair + + @rtype: int + @return: job id + + @raise InvalidStorageType: If an invalid storage type is specified + + """ + if storage_type not in VALID_STORAGE_TYPES: + raise InvalidStorageType("%s is an invalid storage type.", storage_type) + + query = [("storage_type", storage_type), ("name", name)] + return self._SendRequest(HTTP_PUT, "/nodes/%s/storage/repair" % node, query) + + def GetNodeTags(self, node): + """Gets the tags for a node. + + @type node: str + @param node: node whose tags to return + + @rtype: list of str + @return: tags for the node + + """ + return self._SendRequest(HTTP_GET, "/nodes/%s/tags" % node) + + def AddNodeTags(self, node, tags, dry_run=False): + """Adds tags to a node. + + @type node: str + @param node: node to add tags to + @type tags: list of str + @param tags: tags to add to the node + @type dry_run: bool + @param dry_run: whether to perform a dry run + + @rtype: int + @return: job id + + """ + query = [("tag", t) for t in tags] + if dry_run: + query.append(("dry-run", 1)) + + return self._SendRequest(HTTP_PUT, "/nodes/%s/tags" % node, query, + content=tags) + + def DeleteNodeTags(self, node, tags, dry_run=False): + """Delete tags from a node. + + @type node: str + @param node: node to remove tags from + @type tags: list of str + @param tags: tags to remove from the node + @type dry_run: bool + @param dry_run: whether to perform a dry run + + @rtype: int + @return: job id + + """ + query = [("tag", t) for t in tags] + if dry_run: + query.append(("dry-run", 1)) + + return self._SendRequest(HTTP_DELETE, "/nodes/%s/tags" % node, query) + + +class HTTPSConnectionOpenSSL(httplib.HTTPSConnection): + """HTTPS Connection handler that verifies the SSL certificate. + + """ + + # pylint: disable-msg=W0142 + def __init__(self, *args, **kwargs): + """Constructor. + + """ + httplib.HTTPSConnection.__init__(self, *args, **kwargs) + + self._ssl_cert = None + if self.cert_file: + f = open(self.cert_file, "r") + self._ssl_cert = crypto.load_certificate(crypto.FILETYPE_PEM, f.read()) + f.close() + + # pylint: disable-msg=W0613 + def _VerifySSLCertCallback(self, conn, cert, errnum, errdepth, ok): + """Verifies the SSL certificate provided by the peer. + + """ + return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and + self._ssl_cert.digest("md5") == cert.digest("md5")) + + def connect(self): + """Connect to the server specified when the object was created. + + This ensures that SSL certificates are verified. + + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ctx = SSL.Context(SSL.SSLv23_METHOD) + ctx.set_options(SSL.OP_NO_SSLv2) + ctx.use_certificate(self._ssl_cert) + ctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, + self._VerifySSLCertCallback) + + ssl = SSL.Connection(ctx, sock) + ssl.connect((self.host, self.port)) + self.sock = httplib.FakeSocket(sock, ssl) + + +def _VerifyCertificate(hostname, port, cert_file): + """Verifies the SSL certificate for the given host/port. + + @type hostname: str + @param hostname: the ganeti cluster master whose certificate to verify + @type port: int + @param port: the port on which the RAPI is running + @type cert_file: str + @param cert_file: filename of the expected SSL certificate + + @raises CertificateError: If an invalid SSL certificate is found + + """ + https = HTTPSConnectionOpenSSL(hostname, port, cert_file=cert_file) + try: + try: + https.request(HTTP_GET, "/version") + except (crypto.Error, SSL.Error): + raise CertificateError("Invalid SSL certificate.") + finally: + https.close() diff --git a/test/ganeti.rapi.client_unittest.py b/test/ganeti.rapi.client_unittest.py new file mode 100755 index 000000000..ed0d7779c --- /dev/null +++ b/test/ganeti.rapi.client_unittest.py @@ -0,0 +1,446 @@ +#!/usr/bin/python +# + +# Copyright (C) 2010 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Script for unittesting the RAPI client module""" + + +try: + import httplib2 + BaseHttp = httplib2.Http + from ganeti.rapi import client +except ImportError: + httplib2 = None + BaseHttp = object + +import re +import unittest +import warnings + +from ganeti import http + +from ganeti.rapi import connector +from ganeti.rapi import rlib2 + +import testutils + + +_URI_RE = re.compile(r"https://(?P<host>.*):(?P<port>\d+)(?P<path>/.*)") + + +def _GetPathFromUri(uri): + """Gets the path and query from a URI. + + """ + match = _URI_RE.match(uri) + if match: + return match.groupdict()["path"] + else: + return None + + +class HttpResponseMock(dict): + """Dumb mock of httplib2.Response. + + """ + + def __init__(self, status): + self.status = status + self['status'] = status + + +class HttpMock(BaseHttp): + """Mock for httplib.Http. + + """ + + def __init__(self, rapi): + self._rapi = rapi + self._last_request = None + + last_request_url = property(lambda self: self._last_request[0]) + last_request_method = property(lambda self: self._last_request[1]) + last_request_body = property(lambda self: self._last_request[2]) + + def request(self, url, method, body, headers): + self._last_request = (url, method, body) + code, resp_body = self._rapi.FetchResponse(_GetPathFromUri(url), method) + return HttpResponseMock(code), resp_body + + +class RapiMock(object): + + def __init__(self): + self._mapper = connector.Mapper() + self._responses = [] + self._last_handler = None + + def AddResponse(self, response): + self._responses.insert(0, response) + + def PopResponse(self): + if len(self._responses) > 0: + return self._responses.pop() + else: + return None + + def GetLastHandler(self): + return self._last_handler + + def FetchResponse(self, path, method): + code = 200 + response = None + + try: + HandlerClass, items, args = self._mapper.getController(path) + self._last_handler = HandlerClass(items, args, None) + if not hasattr(self._last_handler, method.upper()): + code = 400 + response = "Bad request" + except http.HttpException, ex: + code = ex.code + response = ex.message + + if not response: + response = self.PopResponse() + + return code, response + + +class RapiMockTest(unittest.TestCase): + + def test(self): + rapi = RapiMock() + path = "/version" + self.assertEqual((404, None), rapi.FetchResponse("/foo", "GET")) + self.assertEqual((400, "Bad request"), + rapi.FetchResponse("/version", "POST")) + rapi.AddResponse("2") + code, response = rapi.FetchResponse("/version", "GET") + self.assertEqual(200, code) + self.assertEqual("2", response) + self.failUnless(isinstance(rapi.GetLastHandler(), rlib2.R_version)) + + +class GanetiRapiClientTests(unittest.TestCase): + """Tests for remote API client. + + """ + + def setUp(self): + # Monkey-patch a fake VerifyCertificate function + self._verify_certificate = client._VerifyCertificate + client._VerifyCertificate = lambda x, y, z: True + + self.rapi = RapiMock() + self.http = HttpMock(self.rapi) + self.client = client.GanetiRapiClient('master.foo.com') + self.client._http = self.http + # Hard-code the version for easier testing. + self.client._version = 2 + + def tearDown(self): + # Un-do the monkey-patch + client._VerifyCertificate = self._verify_certificate + + def assertHandler(self, handler_cls): + self.failUnless(isinstance(self.rapi.GetLastHandler(), handler_cls)) + + def assertQuery(self, key, value): + self.assertEqual(value, self.rapi.GetLastHandler().queryargs.get(key, None)) + + def assertItems(self, items): + self.assertEqual(items, self.rapi.GetLastHandler().items) + + def assertBulk(self): + self.assertTrue(self.rapi.GetLastHandler().useBulk()) + + def assertDryRun(self): + self.assertTrue(self.rapi.GetLastHandler().dryRun()) + + def testGetVersion(self): + self.client._version = None + self.rapi.AddResponse("2") + self.assertEqual(2, self.client.GetVersion()) + self.assertHandler(rlib2.R_version) + + def testGetOperatingSystems(self): + self.rapi.AddResponse("[\"beos\"]") + self.assertEqual(["beos"], self.client.GetOperatingSystems()) + self.assertHandler(rlib2.R_2_os) + + def testGetClusterTags(self): + self.rapi.AddResponse("[\"tag\"]") + self.assertEqual(["tag"], self.client.GetClusterTags()) + self.assertHandler(rlib2.R_2_tags) + + def testAddClusterTags(self): + self.client.AddClusterTags(["awesome"], dry_run=True) + self.assertHandler(rlib2.R_2_tags) + self.assertDryRun() + self.assertQuery("tag", ["awesome"]) + + def testDeleteClusterTags(self): + self.client.DeleteClusterTags(["awesome"], dry_run=True) + self.assertHandler(rlib2.R_2_tags) + self.assertDryRun() + self.assertQuery("tag", ["awesome"]) + + def testGetInfo(self): + self.rapi.AddResponse("{}") + self.assertEqual({}, self.client.GetInfo()) + self.assertHandler(rlib2.R_2_info) + + def testGetInstances(self): + self.rapi.AddResponse("[]") + self.assertEqual([], self.client.GetInstances(bulk=True)) + self.assertHandler(rlib2.R_2_instances) + self.assertBulk() + + def testGetInstanceInfo(self): + self.rapi.AddResponse("[]") + self.assertEqual([], self.client.GetInstanceInfo("instance")) + self.assertHandler(rlib2.R_2_instances_name) + self.assertItems(["instance"]) + + def testCreateInstance(self): + self.rapi.AddResponse("1234") + self.assertEqual(1234, self.client.CreateInstance(dry_run=True)) + self.assertHandler(rlib2.R_2_instances) + self.assertDryRun() + + def testDeleteInstance(self): + self.client.DeleteInstance("instance", dry_run=True) + self.assertHandler(rlib2.R_2_instances_name) + self.assertItems(["instance"]) + self.assertDryRun() + + def testGetInstanceTags(self): + self.rapi.AddResponse("[]") + self.assertEqual([], self.client.GetInstanceTags("fooinstance")) + self.assertHandler(rlib2.R_2_instances_name_tags) + self.assertItems(["fooinstance"]) + + def testAddInstanceTags(self): + self.client.AddInstanceTags("fooinstance", ["awesome"], dry_run=True) + self.assertHandler(rlib2.R_2_instances_name_tags) + self.assertItems(["fooinstance"]) + self.assertDryRun() + self.assertQuery("tag", ["awesome"]) + + def testDeleteInstanceTags(self): + self.client.DeleteInstanceTags("foo", ["awesome"], dry_run=True) + self.assertHandler(rlib2.R_2_instances_name_tags) + self.assertItems(["foo"]) + self.assertDryRun() + self.assertQuery("tag", ["awesome"]) + + def testRebootInstance(self): + self.client.RebootInstance("i-bar", reboot_type="hard", + ignore_secondaries=True, dry_run=True) + self.assertHandler(rlib2.R_2_instances_name_reboot) + self.assertItems(["i-bar"]) + self.assertDryRun() + self.assertQuery("type", ["hard"]) + self.assertQuery("ignore_secondaries", ["True"]) + + def testShutdownInstance(self): + self.client.ShutdownInstance("foo-instance", dry_run=True) + self.assertHandler(rlib2.R_2_instances_name_shutdown) + self.assertItems(["foo-instance"]) + self.assertDryRun() + + def testStartupInstance(self): + self.client.StartupInstance("bar-instance", dry_run=True) + self.assertHandler(rlib2.R_2_instances_name_startup) + self.assertItems(["bar-instance"]) + self.assertDryRun() + + def testReinstallInstance(self): + self.client.ReinstallInstance("baz-instance", "DOS", no_startup=True) + self.assertHandler(rlib2.R_2_instances_name_reinstall) + self.assertItems(["baz-instance"]) + self.assertQuery("os", ["DOS"]) + self.assertQuery("nostartup", ["1"]) + + def testReplaceInstanceDisks(self): + self.rapi.AddResponse("999") + job_id = self.client.ReplaceInstanceDisks("instance-name", + ["hda", "hdc"], dry_run=True) + self.assertEqual(999, job_id) + self.assertHandler(rlib2.R_2_instances_name_replace_disks) + self.assertItems(["instance-name"]) + self.assertQuery("disks", ["hda,hdc"]) + self.assertQuery("mode", ["replace_auto"]) + self.assertQuery("iallocator", ["hail"]) + self.assertDryRun() + + self.assertRaises(client.InvalidReplacementMode, + self.client.ReplaceInstanceDisks, + "instance_a", ["hda"], mode="invalid_mode") + self.assertRaises(client.GanetiApiError, + self.client.ReplaceInstanceDisks, + "instance-foo", ["hda"], mode="replace_on_secondary") + + self.rapi.AddResponse("1000") + job_id = self.client.ReplaceInstanceDisks("instance-bar", + ["hda"], mode="replace_on_secondary", remote_node="foo-node", + dry_run=True) + self.assertEqual(1000, job_id) + self.assertItems(["instance-bar"]) + self.assertQuery("disks", ["hda"]) + self.assertQuery("remote_node", ["foo-node"]) + self.assertDryRun() + + def testGetJobs(self): + self.rapi.AddResponse("[ { \"id\": \"123\", \"uri\": \"\/2\/jobs\/123\" }," + " { \"id\": \"124\", \"uri\": \"\2\/jobs\/124\" } ]") + self.assertEqual([123, 124], self.client.GetJobs()) + self.assertHandler(rlib2.R_2_jobs) + + def testGetJobStatus(self): + self.rapi.AddResponse("{\"foo\": \"bar\"}") + self.assertEqual({"foo": "bar"}, self.client.GetJobStatus(1234)) + self.assertHandler(rlib2.R_2_jobs_id) + self.assertItems(["1234"]) + + def testDeleteJob(self): + self.client.DeleteJob(999, dry_run=True) + self.assertHandler(rlib2.R_2_jobs_id) + self.assertItems(["999"]) + self.assertDryRun() + + def testGetNodes(self): + self.rapi.AddResponse("[ { \"id\": \"node1\", \"uri\": \"uri1\" }," + " { \"id\": \"node2\", \"uri\": \"uri2\" } ]") + self.assertEqual(["node1", "node2"], self.client.GetNodes()) + self.assertHandler(rlib2.R_2_nodes) + + self.rapi.AddResponse("[ { \"id\": \"node1\", \"uri\": \"uri1\" }," + " { \"id\": \"node2\", \"uri\": \"uri2\" } ]") + self.assertEqual([{"id": "node1", "uri": "uri1"}, + {"id": "node2", "uri": "uri2"}], + self.client.GetNodes(bulk=True)) + self.assertHandler(rlib2.R_2_nodes) + self.assertBulk() + + def testGetNodeInfo(self): + self.rapi.AddResponse("{}") + self.assertEqual({}, self.client.GetNodeInfo("node-foo")) + self.assertHandler(rlib2.R_2_nodes_name) + self.assertItems(["node-foo"]) + + def testEvacuateNode(self): + self.rapi.AddResponse("9876") + job_id = self.client.EvacuateNode("node-1", remote_node="node-2") + self.assertEqual(9876, job_id) + self.assertHandler(rlib2.R_2_nodes_name_evacuate) + self.assertItems(["node-1"]) + self.assertQuery("remote_node", ["node-2"]) + + self.rapi.AddResponse("8888") + job_id = self.client.EvacuateNode("node-3", iallocator="hail", dry_run=True) + self.assertEqual(8888, job_id) + self.assertItems(["node-3"]) + self.assertQuery("iallocator", ["hail"]) + self.assertDryRun() + + self.assertRaises(client.GanetiApiError, + self.client.EvacuateNode, + "node-4", iallocator="hail", remote_node="node-5") + + def testMigrateNode(self): + self.rapi.AddResponse("1111") + self.assertEqual(1111, self.client.MigrateNode("node-a", dry_run=True)) + self.assertHandler(rlib2.R_2_nodes_name_migrate) + self.assertItems(["node-a"]) + self.assertQuery("live", ["1"]) + self.assertDryRun() + + def testGetNodeRole(self): + self.rapi.AddResponse("\"master\"") + self.assertEqual("master", self.client.GetNodeRole("node-a")) + self.assertHandler(rlib2.R_2_nodes_name_role) + self.assertItems(["node-a"]) + + def testSetNodeRole(self): + self.rapi.AddResponse("789") + self.assertEqual(789, + self.client.SetNodeRole("node-foo", "master-candidate", force=True)) + self.assertHandler(rlib2.R_2_nodes_name_role) + self.assertItems(["node-foo"]) + self.assertQuery("force", ["True"]) + self.assertEqual("master-candidate", self.http.last_request_body) + + self.assertRaises(client.InvalidNodeRole, + self.client.SetNodeRole, "node-bar", "fake-role") + + def testGetNodeStorageUnits(self): + self.rapi.AddResponse("42") + self.assertEqual(42, + self.client.GetNodeStorageUnits("node-x", "lvm-pv", "fields")) + self.assertHandler(rlib2.R_2_nodes_name_storage) + self.assertItems(["node-x"]) + self.assertQuery("storage_type", ["lvm-pv"]) + self.assertQuery("output_fields", ["fields"]) + + self.assertRaises(client.InvalidStorageType, + self.client.GetNodeStorageUnits, + "node-y", "floppy-disk", "fields") + + def testModifyNodeStorageUnits(self): + self.rapi.AddResponse("14") + self.assertEqual(14, + self.client.ModifyNodeStorageUnits("node-z", "lvm-pv", "hda")) + self.assertHandler(rlib2.R_2_nodes_name_storage_modify) + self.assertItems(["node-z"]) + self.assertQuery("storage_type", ["lvm-pv"]) + self.assertQuery("name", ["hda"]) + + self.assertRaises(client.InvalidStorageType, + self.client.ModifyNodeStorageUnits, + "node-n", "floppy-disk", "hdc") + + def testGetNodeTags(self): + self.rapi.AddResponse("[\"fry\", \"bender\"]") + self.assertEqual(["fry", "bender"], self.client.GetNodeTags("node-k")) + self.assertHandler(rlib2.R_2_nodes_name_tags) + self.assertItems(["node-k"]) + + def testAddNodeTags(self): + self.client.AddNodeTags("node-v", ["awesome"], dry_run=True) + self.assertHandler(rlib2.R_2_nodes_name_tags) + self.assertItems(["node-v"]) + self.assertDryRun() + self.assertQuery("tag", ["awesome"]) + + def testDeleteNodeTags(self): + self.client.DeleteNodeTags("node-w", ["awesome"], dry_run=True) + self.assertHandler(rlib2.R_2_nodes_name_tags) + self.assertItems(["node-w"]) + self.assertDryRun() + self.assertQuery("tag", ["awesome"]) + + +if __name__ == '__main__': + if httplib2 is None: + warnings.warn("These tests require the httplib2 library") + else: + testutils.GanetiTestProgram() -- GitLab