cluster-merge 29.4 KB
Newer Older
1
2
3
#!/usr/bin/python
#

4
# Copyright (C) 2010 Google Inc.
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.

"""Tool to merge two or more clusters together.

The clusters have to run the same version of Ganeti!

"""

# pylint: disable-msg=C0103
# C0103: Invalid name cluster-merge

import logging
import os
import optparse
import shutil
import sys
import tempfile

from ganeti import cli
from ganeti import config
from ganeti import constants
from ganeti import errors
from ganeti import ssh
from ganeti import utils


45
46
47
_GROUPS_MERGE = "merge"
_GROUPS_RENAME = "rename"
_CLUSTERMERGE_ECID = "clustermerge-ecid"
48
49
50
51
_RESTART_ALL = "all"
_RESTART_UP = "up"
_RESTART_NONE = "none"
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
52
53
54
_PARAMS_STRICT = "strict"
_PARAMS_WARN = "warn"
_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
55

56

57
58
59
60
61
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
                                  action="store", type="int",
                                  dest="pause_period",
                                  help=("Amount of time in seconds watcher"
                                        " should be suspended from running"))
62
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
Stephen Shirley's avatar
Stephen Shirley committed
63
64
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
                            dest="groups",
65
66
67
                            help=("How to handle groups that have the"
                                  " same name (One of: %s/%s)" %
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
68
69
70
71
72
73
74
75
PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
                            metavar="STRATEGY",
                            choices=_PARAMS_CHOICES,
                            dest="params",
                            help=("How to handle params that have"
                                  " different values (One of: %s/%s)" %
                                  _PARAMS_CHOICES))

76
77
78
79
80
81
82
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
                             metavar="STRATEGY",
                             choices=_RESTART_CHOICES,
                             dest="restart",
                             help=("How to handle restarting instances"
                                   " same name (One of: %s/%s/%s)" %
                                   _RESTART_CHOICES))
83

84
85
86
87
88
89
90
SKIP_STOP_INSTANCES_OPT = cli.cli_option("--skip-stop-instances", default=True,
                                         action="store_false", type="boolean",
                                         dest="stop_instances",
                                         help=("Don't stop the instances on the"
                                               " clusters, but just to check"
                                               " that none is running"))

91

Stephen Shirley's avatar
Stephen Shirley committed
92
def Flatten(unflattened_list):
93
94
  """Flattens a list.

Stephen Shirley's avatar
Stephen Shirley committed
95
96
  @param unflattened_list: A list of unflattened list objects.
  @return: A flattened list
97
98

  """
Stephen Shirley's avatar
Stephen Shirley committed
99
  flattened_list = []
100

Stephen Shirley's avatar
Stephen Shirley committed
101
  for item in unflattened_list:
102
    if isinstance(item, list):
Stephen Shirley's avatar
Stephen Shirley committed
103
      flattened_list.extend(Flatten(item))
104
    else:
Stephen Shirley's avatar
Stephen Shirley committed
105
106
      flattened_list.append(item)
  return flattened_list
107
108
109
110
111
112
113
114
115
116
117


class MergerData(object):
  """Container class to hold data used for merger.

  """
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
    """Initialize the container.

    @param cluster: The name of the cluster
    @param key_path: Path to the ssh private key used for authentication
118
    @param nodes: List of online nodes in the merging cluster
119
    @param instances: List of instances running on merging cluster
120
    @param config_path: Path to the merging cluster config
121
122
123
124
125

    """
    self.cluster = cluster
    self.key_path = key_path
    self.nodes = nodes
126
127
    self.instances = instances
    self.config_path = config_path
128
129
130
131
132
133


class Merger(object):
  """Handling the merge.

  """
134
135
136
137
138
139
  RUNNING_STATUSES = frozenset([
    constants.INSTST_RUNNING,
    constants.INSTST_ERRORUP,
    ])
  def __init__(self, clusters, pause_period, groups, restart, params,
               stop_instances):
