cluster-merge 30.6 KB
Newer Older
1 2 3
#!/usr/bin/python
#

Iustin Pop's avatar
Iustin Pop committed
4
# Copyright (C) 2010, 2012 Google Inc.
Klaus Aehlig's avatar
Klaus Aehlig committed
5
# All rights reserved.
6
#
Klaus Aehlig's avatar
Klaus Aehlig committed
7 8 9
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
10
#
Klaus Aehlig's avatar
Klaus Aehlig committed
11 12
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
13
#
Klaus Aehlig's avatar
Klaus Aehlig committed
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 30 31 32 33 34 35

"""Tool to merge two or more clusters together.

The clusters have to run the same version of Ganeti!

"""

36
# pylint: disable=C0103
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
# C0103: Invalid name cluster-merge

import logging
import os
import optparse
import shutil
import sys
import tempfile

from ganeti import cli
from ganeti import config
from ganeti import constants
from ganeti import errors
from ganeti import ssh
from ganeti import utils
52
from ganeti import pathutils
53
from ganeti import compat
54 55


56 57 58
_GROUPS_MERGE = "merge"
_GROUPS_RENAME = "rename"
_CLUSTERMERGE_ECID = "clustermerge-ecid"
59 60 61 62
_RESTART_ALL = "all"
_RESTART_UP = "up"
_RESTART_NONE = "none"
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
63 64 65
_PARAMS_STRICT = "strict"
_PARAMS_WARN = "warn"
_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
66

67

68 69 70 71 72
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
                                  action="store", type="int",
                                  dest="pause_period",
                                  help=("Amount of time in seconds watcher"
                                        " should be suspended from running"))
73
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
Stephen Shirley's avatar
Stephen Shirley committed
74 75
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
                            dest="groups",
76 77 78
                            help=("How to handle groups that have the"
                                  " same name (One of: %s/%s)" %
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
79 80 81 82 83 84 85 86
PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
                            metavar="STRATEGY",
                            choices=_PARAMS_CHOICES,
                            dest="params",
                            help=("How to handle params that have"
                                  " different values (One of: %s/%s)" %
                                  _PARAMS_CHOICES))

87 88 89 90 91 92 93
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
                             metavar="STRATEGY",
                             choices=_RESTART_CHOICES,
                             dest="restart",
                             help=("How to handle restarting instances"
                                   " same name (One of: %s/%s/%s)" %
                                   _RESTART_CHOICES))
94

Guido Trotter's avatar
Guido Trotter committed
95 96 97 98 99
SKIP_STOP_INSTANCES_OPT = \
  cli.cli_option("--skip-stop-instances", default=True, action="store_false",
                 dest="stop_instances",
                 help=("Don't stop the instances on the clusters, just check "
                       "that none is running"))
100

101

Stephen Shirley's avatar
Stephen Shirley committed
102
def Flatten(unflattened_list):
103 104
  """Flattens a list.

Stephen Shirley's avatar
Stephen Shirley committed
105 106
  @param unflattened_list: A list of unflattened list objects.
  @return: A flattened list
107 108

  """
Stephen Shirley's avatar
Stephen Shirley committed
109
  flattened_list = []
110

Stephen Shirley's avatar
Stephen Shirley committed
111
  for item in unflattened_list:
112
    if isinstance(item, list):
Stephen Shirley's avatar
Stephen Shirley committed
113
      flattened_list.extend(Flatten(item))
114
    else:
Stephen Shirley's avatar
Stephen Shirley committed
115 116
      flattened_list.append(item)
  return flattened_list
117 118 119 120 121 122


class MergerData(object):
  """Container class to hold data used for merger.

  """
123
  def __init__(self, cluster, key_path, nodes, instances, master_node,
124
               config_path=None):
