Skip to content
Snippets Groups Projects
  • Michael Hanselmann's avatar
    RAPI client: Fix SSL error reporting for real · 0d9bc5d2
    Michael Hanselmann authored
    
    My previous patch, commit 857705e8, helped in one particular
    situation where the exception didn't contain any arguments
    (pyOpenSSL reporting a WantReadError, which shouldn't occur with a
    blocking socket anyway). With this patch, more common and uncommon
    errors should be easy to recognize.
    
    SSL errors without any of these patches:
    “ganeti.rapi.client.CertificateError: SSL issue:
    [('SSL routines', 'SSL3_GET_SERVER_CERTIFICATE',
    'certificate verify failed')]”
    
    SSL errors with both patches:
    “ganeti.rapi.client.CertificateError: SSL issue:
    [('SSL routines', 'SSL3_GET_SERVER_CERTIFICATE',
    'certificate verify failed')] (<OpenSSL.SSL.Error
    instance at 0x…>)”
    
    WantReadError without any of these two patches:
    “ganeti.rapi.client.CertificateError: SSL issue: ”
    
    WantReadError with both patches:
    “ganeti.rapi.client.CertificateError: SSL issue:
    (<OpenSSL.SSL.WantReadError instance at 0x…>)”
    
    Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
    Reviewed-by: default avatarIustin Pop <iustin@google.com>
    0d9bc5d2
client.py 29.52 KiB
#
#

# Copyright (C) 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Ganeti RAPI client."""

import httplib
import urllib2
import logging
import simplejson
import socket
import urllib
import OpenSSL
import distutils.version


GANETI_RAPI_PORT = 5080
GANETI_RAPI_VERSION = 2

HTTP_DELETE = "DELETE"
HTTP_GET = "GET"
HTTP_PUT = "PUT"
HTTP_POST = "POST"
HTTP_OK = 200
HTTP_APP_JSON = "application/json"

REPLACE_DISK_PRI = "replace_on_primary"
REPLACE_DISK_SECONDARY = "replace_on_secondary"
REPLACE_DISK_CHG = "replace_new_secondary"
REPLACE_DISK_AUTO = "replace_auto"

NODE_ROLE_DRAINED = "drained"
NODE_ROLE_MASTER_CANDIATE = "master-candidate"
NODE_ROLE_MASTER = "master"
NODE_ROLE_OFFLINE = "offline"
NODE_ROLE_REGULAR = "regular"


class Error(Exception):
  """Base error class for this module.

  """
  pass


class CertificateError(Error):
  """Raised when a problem is found with the SSL certificate.

  """
  pass


class GanetiApiError(Error):
  """Generic error raised from Ganeti API.

  """
  def __init__(self, msg, code=None):
    Error.__init__(self, msg)
    self.code = code


def FormatX509Name(x509_name):
  """Formats an X509 name.

  @type x509_name: OpenSSL.crypto.X509Name

  """
  try:
    # Only supported in pyOpenSSL 0.7 and above
    get_components_fn = x509_name.get_components
  except AttributeError:
    return repr(x509_name)
  else:
    return "".join("/%s=%s" % (name, value)
                   for name, value in get_components_fn())