140
141
142
143
    """Initialize object with sane defaults and infos required.

    @param clusters: The list of clusters to merge in
    @param pause_period: The time watcher shall be disabled for
144
    @param groups: How to handle group conflicts
145
    @param restart: How to handle instance restart
146
147
148
    @param stop_instances: Indicates whether the instances must be stopped
                           (True) or if the Merger must only check if no
                           instances are running on the mergee clusters (False)
149
150
151
152
153
154

    """
    self.merger_data = []
    self.clusters = clusters
    self.pause_period = pause_period
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
155
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
156
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
157
    self.groups = groups
158
    self.restart = restart
159
    self.params = params
160
    self.stop_instances = stop_instances
161
162
163
    if self.restart == _RESTART_UP:
      raise NotImplementedError

164
165
166
167
168
169
170
171
172
173
174
175

  def Setup(self):
    """Sets up our end so we can do the merger.

    This method is setting us up as a preparation for the merger.
    It makes the initial contact and gathers information needed.

    @raise errors.RemoteError: for errors in communication/grabbing

    """
    (remote_path, _, _) = ssh.GetUserFiles("root")

176
177
178
179
    if self.cluster_name in self.clusters:
      raise errors.CommandError("Cannot merge cluster %s with itself" %
                                self.cluster_name)

180
181
182
183
184
185
186
187
188
    # Fetch remotes private key
    for cluster in self.clusters:
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
                            ask_key=False)
      if result.failed:
        raise errors.RemoteError("There was an error while grabbing ssh private"
                                 " key from %s. Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

189
      key_path = utils.PathJoin(self.work_dir, cluster)
190
191
      utils.WriteFile(key_path, mode=0600, data=result.stdout)

192
193
      result = self._RunCmd(cluster, "gnt-node list -o name,offline"
                            " --no-header --separator=,", private_key=key_path)
194
195
196
197
      if result.failed:
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))
198
199
200
      nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
      nodes = [node_status[0] for node_status in nodes_statuses
               if node_status[1] == "N"]
201
202
203
204
205
206
207
208
209
210
211

      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
                            private_key=key_path)
      if result.failed:
        raise errors.RemoteError("Unable to retrieve list of instances from"
                                 " %s. Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))
      instances = result.stdout.splitlines()

      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))

212
213
  def _PrepareAuthorizedKeys(self):
    """Prepare the authorized_keys on every merging node.
214
215
216
217
218
219
220
221

    This method add our public key to remotes authorized_key for further
    communication.

    """
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
    pub_key = utils.ReadFile(pub_key_file)

222
223
224
225
    for data in self.merger_data:
      for node in data.nodes:
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
                                     (auth_keys, pub_key)),
226
                              private_key=data.key_path, max_attempts=3)
227

228
229
230
231
232
        if result.failed:
          raise errors.RemoteError("Unable to add our public key to %s in %s."
                                   " Fail reason: %s; output: %s" %
                                   (node, data.cluster, result.fail_reason,
                                    result.output))
233
234
235

  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
              strict_host_check=False, private_key=None, batch=True,
236
              ask_key=False, max_attempts=1):
237
238
    """Wrapping SshRunner.Run with default parameters.

239
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
240
241

    """
242
243
244
245
246
247
248
249
250
251
    for _ in range(max_attempts):
      result = self.ssh_runner.Run(hostname=hostname, command=command,
                                 user=user, use_cluster_key=use_cluster_key,
                                 strict_host_check=strict_host_check,
                                 private_key=private_key, batch=batch,
                                 ask_key=ask_key)
      if not result.failed:
        break

    return result
252

253
254
255
256
257
258
259
260
261
262
263
264
265
266
  def _CheckRunningInstances(self):
    """Checks if on the clusters to be merged there are running instances

    @rtype: boolean
    @return: True if there are running instances, False otherwise

    """
    for cluster in self.clusters:
      result = self._RunCmd(cluster, "gnt-instance list -o status")
      if self.RUNNING_STATUSES.intersect(result.output.splitlines()):
        return True

    return False

