cluster-merge 30.6 KB
Newer Older
1
2
3
#!/usr/bin/python
#

4
# Copyright (C) 2010 Google Inc.
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.

"""Tool to merge two or more clusters together.

The clusters have to run the same version of Ganeti!

"""

27
# pylint: disable=C0103
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# C0103: Invalid name cluster-merge

import logging
import os
import optparse
import shutil
import sys
import tempfile

from ganeti import cli
from ganeti import config
from ganeti import constants
from ganeti import errors
from ganeti import ssh
from ganeti import utils


45
46
47
_GROUPS_MERGE = "merge"
_GROUPS_RENAME = "rename"
_CLUSTERMERGE_ECID = "clustermerge-ecid"
48
49
50
51
_RESTART_ALL = "all"
_RESTART_UP = "up"
_RESTART_NONE = "none"
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
52
53
54
_PARAMS_STRICT = "strict"
_PARAMS_WARN = "warn"
_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
55

56

57
58
59
60
61
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
                                  action="store", type="int",
                                  dest="pause_period",
                                  help=("Amount of time in seconds watcher"
                                        " should be suspended from running"))
62
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
Stephen Shirley's avatar
Stephen Shirley committed
63
64
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
                            dest="groups",
65
66
67
                            help=("How to handle groups that have the"
                                  " same name (One of: %s/%s)" %
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
68
69
70
71
72
73
74
75
PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
                            metavar="STRATEGY",
                            choices=_PARAMS_CHOICES,
                            dest="params",
                            help=("How to handle params that have"
                                  " different values (One of: %s/%s)" %
                                  _PARAMS_CHOICES))

76
77
78
79
80
81
82
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
                             metavar="STRATEGY",
                             choices=_RESTART_CHOICES,
                             dest="restart",
                             help=("How to handle restarting instances"
                                   " same name (One of: %s/%s/%s)" %
                                   _RESTART_CHOICES))
83

Guido Trotter's avatar
Guido Trotter committed
84
85
86
87
88
SKIP_STOP_INSTANCES_OPT = \
  cli.cli_option("--skip-stop-instances", default=True, action="store_false",
                 dest="stop_instances",
                 help=("Don't stop the instances on the clusters, just check "
                       "that none is running"))
89

90

Stephen Shirley's avatar
Stephen Shirley committed
91
def Flatten(unflattened_list):
92
93
  """Flattens a list.

Stephen Shirley's avatar
Stephen Shirley committed
94
95
  @param unflattened_list: A list of unflattened list objects.
  @return: A flattened list
96
97

  """
Stephen Shirley's avatar
Stephen Shirley committed
98
  flattened_list = []
99

Stephen Shirley's avatar
Stephen Shirley committed
100
  for item in unflattened_list:
101
    if isinstance(item, list):
Stephen Shirley's avatar
Stephen Shirley committed
102
      flattened_list.extend(Flatten(item))
103
    else:
Stephen Shirley's avatar
Stephen Shirley committed
104
105
      flattened_list.append(item)
  return flattened_list
106
107
108
109
110
111


class MergerData(object):
  """Container class to hold data used for merger.

  """
112
  def __init__(self, cluster, key_path, nodes, instances, master_node,
113
               config_path=None):
114
115
116
117
    """Initialize the container.

    @param cluster: The name of the cluster
    @param key_path: Path to the ssh private key used for authentication
118
    @param nodes: List of online nodes in the merging cluster
119
    @param instances: List of instances running on merging cluster
120
    @param master_node: Name of the master node
121
    @param config_path: Path to the merging cluster config
122
123
124
125
126

    """
    self.cluster = cluster
    self.key_path = key_path
    self.nodes = nodes
127
    self.instances = instances
128
    self.master_node = master_node
129
    self.config_path = config_path
130
131
132
133
134
135


class Merger(object):
  """Handling the merge.

  """
136
137
138
139
  RUNNING_STATUSES = frozenset([
    constants.INSTST_RUNNING,
    constants.INSTST_ERRORUP,
    ])
Michael Hanselmann's avatar
Michael Hanselmann committed
140

141
142
  def __init__(self, clusters, pause_period, groups, restart, params,
               stop_instances):
