cluster.py 146 KB
Newer Older
1 2 3
#
#

4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
Klaus Aehlig's avatar
Klaus Aehlig committed
5
# All rights reserved.
6
#
Klaus Aehlig's avatar
Klaus Aehlig committed
7 8 9
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
10
#
Klaus Aehlig's avatar
Klaus Aehlig committed
11 12
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
13
#
Klaus Aehlig's avatar
Klaus Aehlig committed
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51


"""Logical units dealing with the cluster."""

import copy
import itertools
import logging
import operator
import os
import re
import time

from ganeti import compat
from ganeti import constants
from ganeti import errors
from ganeti import hypervisor
from ganeti import locking
from ganeti import masterd
from ganeti import netutils
from ganeti import objects
from ganeti import opcodes
from ganeti import pathutils
from ganeti import query
52
import ganeti.rpc.node as rpc
53 54 55 56 57 58
from ganeti import runtime
from ganeti import ssh
from ganeti import uidpool
from ganeti import utils
from ganeti import vcluster

59
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
60
  ResultWithJobs
61 62 63 64 65
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
66
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
67
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
68
  CheckDiskAccessModeConsistency, CreateNewClientCert, \
Hrvoje Ribicic's avatar
Hrvoje Ribicic committed
69
  AddInstanceCommunicationNetworkOp, ConnectInstanceCommunicationNetworkOp, \
70
  CheckImageValidity, \
71
  CheckDiskAccessModeConsistency, CreateNewClientCert, EnsureKvmdOnNodes
72 73 74 75

import ganeti.masterd.instance


76
def _UpdateMasterClientCert(
77
    lu, cfg, master_uuid,
78 79 80 81 82 83
    client_cert=pathutils.NODED_CLIENT_CERT_FILE,
    client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP):
  """Renews the master's client certificate and propagates the config.

  @type lu: C{LogicalUnit}
  @param lu: the logical unit holding the config
84 85
  @type cfg: C{config.ConfigWriter}
  @param cfg: the cluster's configuration
86 87 88 89 90 91 92 93 94 95 96
  @type master_uuid: string
  @param master_uuid: the master node's UUID
  @type client_cert: string
  @param client_cert: the path of the client certificate
  @type client_cert_tmp: string
  @param client_cert_tmp: the temporary path of the client certificate
  @rtype: string
  @return: the digest of the newly created client certificate

  """
  client_digest = CreateNewClientCert(lu, master_uuid, filename=client_cert_tmp)
97
  cfg.AddNodeToCandidateCerts(master_uuid, client_digest)
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
  # This triggers an update of the config and distribution of it with the old
  # SSL certificate

  utils.RemoveFile(client_cert)
  utils.RenameFile(client_cert_tmp, client_cert)
  return client_digest


class LUClusterRenewCrypto(NoHooksLU):
  """Renew the cluster's crypto tokens.

  Note that most of this operation is done in gnt_cluster.py, this LU only
  takes care of the renewal of the client SSL certificates.

  """
113 114
  _MAX_NUM_RETRIES = 3

115 116
  def Exec(self, feedback_fn):
    master_uuid = self.cfg.GetMasterNode()
117 118
    cluster = self.cfg.GetClusterInfo()

Helga Velroyen's avatar
Helga Velroyen committed
119 120
    logging.debug("Renewing the master's SSL node certificate."
                  " Master's UUID: %s.", master_uuid)
121 122 123

    server_digest = utils.GetCertificateDigest(
      cert_filename=pathutils.NODED_CERT_FILE)
Helga Velroyen's avatar
Helga Velroyen committed
124
    logging.debug("SSL digest of the node certificate: %s.", server_digest)
125 126
    self.cfg.AddNodeToCandidateCerts("%s-SERVER" % master_uuid,
                                     server_digest)
Helga Velroyen's avatar
Helga Velroyen committed
127 128 129
    logging.debug("Added master's digest as *-SERVER entry to configuration."
                  " Current list of candidate certificates: %s.",
                  str(cluster.candidate_certs))
130 131 132
    try:
      old_master_digest = utils.GetCertificateDigest(
        cert_filename=pathutils.NODED_CLIENT_CERT_FILE)
Helga Velroyen's avatar
Helga Velroyen committed
133 134
      logging.debug("SSL digest of old master's SSL node certificate: %s.",
                    old_master_digest)
135 136
      self.cfg.AddNodeToCandidateCerts("%s-OLDMASTER" % master_uuid,
                                       old_master_digest)
Helga Velroyen's avatar
Helga Velroyen committed
137 138 139
      logging.debug("Added old master's node certificate digest to config"
                    " as *-OLDMASTER. Current list of candidate certificates:"
                    " %s.", str(cluster.candidate_certs))
140
    except IOError:
Helga Velroyen's avatar
Helga Velroyen committed
141
      logging.info("No old master certificate available.")
142

Helga Velroyen's avatar
Helga Velroyen committed
143
    last_exception = None
Helga Velroyen's avatar
Helga Velroyen committed
144
    for i in range(self._MAX_NUM_RETRIES):
145 146 147 148 149
      try:
        # Technically it should not be necessary to set the cert
        # paths. However, due to a bug in the mock library, we
        # have to do this to be able to test the function properly.
        _UpdateMasterClientCert(
150
            self, self.cfg, master_uuid,
151 152
            client_cert=pathutils.NODED_CLIENT_CERT_FILE,
            client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP)