125 126 127 128
    """Initialize the container.

    @param cluster: The name of the cluster
    @param key_path: Path to the ssh private key used for authentication
129
    @param nodes: List of online nodes in the merging cluster
130
    @param instances: List of instances running on merging cluster
131
    @param master_node: Name of the master node
132
    @param config_path: Path to the merging cluster config
133 134 135 136 137

    """
    self.cluster = cluster
    self.key_path = key_path
    self.nodes = nodes
138
    self.instances = instances
139
    self.master_node = master_node
140
    self.config_path = config_path
141 142 143 144 145 146


class Merger(object):
  """Handling the merge.

  """
147
  RUNNING_STATUSES = compat.UniqueFrozenset([
148 149 150
    constants.INSTST_RUNNING,
    constants.INSTST_ERRORUP,
    ])
Michael Hanselmann's avatar
Michael Hanselmann committed
151

152 153
  def __init__(self, clusters, pause_period, groups, restart, params,
               stop_instances):
154 155 156 157
    """Initialize object with sane defaults and infos required.

    @param clusters: The list of clusters to merge in
    @param pause_period: The time watcher shall be disabled for
158
    @param groups: How to handle group conflicts
159
    @param restart: How to handle instance restart
160 161 162
    @param stop_instances: Indicates whether the instances must be stopped
                           (True) or if the Merger must only check if no
                           instances are running on the mergee clusters (False)
163 164 165 166 167 168

    """
    self.merger_data = []
    self.clusters = clusters
    self.pause_period = pause_period
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
169
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
170
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
171
    self.groups = groups
172
    self.restart = restart
173
    self.params = params
174
    self.stop_instances = stop_instances
175 176 177
    if self.restart == _RESTART_UP:
      raise NotImplementedError

178 179 180 181 182 183 184 185 186 187 188
  def Setup(self):
    """Sets up our end so we can do the merger.

    This method is setting us up as a preparation for the merger.
    It makes the initial contact and gathers information needed.

    @raise errors.RemoteError: for errors in communication/grabbing

    """
    (remote_path, _, _) = ssh.GetUserFiles("root")

189 190 191 192
    if self.cluster_name in self.clusters:
      raise errors.CommandError("Cannot merge cluster %s with itself" %
                                self.cluster_name)

193 194 195 196 197 198 199 200 201
    # Fetch remotes private key
    for cluster in self.clusters:
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
                            ask_key=False)
      if result.failed:
        raise errors.RemoteError("There was an error while grabbing ssh private"
                                 " key from %s. Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

202
      key_path = utils.PathJoin(self.work_dir, cluster)
203 204
      utils.WriteFile(key_path, mode=0600, data=result.stdout)

205
      result = self._RunCmd(cluster, "gnt-node list -o name,offline"
206
                            " --no-headers --separator=,", private_key=key_path)
207 208 209 210
      if result.failed:
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))
211
      nodes_statuses = [line.split(",") for line in result.stdout.splitlines()]
212 213
      nodes = [node_status[0] for node_status in nodes_statuses
               if node_status[1] == "N"]
214

215
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers",
216 217 218 219 220 221 222
                            private_key=key_path)
      if result.failed:
        raise errors.RemoteError("Unable to retrieve list of instances from"
                                 " %s. Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))
      instances = result.stdout.splitlines()

223
      path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" %
224 225 226 227 228 229 230 231 232
                            constants.SS_MASTER_NODE)
      result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
      if result.failed:
        raise errors.RemoteError("Unable to retrieve the master node name from"
                                 " %s. Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))
      master_node = result.stdout.strip()

      self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
233
                                         master_node))
234

235 236
  def _PrepareAuthorizedKeys(self):
    """Prepare the authorized_keys on every merging node.
237 238 239 240 241 242 243 244

    This method add our public key to remotes authorized_key for further
    communication.

    """
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
    pub_key = utils.ReadFile(pub_key_file)

245 246 247 248
    for data in self.merger_data:
      for node in data.nodes:
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
                                     (auth_keys, pub_key)),