143
144
145
146
    """Initialize object with sane defaults and infos required.

    @param clusters: The list of clusters to merge in
    @param pause_period: The time watcher shall be disabled for
147
    @param groups: How to handle group conflicts
148
    @param restart: How to handle instance restart
149
150
151
    @param stop_instances: Indicates whether the instances must be stopped
                           (True) or if the Merger must only check if no
                           instances are running on the mergee clusters (False)
152
153
154
155
156
157

    """
    self.merger_data = []
    self.clusters = clusters
    self.pause_period = pause_period
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
158
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
159
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
160
    self.groups = groups
161
    self.restart = restart
162
    self.params = params
163
    self.stop_instances = stop_instances
164
165
166
    if self.restart == _RESTART_UP:
      raise NotImplementedError

167
168
169
170
171
172
173
174
175
176
177
  def Setup(self):
    """Sets up our end so we can do the merger.

    This method is setting us up as a preparation for the merger.
    It makes the initial contact and gathers information needed.

    @raise errors.RemoteError: for errors in communication/grabbing

    """
    (remote_path, _, _) = ssh.GetUserFiles("root")

178
179
180
181
    if self.cluster_name in self.clusters:
      raise errors.CommandError("Cannot merge cluster %s with itself" %
                                self.cluster_name)

182
183
184
185
186
187
188
189
190
    # Fetch remotes private key
    for cluster in self.clusters:
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
                            ask_key=False)
      if result.failed:
        raise errors.RemoteError("There was an error while grabbing ssh private"
                                 " key from %s. Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

191
      key_path = utils.PathJoin(self.work_dir, cluster)
192
193
      utils.WriteFile(key_path, mode=0600, data=result.stdout)

194
195
      result = self._RunCmd(cluster, "gnt-node list -o name,offline"
                            " --no-header --separator=,", private_key=key_path)
196
197
198
199
      if result.failed:
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))
200
201
202
      nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
      nodes = [node_status[0] for node_status in nodes_statuses
               if node_status[1] == "N"]
203
204
205
206
207
208
209
210
211

      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
                            private_key=key_path)
      if result.failed:
        raise errors.RemoteError("Unable to retrieve list of instances from"
                                 " %s. Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))
      instances = result.stdout.splitlines()

212
213
214
215
216
217
218
219
220
221
      path = utils.PathJoin(constants.DATA_DIR, "ssconf_%s" %
                            constants.SS_MASTER_NODE)
      result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
      if result.failed:
        raise errors.RemoteError("Unable to retrieve the master node name from"
                                 " %s. Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))
      master_node = result.stdout.strip()

      self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
222
                                         master_node))
223

224
225
  def _PrepareAuthorizedKeys(self):
    """Prepare the authorized_keys on every merging node.
226
227
228
229
230
231
232
233

    This method add our public key to remotes authorized_key for further
    communication.

    """
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
    pub_key = utils.ReadFile(pub_key_file)

234
235
236
237
    for data in self.merger_data:
      for node in data.nodes:
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
                                     (auth_keys, pub_key)),
238
                              private_key=data.key_path, max_attempts=3)
239

240
241
242
243
244
        if result.failed:
          raise errors.RemoteError("Unable to add our public key to %s in %s."
                                   " Fail reason: %s; output: %s" %
                                   (node, data.cluster, result.fail_reason,
                                    result.output))
245
246
247

  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
              strict_host_check=False, private_key=None, batch=True,
248
              ask_key=False, max_attempts=1):
249
250
    """Wrapping SshRunner.Run with default parameters.

251
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
252
253

    """
254
255
256
257
258
259
260
261
262
263
    for _ in range(max_attempts):
      result = self.ssh_runner.Run(hostname=hostname, command=command,
                                 user=user, use_cluster_key=use_cluster_key,
                                 strict_host_check=strict_host_check,
                                 private_key=private_key, batch=batch,
                                 ask_key=ask_key)
      if not result.failed:
        break

    return result
264

265
266
267
268
269
270
271
272
273
  def _CheckRunningInstances(self):
    """Checks if on the clusters to be merged there are running instances

    @rtype: boolean
    @return: True if there are running instances, False otherwise

    """
    for cluster in self.clusters:
      result = self._RunCmd(cluster, "gnt-instance list -o status")
274
      if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
275
276
277
278
        return True

    return False