Helga Velroyen's avatar
Helga Velroyen committed
153
        logging.debug("Successfully renewed the master's node certificate.")
154 155
        break
      except errors.OpExecError as e:
Helga Velroyen's avatar
Helga Velroyen committed
156 157
        logging.error("Renewing the master's SSL node certificate failed"
                      " at attempt no. %s with error '%s'", str(i), e)
Helga Velroyen's avatar
Helga Velroyen committed
158
        last_exception = e
159
    else:
Helga Velroyen's avatar
Helga Velroyen committed
160 161
      if last_exception:
        feedback_fn("Could not renew the master's client SSL certificate."
Helga Velroyen's avatar
Helga Velroyen committed
162
                    " Cleaning up. Error: %s." % last_exception)
163
      # Cleaning up temporary certificates
164 165
      self.cfg.RemoveNodeFromCandidateCerts("%s-SERVER" % master_uuid)
      self.cfg.RemoveNodeFromCandidateCerts("%s-OLDMASTER" % master_uuid)
Helga Velroyen's avatar
Helga Velroyen committed
166 167 168
      logging.debug("Cleaned up *-SERVER and *-OLDMASTER certificate from"
                    " master candidate cert list. Current state of the"
                    " list: %s.", str(cluster.candidate_certs))
169 170
      try:
        utils.RemoveFile(pathutils.NODED_CLIENT_CERT_FILE_TMP)
Helga Velroyen's avatar
Helga Velroyen committed
171 172 173 174
      except IOError as e:
        logging.debug("Could not clean up temporary node certificate of the"
                      " master node. (Possibly because it was already removed"
                      " properly.) Error: %s.", e)
175
      return
176

177
    node_errors = {}
178
    nodes = self.cfg.GetAllNodesInfo()
Helga Velroyen's avatar
Helga Velroyen committed
179
    logging.debug("Renewing non-master nodes' node certificates.")
180
    for (node_uuid, node_info) in nodes.items():
181 182
      if node_info.offline:
        feedback_fn("* Skipping offline node %s" % node_info.name)
Helga Velroyen's avatar
Helga Velroyen committed
183 184
        logging.debug("Skipping offline node %s (UUID: %s).",
                      node_info.name, node_uuid)
185
        continue
186
      if node_uuid != master_uuid:
Helga Velroyen's avatar
Helga Velroyen committed
187
        logging.debug("Renewing node certificate of node '%s'.", node_uuid)
Helga Velroyen's avatar
Helga Velroyen committed
188
        last_exception = None
Helga Velroyen's avatar
Helga Velroyen committed
189
        for i in range(self._MAX_NUM_RETRIES):
190 191 192
          try:
            new_digest = CreateNewClientCert(self, node_uuid)
            if node_info.master_candidate:
193 194
              self.cfg.AddNodeToCandidateCerts(node_uuid,
                                               new_digest)
Helga Velroyen's avatar
Helga Velroyen committed
195 196 197
              logging.debug("Added the node's certificate to candidate"
                            " certificate list. Current list: %s.",
                            str(cluster.candidate_certs))
198
            break
Helga Velroyen's avatar
Helga Velroyen committed
199 200
          except errors.OpExecError as e:
            last_exception = e
Helga Velroyen's avatar
Helga Velroyen committed
201 202 203 204
            logging.error("Could not renew a non-master node's SSL node"
                          " certificate at attempt no. %s. The node's UUID"
                          " is %s, and the error was: %s.",
                          str(i), node_uuid, e)
205 206 207
        else:
          if last_exception:
            node_errors[node_uuid] = last_exception
208 209 210 211 212 213 214 215 216

    if node_errors:
      msg = ("Some nodes' SSL client certificates could not be renewed."
             " Please make sure those nodes are reachable and rerun"
             " the operation. The affected nodes and their errors are:\n")
      for uuid, e in node_errors.items():
        msg += "Node %s: %s\n" % (uuid, e)
      feedback_fn(msg)

217 218
    self.cfg.RemoveNodeFromCandidateCerts("%s-SERVER" % master_uuid)
    self.cfg.RemoveNodeFromCandidateCerts("%s-OLDMASTER" % master_uuid)
Helga Velroyen's avatar
Helga Velroyen committed
219 220 221 222
    logging.debug("Cleaned up *-SERVER and *-OLDMASTER certificate from"
                  " master candidate cert list. Current state of the"
                  " list: %s.", cluster.candidate_certs)

223

224 225 226 227 228 229 230 231 232 233
class LUClusterActivateMasterIp(NoHooksLU):
  """Activate the master IP on the master node.

  """
  def Exec(self, feedback_fn):
    """Activate the master IP.

    """
    master_params = self.cfg.GetMasterNetworkParameters()
    ems = self.cfg.GetUseExternalMipScript()
Thomas Thrainer's avatar
Thomas Thrainer committed
234
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
235 236 237 238 239 240 241 242 243 244 245 246 247 248
                                                   master_params, ems)
    result.Raise("Could not activate the master IP")