249
                              private_key=data.key_path, max_attempts=3)
250

251 252 253 254 255
        if result.failed:
          raise errors.RemoteError("Unable to add our public key to %s in %s."
                                   " Fail reason: %s; output: %s" %
                                   (node, data.cluster, result.fail_reason,
                                    result.output))
256 257 258

  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
              strict_host_check=False, private_key=None, batch=True,
259
              ask_key=False, max_attempts=1):
260 261
    """Wrapping SshRunner.Run with default parameters.

262
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
263 264

    """
265 266
    for _ in range(max_attempts):
      result = self.ssh_runner.Run(hostname=hostname, command=command,
Iustin Pop's avatar
Iustin Pop committed
267 268 269 270
                                   user=user, use_cluster_key=use_cluster_key,
                                   strict_host_check=strict_host_check,
                                   private_key=private_key, batch=batch,
                                   ask_key=ask_key)
271 272 273 274
      if not result.failed:
        break

    return result
275

276 277 278 279 280 281 282 283 284
  def _CheckRunningInstances(self):
    """Checks if on the clusters to be merged there are running instances

    @rtype: boolean
    @return: True if there are running instances, False otherwise

    """
    for cluster in self.clusters:
      result = self._RunCmd(cluster, "gnt-instance list -o status")
285
      if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
286 287 288 289
        return True

    return False

290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
  def _StopMergingInstances(self):
    """Stop instances on merging clusters.

    """
    for cluster in self.clusters:
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
                                     " --force-multiple")

      if result.failed:
        raise errors.RemoteError("Unable to stop instances on %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

  def _DisableWatcher(self):
    """Disable watch on all merging clusters, including ourself.

    """
    for cluster in ["localhost"] + self.clusters:
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
                                     self.pause_period)

      if result.failed:
        raise errors.RemoteError("Unable to pause watcher on %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

316 317 318 319 320
  def _RemoveMasterIps(self):
    """Removes the master IPs from the master nodes of each cluster.

    """
    for data in self.merger_data:
321
      result = self._RunCmd(data.master_node,
322
                            "gnt-cluster deactivate-master-ip --yes")
323

324 325 326 327 328 329 330
      if result.failed:
        raise errors.RemoteError("Unable to remove master IP on %s."
                                 " Fail reason: %s; output: %s" %
                                 (data.master_node,
                                  result.fail_reason,
                                  result.output))

331 332 333 334
  def _StopDaemons(self):
    """Stop all daemons on merging nodes.

    """
335
    cmd = "%s stop-all" % pathutils.DAEMON_UTIL
336 337
    for data in self.merger_data:
      for node in data.nodes:
338
        result = self._RunCmd(node, cmd, max_attempts=3)
339 340 341 342 343 344 345 346 347 348 349 350 351 352

        if result.failed:
          raise errors.RemoteError("Unable to stop daemons on %s."
                                   " Fail reason: %s; output: %s." %
                                   (node, result.fail_reason, result.output))

  def _FetchRemoteConfig(self):
    """Fetches and stores remote cluster config from the master.

    This step is needed before we can merge the config.

    """
    for data in self.merger_data:
      result = self._RunCmd(data.cluster, "cat %s" %
353
                                          pathutils.CLUSTER_CONF_FILE)
354 355 356 357 358 359 360

      if result.failed:
        raise errors.RemoteError("Unable to retrieve remote config on %s."
                                 " Fail reason: %s; output %s" %
                                 (data.cluster, result.fail_reason,
                                  result.output))

361 362
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
                                        data.cluster)
363 364 365
      utils.WriteFile(data.config_path, data=result.stdout)

  # R0201: Method could be a function
366
  def _KillMasterDaemon(self): # pylint: disable=R0201
367 368 369 370 371
    """Kills the local master daemon.

    @raise errors.CommandError: If unable to kill

    """
372
    result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