279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
  def _StopMergingInstances(self):
    """Stop instances on merging clusters.

    """
    for cluster in self.clusters:
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
                                     " --force-multiple")

      if result.failed:
        raise errors.RemoteError("Unable to stop instances on %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

  def _DisableWatcher(self):
    """Disable watch on all merging clusters, including ourself.

    """
    for cluster in ["localhost"] + self.clusters:
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
                                     self.pause_period)

      if result.failed:
        raise errors.RemoteError("Unable to pause watcher on %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

305
306
307
308
309
  def _RemoveMasterIps(self):
    """Removes the master IPs from the master nodes of each cluster.

    """
    for data in self.merger_data:
310
      result = self._RunCmd(data.master_node,
311
                            "gnt-cluster deactivate-master-ip --yes")
312

313
314
315
316
317
318
319
      if result.failed:
        raise errors.RemoteError("Unable to remove master IP on %s."
                                 " Fail reason: %s; output: %s" %
                                 (data.master_node,
                                  result.fail_reason,
                                  result.output))

320
321
322
323
  def _StopDaemons(self):
    """Stop all daemons on merging nodes.

    """
324
    cmd = "%s stop-all" % constants.DAEMON_UTIL
325
326
    for data in self.merger_data:
      for node in data.nodes:
327
        result = self._RunCmd(node, cmd, max_attempts=3)
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349

        if result.failed:
          raise errors.RemoteError("Unable to stop daemons on %s."
                                   " Fail reason: %s; output: %s." %
                                   (node, result.fail_reason, result.output))

  def _FetchRemoteConfig(self):
    """Fetches and stores remote cluster config from the master.

    This step is needed before we can merge the config.

    """
    for data in self.merger_data:
      result = self._RunCmd(data.cluster, "cat %s" %
                                          constants.CLUSTER_CONF_FILE)

      if result.failed:
        raise errors.RemoteError("Unable to retrieve remote config on %s."
                                 " Fail reason: %s; output %s" %
                                 (data.cluster, result.fail_reason,
                                  result.output))

350
351
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
                                        data.cluster)
352
353
354
      utils.WriteFile(data.config_path, data=result.stdout)

  # R0201: Method could be a function