class LUClusterDeactivateMasterIp(NoHooksLU):
  """Deactivate the master IP on the master node.

  """
  def Exec(self, feedback_fn):
    """Deactivate the master IP.

    """
    master_params = self.cfg.GetMasterNetworkParameters()
    ems = self.cfg.GetUseExternalMipScript()
Thomas Thrainer's avatar
Thomas Thrainer committed
249
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
250 251 252 253 254 255 256 257 258 259 260
                                                     master_params, ems)
    result.Raise("Could not deactivate the master IP")


class LUClusterConfigQuery(NoHooksLU):
  """Return configuration values.

  """
  REQ_BGL = False

  def CheckArguments(self):
261
    self.cq = ClusterQuery(None, self.op.output_fields, False)
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283

  def ExpandNames(self):
    self.cq.ExpandNames(self)

  def DeclareLocks(self, level):
    self.cq.DeclareLocks(self, level)

  def Exec(self, feedback_fn):
    result = self.cq.OldStyleQuery(self)

    assert len(result) == 1

    return result[0]


class LUClusterDestroy(LogicalUnit):
  """Logical unit for destroying the cluster.

  """
  HPATH = "cluster-destroy"
  HTYPE = constants.HTYPE_CLUSTER

284 285 286 287 288
  # Read by the job queue to detect when the cluster is gone and job files will
  # never be available.
  # FIXME: This variable should be removed together with the Python job queue.
  clusterHasBeenDestroyed = False

289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
  def BuildHooksEnv(self):
    """Build hooks env.

    """
    return {
      "OP_TARGET": self.cfg.GetClusterName(),
      }

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    return ([], [])

  def CheckPrereq(self):
    """Check prerequisites.

    This checks whether the cluster is empty.

    Any errors are signaled by raising errors.OpPrereqError.

    """
    master = self.cfg.GetMasterNode()

    nodelist = self.cfg.GetNodeList()
    if len(nodelist) != 1 or nodelist[0] != master:
      raise errors.OpPrereqError("There are still %d node(s) in"
                                 " this cluster." % (len(nodelist) - 1),
                                 errors.ECODE_INVAL)
    instancelist = self.cfg.GetInstanceList()
    if instancelist:
      raise errors.OpPrereqError("There are still %d instance(s) in"
                                 " this cluster." % len(instancelist),
                                 errors.ECODE_INVAL)

  def Exec(self, feedback_fn):
    """Destroys the cluster.

    """
    master_params = self.cfg.GetMasterNetworkParameters()

    # Run post hooks on master node before it's removed
Thomas Thrainer's avatar
Thomas Thrainer committed
331
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
332 333

    ems = self.cfg.GetUseExternalMipScript()
Thomas Thrainer's avatar
Thomas Thrainer committed
334
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
335
                                                     master_params, ems)
336
    result.Warn("Error disabling the master IP address", self.LogWarning)
337

338 339
    self.wconfd.Client().PrepareClusterDestruction(self.wconfdcontext)

340 341 342
    # signal to the job queue that the cluster is gone
    LUClusterDestroy.clusterHasBeenDestroyed = True

Thomas Thrainer's avatar
Thomas Thrainer committed
343
    return master_params.uuid
344 345 346 347 348 349 350 351 352


class LUClusterPostInit(LogicalUnit):
  """Logical unit for running hooks after cluster initialization.

  """
  HPATH = "cluster-init"
  HTYPE = constants.HTYPE_CLUSTER