267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
  def _StopMergingInstances(self):
    """Stop instances on merging clusters.

    """
    for cluster in self.clusters:
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
                                     " --force-multiple")

      if result.failed:
        raise errors.RemoteError("Unable to stop instances on %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

  def _DisableWatcher(self):
    """Disable watch on all merging clusters, including ourself.

    """
    for cluster in ["localhost"] + self.clusters:
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
                                     self.pause_period)

      if result.failed:
        raise errors.RemoteError("Unable to pause watcher on %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

  def _StopDaemons(self):
    """Stop all daemons on merging nodes.

    """
297
    cmd = "%s stop-all" % constants.DAEMON_UTIL
298
299
    for data in self.merger_data:
      for node in data.nodes:
300
        result = self._RunCmd(node, cmd, max_attempts=3)
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322

        if result.failed:
          raise errors.RemoteError("Unable to stop daemons on %s."
                                   " Fail reason: %s; output: %s." %
                                   (node, result.fail_reason, result.output))

  def _FetchRemoteConfig(self):
    """Fetches and stores remote cluster config from the master.

    This step is needed before we can merge the config.

    """
    for data in self.merger_data:
      result = self._RunCmd(data.cluster, "cat %s" %
                                          constants.CLUSTER_CONF_FILE)

      if result.failed:
        raise errors.RemoteError("Unable to retrieve remote config on %s."
                                 " Fail reason: %s; output %s" %
                                 (data.cluster, result.fail_reason,
                                  result.output))

323
324
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
                                        data.cluster)
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
      utils.WriteFile(data.config_path, data=result.stdout)

  # R0201: Method could be a function
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
    """Kills the local master daemon.

    @raise errors.CommandError: If unable to kill

    """
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
    if result.failed:
      raise errors.CommandError("Unable to stop master daemons."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def _MergeConfig(self):
    """Merges all foreign config into our own config.

    """
    my_config = config.ConfigWriter(offline=True)
    fake_ec_id = 0 # Needs to be uniq over the whole config merge

    for data in self.merger_data:
348
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
349
      self._MergeClusterConfigs(my_config, other_config)
350
      self._MergeNodeGroups(my_config, other_config)
351
352
353

      for node in other_config.GetNodeList():
        node_info = other_config.GetNodeInfo(node)
354
355
356
357
        # Offline the node, it will be reonlined later at node readd
        node_info.master_candidate = False
        node_info.drained = False
        node_info.offline = True
358
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
        fake_ec_id += 1

      for instance in other_config.GetInstanceList():
        instance_info = other_config.GetInstanceInfo(instance)

        # Update the DRBD port assignments
        # This is a little bit hackish
        for dsk in instance_info.disks:
          if dsk.dev_type in constants.LDS_DRBD:
            port = my_config.AllocatePort()

            logical_id = list(dsk.logical_id)
            logical_id[2] = port
            dsk.logical_id = tuple(logical_id)

            physical_id = list(dsk.physical_id)
            physical_id[1] = physical_id[3] = port
            dsk.physical_id = tuple(physical_id)

Stephen Shirley's avatar
Stephen Shirley committed
378
379
        my_config.AddInstance(instance_info,
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
380
381
        fake_ec_id += 1

382
383
384
385
386
387
388
389
390
391
392
  def _MergeClusterConfigs(self, my_config, other_config):
    """Checks that all relevant cluster parameters are compatible

    """
    my_cluster = my_config.GetClusterInfo()
    other_cluster = other_config.GetClusterInfo()
    err_count = 0

    #
    # Generic checks
    #
393
    check_params = [
394
395
396
397
398
399
400
401
402
403
404
      "beparams",
      "default_iallocator",
      "drbd_usermode_helper",
      "hidden_os",
      "maintain_node_health",
      "master_netdev",
      "ndparams",
      "nicparams",
      "primary_ip_family",
      "tags",
      "uid_pool",
405
      ]
406
407
408
    check_params_strict = [
      "volume_group_name",
    ]
409
    if constants.ENABLE_FILE_STORAGE:
410
411
412
413
414
415
416
417
418
      check_params_strict.append("file_storage_dir")
    if constants.ENABLE_SHARED_FILE_STORAGE:
      check_params_strict.append("shared_file_storage_dir")
    check_params.extend(check_params_strict)

    if self.params == _PARAMS_STRICT:
      params_strict = True
    else:
      params_strict = False
419

420
421
422
423
424
425
426
427
    for param_name in check_params:
      my_param = getattr(my_cluster, param_name)
      other_param = getattr(other_cluster, param_name)
      if my_param != other_param:
        logging.error("The value (%s) of the cluster parameter %s on %s"
                      " differs to this cluster's value (%s)",
                      other_param, param_name, other_cluster.cluster_name,
                      my_param)
428
429
        if params_strict or param_name in check_params_strict:
          err_count += 1
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462

    #
    # Custom checks
    #

    # Check default hypervisor
    my_defhyp = my_cluster.enabled_hypervisors[0]
    other_defhyp = other_cluster.enabled_hypervisors[0]
    if my_defhyp != other_defhyp:
      logging.warning("The default hypervisor (%s) differs on %s, new"
                      " instances will be created with this cluster's"
                      " default hypervisor (%s)", other_defhyp,
                      other_cluster.cluster_name, my_defhyp)

    if (set(my_cluster.enabled_hypervisors) !=
        set(other_cluster.enabled_hypervisors)):
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
                    " this cluster's set (%s)",
                    other_cluster.enabled_hypervisors,
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
      err_count += 1

    # Check hypervisor params for hypervisors we care about
    for hyp in my_cluster.enabled_hypervisors:
      for param in my_cluster.hvparams[hyp]:
        my_value = my_cluster.hvparams[hyp][param]
        other_value = other_cluster.hvparams[hyp][param]
        if my_value != other_value:
          logging.error("The value (%s) of the %s parameter of the %s"
                        " hypervisor on %s differs to this cluster's parameter"
                        " (%s)",
                        other_value, param, hyp, other_cluster.cluster_name,
                        my_value)
463
464
          if params_strict:
            err_count += 1
465
466
467
468
469
470
471
472
473
474
475
476

    # Check os hypervisor params for hypervisors we care about
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
      for hyp in my_cluster.enabled_hypervisors:
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
        if my_os_hvp != other_os_hvp:
          logging.error("The OS parameters (%s) for the %s OS for the %s"
                        " hypervisor on %s differs to this cluster's parameters"
                        " (%s)",
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
                        my_os_hvp)
477
478
          if params_strict:
            err_count += 1
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518

    #
    # Warnings
    #
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
                      " this cluster's value (%s) will take precedence",
                      other_cluster.modify_etc_hosts,
                      other_cluster.cluster_name,
                      my_cluster.modify_etc_hosts)

    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
                      " this cluster's value (%s) will take precedence",
                      other_cluster.modify_ssh_setup,
                      other_cluster.cluster_name,
                      my_cluster.modify_ssh_setup)

    #
    # Actual merging
    #
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
                                       other_cluster.reserved_lvs))

    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
                      " cluster's value (%s). The least permissive value (%s)"
                      " will be used", other_cluster.prealloc_wipe_disks,
                      other_cluster.cluster_name,
                      my_cluster.prealloc_wipe_disks, True)
      my_cluster.prealloc_wipe_disks = True

    for os_, osparams in other_cluster.osparams.items():
      if os_ not in my_cluster.osparams:
        my_cluster.osparams[os_] = osparams
      elif my_cluster.osparams[os_] != osparams:
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
                      " this cluster's parameters (%s)",
                      osparams, os_, other_cluster.cluster_name,
                      my_cluster.osparams[os_])