355
  def _KillMasterDaemon(self): # pylint: disable=R0201
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
    """Kills the local master daemon.

    @raise errors.CommandError: If unable to kill

    """
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
    if result.failed:
      raise errors.CommandError("Unable to stop master daemons."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def _MergeConfig(self):
    """Merges all foreign config into our own config.

    """
    my_config = config.ConfigWriter(offline=True)
    fake_ec_id = 0 # Needs to be uniq over the whole config merge

    for data in self.merger_data:
375
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
376
      self._MergeClusterConfigs(my_config, other_config)
377
      self._MergeNodeGroups(my_config, other_config)
378
379
380

      for node in other_config.GetNodeList():
        node_info = other_config.GetNodeInfo(node)
381
382
383
384
        # Offline the node, it will be reonlined later at node readd
        node_info.master_candidate = False
        node_info.drained = False
        node_info.offline = True
385
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
        fake_ec_id += 1

      for instance in other_config.GetInstanceList():
        instance_info = other_config.GetInstanceInfo(instance)

        # Update the DRBD port assignments
        # This is a little bit hackish
        for dsk in instance_info.disks:
          if dsk.dev_type in constants.LDS_DRBD:
            port = my_config.AllocatePort()

            logical_id = list(dsk.logical_id)
            logical_id[2] = port
            dsk.logical_id = tuple(logical_id)

            physical_id = list(dsk.physical_id)
            physical_id[1] = physical_id[3] = port
            dsk.physical_id = tuple(physical_id)

Stephen Shirley's avatar
Stephen Shirley committed
405
406
        my_config.AddInstance(instance_info,
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
407
408
        fake_ec_id += 1

409
410
411
412
413
414
415
416
417
418
419
  def _MergeClusterConfigs(self, my_config, other_config):
    """Checks that all relevant cluster parameters are compatible

    """
    my_cluster = my_config.GetClusterInfo()
    other_cluster = other_config.GetClusterInfo()
    err_count = 0

    #
    # Generic checks
    #
420
    check_params = [
421
422
423
424
425
426
427
428
429
430
431
      "beparams",
      "default_iallocator",
      "drbd_usermode_helper",
      "hidden_os",
      "maintain_node_health",
      "master_netdev",
      "ndparams",
      "nicparams",
      "primary_ip_family",
      "tags",
      "uid_pool",
432
      ]
433
434
435
    check_params_strict = [
      "volume_group_name",
    ]
436
    if constants.ENABLE_FILE_STORAGE:
437
438
439
440
441
442
443
444
445
      check_params_strict.append("file_storage_dir")
    if constants.ENABLE_SHARED_FILE_STORAGE:
      check_params_strict.append("shared_file_storage_dir")
    check_params.extend(check_params_strict)

    if self.params == _PARAMS_STRICT:
      params_strict = True
    else:
      params_strict = False
446

447
448
449
450
451
452
453
454
    for param_name in check_params:
      my_param = getattr(my_cluster, param_name)
      other_param = getattr(other_cluster, param_name)
      if my_param != other_param:
        logging.error("The value (%s) of the cluster parameter %s on %s"
                      " differs to this cluster's value (%s)",
                      other_param, param_name, other_cluster.cluster_name,
                      my_param)
455
456
        if params_strict or param_name in check_params_strict:
          err_count += 1
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489

    #
    # Custom checks
    #

    # Check default hypervisor
    my_defhyp = my_cluster.enabled_hypervisors[0]
    other_defhyp = other_cluster.enabled_hypervisors[0]
    if my_defhyp != other_defhyp:
      logging.warning("The default hypervisor (%s) differs on %s, new"
                      " instances will be created with this cluster's"
                      " default hypervisor (%s)", other_defhyp,
                      other_cluster.cluster_name, my_defhyp)

    if (set(my_cluster.enabled_hypervisors) !=
        set(other_cluster.enabled_hypervisors)):
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
                    " this cluster's set (%s)",
                    other_cluster.enabled_hypervisors,
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
      err_count += 1

    # Check hypervisor params for hypervisors we care about
    for hyp in my_cluster.enabled_hypervisors:
      for param in my_cluster.hvparams[hyp]:
        my_value = my_cluster.hvparams[hyp][param]
        other_value = other_cluster.hvparams[hyp][param]
        if my_value != other_value:
          logging.error("The value (%s) of the %s parameter of the %s"
                        " hypervisor on %s differs to this cluster's parameter"
                        " (%s)",
                        other_value, param, hyp, other_cluster.cluster_name,
                        my_value)
490
491
          if params_strict:
            err_count += 1
492
493
494
495
496
497
498
499
500
501
502
503

    # Check os hypervisor params for hypervisors we care about
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
      for hyp in my_cluster.enabled_hypervisors:
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
        if my_os_hvp != other_os_hvp:
          logging.error("The OS parameters (%s) for the %s OS for the %s"
                        " hypervisor on %s differs to this cluster's parameters"
                        " (%s)",
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
                        my_os_hvp)
504
505
          if params_strict:
            err_count += 1
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545

    #
    # Warnings
    #
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
                      " this cluster's value (%s) will take precedence",
                      other_cluster.modify_etc_hosts,
                      other_cluster.cluster_name,
                      my_cluster.modify_etc_hosts)

    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
                      " this cluster's value (%s) will take precedence",
                      other_cluster.modify_ssh_setup,
                      other_cluster.cluster_name,
                      my_cluster.modify_ssh_setup)

    #
    # Actual merging
    #
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
                                       other_cluster.reserved_lvs))

    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
                      " cluster's value (%s). The least permissive value (%s)"
                      " will be used", other_cluster.prealloc_wipe_disks,
                      other_cluster.cluster_name,
                      my_cluster.prealloc_wipe_disks, True)
      my_cluster.prealloc_wipe_disks = True

    for os_, osparams in other_cluster.osparams.items():
      if os_ not in my_cluster.osparams:
        my_cluster.osparams[os_] = osparams
      elif my_cluster.osparams[os_] != osparams:
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
                      " this cluster's parameters (%s)",
                      osparams, os_, other_cluster.cluster_name,
                      my_cluster.osparams[os_])
546
547
        if params_strict:
          err_count += 1
548
549
550
551
552
553
554

    if err_count:
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
                                      " values, please fix and re-run" %
                                      other_cluster.cluster_name)

  # R0201: Method could be a function
555
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
556
557
558
559
560
    if os_name in cluster.os_hvp:
      return cluster.os_hvp[os_name].get(hyp, None)
    else:
      return None

561
562
563
564
565
566
  # R0201: Method could be a function
  def _MergeNodeGroups(self, my_config, other_config):
    """Adds foreign node groups

    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
    """
567
    # pylint: disable=R0201
Stephen Shirley's avatar
Stephen Shirley committed
568
    logging.info("Node group conflict strategy: %s", self.groups)