353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
  def CheckArguments(self):
    self.master_uuid = self.cfg.GetMasterNode()
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())

    # TODO: When Issue 584 is solved, and None is properly parsed when used
    # as a default value, ndparams.get(.., None) can be changed to
    # ndparams[..] to access the values directly

    # OpenvSwitch: Warn user if link is missing
    if (self.master_ndparams[constants.ND_OVS] and not
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
      self.LogInfo("No physical interface for OpenvSwitch was given."
                   " OpenvSwitch will not have an outside connection. This"
                   " might not be what you want.")

368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
  def BuildHooksEnv(self):
    """Build hooks env.

    """
    return {
      "OP_TARGET": self.cfg.GetClusterName(),
      }

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    return ([], [self.cfg.GetMasterNode()])

  def Exec(self, feedback_fn):
383
    """Create and configure Open vSwitch
384 385

    """
386 387 388 389 390 391
    if self.master_ndparams[constants.ND_OVS]:
      result = self.rpc.call_node_configure_ovs(
                 self.master_uuid,
                 self.master_ndparams[constants.ND_OVS_NAME],
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
      result.Raise("Could not successully configure Open vSwitch")
392

393
    _UpdateMasterClientCert(self, self.cfg, self.master_uuid)
394

395 396 397
    return True


398
class ClusterQuery(QueryBase):
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
  FIELDS = query.CLUSTER_FIELDS

  #: Do not sort (there is only one item)
  SORT_FIELD = None

  def ExpandNames(self, lu):
    lu.needed_locks = {}

    # The following variables interact with _QueryBase._GetNames
    self.wanted = locking.ALL_SET
    self.do_locking = self.use_locking

    if self.do_locking:
      raise errors.OpPrereqError("Can not use locking for cluster queries",
                                 errors.ECODE_INVAL)

  def DeclareLocks(self, lu, level):
    pass

  def _GetQueryData(self, lu):
    """Computes the list of nodes and their attributes.

    """
    if query.CQ_CONFIG in self.requested_data:
      cluster = lu.cfg.GetClusterInfo()
Thomas Thrainer's avatar
Thomas Thrainer committed
424
      nodes = lu.cfg.GetAllNodesInfo()
425 426
    else:
      cluster = NotImplemented
Thomas Thrainer's avatar
Thomas Thrainer committed
427
      nodes = NotImplemented
428 429 430 431 432 433 434

    if query.CQ_QUEUE_DRAINED in self.requested_data:
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
    else:
      drain_flag = NotImplemented

    if query.CQ_WATCHER_PAUSE in self.requested_data:
Thomas Thrainer's avatar
Thomas Thrainer committed
435
      master_node_uuid = lu.cfg.GetMasterNode()
436

Thomas Thrainer's avatar
Thomas Thrainer committed
437
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
438
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
Thomas Thrainer's avatar
Thomas Thrainer committed
439
                   lu.cfg.GetMasterNodeName())
440 441 442 443 444

      watcher_pause = result.payload
    else:
      watcher_pause = NotImplemented

Thomas Thrainer's avatar
Thomas Thrainer committed
445
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481


class LUClusterQuery(NoHooksLU):
  """Query cluster configuration.

  """
  REQ_BGL = False

  def ExpandNames(self):
    self.needed_locks = {}

  def Exec(self, feedback_fn):
    """Return cluster config.

    """
    cluster = self.cfg.GetClusterInfo()
    os_hvp = {}

    # Filter just for enabled hypervisors
    for os_name, hv_dict in cluster.os_hvp.items():
      os_hvp[os_name] = {}
      for hv_name, hv_params in hv_dict.items():
        if hv_name in cluster.enabled_hypervisors:
          os_hvp[os_name][hv_name] = hv_params

    # Convert ip_family to ip_version
    primary_ip_version = constants.IP4_VERSION
    if cluster.primary_ip_family == netutils.IP6Address.family:
      primary_ip_version = constants.IP6_VERSION

    result = {
      "software_version": constants.RELEASE_VERSION,
      "protocol_version": constants.PROTOCOL_VERSION,
      "config_version": constants.CONFIG_VERSION,
      "os_api_version": max(constants.OS_API_VERSIONS),
      "export_version": constants.EXPORT_VERSION,
482
      "vcs_version": constants.VCS_VERSION,
483 484
      "architecture": runtime.GetArchInfo(),
      "name": cluster.cluster_name,
Thomas Thrainer's avatar
Thomas Thrainer committed
485
      "master": self.cfg.GetMasterNodeName(),
486 487 488 489 490 491 492 493 494 495 496 497
      "default_hypervisor": cluster.primary_hypervisor,
      "enabled_hypervisors": cluster.enabled_hypervisors,
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
                        for hypervisor_name in cluster.enabled_hypervisors]),
      "os_hvp": os_hvp,
      "beparams": cluster.beparams,
      "osparams": cluster.osparams,
      "ipolicy": cluster.ipolicy,
      "nicparams": cluster.nicparams,
      "ndparams": cluster.ndparams,
      "diskparams": cluster.diskparams,
      "candidate_pool_size": cluster.candidate_pool_size,
Klaus Aehlig's avatar
Klaus Aehlig committed
498
      "max_running_jobs": cluster.max_running_jobs,
Klaus Aehlig's avatar
Klaus Aehlig committed
499
      "max_tracked_jobs": cluster.max_tracked_jobs,
500
      "mac_prefix": cluster.mac_prefix,
501 502 503 504 505 506 507 508 509 510 511 512 513 514
      "master_netdev": cluster.master_netdev,
      "master_netmask": cluster.master_netmask,
      "use_external_mip_script": cluster.use_external_mip_script,
      "volume_group_name": cluster.volume_group_name,
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
      "file_storage_dir": cluster.file_storage_dir,
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
      "maintain_node_health": cluster.maintain_node_health,
      "ctime": cluster.ctime,
      "mtime": cluster.mtime,
      "uuid": cluster.uuid,
      "tags": list(cluster.GetTags()),
      "uid_pool": cluster.uid_pool,
      "default_iallocator": cluster.default_iallocator,
515
      "default_iallocator_params": cluster.default_iallocator_params,
516 517 518 519 520
      "reserved_lvs": cluster.reserved_lvs,
      "primary_ip_version": primary_ip_version,
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
      "hidden_os": cluster.hidden_os,
      "blacklisted_os": cluster.blacklisted_os,
521
      "enabled_disk_templates": cluster.enabled_disk_templates,
522
      "install_image": cluster.install_image,
523
      "instance_communication_network": cluster.instance_communication_network,
524
      "compression_tools": cluster.compression_tools,
525
      "enabled_user_shutdown": cluster.enabled_user_shutdown,
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
      }

    return result


class LUClusterRedistConf(NoHooksLU):
  """Force the redistribution of cluster configuration.

  This is a very simple LU.

  """
  REQ_BGL = False

  def ExpandNames(self):
    self.needed_locks = {
      locking.LEVEL_NODE: locking.ALL_SET,
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
    }
544
    self.share_locks = ShareAll()
545 546 547 548 549 550

  def Exec(self, feedback_fn):
    """Redistribute the configuration.

    """
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
551
    RedistributeAncillaryFiles(self)