519
520
        if params_strict:
          err_count += 1
521
522
523
524
525
526
527
528
529
530
531
532
533

    if err_count:
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
                                      " values, please fix and re-run" %
                                      other_cluster.cluster_name)

  # R0201: Method could be a function
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
    if os_name in cluster.os_hvp:
      return cluster.os_hvp[os_name].get(hyp, None)
    else:
      return None

534
535
536
537
538
539
540
  # R0201: Method could be a function
  def _MergeNodeGroups(self, my_config, other_config):
    """Adds foreign node groups

    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
    """
    # pylint: disable-msg=R0201
Stephen Shirley's avatar
Stephen Shirley committed
541
    logging.info("Node group conflict strategy: %s", self.groups)
542
543
544
545
546
547
548
549
550
551
552
553
554

    my_grps = my_config.GetAllNodeGroupsInfo().values()
    other_grps = other_config.GetAllNodeGroupsInfo().values()

    # Check for node group naming conflicts:
    conflicts = []
    for other_grp in other_grps:
      for my_grp in my_grps:
        if other_grp.name == my_grp.name:
          conflicts.append(other_grp)

    if conflicts:
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
Stephen Shirley's avatar
Stephen Shirley committed
555
      logging.info("Node groups in both local and remote cluster: %s",
556
557
558
559
560
561
562
563
564
565
                   conflict_names)

      # User hasn't specified how to handle conflicts
      if not self.groups:
        raise errors.CommandError("The following node group(s) are in both"
                                  " clusters, and no merge strategy has been"
                                  " supplied (see the --groups option): %s" %
                                  conflict_names)

      # User wants to rename conflicts