373 374 375 376 377 378 379 380 381 382 383 384 385
    if result.failed:
      raise errors.CommandError("Unable to stop master daemons."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def _MergeConfig(self):
    """Merges all foreign config into our own config.

    """
    my_config = config.ConfigWriter(offline=True)
    fake_ec_id = 0 # Needs to be uniq over the whole config merge

    for data in self.merger_data:
386
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
387
      self._MergeClusterConfigs(my_config, other_config)
388
      self._MergeNodeGroups(my_config, other_config)
389 390 391

      for node in other_config.GetNodeList():
        node_info = other_config.GetNodeInfo(node)
392 393 394 395
        # Offline the node, it will be reonlined later at node readd
        node_info.master_candidate = False
        node_info.drained = False
        node_info.offline = True
396
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
397 398 399 400 401 402 403 404
        fake_ec_id += 1

      for instance in other_config.GetInstanceList():
        instance_info = other_config.GetInstanceInfo(instance)

        # Update the DRBD port assignments
        # This is a little bit hackish
        for dsk in instance_info.disks:
Helga Velroyen's avatar
Helga Velroyen committed
405
          if dsk.dev_type in constants.DTS_DRBD:
406 407 408 409 410 411
            port = my_config.AllocatePort()

            logical_id = list(dsk.logical_id)
            logical_id[2] = port
            dsk.logical_id = tuple(logical_id)

Stephen Shirley's avatar
Stephen Shirley committed
412 413
        my_config.AddInstance(instance_info,
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
414 415
        fake_ec_id += 1

416 417 418 419 420 421 422 423 424 425 426
  def _MergeClusterConfigs(self, my_config, other_config):
    """Checks that all relevant cluster parameters are compatible

    """
    my_cluster = my_config.GetClusterInfo()
    other_cluster = other_config.GetClusterInfo()
    err_count = 0

    #
    # Generic checks
    #
427
    check_params = [
428 429 430 431 432 433 434 435 436 437 438
      "beparams",
      "default_iallocator",
      "drbd_usermode_helper",
      "hidden_os",
      "maintain_node_health",
      "master_netdev",
      "ndparams",
      "nicparams",
      "primary_ip_family",
      "tags",
      "uid_pool",
439
      ]
440 441 442
    check_params_strict = [
      "volume_group_name",
    ]
443 444
    if my_cluster.IsFileStorageEnabled() or \
        other_cluster.IsFileStorageEnabled():
445
      check_params_strict.append("file_storage_dir")
446 447
    if my_cluster.IsSharedFileStorageEnabled() or \
        other_cluster.IsSharedFileStorageEnabled():
448 449 450 451 452 453 454
      check_params_strict.append("shared_file_storage_dir")
    check_params.extend(check_params_strict)

    if self.params == _PARAMS_STRICT:
      params_strict = True
    else:
      params_strict = False
455

456 457 458 459 460 461 462 463
    for param_name in check_params:
      my_param = getattr(my_cluster, param_name)
      other_param = getattr(other_cluster, param_name)
      if my_param != other_param:
        logging.error("The value (%s) of the cluster parameter %s on %s"
                      " differs to this cluster's value (%s)",
                      other_param, param_name, other_cluster.cluster_name,
                      my_param)
464 465
        if params_strict or param_name in check_params_strict:
          err_count += 1
466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498

    #
    # Custom checks
    #

    # Check default hypervisor
    my_defhyp = my_cluster.enabled_hypervisors[0]
    other_defhyp = other_cluster.enabled_hypervisors[0]
    if my_defhyp != other_defhyp:
      logging.warning("The default hypervisor (%s) differs on %s, new"
                      " instances will be created with this cluster's"
                      " default hypervisor (%s)", other_defhyp,
                      other_cluster.cluster_name, my_defhyp)

    if (set(my_cluster.enabled_hypervisors) !=
        set(other_cluster.enabled_hypervisors)):
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
                    " this cluster's set (%s)",
                    other_cluster.enabled_hypervisors,
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
      err_count += 1

    # Check hypervisor params for hypervisors we care about
    for hyp in my_cluster.enabled_hypervisors:
      for param in my_cluster.hvparams[hyp]:
        my_value = my_cluster.hvparams[hyp][param]
        other_value = other_cluster.hvparams[hyp][param]
        if my_value != other_value:
          logging.error("The value (%s) of the %s parameter of the %s"
                        " hypervisor on %s differs to this cluster's parameter"
                        " (%s)",
                        other_value, param, hyp, other_cluster.cluster_name,
                        my_value)
499 500
          if params_strict:
            err_count += 1
501 502 503 504 505 506 507 508 509 510 511 512

    # Check os hypervisor params for hypervisors we care about
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
      for hyp in my_cluster.enabled_hypervisors:
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
        if my_os_hvp != other_os_hvp:
          logging.error("The OS parameters (%s) for the %s OS for the %s"
                        " hypervisor on %s differs to this cluster's parameters"
                        " (%s)",
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
                        my_os_hvp)
513 514
          if params_strict:
            err_count += 1
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554

    #
    # Warnings
    #
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
                      " this cluster's value (%s) will take precedence",
                      other_cluster.modify_etc_hosts,
                      other_cluster.cluster_name,
                      my_cluster.modify_etc_hosts)

    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
                      " this cluster's value (%s) will take precedence",
                      other_cluster.modify_ssh_setup,
                      other_cluster.cluster_name,
                      my_cluster.modify_ssh_setup)

    #
    # Actual merging
    #
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
                                       other_cluster.reserved_lvs))

    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
                      " cluster's value (%s). The least permissive value (%s)"
                      " will be used", other_cluster.prealloc_wipe_disks,
                      other_cluster.cluster_name,
                      my_cluster.prealloc_wipe_disks, True)
      my_cluster.prealloc_wipe_disks = True

    for os_, osparams in other_cluster.osparams.items():
      if os_ not in my_cluster.osparams:
        my_cluster.osparams[os_] = osparams
      elif my_cluster.osparams[os_] != osparams:
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
                      " this cluster's parameters (%s)",
                      osparams, os_, other_cluster.cluster_name,
                      my_cluster.osparams[os_])