class CertAuthorityVerify:
  """Certificate verificator for SSL context.

  Configures SSL context to verify server's certificate.

  """
  _CAPATH_MINVERSION = "0.9"
  _DEFVFYPATHS_MINVERSION = "0.9"

  _PYOPENSSL_VERSION = OpenSSL.__version__
  _PARSED_PYOPENSSL_VERSION = distutils.version.LooseVersion(_PYOPENSSL_VERSION)

  _SUPPORT_CAPATH = (_PARSED_PYOPENSSL_VERSION >= _CAPATH_MINVERSION)
  _SUPPORT_DEFVFYPATHS = (_PARSED_PYOPENSSL_VERSION >= _DEFVFYPATHS_MINVERSION)

  def __init__(self, cafile=None, capath=None, use_default_verify_paths=False):
    """Initializes this class.

    @type cafile: string
    @param cafile: In which file we can find the certificates
    @type capath: string
    @param capath: In which directory we can find the certificates
    @type use_default_verify_paths: bool
    @param use_default_verify_paths: Whether the platform provided CA
                                     certificates are to be used for
                                     verification purposes

    """
    self._cafile = cafile
    self._capath = capath
    self._use_default_verify_paths = use_default_verify_paths

    if self._capath is not None and not self._SUPPORT_CAPATH:
      raise Error(("PyOpenSSL %s has no support for a CA directory,"
                   " version %s or above is required") %
                  (self._PYOPENSSL_VERSION, self._CAPATH_MINVERSION))

    if self._use_default_verify_paths and not self._SUPPORT_DEFVFYPATHS:
      raise Error(("PyOpenSSL %s has no support for using default verification"
                   " paths, version %s or above is required") %
                  (self._PYOPENSSL_VERSION, self._DEFVFYPATHS_MINVERSION))

  @staticmethod
  def _VerifySslCertCb(logger, _, cert, errnum, errdepth, ok):
    """Callback for SSL certificate verification.

    @param logger: Logging object

    """
    if ok:
      log_fn = logger.debug
    else:
      log_fn = logger.error

    log_fn("Verifying SSL certificate at depth %s, subject '%s', issuer '%s'",
           errdepth, FormatX509Name(cert.get_subject()),
           FormatX509Name(cert.get_issuer()))

    if not ok:
      try:
        # Only supported in pyOpenSSL 0.7 and above
        # pylint: disable-msg=E1101
        fn = OpenSSL.crypto.X509_verify_cert_error_string
      except AttributeError:
        errmsg = ""
      else:
        errmsg = ":%s" % fn(errnum)

      logger.error("verify error:num=%s%s", errnum, errmsg)

    return ok

  def __call__(self, ctx, logger):
    """Configures an SSL context to verify certificates.

    @type ctx: OpenSSL.SSL.Context
    @param ctx: SSL context

    """
    if self._use_default_verify_paths:
      ctx.set_default_verify_paths()

    if self._cafile or self._capath:
      if self._SUPPORT_CAPATH:
        ctx.load_verify_locations(self._cafile, self._capath)
      else:
        ctx.load_verify_locations(self._cafile)

    ctx.set_verify(OpenSSL.SSL.VERIFY_PEER,
                   lambda conn, cert, errnum, errdepth, ok: \
                     self._VerifySslCertCb(logger, conn, cert,
                                           errnum, errdepth, ok))