566
      elif self.groups == _GROUPS_RENAME:
567
568
569
        for grp in conflicts:
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
          logging.info("Renaming remote node group from %s to %s"
Stephen Shirley's avatar
Stephen Shirley committed
570
                       " to resolve conflict", grp.name, new_name)
571
572
          grp.name = new_name

573
      # User wants to merge conflicting groups
574
      elif self.groups == _GROUPS_MERGE:
575
        for other_grp in conflicts:
Stephen Shirley's avatar
Stephen Shirley committed
576
          logging.info("Merging local and remote '%s' groups", other_grp.name)
577
578
          for node_name in other_grp.members[:]:
            node = other_config.GetNodeInfo(node_name)
Stephen Shirley's avatar
Stephen Shirley committed
579
580
            # Access to a protected member of a client class
            # pylint: disable-msg=W0212
581
582
            other_config._UnlockedRemoveNodeFromGroup(node)

Stephen Shirley's avatar
Stephen Shirley committed
583
584
            # Access to a protected member of a client class
            # pylint: disable-msg=W0212
585
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
Stephen Shirley's avatar
Stephen Shirley committed
586
587
588

            # Access to a protected member of a client class
            # pylint: disable-msg=W0212
589
590
591
592
593
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
            node.group = my_grp_uuid
          # Remove from list of groups to add
          other_grps.remove(other_grp)

594
    for grp in other_grps:
595
596
597
      #TODO: handle node group conflicts
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)