552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608


class LUClusterRename(LogicalUnit):
  """Rename the cluster.

  """
  HPATH = "cluster-rename"
  HTYPE = constants.HTYPE_CLUSTER

  def BuildHooksEnv(self):
    """Build hooks env.

    """
    return {
      "OP_TARGET": self.cfg.GetClusterName(),
      "NEW_NAME": self.op.name,
      }

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())

  def CheckPrereq(self):
    """Verify that the passed name is a valid one.

    """
    hostname = netutils.GetHostname(name=self.op.name,
                                    family=self.cfg.GetPrimaryIPFamily())

    new_name = hostname.name
    self.ip = new_ip = hostname.ip
    old_name = self.cfg.GetClusterName()
    old_ip = self.cfg.GetMasterIP()
    if new_name == old_name and new_ip == old_ip:
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
                                 " cluster has changed",
                                 errors.ECODE_INVAL)
    if new_ip != old_ip:
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
                                   " reachable on the network" %
                                   new_ip, errors.ECODE_NOTUNIQUE)

    self.op.name = new_name

  def Exec(self, feedback_fn):
    """Rename the cluster.

    """
    clustername = self.op.name
    new_ip = self.ip

    # shutdown the master IP
    master_params = self.cfg.GetMasterNetworkParameters()
    ems = self.cfg.GetUseExternalMipScript()
Thomas Thrainer's avatar
Thomas Thrainer committed
609
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
610 611 612 613 614 615 616 617 618 619 620 621 622
                                                     master_params, ems)
    result.Raise("Could not disable the master role")

    try:
      cluster = self.cfg.GetClusterInfo()
      cluster.cluster_name = clustername
      cluster.master_ip = new_ip
      self.cfg.Update(cluster, feedback_fn)

      # update the known hosts file
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
      node_list = self.cfg.GetOnlineNodeList()
      try:
Thomas Thrainer's avatar
Thomas Thrainer committed
623
        node_list.remove(master_params.uuid)
624 625
      except ValueError:
        pass
626
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
627 628
    finally:
      master_params.ip = new_ip
Thomas Thrainer's avatar
Thomas Thrainer committed
629
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
630
                                                     master_params, ems)
631 632
      result.Warn("Could not re-enable the master role on the master,"
                  " please restart manually", self.LogWarning)
633 634 635 636 637 638 639 640 641 642 643 644

    return clustername


class LUClusterRepairDiskSizes(NoHooksLU):
  """Verifies the cluster disks sizes.

  """
  REQ_BGL = False

  def ExpandNames(self):
    if self.op.instances:
645
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679
      # Not getting the node allocation lock as only a specific set of
      # instances (and their nodes) is going to be acquired
      self.needed_locks = {
        locking.LEVEL_NODE_RES: [],
        locking.LEVEL_INSTANCE: self.wanted_names,
        }
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
    else:
      self.wanted_names = None
      self.needed_locks = {
        locking.LEVEL_NODE_RES: locking.ALL_SET,
        locking.LEVEL_INSTANCE: locking.ALL_SET,

        # This opcode is acquires the node locks for all instances
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
        }

    self.share_locks = {
      locking.LEVEL_NODE_RES: 1,
      locking.LEVEL_INSTANCE: 0,
      locking.LEVEL_NODE_ALLOC: 1,
      }

  def DeclareLocks(self, level):
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
      self._LockInstancesNodes(primary_only=True, level=level)

  def CheckPrereq(self):
    """Check prerequisites.

    This only checks the optional instance list against the existing names.

    """
    if self.wanted_names is None:
680
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
681 682

    self.wanted_instances = \
683
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
684 685 686 687 688 689 690 691 692 693

  def _EnsureChildSizes(self, disk):
    """Ensure children of the disk have the needed disk size.

    This is valid mainly for DRBD8 and fixes an issue where the
    children have smaller disk size.

    @param disk: an L{ganeti.objects.Disk} object

    """
694
    if disk.dev_type == constants.DT_DRBD8:
695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
      assert disk.children, "Empty children for DRBD8?"
      fchild = disk.children[0]
      mismatch = fchild.size < disk.size
      if mismatch:
        self.LogInfo("Child disk has size %d, parent %d, fixing",
                     fchild.size, disk.size)
        fchild.size = disk.size

      # and we recurse on this child only, not on the metadev
      return self._EnsureChildSizes(fchild) or mismatch
    else:
      return False

  def Exec(self, feedback_fn):
    """Verify the size of cluster disks.

    """
    # TODO: check child disks too
    # TODO: check differences in size between primary/secondary nodes
    per_node_disks = {}
    for instance in self.wanted_instances:
      pnode = instance.primary_node
      if pnode not in per_node_disks:
        per_node_disks[pnode] = []
719
      for idx, disk in enumerate(self.cfg.GetInstanceDisks(instance.uuid)):
720 721 722
        per_node_disks[pnode].append((instance, idx, disk))

    assert not (frozenset(per_node_disks.keys()) -
Klaus Aehlig's avatar
Klaus Aehlig committed
723
                frozenset(self.owned_locks(locking.LEVEL_NODE_RES))), \
724 725 726
      "Not owning correct locks"
    assert not self.owned_locks(locking.LEVEL_NODE)