class _HTTPSConnectionOpenSSL(httplib.HTTPSConnection):
  """HTTPS Connection handler that verifies the SSL certificate.

  """
  def __init__(self, *args, **kwargs):
    """Initializes this class.

    """
    httplib.HTTPSConnection.__init__(self, *args, **kwargs)
    self._logger = None
    self._config_ssl_verification = None

  def Setup(self, logger, config_ssl_verification):
    """Sets the SSL verification config function.

    @param logger: Logging object
    @type config_ssl_verification: callable

    """
    assert self._logger is None
    assert self._config_ssl_verification is None

    self._logger = logger
    self._config_ssl_verification = config_ssl_verification

  def connect(self):
    """Connect to the server specified when the object was created.

    This ensures that SSL certificates are verified.

    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)

    if self._config_ssl_verification:
      self._config_ssl_verification(ctx, self._logger)

    ssl = OpenSSL.SSL.Connection(ctx, sock)
    ssl.connect((self.host, self.port))

    self.sock = httplib.FakeSocket(sock, ssl)


class _HTTPSHandler(urllib2.HTTPSHandler):
  def __init__(self, logger, config_ssl_verification):
    """Initializes this class.

    @param logger: Logging object
    @type config_ssl_verification: callable
    @param config_ssl_verification: Function to configure SSL context for
                                    certificate verification

    """
    urllib2.HTTPSHandler.__init__(self)
    self._logger = logger
    self._config_ssl_verification = config_ssl_verification

  def _CreateHttpsConnection(self, *args, **kwargs):
    """Wrapper around L{_HTTPSConnectionOpenSSL} to add SSL verification.

    This wrapper is necessary provide a compatible API to urllib2.

    """
    conn = _HTTPSConnectionOpenSSL(*args, **kwargs)
    conn.Setup(self._logger, self._config_ssl_verification)
    return conn

  def https_open(self, req):
    """Creates HTTPS connection.

    Called by urllib2.

    """
    return self.do_open(self._CreateHttpsConnection, req)


class _RapiRequest(urllib2.Request):
  def __init__(self, method, url, headers, data):
    """Initializes this class.

    """
    urllib2.Request.__init__(self, url, data=data, headers=headers)
    self._method = method

  def get_method(self):
    """Returns the HTTP request method.

    """
    return self._method


class GanetiRapiClient(object):
  """Ganeti RAPI client.

  """
  USER_AGENT = "Ganeti RAPI Client"
  _json_encoder = simplejson.JSONEncoder(sort_keys=True)

  def __init__(self, host, port=GANETI_RAPI_PORT,
               username=None, password=None,
               config_ssl_verification=None, ignore_proxy=False,
               logger=logging):
    """Constructor.

    @type host: string
    @param host: the ganeti cluster master to interact with
    @type port: int
    @param port: the port on which the RAPI is running (default is 5080)
    @type username: string
    @param username: the username to connect with
    @type password: string
    @param password: the password to connect with
    @type config_ssl_verification: callable
    @param config_ssl_verification: Function to configure SSL context for
                                    certificate verification
    @type ignore_proxy: bool
    @param ignore_proxy: Whether to ignore proxy settings
    @param logger: Logging object

    """
    self._host = host
    self._port = port
    self._logger = logger

    self._base_url = "https://%s:%s" % (host, port)

    handlers = [_HTTPSHandler(self._logger, config_ssl_verification)]

    if username is not None:
      pwmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
      pwmgr.add_password(None, self._base_url, username, password)
      handlers.append(urllib2.HTTPBasicAuthHandler(pwmgr))
    elif password:
      raise Error("Specified password without username")

    if ignore_proxy:
      handlers.append(urllib2.ProxyHandler({}))

    self._http = urllib2.build_opener(*handlers) # pylint: disable-msg=W0142

    self._headers = {
      "Accept": HTTP_APP_JSON,
      "Content-type": HTTP_APP_JSON,
      "User-Agent": self.USER_AGENT,
      }

  @staticmethod
  def _EncodeQuery(query):
    """Encode query values for RAPI URL.

    @type query: list of two-tuples
    @param query: Query arguments
    @rtype: list
    @return: Query list with encoded values

    """
    result = []

    for name, value in query:
      if value is None:
        result.append((name, ""))

      elif isinstance(value, bool):
        # Boolean values must be encoded as 0 or 1
        result.append((name, int(value)))

      elif isinstance(value, (list, tuple, dict)):
        raise ValueError("Invalid query data type %r" % type(value).__name__)

      else:
        result.append((name, value))

    return result

  def _SendRequest(self, method, path, query, content):
    """Sends an HTTP request.

    This constructs a full URL, encodes and decodes HTTP bodies, and
    handles invalid responses in a pythonic way.

    @type method: string
    @param method: HTTP method to use
    @type path: string
    @param path: HTTP URL path
    @type query: list of two-tuples
    @param query: query arguments to pass to urllib.urlencode
    @type content: str or None
    @param content: HTTP body content

    @rtype: str
    @return: JSON-Decoded response

    @raises CertificateError: If an invalid SSL certificate is found
    @raises GanetiApiError: If an invalid response is returned

    """
    assert path.startswith("/")

    if content:
      encoded_content = self._json_encoder.encode(content)
    else:
      encoded_content = None

    # Build URL
    url = [self._base_url, path]
    if query:
      url.append("?")
      url.append(urllib.urlencode(self._EncodeQuery(query)))

    req = _RapiRequest(method, "".join(url), self._headers, encoded_content)

    try:
      resp = self._http.open(req)
      encoded_response_content = resp.read()
    except (OpenSSL.SSL.Error, OpenSSL.crypto.Error), err:
      raise CertificateError("SSL issue: %s (%r)" % (err, err))

    if encoded_response_content:
      response_content = simplejson.loads(encoded_response_content)
    else:
      response_content = None

    # TODO: Are there other status codes that are valid? (redirect?)
    if resp.code != HTTP_OK:
      if isinstance(response_content, dict):
        msg = ("%s %s: %s" %
               (response_content["code"],
                response_content["message"],
                response_content["explain"]))
      else:
        msg = str(response_content)

      raise GanetiApiError(msg, code=resp.code)

    return response_content

  def GetVersion(self):
    """Gets the Remote API version running on the cluster.

    @rtype: int
    @return: Ganeti Remote API version

    """
    return self._SendRequest(HTTP_GET, "/version", None, None)

  def GetOperatingSystems(self):
    """Gets the Operating Systems running in the Ganeti cluster.

    @rtype: list of str
    @return: operating systems

    """
    return self._SendRequest(HTTP_GET, "/%s/os" % GANETI_RAPI_VERSION,
                             None, None)

  def GetInfo(self):
    """Gets info about the cluster.

    @rtype: dict
    @return: information about the cluster

    """
    return self._SendRequest(HTTP_GET, "/%s/info" % GANETI_RAPI_VERSION,
                             None, None)

  def GetClusterTags(self):
    """Gets the cluster tags.

    @rtype: list of str
    @return: cluster tags

    """
    return self._SendRequest(HTTP_GET, "/%s/tags" % GANETI_RAPI_VERSION,
                             None, None)

  def AddClusterTags(self, tags, dry_run=False):
    """Adds tags to the cluster.

    @type tags: list of str
    @param tags: tags to add to the cluster
    @type dry_run: bool
    @param dry_run: whether to perform a dry run

    @rtype: int
    @return: job id

    """
    query = [("tag", t) for t in tags]
    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_PUT, "/%s/tags" % GANETI_RAPI_VERSION,
                             query, None)

  def DeleteClusterTags(self, tags, dry_run=False):
    """Deletes tags from the cluster.

    @type tags: list of str
    @param tags: tags to delete
    @type dry_run: bool
    @param dry_run: whether to perform a dry run

    """
    query = [("tag", t) for t in tags]
    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_DELETE, "/%s/tags" % GANETI_RAPI_VERSION,
                             query, None)

  def GetInstances(self, bulk=False):
    """Gets information about instances on the cluster.

    @type bulk: bool
    @param bulk: whether to return all information about all instances

    @rtype: list of dict or list of str
    @return: if bulk is True, info about the instances, else a list of instances

    """
    query = []
    if bulk:
      query.append(("bulk", 1))

    instances = self._SendRequest(HTTP_GET,
                                  "/%s/instances" % GANETI_RAPI_VERSION,
                                  query, None)
    if bulk:
      return instances
    else:
      return [i["id"] for i in instances]

  def GetInstanceInfo(self, instance):
    """Gets information about an instance.

    @type instance: str
    @param instance: instance whose info to return

    @rtype: dict
    @return: info about the instance

    """
    return self._SendRequest(HTTP_GET,
                             ("/%s/instances/%s" %
                              (GANETI_RAPI_VERSION, instance)), None, None)

  def CreateInstance(self, dry_run=False):
    """Creates a new instance.

    @type dry_run: bool
    @param dry_run: whether to perform a dry run

    @rtype: int
    @return: job id

    """
    # TODO: Pass arguments needed to actually create an instance.
    query = []
    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_POST, "/%s/instances" % GANETI_RAPI_VERSION,
                             query, None)

  def DeleteInstance(self, instance, dry_run=False):
    """Deletes an instance.

    @type instance: str
    @param instance: the instance to delete

    @rtype: int
    @return: job id

    """
    query = []
    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_DELETE,
                             ("/%s/instances/%s" %
                              (GANETI_RAPI_VERSION, instance)), query, None)

  def GetInstanceTags(self, instance):
    """Gets tags for an instance.

    @type instance: str
    @param instance: instance whose tags to return

    @rtype: list of str
    @return: tags for the instance

    """
    return self._SendRequest(HTTP_GET,
                             ("/%s/instances/%s/tags" %
                              (GANETI_RAPI_VERSION, instance)), None, None)

  def AddInstanceTags(self, instance, tags, dry_run=False):
    """Adds tags to an instance.

    @type instance: str
    @param instance: instance to add tags to
    @type tags: list of str
    @param tags: tags to add to the instance
    @type dry_run: bool
    @param dry_run: whether to perform a dry run

    @rtype: int
    @return: job id

    """
    query = [("tag", t) for t in tags]
    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_PUT,
                             ("/%s/instances/%s/tags" %
                              (GANETI_RAPI_VERSION, instance)), query, None)

  def DeleteInstanceTags(self, instance, tags, dry_run=False):
    """Deletes tags from an instance.

    @type instance: str
    @param instance: instance to delete tags from
    @type tags: list of str
    @param tags: tags to delete
    @type dry_run: bool
    @param dry_run: whether to perform a dry run

    """
    query = [("tag", t) for t in tags]
    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_DELETE,
                             ("/%s/instances/%s/tags" %
                              (GANETI_RAPI_VERSION, instance)), query, None)

  def RebootInstance(self, instance, reboot_type=None, ignore_secondaries=None,
                     dry_run=False):
    """Reboots an instance.

    @type instance: str
    @param instance: instance to rebot
    @type reboot_type: str
    @param reboot_type: one of: hard, soft, full
    @type ignore_secondaries: bool
    @param ignore_secondaries: if True, ignores errors for the secondary node
        while re-assembling disks (in hard-reboot mode only)
    @type dry_run: bool
    @param dry_run: whether to perform a dry run

    """
    query = []
    if reboot_type:
      query.append(("type", reboot_type))
    if ignore_secondaries is not None:
      query.append(("ignore_secondaries", ignore_secondaries))
    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_POST,
                             ("/%s/instances/%s/reboot" %
                              (GANETI_RAPI_VERSION, instance)), query, None)

  def ShutdownInstance(self, instance, dry_run=False):
    """Shuts down an instance.

    @type instance: str
    @param instance: the instance to shut down
    @type dry_run: bool
    @param dry_run: whether to perform a dry run

    """
    query = []
    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_PUT,
                             ("/%s/instances/%s/shutdown" %
                              (GANETI_RAPI_VERSION, instance)), query, None)

  def StartupInstance(self, instance, dry_run=False):
    """Starts up an instance.

    @type instance: str
    @param instance: the instance to start up
    @type dry_run: bool
    @param dry_run: whether to perform a dry run

    """
    query = []
    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_PUT,
                             ("/%s/instances/%s/startup" %
                              (GANETI_RAPI_VERSION, instance)), query, None)

  def ReinstallInstance(self, instance, os, no_startup=False):
    """Reinstalls an instance.

    @type instance: str
    @param instance: the instance to reinstall
    @type os: str
    @param os: the os to reinstall
    @type no_startup: bool
    @param no_startup: whether to start the instance automatically

    """
    query = [("os", os)]
    if no_startup:
      query.append(("nostartup", 1))
    return self._SendRequest(HTTP_POST,
                             ("/%s/instances/%s/reinstall" %
                              (GANETI_RAPI_VERSION, instance)), query, None)

  def ReplaceInstanceDisks(self, instance, disks=None, mode=REPLACE_DISK_AUTO,
                           remote_node=None, iallocator=None, dry_run=False):
    """Replaces disks on an instance.

    @type instance: str
    @param instance: instance whose disks to replace
    @type disks: list of ints
    @param disks: Indexes of disks to replace
    @type mode: str
    @param mode: replacement mode to use (defaults to replace_auto)
    @type remote_node: str or None
    @param remote_node: new secondary node to use (for use with
        replace_new_secondary mode)
    @type iallocator: str or None
    @param iallocator: instance allocator plugin to use (for use with
                       replace_auto mode)
    @type dry_run: bool
    @param dry_run: whether to perform a dry run

    @rtype: int
    @return: job id

    """
    query = [
      ("mode", mode),
      ]

    if disks:
      query.append(("disks", ",".join(str(idx) for idx in disks)))

    if remote_node:
      query.append(("remote_node", remote_node))

    if iallocator:
      query.append(("iallocator", iallocator))

    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_POST,
                             ("/%s/instances/%s/replace-disks" %
                              (GANETI_RAPI_VERSION, instance)), query, None)

  def GetJobs(self):
    """Gets all jobs for the cluster.

    @rtype: list of int
    @return: job ids for the cluster

    """
    return [int(j["id"])
            for j in self._SendRequest(HTTP_GET,
                                       "/%s/jobs" % GANETI_RAPI_VERSION,
                                       None, None)]

  def GetJobStatus(self, job_id):
    """Gets the status of a job.

    @type job_id: int
    @param job_id: job id whose status to query

    @rtype: dict
    @return: job status

    """
    return self._SendRequest(HTTP_GET,
                             "/%s/jobs/%s" % (GANETI_RAPI_VERSION, job_id),
                             None, None)

  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
    """Waits for job changes.

    @type job_id: int
    @param job_id: Job ID for which to wait

    """
    body = {
      "fields": fields,
      "previous_job_info": prev_job_info,
      "previous_log_serial": prev_log_serial,
      }

    return self._SendRequest(HTTP_GET,
                             "/%s/jobs/%s/wait" % (GANETI_RAPI_VERSION, job_id),
                             None, body)

  def CancelJob(self, job_id, dry_run=False):
    """Cancels a job.

    @type job_id: int
    @param job_id: id of the job to delete
    @type dry_run: bool
    @param dry_run: whether to perform a dry run

    """
    query = []
    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_DELETE,
                             "/%s/jobs/%s" % (GANETI_RAPI_VERSION, job_id),
                             query, None)

  def GetNodes(self, bulk=False):
    """Gets all nodes in the cluster.

    @type bulk: bool
    @param bulk: whether to return all information about all instances

    @rtype: list of dict or str
    @return: if bulk is true, info about nodes in the cluster,
        else list of nodes in the cluster

    """
    query = []
    if bulk:
      query.append(("bulk", 1))

    nodes = self._SendRequest(HTTP_GET, "/%s/nodes" % GANETI_RAPI_VERSION,
                              query, None)
    if bulk:
      return nodes
    else:
      return [n["id"] for n in nodes]

  def GetNodeInfo(self, node):
    """Gets information about a node.

    @type node: str
    @param node: node whose info to return

    @rtype: dict
    @return: info about the node

    """
    return self._SendRequest(HTTP_GET,
                             "/%s/nodes/%s" % (GANETI_RAPI_VERSION, node),
                             None, None)

  def EvacuateNode(self, node, iallocator=None, remote_node=None,
                   dry_run=False):
    """Evacuates instances from a Ganeti node.

    @type node: str
    @param node: node to evacuate
    @type iallocator: str or None
    @param iallocator: instance allocator to use
    @type remote_node: str
    @param remote_node: node to evaucate to
    @type dry_run: bool
    @param dry_run: whether to perform a dry run

    @rtype: int
    @return: job id

    @raises GanetiApiError: if an iallocator and remote_node are both specified

    """
    if iallocator and remote_node:
      raise GanetiApiError("Only one of iallocator or remote_node can be used")

    query = []
    if iallocator:
      query.append(("iallocator", iallocator))
    if remote_node:
      query.append(("remote_node", remote_node))
    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_POST,
                             ("/%s/nodes/%s/evacuate" %
                              (GANETI_RAPI_VERSION, node)), query, None)

  def MigrateNode(self, node, live=True, dry_run=False):
    """Migrates all primary instances from a node.

    @type node: str
    @param node: node to migrate
    @type live: bool
    @param live: whether to use live migration
    @type dry_run: bool
    @param dry_run: whether to perform a dry run

    @rtype: int
    @return: job id

    """
    query = []
    if live:
      query.append(("live", 1))
    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_POST,
                             ("/%s/nodes/%s/migrate" %
                              (GANETI_RAPI_VERSION, node)), query, None)

  def GetNodeRole(self, node):
    """Gets the current role for a node.

    @type node: str
    @param node: node whose role to return

    @rtype: str
    @return: the current role for a node

    """
    return self._SendRequest(HTTP_GET,
                             ("/%s/nodes/%s/role" %
                              (GANETI_RAPI_VERSION, node)), None, None)

  def SetNodeRole(self, node, role, force=False):
    """Sets the role for a node.

    @type node: str
    @param node: the node whose role to set
    @type role: str
    @param role: the role to set for the node
    @type force: bool
    @param force: whether to force the role change

    @rtype: int
    @return: job id

    """
    query = [
      ("force", force),
      ]

    return self._SendRequest(HTTP_PUT,
                             ("/%s/nodes/%s/role" %
                              (GANETI_RAPI_VERSION, node)), query, role)

  def GetNodeStorageUnits(self, node, storage_type, output_fields):
    """Gets the storage units for a node.

    @type node: str
    @param node: the node whose storage units to return
    @type storage_type: str
    @param storage_type: storage type whose units to return
    @type output_fields: str
    @param output_fields: storage type fields to return

    @rtype: int
    @return: job id where results can be retrieved

    """
    query = [
      ("storage_type", storage_type),
      ("output_fields", output_fields),
      ]

    return self._SendRequest(HTTP_GET,
                             ("/%s/nodes/%s/storage" %
                              (GANETI_RAPI_VERSION, node)), query, None)

  def ModifyNodeStorageUnits(self, node, storage_type, name, allocatable=None):
    """Modifies parameters of storage units on the node.

    @type node: str
    @param node: node whose storage units to modify
    @type storage_type: str
    @param storage_type: storage type whose units to modify
    @type name: str
    @param name: name of the storage unit
    @type allocatable: bool or None
    @param allocatable: Whether to set the "allocatable" flag on the storage
                        unit (None=no modification, True=set, False=unset)

    @rtype: int
    @return: job id

    """
    query = [
      ("storage_type", storage_type),
      ("name", name),
      ]

    if allocatable is not None:
      query.append(("allocatable", allocatable))

    return self._SendRequest(HTTP_PUT,
                             ("/%s/nodes/%s/storage/modify" %
                              (GANETI_RAPI_VERSION, node)), query, None)

  def RepairNodeStorageUnits(self, node, storage_type, name):
    """Repairs a storage unit on the node.

    @type node: str
    @param node: node whose storage units to repair
    @type storage_type: str
    @param storage_type: storage type to repair
    @type name: str
    @param name: name of the storage unit to repair

    @rtype: int
    @return: job id

    """
    query = [
      ("storage_type", storage_type),
      ("name", name),
      ]

    return self._SendRequest(HTTP_PUT,
                             ("/%s/nodes/%s/storage/repair" %
                              (GANETI_RAPI_VERSION, node)), query, None)

  def GetNodeTags(self, node):
    """Gets the tags for a node.

    @type node: str
    @param node: node whose tags to return

    @rtype: list of str
    @return: tags for the node

    """
    return self._SendRequest(HTTP_GET,
                             ("/%s/nodes/%s/tags" %
                              (GANETI_RAPI_VERSION, node)), None, None)

  def AddNodeTags(self, node, tags, dry_run=False):
    """Adds tags to a node.

    @type node: str
    @param node: node to add tags to
    @type tags: list of str
    @param tags: tags to add to the node
    @type dry_run: bool
    @param dry_run: whether to perform a dry run

    @rtype: int
    @return: job id

    """
    query = [("tag", t) for t in tags]
    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_PUT,
                             ("/%s/nodes/%s/tags" %
                              (GANETI_RAPI_VERSION, node)), query, tags)

  def DeleteNodeTags(self, node, tags, dry_run=False):
    """Delete tags from a node.

    @type node: str
    @param node: node to remove tags from
    @type tags: list of str
    @param tags: tags to remove from the node
    @type dry_run: bool
    @param dry_run: whether to perform a dry run

    @rtype: int
    @return: job id

    """
    query = [("tag", t) for t in tags]
    if dry_run:
      query.append(("dry-run", 1))

    return self._SendRequest(HTTP_DELETE,
                             ("/%s/nodes/%s/tags" %
                              (GANETI_RAPI_VERSION, node)), query, None)