555 556
        if params_strict:
          err_count += 1
557 558 559 560 561 562 563

    if err_count:
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
                                      " values, please fix and re-run" %
                                      other_cluster.cluster_name)

  # R0201: Method could be a function
564
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
565 566 567 568 569
    if os_name in cluster.os_hvp:
      return cluster.os_hvp[os_name].get(hyp, None)
    else:
      return None

570 571 572 573 574 575
  # R0201: Method could be a function
  def _MergeNodeGroups(self, my_config, other_config):
    """Adds foreign node groups

    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
    """
576
    # pylint: disable=R0201
Stephen Shirley's avatar
Stephen Shirley committed
577
    logging.info("Node group conflict strategy: %s", self.groups)
578 579 580 581 582 583 584 585 586 587 588 589 590

    my_grps = my_config.GetAllNodeGroupsInfo().values()
    other_grps = other_config.GetAllNodeGroupsInfo().values()

    # Check for node group naming conflicts:
    conflicts = []
    for other_grp in other_grps:
      for my_grp in my_grps:
        if other_grp.name == my_grp.name:
          conflicts.append(other_grp)

    if conflicts:
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
Stephen Shirley's avatar
Stephen Shirley committed
591
      logging.info("Node groups in both local and remote cluster: %s",
592 593 594 595 596 597 598 599 600 601
                   conflict_names)

      # User hasn't specified how to handle conflicts
      if not self.groups:
        raise errors.CommandError("The following node group(s) are in both"
                                  " clusters, and no merge strategy has been"
                                  " supplied (see the --groups option): %s" %
                                  conflict_names)

      # User wants to rename conflicts