Thomas Thrainer's avatar
Thomas Thrainer committed
727 728
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
                                               per_node_disks.keys())
729

730
    changed = []
Thomas Thrainer's avatar
Thomas Thrainer committed
731
    for node_uuid, dskl in per_node_disks.items():
732 733 734 735
      if not dskl:
        # no disks on the node
        continue

736
      newl = [([v[2].Copy()], v[0]) for v in dskl]
Thomas Thrainer's avatar
Thomas Thrainer committed
737 738
      node_name = self.cfg.GetNodeName(node_uuid)
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
739
      if result.fail_msg:
740
        self.LogWarning("Failure in blockdev_getdimensions call to node"
Thomas Thrainer's avatar
Thomas Thrainer committed
741
                        " %s, ignoring", node_name)
742 743 744
        continue
      if len(result.payload) != len(dskl):
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
Thomas Thrainer's avatar
Thomas Thrainer committed
745 746
                        " result.payload=%s", node_name, len(dskl),
                        result.payload)
747
        self.LogWarning("Invalid result from node %s, ignoring node results",
Thomas Thrainer's avatar
Thomas Thrainer committed
748
                        node_name)
749
        continue
750 751
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
        if dimensions is None:
752 753 754
          self.LogWarning("Disk %d of instance %s did not return size"
                          " information, ignoring", idx, instance.name)
          continue
755 756 757 758 759
        if not isinstance(dimensions, (tuple, list)):
          self.LogWarning("Disk %d of instance %s did not return valid"
                          " dimension information, ignoring", idx,
                          instance.name)
          continue
760
        (size, spindles) = dimensions
761 762 763 764 765 766 767 768 769 770
        if not isinstance(size, (int, long)):
          self.LogWarning("Disk %d of instance %s did not return valid"
                          " size information, ignoring", idx, instance.name)
          continue
        size = size >> 20
        if size != disk.size:
          self.LogInfo("Disk %d of instance %s has mismatched size,"
                       " correcting: recorded %d, actual %d", idx,
                       instance.name, disk.size, size)
          disk.size = size
771
          self.cfg.Update(disk, feedback_fn)
772
          changed.append((instance.name, idx, "size", size))
Thomas Thrainer's avatar
Thomas Thrainer committed
773
        if es_flags[node_uuid]:
774 775 776 777 778 779 780 781 782
          if spindles is None:
            self.LogWarning("Disk %d of instance %s did not return valid"
                            " spindles information, ignoring", idx,
                            instance.name)
          elif disk.spindles is None or disk.spindles != spindles:
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
                         " correcting: recorded %s, actual %s",
                         idx, instance.name, disk.spindles, spindles)
            disk.spindles = spindles
783
            self.cfg.Update(disk, feedback_fn)
784
            changed.append((instance.name, idx, "spindles", disk.spindles))
785
        if self._EnsureChildSizes(disk):
786
          self.cfg.Update(disk, feedback_fn)
787
          changed.append((instance.name, idx, "size", disk.size))
788 789 790 791 792 793 794
    return changed


def _ValidateNetmask(cfg, netmask):
  """Checks if a netmask is valid.

  @type cfg: L{config.ConfigWriter}
795
  @param cfg: cluster configuration
796
  @type netmask: int
797
  @param netmask: netmask to be verified
798 799 800 801 802 803 804 805 806 807 808 809 810 811
  @raise errors.OpPrereqError: if the validation fails

  """
  ip_family = cfg.GetPrimaryIPFamily()
  try:
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
  except errors.ProgrammerError:
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
                               ip_family, errors.ECODE_INVAL)
  if not ipcls.ValidateNetmask(netmask):
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
                               (netmask), errors.ECODE_INVAL)


812 813 814 815 816 817
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
    file_disk_template):
  """Checks whether the given file-based storage directory is acceptable.

  Note: This function is public, because it is also used in bootstrap.py.
818 819 820 821 822 823 824

  @type logging_warn_fn: function
  @param logging_warn_fn: function which accepts a string and logs it
  @type file_storage_dir: string
  @param file_storage_dir: the directory to be used for file-based instances
  @type enabled_disk_templates: list of string
  @param enabled_disk_templates: the list of enabled disk templates
825 826 827
  @type file_disk_template: string
  @param file_disk_template: the file-based disk template for which the
      path should be checked
828 829

  """
830 831 832
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
            constants.ST_FILE, constants.ST_SHARED_FILE
         ))
833
  file_storage_enabled = file_disk_template in enabled_disk_templates
834 835 836
  if file_storage_dir is not None:
    if file_storage_dir == "":
      if file_storage_enabled:
837 838 839 840
        raise errors.OpPrereqError(
            "Unsetting the '%s' storage directory while having '%s' storage"
            " enabled is not permitted." %
            (file_disk_template, file_disk_template))
841 842
    else:
      if not file_storage_enabled:
843 844 845
        logging_warn_fn(
            "Specified a %s storage directory, although %s storage is not"
            " enabled." % (file_disk_template, file_disk_template))
846
  else:
847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872
    raise errors.ProgrammerError("Received %s storage dir with value"
                                 " 'None'." % file_disk_template)


def CheckFileStoragePathVsEnabledDiskTemplates(
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
  """Checks whether the given file storage directory is acceptable.

  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}

  """
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
      constants.DT_FILE)