569
570
571
572
573
574
575
576
577
578
579
580
581

    my_grps = my_config.GetAllNodeGroupsInfo().values()
    other_grps = other_config.GetAllNodeGroupsInfo().values()

    # Check for node group naming conflicts:
    conflicts = []
    for other_grp in other_grps:
      for my_grp in my_grps:
        if other_grp.name == my_grp.name:
          conflicts.append(other_grp)

    if conflicts:
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
Stephen Shirley's avatar
Stephen Shirley committed
582
      logging.info("Node groups in both local and remote cluster: %s",
583
584
585
586
587
588
589
590
591
592
                   conflict_names)

      # User hasn't specified how to handle conflicts
      if not self.groups:
        raise errors.CommandError("The following node group(s) are in both"
                                  " clusters, and no merge strategy has been"
                                  " supplied (see the --groups option): %s" %
                                  conflict_names)

      # User wants to rename conflicts
593
      elif self.groups == _GROUPS_RENAME:
594
595
596
        for grp in conflicts:
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
          logging.info("Renaming remote node group from %s to %s"
Stephen Shirley's avatar
Stephen Shirley committed
597
                       " to resolve conflict", grp.name, new_name)
598
599
          grp.name = new_name

600
      # User wants to merge conflicting groups
601
      elif self.groups == _GROUPS_MERGE:
602
        for other_grp in conflicts:
Stephen Shirley's avatar
Stephen Shirley committed
603
          logging.info("Merging local and remote '%s' groups", other_grp.name)
604
605
          for node_name in other_grp.members[:]:
            node = other_config.GetNodeInfo(node_name)
Stephen Shirley's avatar
Stephen Shirley committed
606
            # Access to a protected member of a client class
607
            # pylint: disable=W0212
608
609
            other_config._UnlockedRemoveNodeFromGroup(node)

Stephen Shirley's avatar
Stephen Shirley committed
610
            # Access to a protected member of a client class
611
            # pylint: disable=W0212
612
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
Stephen Shirley's avatar
Stephen Shirley committed
613
614

            # Access to a protected member of a client class
615
            # pylint: disable=W0212
616
617
618
619
620
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
            node.group = my_grp_uuid
          # Remove from list of groups to add
          other_grps.remove(other_grp)

621
    for grp in other_grps:
622
623
624
      #TODO: handle node group conflicts
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)

625
  # R0201: Method could be a function
626
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
    """Starts the local master daemon.

    @param no_vote: Should the masterd started without voting? default: False
    @raise errors.CommandError: If unable to start daemon.

    """
    env = {}
    if no_vote:
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"

    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
    if result.failed:
      raise errors.CommandError("Couldn't start ganeti master."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def _ReaddMergedNodesAndRedist(self):
    """Readds all merging nodes and make sure their config is up-to-date.

    @raise errors.CommandError: If anything fails.

    """
    for data in self.merger_data:
      for node in data.nodes:
651
        logging.info("Readding node %s", node)
652
        result = utils.RunCmd(["gnt-node", "add", "--readd",
653
                               "--no-ssh-key-check", "--force-join", node])
654
        if result.failed:
655
656
          logging.error("%s failed to be readded. Reason: %s, output: %s",
                         node, result.fail_reason, result.output)
657
658
659
660
661
662
663
664

    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
    if result.failed:
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
                                " output: %s" % (result.fail_reason,
                                                result.output))

  # R0201: Method could be a function
665
  def _StartupAllInstances(self): # pylint: disable=R0201