602
      elif self.groups == _GROUPS_RENAME:
603 604 605
        for grp in conflicts:
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
          logging.info("Renaming remote node group from %s to %s"
Stephen Shirley's avatar
Stephen Shirley committed
606
                       " to resolve conflict", grp.name, new_name)
607 608
          grp.name = new_name

609
      # User wants to merge conflicting groups
610
      elif self.groups == _GROUPS_MERGE:
611
        for other_grp in conflicts:
Stephen Shirley's avatar
Stephen Shirley committed
612
          logging.info("Merging local and remote '%s' groups", other_grp.name)
613 614
          for node_name in other_grp.members[:]:
            node = other_config.GetNodeInfo(node_name)
Stephen Shirley's avatar
Stephen Shirley committed
615
            # Access to a protected member of a client class
616
            # pylint: disable=W0212
617 618
            other_config._UnlockedRemoveNodeFromGroup(node)

Stephen Shirley's avatar
Stephen Shirley committed
619
            # Access to a protected member of a client class
620
            # pylint: disable=W0212
621
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
Stephen Shirley's avatar
Stephen Shirley committed
622 623

            # Access to a protected member of a client class
624
            # pylint: disable=W0212
625 626 627 628 629
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
            node.group = my_grp_uuid
          # Remove from list of groups to add
          other_grps.remove(other_grp)

630
    for grp in other_grps:
631 632 633
      #TODO: handle node group conflicts
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)

634
  # R0201: Method could be a function
635
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
636 637 638 639 640 641 642 643 644 645
    """Starts the local master daemon.

    @param no_vote: Should the masterd started without voting? default: False
    @raise errors.CommandError: If unable to start daemon.

    """
    env = {}
    if no_vote:
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"

646
    result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
647 648 649 650 651 652 653 654 655 656 657 658 659
    if result.failed:
      raise errors.CommandError("Couldn't start ganeti master."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def _ReaddMergedNodesAndRedist(self):
    """Readds all merging nodes and make sure their config is up-to-date.

    @raise errors.CommandError: If anything fails.

    """
    for data in self.merger_data:
      for node in data.nodes:
660
        logging.info("Readding node %s", node)
661
        result = utils.RunCmd(["gnt-node", "add", "--readd",
662
                               "--no-ssh-key-check", node])
663
        if result.failed:
664 665
          logging.error("%s failed to be readded. Reason: %s, output: %s",
                         node, result.fail_reason, result.output)
666 667 668 669 670

    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
    if result.failed:
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
                                " output: %s" % (result.fail_reason,
Iustin Pop's avatar
Iustin Pop committed
671
                                                 result.output))
672 673

  # R0201: Method could be a function
674
  def _StartupAllInstances(self): # pylint: disable=R0201