def CheckSharedFileStoragePathVsEnabledDiskTemplates(
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
  """Checks whether the given shared file storage directory is acceptable.

  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}

  """
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
      constants.DT_SHARED_FILE)
873 874


875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901
def CheckCompressionTools(tools):
  """Check whether the provided compression tools look like executables.

  @type tools: list of string
  @param tools: The tools provided as opcode input

  """
  regex = re.compile('^[-_a-zA-Z0-9]+$')
  illegal_tools = [t for t in tools if not regex.match(t)]

  if illegal_tools:
    raise errors.OpPrereqError(
      "The tools '%s' contain illegal characters: only alphanumeric values,"
      " dashes, and underscores are allowed" % ", ".join(illegal_tools)
    )

  if constants.IEC_GZIP not in tools:
    raise errors.OpPrereqError("For compatibility reasons, the %s utility must"
                               " be present among the compression tools" %
                               constants.IEC_GZIP)

  if constants.IEC_NONE in tools:
    raise errors.OpPrereqError("%s is a reserved value used for no compression,"
                               " and cannot be used as the name of a tool" %
                               constants.IEC_NONE)


902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922
class LUClusterSetParams(LogicalUnit):
  """Change the parameters of the cluster.

  """
  HPATH = "cluster-modify"
  HTYPE = constants.HTYPE_CLUSTER
  REQ_BGL = False

  def CheckArguments(self):
    """Check parameters

    """
    if self.op.uid_pool:
      uidpool.CheckUidPool(self.op.uid_pool)

    if self.op.add_uids:
      uidpool.CheckUidPool(self.op.add_uids)

    if self.op.remove_uids:
      uidpool.CheckUidPool(self.op.remove_uids)

923 924 925 926
    if self.op.mac_prefix:
      self.op.mac_prefix = \
          utils.NormalizeAndValidateThreeOctetMacPrefix(self.op.mac_prefix)

927 928 929 930 931 932 933 934
    if self.op.master_netmask is not None:
      _ValidateNetmask(self.cfg, self.op.master_netmask)

    if self.op.diskparams:
      for dt_params in self.op.diskparams.values():
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
      try:
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
935
        CheckDiskAccessModeValidity(self.op.diskparams)
936 937 938 939
      except errors.OpPrereqError, err:
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
                                   errors.ECODE_INVAL)

940 941 942 943
    if self.op.install_image is not None:
      CheckImageValidity(self.op.install_image,
                         "Install image must be an absolute path or a URL")

944 945 946 947 948 949 950 951 952 953 954
  def ExpandNames(self):
    # FIXME: in the future maybe other cluster params won't require checking on
    # all nodes to be modified.
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
    # resource locks the right thing, shouldn't it be the BGL instead?
    self.needed_locks = {
      locking.LEVEL_NODE: locking.ALL_SET,
      locking.LEVEL_INSTANCE: locking.ALL_SET,
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
    }
955
    self.share_locks = ShareAll()
956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972

  def BuildHooksEnv(self):
    """Build hooks env.

    """
    return {
      "OP_TARGET": self.cfg.GetClusterName(),
      "NEW_VG_NAME": self.op.vg_name,
      }

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    mn = self.cfg.GetMasterNode()
    return ([mn], [mn])

Thomas Thrainer's avatar
Thomas Thrainer committed
973
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
974 975 976
                   new_enabled_disk_templates):
    """Check the consistency of the vg name on all nodes and in case it gets
       unset whether there are instances still using it.
977 978

    """
979 980 981 982 983 984 985 986 987 988 989 990 991 992 993
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
                                            new_enabled_disk_templates)
    current_vg_name = self.cfg.GetVGName()

    if self.op.vg_name == '':
      if lvm_is_enabled:
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
                                   " disk templates are or get enabled.")

    if self.op.vg_name is None:
      if current_vg_name is None and lvm_is_enabled:
        raise errors.OpPrereqError("Please specify a volume group when"
                                   " enabling lvm-based disk-templates.")

994
    if self.op.vg_name is not None and not self.op.vg_name:
995
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
996 997 998
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
                                   " instances exist", errors.ECODE_INVAL)

999 1000
    if (self.op.vg_name is not None and lvm_is_enabled) or \
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
Thomas Thrainer's avatar
Thomas Thrainer committed
1001
      self._CheckVgNameOnNodes(node_uuids)
1002

Thomas Thrainer's avatar
Thomas Thrainer committed
1003
  def _CheckVgNameOnNodes(self, node_uuids):
1004 1005 1006
    """Check the status of the volume group on each node.

    """
Thomas Thrainer's avatar
Thomas Thrainer committed
1007 1008 1009
    vglist = self.rpc.call_vg_list(node_uuids)
    for node_uuid in node_uuids:
      msg = vglist[node_uuid].fail_msg
1010 1011 1012
      if msg:
        # ignoring down node
        self.LogWarning("Error while gathering data on node %s"
Thomas Thrainer's avatar
Thomas Thrainer committed
1013 1014
                        " (ignoring node): %s",
                        self.cfg.GetNodeName(node_uuid), msg)
1015
        continue