666
667
668
669
670
671
672
673
674
675
676
677
678
    """Starts up all instances (locally).

    @raise errors.CommandError: If unable to start clusters

    """
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
                           "--force-multiple"])
    if result.failed:
      raise errors.CommandError("Unable to start all instances."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  # R0201: Method could be a function
679
  # TODO: make this overridable, for some verify errors
680
  def _VerifyCluster(self): # pylint: disable=R0201
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
    """Runs gnt-cluster verify to verify the health.

    @raise errors.ProgrammError: If cluster fails on verification

    """
    result = utils.RunCmd(["gnt-cluster", "verify"])
    if result.failed:
      raise errors.CommandError("Verification of cluster failed."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def Merge(self):
    """Does the actual merge.

    It runs all the steps in the right order and updates the user about steps
    taken. Also it keeps track of rollback_steps to undo everything.

    """
    rbsteps = []
    try:
      logging.info("Pre cluster verification")
      self._VerifyCluster()

704
705
706
707
708
      logging.info("Prepare authorized_keys")
      rbsteps.append("Remove our key from authorized_keys on nodes:"
                     " %(nodes)s")
      self._PrepareAuthorizedKeys()

709
710
      rbsteps.append("Start all instances again on the merging"
                     " clusters: %(clusters)s")
711
712
713
714
715
716
717
718
      if self.stop_instances:
        logging.info("Stopping merging instances (takes a while)")
        self._StopMergingInstances()
      logging.info("Checking that no instances are running on the mergees")
      instances_running = self._CheckRunningInstances()
      if instances_running:
        raise errors.CommandError("Some instances are still running on the"
                                  " mergees")
719
720
721
722
      logging.info("Disable watcher")
      self._DisableWatcher()
      logging.info("Merging config")
      self._FetchRemoteConfig()
723
724
      logging.info("Removing master IPs on mergee master nodes")
      self._RemoveMasterIps()
725
726
      logging.info("Stop daemons on merging nodes")
      self._StopDaemons()
727

728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
      logging.info("Stopping master daemon")
      self._KillMasterDaemon()

      rbsteps.append("Restore %s from another master candidate"
                     " and restart master daemon" %
                     constants.CLUSTER_CONF_FILE)
      self._MergeConfig()
      self._StartMasterDaemon(no_vote=True)

      # Point of no return, delete rbsteps
      del rbsteps[:]

      logging.warning("We are at the point of no return. Merge can not easily"
                      " be undone after this point.")
      logging.info("Readd nodes")
      self._ReaddMergedNodesAndRedist()

      logging.info("Merge done, restart master daemon normally")
      self._KillMasterDaemon()
      self._StartMasterDaemon()
748

749
750
751
752
753
      if self.restart == _RESTART_ALL:
        logging.info("Starting instances again")
        self._StartupAllInstances()
      else:
        logging.info("Not starting instances again")
754
755
756
757
758
759
760
761
762
763
764
765
766
      logging.info("Post cluster verification")
      self._VerifyCluster()
    except errors.GenericError, e:
      logging.exception(e)

      if rbsteps:
        nodes = Flatten([data.nodes for data in self.merger_data])
        info = {
          "clusters": self.clusters,
          "nodes": nodes,
          }
        logging.critical("In order to rollback do the following:")
        for step in rbsteps:
René Nussbaumer's avatar
René Nussbaumer committed
767
          logging.critical("  * %s", step % info)
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
      else:
        logging.critical("Nothing to rollback.")

      # TODO: Keep track of steps done for a flawless resume?

  def Cleanup(self):
    """Clean up our environment.

    This cleans up remote private keys and configs and after that
    deletes the temporary directory.

    """
    shutil.rmtree(self.work_dir)


def SetupLogging(options):
  """Setting up logging infrastructure.

  @param options: Parsed command line options

  """
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")

  stderr_handler = logging.StreamHandler()
  stderr_handler.setFormatter(formatter)
  if options.debug:
    stderr_handler.setLevel(logging.NOTSET)
  elif options.verbose:
    stderr_handler.setLevel(logging.INFO)
  else:
798
    stderr_handler.setLevel(logging.WARNING)
799
800
801
802
803
804
805
806
807
808
809
810

  root_logger = logging.getLogger("")
  root_logger.setLevel(logging.NOTSET)
  root_logger.addHandler(stderr_handler)


def main():
  """Main routine.

  """
  program = os.path.basename(sys.argv[0])

811
812
  parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
                                 prog=program)
813
814
815
  parser.add_option(cli.DEBUG_OPT)
  parser.add_option(cli.VERBOSE_OPT)
  parser.add_option(PAUSE_PERIOD_OPT)
816
  parser.add_option(GROUPS_OPT)
817
  parser.add_option(RESTART_OPT)
818
  parser.add_option(PARAMS_OPT)
819
  parser.add_option(SKIP_STOP_INSTANCES_OPT)
820
821
822
823
824
825
826
827

  (options, args) = parser.parse_args()

  SetupLogging(options)

  if not args:
    parser.error("No clusters specified")

828
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
829
830
                          options.groups, options.restart, options.params,
                          options.stop_instances)
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
  try:
    try:
      cluster_merger.Setup()
      cluster_merger.Merge()
    except errors.GenericError, e:
      logging.exception(e)
      return constants.EXIT_FAILURE
  finally:
    cluster_merger.Cleanup()

  return constants.EXIT_SUCCESS


if __name__ == "__main__":
  sys.exit(main())