675 676 677 678 679 680 681 682 683 684 685 686 687
    """Starts up all instances (locally).

    @raise errors.CommandError: If unable to start clusters

    """
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
                           "--force-multiple"])
    if result.failed:
      raise errors.CommandError("Unable to start all instances."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  # R0201: Method could be a function
688
  # TODO: make this overridable, for some verify errors
689
  def _VerifyCluster(self): # pylint: disable=R0201
690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712
    """Runs gnt-cluster verify to verify the health.

    @raise errors.ProgrammError: If cluster fails on verification

    """
    result = utils.RunCmd(["gnt-cluster", "verify"])
    if result.failed:
      raise errors.CommandError("Verification of cluster failed."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def Merge(self):
    """Does the actual merge.

    It runs all the steps in the right order and updates the user about steps
    taken. Also it keeps track of rollback_steps to undo everything.

    """
    rbsteps = []
    try:
      logging.info("Pre cluster verification")
      self._VerifyCluster()

713 714 715 716 717
      logging.info("Prepare authorized_keys")
      rbsteps.append("Remove our key from authorized_keys on nodes:"
                     " %(nodes)s")
      self._PrepareAuthorizedKeys()

718 719
      rbsteps.append("Start all instances again on the merging"
                     " clusters: %(clusters)s")
720 721 722 723 724 725 726 727
      if self.stop_instances:
        logging.info("Stopping merging instances (takes a while)")
        self._StopMergingInstances()
      logging.info("Checking that no instances are running on the mergees")
      instances_running = self._CheckRunningInstances()
      if instances_running:
        raise errors.CommandError("Some instances are still running on the"
                                  " mergees")
728 729 730 731
      logging.info("Disable watcher")
      self._DisableWatcher()
      logging.info("Merging config")
      self._FetchRemoteConfig()
732 733
      logging.info("Removing master IPs on mergee master nodes")
      self._RemoveMasterIps()
734 735
      logging.info("Stop daemons on merging nodes")
      self._StopDaemons()
736

737 738 739 740 741
      logging.info("Stopping master daemon")
      self._KillMasterDaemon()

      rbsteps.append("Restore %s from another master candidate"
                     " and restart master daemon" %
742
                     pathutils.CLUSTER_CONF_FILE)
743 744 745 746 747 748 749 750 751 752 753 754 755 756
      self._MergeConfig()
      self._StartMasterDaemon(no_vote=True)

      # Point of no return, delete rbsteps
      del rbsteps[:]

      logging.warning("We are at the point of no return. Merge can not easily"
                      " be undone after this point.")
      logging.info("Readd nodes")
      self._ReaddMergedNodesAndRedist()

      logging.info("Merge done, restart master daemon normally")
      self._KillMasterDaemon()
      self._StartMasterDaemon()
757

758 759 760 761 762
      if self.restart == _RESTART_ALL:
        logging.info("Starting instances again")
        self._StartupAllInstances()
      else:
        logging.info("Not starting instances again")
763 764 765 766 767 768 769 770 771 772 773 774 775
      logging.info("Post cluster verification")
      self._VerifyCluster()
    except errors.GenericError, e:
      logging.exception(e)

      if rbsteps:
        nodes = Flatten([data.nodes for data in self.merger_data])
        info = {
          "clusters": self.clusters,
          "nodes": nodes,
          }
        logging.critical("In order to rollback do the following:")
        for step in rbsteps:
René Nussbaumer's avatar
René Nussbaumer committed
776
          logging.critical("  * %s", step % info)
777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797
      else:
        logging.critical("Nothing to rollback.")

      # TODO: Keep track of steps done for a flawless resume?

  def Cleanup(self):
    """Clean up our environment.

    This cleans up remote private keys and configs and after that
    deletes the temporary directory.

    """
    shutil.rmtree(self.work_dir)


def main():
  """Main routine.

  """
  program = os.path.basename(sys.argv[0])

798 799
  parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
                                 prog=program)
800 801 802
  parser.add_option(cli.DEBUG_OPT)
  parser.add_option(cli.VERBOSE_OPT)
  parser.add_option(PAUSE_PERIOD_OPT)
803
  parser.add_option(GROUPS_OPT)
804
  parser.add_option(RESTART_OPT)
805
  parser.add_option(PARAMS_OPT)
806
  parser.add_option(SKIP_STOP_INSTANCES_OPT)
807 808 809

  (options, args) = parser.parse_args()

810
  utils.SetupToolLogging(options.debug, options.verbose)
811 812 813 814

  if not args:
    parser.error("No clusters specified")

815
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
816 817
                          options.groups, options.restart, options.params,
                          options.stop_instances)
818 819 820 821 822 823 824 825 826 827 828 829 830 831 832
  try:
    try:
      cluster_merger.Setup()
      cluster_merger.Merge()
    except errors.GenericError, e:
      logging.exception(e)
      return constants.EXIT_FAILURE
  finally:
    cluster_merger.Cleanup()

  return constants.EXIT_SUCCESS


if __name__ == "__main__":
  sys.exit(main())