Thomas Thrainer's avatar
Thomas Thrainer committed
1016
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
1017 1018 1019 1020
                                            self.op.vg_name,
                                            constants.MIN_VG_SIZE)
      if vgstatus:
        raise errors.OpPrereqError("Error on node '%s': %s" %
Thomas Thrainer's avatar
Thomas Thrainer committed
1021 1022
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
                                   errors.ECODE_ENVIRON)
1023

1024
  @staticmethod
1025 1026
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
                                old_enabled_disk_templates):
1027 1028 1029
    """Computes three sets of disk templates.

    @see: C{_GetDiskTemplateSets} for more details.
1030 1031 1032 1033

    """
    enabled_disk_templates = None
    new_enabled_disk_templates = []
1034
    disabled_disk_templates = []
1035 1036
    if op_enabled_disk_templates:
      enabled_disk_templates = op_enabled_disk_templates
1037
      new_enabled_disk_templates = \
1038 1039
        list(set(enabled_disk_templates)
             - set(old_enabled_disk_templates))
1040
      disabled_disk_templates = \
1041 1042
        list(set(old_enabled_disk_templates)
             - set(enabled_disk_templates))
1043
    else:
1044
      enabled_disk_templates = old_enabled_disk_templates
1045 1046
    return (enabled_disk_templates, new_enabled_disk_templates,
            disabled_disk_templates)
1047

1048
  def _GetDiskTemplateSets(self, cluster):
1049 1050 1051 1052 1053 1054 1055 1056
    """Computes three sets of disk templates.

    The three sets are:
      - disk templates that will be enabled after this operation (no matter if
        they were enabled before or not)
      - disk templates that get enabled by this operation (thus haven't been
        enabled before.)
      - disk templates that get disabled by this operation
1057 1058

    """
1059 1060
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
                                          cluster.enabled_disk_templates)
1061

1062
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
1063 1064 1065 1066
    """Checks the ipolicy.

    @type cluster: C{objects.Cluster}
    @param cluster: the cluster's configuration
1067 1068 1069
    @type enabled_disk_templates: list of string
    @param enabled_disk_templates: list of (possibly newly) enabled disk
      templates
1070 1071

    """
1072
    # FIXME: write unit tests for this
1073 1074 1075 1076
    if self.op.ipolicy:
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
                                           group_policy=False)

1077 1078
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
                                  enabled_disk_templates)
1079

1080 1081 1082
      all_instances = self.cfg.GetAllInstancesInfo().values()
      violations = set()
      for group in self.cfg.GetAllNodeGroupsInfo().values():
1083 1084 1085 1086
        instances = frozenset(
          [inst for inst in all_instances
           if compat.any(nuuid in group.members
           for nuuid in self.cfg.GetInstanceNodes(inst.uuid))])
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
                                           self.cfg)
        if new:
          violations.update(new)

      if violations:
        self.LogWarning("After the ipolicy change the following instances"
                        " violate them: %s",
                        utils.CommaJoin(utils.NiceSort(violations)))
1098
    else:
1099 1100
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
                                  enabled_disk_templates)
1101

1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
    """Checks whether the set DRBD helper actually exists on the nodes.

    @type drbd_helper: string
    @param drbd_helper: path of the drbd usermode helper binary
    @type node_uuids: list of strings
    @param node_uuids: list of node UUIDs to check for the helper

    """
    # checks given drbd helper on all nodes
    helpers = self.rpc.call_drbd_helper(node_uuids)
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
      if ninfo.offline:
        self.LogInfo("Not checking drbd helper on offline node %s",
                     ninfo.name)
        continue
      msg = helpers[ninfo.uuid].fail_msg
      if msg:
        raise errors.OpPrereqError("Error checking drbd helper on node"
                                   " '%s': %s" % (ninfo.name, msg),
                                   errors.ECODE_ENVIRON)
      node_helper = helpers[ninfo.uuid].payload
      if node_helper != drbd_helper:
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
                                   (ninfo.name, node_helper),
                                   errors.ECODE_ENVIRON)

  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
1130
    """Check the DRBD usermode helper.
1131

1132 1133
    @type node_uuids: list of strings
    @param node_uuids: a list of nodes' UUIDs
1134 1135 1136 1137 1138 1139
    @type drbd_enabled: boolean
    @param drbd_enabled: whether DRBD will be enabled after this operation
      (no matter if it was disabled before or not)
    @type drbd_gets_enabled: boolen
    @param drbd_gets_enabled: true if DRBD was disabled before this
      operation, but will be enabled afterwards
1140 1141

    """
1142 1143 1144 1145
    if self.op.drbd_helper == '':
      if drbd_enabled:
        raise errors.OpPrereqError("Cannot disable drbd helper while"
                                   " DRBD is enabled.")
1146
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
1147 1148 1149 1150
        raise errors.OpPrereqError("Cannot disable drbd helper while"
                                   " drbd-based instances exist",
                                   errors.ECODE_INVAL)

1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161
    else:
      if self.op.drbd_helper is not None and drbd_enabled:
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
      else:
        if drbd_gets_enabled:
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
          if current_drbd_helper is not None:
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
          else:
            raise errors.OpPrereqError("Cannot enable DRBD without a"
                                       " DRBD usermode helper set.")
1162

1163 1164
  def _CheckInstancesOfDisabledDiskTemplates(
      self, disabled_disk_templates):
1165
    """Check whether we try to disable a disk template that is in use.