598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
  # R0201: Method could be a function
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
    """Starts the local master daemon.

    @param no_vote: Should the masterd started without voting? default: False
    @raise errors.CommandError: If unable to start daemon.

    """
    env = {}
    if no_vote:
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"

    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
    if result.failed:
      raise errors.CommandError("Couldn't start ganeti master."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def _ReaddMergedNodesAndRedist(self):
    """Readds all merging nodes and make sure their config is up-to-date.

    @raise errors.CommandError: If anything fails.

    """
    for data in self.merger_data:
      for node in data.nodes:
        result = utils.RunCmd(["gnt-node", "add", "--readd",
625
                               "--no-ssh-key-check", "--force-join", node])
626
        if result.failed:
627
628
          logging.error("%s failed to be readded. Reason: %s, output: %s",
                         node, result.fail_reason, result.output)
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650

    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
    if result.failed:
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
                                " output: %s" % (result.fail_reason,
                                                result.output))

  # R0201: Method could be a function
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
    """Starts up all instances (locally).

    @raise errors.CommandError: If unable to start clusters

    """
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
                           "--force-multiple"])
    if result.failed:
      raise errors.CommandError("Unable to start all instances."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  # R0201: Method could be a function
651
  # TODO: make this overridable, for some verify errors
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
  def _VerifyCluster(self): # pylint: disable-msg=R0201
    """Runs gnt-cluster verify to verify the health.

    @raise errors.ProgrammError: If cluster fails on verification

    """
    result = utils.RunCmd(["gnt-cluster", "verify"])
    if result.failed:
      raise errors.CommandError("Verification of cluster failed."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def Merge(self):
    """Does the actual merge.

    It runs all the steps in the right order and updates the user about steps
    taken. Also it keeps track of rollback_steps to undo everything.

    """
    rbsteps = []
    try:
      logging.info("Pre cluster verification")
      self._VerifyCluster()

676
677
678
679
680
      logging.info("Prepare authorized_keys")
      rbsteps.append("Remove our key from authorized_keys on nodes:"
                     " %(nodes)s")
      self._PrepareAuthorizedKeys()

681
682
      rbsteps.append("Start all instances again on the merging"
                     " clusters: %(clusters)s")
683
684
685
686
687
688
689
690
      if self.stop_instances:
        logging.info("Stopping merging instances (takes a while)")
        self._StopMergingInstances()
      logging.info("Checking that no instances are running on the mergees")
      instances_running = self._CheckRunningInstances()
      if instances_running:
        raise errors.CommandError("Some instances are still running on the"
                                  " mergees")
691
692
693
694
695
696
      logging.info("Disable watcher")
      self._DisableWatcher()
      logging.info("Stop daemons on merging nodes")
      self._StopDaemons()
      logging.info("Merging config")
      self._FetchRemoteConfig()
697

698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
      logging.info("Stopping master daemon")
      self._KillMasterDaemon()

      rbsteps.append("Restore %s from another master candidate"
                     " and restart master daemon" %
                     constants.CLUSTER_CONF_FILE)
      self._MergeConfig()
      self._StartMasterDaemon(no_vote=True)

      # Point of no return, delete rbsteps
      del rbsteps[:]

      logging.warning("We are at the point of no return. Merge can not easily"
                      " be undone after this point.")
      logging.info("Readd nodes")
      self._ReaddMergedNodesAndRedist()

      logging.info("Merge done, restart master daemon normally")
      self._KillMasterDaemon()
      self._StartMasterDaemon()
718

719
720
721
722
723
      if self.restart == _RESTART_ALL:
        logging.info("Starting instances again")
        self._StartupAllInstances()
      else:
        logging.info("Not starting instances again")
724
725
726
727
728
729
730
731
732
733
734
735
736
      logging.info("Post cluster verification")
      self._VerifyCluster()
    except errors.GenericError, e:
      logging.exception(e)

      if rbsteps:
        nodes = Flatten([data.nodes for data in self.merger_data])
        info = {
          "clusters": self.clusters,
          "nodes": nodes,
          }
        logging.critical("In order to rollback do the following:")
        for step in rbsteps:
René Nussbaumer's avatar
René Nussbaumer committed
737
          logging.critical("  * %s", step % info)
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
      else:
        logging.critical("Nothing to rollback.")

      # TODO: Keep track of steps done for a flawless resume?

  def Cleanup(self):
    """Clean up our environment.

    This cleans up remote private keys and configs and after that
    deletes the temporary directory.

    """
    shutil.rmtree(self.work_dir)


def SetupLogging(options):
  """Setting up logging infrastructure.

  @param options: Parsed command line options

  """
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")

  stderr_handler = logging.StreamHandler()
  stderr_handler.setFormatter(formatter)
  if options.debug:
    stderr_handler.setLevel(logging.NOTSET)
  elif options.verbose:
    stderr_handler.setLevel(logging.INFO)
  else:
768
    stderr_handler.setLevel(logging.WARNING)
769
770
771
772
773
774
775
776
777
778
779
780

  root_logger = logging.getLogger("")
  root_logger.setLevel(logging.NOTSET)
  root_logger.addHandler(stderr_handler)


def main():
  """Main routine.

  """
  program = os.path.basename(sys.argv[0])

781
782
  parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
                                 prog=program)
783
784
785
  parser.add_option(cli.DEBUG_OPT)
  parser.add_option(cli.VERBOSE_OPT)
  parser.add_option(PAUSE_PERIOD_OPT)
786
  parser.add_option(GROUPS_OPT)
787
  parser.add_option(RESTART_OPT)
788
  parser.add_option(PARAMS_OPT)
789
  parser.add_option(SKIP_STOP_INSTANCES_OPT)
790
791
792
793
794
795
796
797

  (options, args) = parser.parse_args()

  SetupLogging(options)

  if not args:
    parser.error("No clusters specified")

798
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
799
800
                          options.groups, options.restart, options.params,
                          options.stop_instances)
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
  try:
    try:
      cluster_merger.Setup()
      cluster_merger.Merge()
    except errors.GenericError, e:
      logging.exception(e)
      return constants.EXIT_FAILURE
  finally:
    cluster_merger.Cleanup()

  return constants.EXIT_SUCCESS


if __name__ == "__main__":
  sys.exit(main())