cluster-merge 26.6 KB
Newer Older
1
2
3
#!/usr/bin/python
#

4
# Copyright (C) 2010 Google Inc.
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.

"""Tool to merge two or more clusters together.

The clusters have to run the same version of Ganeti!

"""

# pylint: disable-msg=C0103
# C0103: Invalid name cluster-merge

import logging
import os
import optparse
import shutil
import sys
import tempfile

from ganeti import cli
from ganeti import config
from ganeti import constants
from ganeti import errors
from ganeti import ssh
from ganeti import utils


45
46
47
_GROUPS_MERGE = "merge"
_GROUPS_RENAME = "rename"
_CLUSTERMERGE_ECID = "clustermerge-ecid"
48
49
50
51
52
_RESTART_ALL = "all"
_RESTART_UP = "up"
_RESTART_NONE = "none"
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)

53

54
55
56
57
58
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
                                  action="store", type="int",
                                  dest="pause_period",
                                  help=("Amount of time in seconds watcher"
                                        " should be suspended from running"))
59
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
Stephen Shirley's avatar
Stephen Shirley committed
60
61
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
                            dest="groups",
62
63
64
                            help=("How to handle groups that have the"
                                  " same name (One of: %s/%s)" %
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
65
66
67
68
69
70
71
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
                             metavar="STRATEGY",
                             choices=_RESTART_CHOICES,
                             dest="restart",
                             help=("How to handle restarting instances"
                                   " same name (One of: %s/%s/%s)" %
                                   _RESTART_CHOICES))
72
73


Stephen Shirley's avatar
Stephen Shirley committed
74
def Flatten(unflattened_list):
75
76
  """Flattens a list.

Stephen Shirley's avatar
Stephen Shirley committed
77
78
  @param unflattened_list: A list of unflattened list objects.
  @return: A flattened list
79
80

  """
Stephen Shirley's avatar
Stephen Shirley committed
81
  flattened_list = []
82

Stephen Shirley's avatar
Stephen Shirley committed
83
  for item in unflattened_list:
84
    if isinstance(item, list):
Stephen Shirley's avatar
Stephen Shirley committed
85
      flattened_list.extend(Flatten(item))
86
    else:
Stephen Shirley's avatar
Stephen Shirley committed
87
88
      flattened_list.append(item)
  return flattened_list
89
90
91
92
93
94
95
96
97
98
99


class MergerData(object):
  """Container class to hold data used for merger.

  """
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
    """Initialize the container.

    @param cluster: The name of the cluster
    @param key_path: Path to the ssh private key used for authentication
100
    @param nodes: List of online nodes in the merging cluster
101
    @param instances: List of instances running on merging cluster
102
    @param config_path: Path to the merging cluster config
103
104
105
106
107

    """
    self.cluster = cluster
    self.key_path = key_path
    self.nodes = nodes
108
109
    self.instances = instances
    self.config_path = config_path
110
111
112
113
114
115


class Merger(object):
  """Handling the merge.

  """
116
  def __init__(self, clusters, pause_period, groups, restart):
117
118
119
120
    """Initialize object with sane defaults and infos required.

    @param clusters: The list of clusters to merge in
    @param pause_period: The time watcher shall be disabled for
121
    @param groups: How to handle group conflicts
122
    @param restart: How to handle instance restart
123
124
125
126
127
128

    """
    self.merger_data = []
    self.clusters = clusters
    self.pause_period = pause_period
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
129
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
130
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
131
    self.groups = groups
132
133
134
135
    self.restart = restart
    if self.restart == _RESTART_UP:
      raise NotImplementedError

136
137
138
139
140
141
142
143
144
145
146
147

  def Setup(self):
    """Sets up our end so we can do the merger.

    This method is setting us up as a preparation for the merger.
    It makes the initial contact and gathers information needed.

    @raise errors.RemoteError: for errors in communication/grabbing

    """
    (remote_path, _, _) = ssh.GetUserFiles("root")

148
149
150
151
    if self.cluster_name in self.clusters:
      raise errors.CommandError("Cannot merge cluster %s with itself" %
                                self.cluster_name)

152
153
154
155
156
157
158
159
160
    # Fetch remotes private key
    for cluster in self.clusters:
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
                            ask_key=False)
      if result.failed:
        raise errors.RemoteError("There was an error while grabbing ssh private"
                                 " key from %s. Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

161
      key_path = utils.PathJoin(self.work_dir, cluster)
162
163
      utils.WriteFile(key_path, mode=0600, data=result.stdout)

164
165
      result = self._RunCmd(cluster, "gnt-node list -o name,offline"
                            " --no-header --separator=,", private_key=key_path)
166
167
168
169
      if result.failed:
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))
170
171
172
      nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
      nodes = [node_status[0] for node_status in nodes_statuses
               if node_status[1] == "N"]
173
174
175
176
177
178
179
180
181
182
183

      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
                            private_key=key_path)
      if result.failed:
        raise errors.RemoteError("Unable to retrieve list of instances from"
                                 " %s. Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))
      instances = result.stdout.splitlines()

      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))

184
185
  def _PrepareAuthorizedKeys(self):
    """Prepare the authorized_keys on every merging node.
186
187
188
189
190
191
192
193

    This method add our public key to remotes authorized_key for further
    communication.

    """
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
    pub_key = utils.ReadFile(pub_key_file)

194
195
196
197
198
    for data in self.merger_data:
      for node in data.nodes:
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
                                     (auth_keys, pub_key)),
                              private_key=data.key_path)
199

200
201
202
203
204
        if result.failed:
          raise errors.RemoteError("Unable to add our public key to %s in %s."
                                   " Fail reason: %s; output: %s" %
                                   (node, data.cluster, result.fail_reason,
                                    result.output))
205
206
207
208
209
210

  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
              strict_host_check=False, private_key=None, batch=True,
              ask_key=False):
    """Wrapping SshRunner.Run with default parameters.

211
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249

    """
    return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
                               use_cluster_key=use_cluster_key,
                               strict_host_check=strict_host_check,
                               private_key=private_key, batch=batch,
                               ask_key=ask_key)

  def _StopMergingInstances(self):
    """Stop instances on merging clusters.

    """
    for cluster in self.clusters:
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
                                     " --force-multiple")

      if result.failed:
        raise errors.RemoteError("Unable to stop instances on %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

  def _DisableWatcher(self):
    """Disable watch on all merging clusters, including ourself.

    """
    for cluster in ["localhost"] + self.clusters:
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
                                     self.pause_period)

      if result.failed:
        raise errors.RemoteError("Unable to pause watcher on %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

  def _StopDaemons(self):
    """Stop all daemons on merging nodes.

    """
250
    cmd = "%s stop-all" % constants.DAEMON_UTIL
251
252
    for data in self.merger_data:
      for node in data.nodes:
253
        result = self._RunCmd(node, cmd)
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275

        if result.failed:
          raise errors.RemoteError("Unable to stop daemons on %s."
                                   " Fail reason: %s; output: %s." %
                                   (node, result.fail_reason, result.output))

  def _FetchRemoteConfig(self):
    """Fetches and stores remote cluster config from the master.

    This step is needed before we can merge the config.

    """
    for data in self.merger_data:
      result = self._RunCmd(data.cluster, "cat %s" %
                                          constants.CLUSTER_CONF_FILE)

      if result.failed:
        raise errors.RemoteError("Unable to retrieve remote config on %s."
                                 " Fail reason: %s; output %s" %
                                 (data.cluster, result.fail_reason,
                                  result.output))

276
277
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
                                        data.cluster)
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
      utils.WriteFile(data.config_path, data=result.stdout)

  # R0201: Method could be a function
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
    """Kills the local master daemon.

    @raise errors.CommandError: If unable to kill

    """
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
    if result.failed:
      raise errors.CommandError("Unable to stop master daemons."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def _MergeConfig(self):
    """Merges all foreign config into our own config.

    """
    my_config = config.ConfigWriter(offline=True)
    fake_ec_id = 0 # Needs to be uniq over the whole config merge

    for data in self.merger_data:
301
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
302
      self._MergeClusterConfigs(my_config, other_config)
303
      self._MergeNodeGroups(my_config, other_config)
304
305
306

      for node in other_config.GetNodeList():
        node_info = other_config.GetNodeInfo(node)
307
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
        fake_ec_id += 1

      for instance in other_config.GetInstanceList():
        instance_info = other_config.GetInstanceInfo(instance)

        # Update the DRBD port assignments
        # This is a little bit hackish
        for dsk in instance_info.disks:
          if dsk.dev_type in constants.LDS_DRBD:
            port = my_config.AllocatePort()

            logical_id = list(dsk.logical_id)
            logical_id[2] = port
            dsk.logical_id = tuple(logical_id)

            physical_id = list(dsk.physical_id)
            physical_id[1] = physical_id[3] = port
            dsk.physical_id = tuple(physical_id)

Stephen Shirley's avatar
Stephen Shirley committed
327
328
        my_config.AddInstance(instance_info,
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
329
330
        fake_ec_id += 1

331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
  # R0201: Method could be a function
  def _MergeClusterConfigs(self, my_config, other_config):
    """Checks that all relevant cluster parameters are compatible

    """
    # pylint: disable-msg=R0201
    my_cluster = my_config.GetClusterInfo()
    other_cluster = other_config.GetClusterInfo()
    err_count = 0

    #
    # Generic checks
    #
    check_params = (
      "beparams",
      "default_iallocator",
      "drbd_usermode_helper",
      "file_storage_dir",
      "hidden_os",
      "maintain_node_health",
      "master_netdev",
      "ndparams",
      "nicparams",
      "primary_ip_family",
      "tags",
      "uid_pool",
      "volume_group_name",
      )
    for param_name in check_params:
      my_param = getattr(my_cluster, param_name)
      other_param = getattr(other_cluster, param_name)
      if my_param != other_param:
        logging.error("The value (%s) of the cluster parameter %s on %s"
                      " differs to this cluster's value (%s)",
                      other_param, param_name, other_cluster.cluster_name,
                      my_param)
        err_count += 1

    #
    # Custom checks
    #

    # Check default hypervisor
    my_defhyp = my_cluster.enabled_hypervisors[0]
    other_defhyp = other_cluster.enabled_hypervisors[0]
    if my_defhyp != other_defhyp:
      logging.warning("The default hypervisor (%s) differs on %s, new"
                      " instances will be created with this cluster's"
                      " default hypervisor (%s)", other_defhyp,
                      other_cluster.cluster_name, my_defhyp)

    if (set(my_cluster.enabled_hypervisors) !=
        set(other_cluster.enabled_hypervisors)):
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
                    " this cluster's set (%s)",
                    other_cluster.enabled_hypervisors,
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
      err_count += 1

    # Check hypervisor params for hypervisors we care about
    # TODO: we probably don't care about all params for a given hypervisor
    for hyp in my_cluster.enabled_hypervisors:
      for param in my_cluster.hvparams[hyp]:
        my_value = my_cluster.hvparams[hyp][param]
        other_value = other_cluster.hvparams[hyp][param]
        if my_value != other_value:
          logging.error("The value (%s) of the %s parameter of the %s"
                        " hypervisor on %s differs to this cluster's parameter"
                        " (%s)",
                        other_value, param, hyp, other_cluster.cluster_name,
                        my_value)
          err_count += 1

    # Check os hypervisor params for hypervisors we care about
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
      for hyp in my_cluster.enabled_hypervisors:
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
        if my_os_hvp != other_os_hvp:
          logging.error("The OS parameters (%s) for the %s OS for the %s"
                        " hypervisor on %s differs to this cluster's parameters"
                        " (%s)",
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
                        my_os_hvp)
          err_count += 1

    #
    # Warnings
    #
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
                      " this cluster's value (%s) will take precedence",
                      other_cluster.modify_etc_hosts,
                      other_cluster.cluster_name,
                      my_cluster.modify_etc_hosts)

    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
                      " this cluster's value (%s) will take precedence",
                      other_cluster.modify_ssh_setup,
                      other_cluster.cluster_name,
                      my_cluster.modify_ssh_setup)

    #
    # Actual merging
    #
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
                                       other_cluster.reserved_lvs))

    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
                      " cluster's value (%s). The least permissive value (%s)"
                      " will be used", other_cluster.prealloc_wipe_disks,
                      other_cluster.cluster_name,
                      my_cluster.prealloc_wipe_disks, True)
      my_cluster.prealloc_wipe_disks = True

    for os_, osparams in other_cluster.osparams.items():
      if os_ not in my_cluster.osparams:
        my_cluster.osparams[os_] = osparams
      elif my_cluster.osparams[os_] != osparams:
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
                      " this cluster's parameters (%s)",
                      osparams, os_, other_cluster.cluster_name,
                      my_cluster.osparams[os_])
        err_count += 1

    if err_count:
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
                                      " values, please fix and re-run" %
                                      other_cluster.cluster_name)

  # R0201: Method could be a function
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
    if os_name in cluster.os_hvp:
      return cluster.os_hvp[os_name].get(hyp, None)
    else:
      return None

470
471
472
473
474
475
476
  # R0201: Method could be a function
  def _MergeNodeGroups(self, my_config, other_config):
    """Adds foreign node groups

    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
    """
    # pylint: disable-msg=R0201
Stephen Shirley's avatar
Stephen Shirley committed
477
    logging.info("Node group conflict strategy: %s", self.groups)
478
479
480
481
482
483
484
485
486
487
488
489
490

    my_grps = my_config.GetAllNodeGroupsInfo().values()
    other_grps = other_config.GetAllNodeGroupsInfo().values()

    # Check for node group naming conflicts:
    conflicts = []
    for other_grp in other_grps:
      for my_grp in my_grps:
        if other_grp.name == my_grp.name:
          conflicts.append(other_grp)

    if conflicts:
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
Stephen Shirley's avatar
Stephen Shirley committed
491
      logging.info("Node groups in both local and remote cluster: %s",
492
493
494
495
496
497
498
499
500
501
                   conflict_names)

      # User hasn't specified how to handle conflicts
      if not self.groups:
        raise errors.CommandError("The following node group(s) are in both"
                                  " clusters, and no merge strategy has been"
                                  " supplied (see the --groups option): %s" %
                                  conflict_names)

      # User wants to rename conflicts
502
      elif self.groups == _GROUPS_RENAME:
503
504
505
        for grp in conflicts:
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
          logging.info("Renaming remote node group from %s to %s"
Stephen Shirley's avatar
Stephen Shirley committed
506
                       " to resolve conflict", grp.name, new_name)
507
508
          grp.name = new_name

509
      # User wants to merge conflicting groups
510
      elif self.groups == _GROUPS_MERGE:
511
        for other_grp in conflicts:
Stephen Shirley's avatar
Stephen Shirley committed
512
          logging.info("Merging local and remote '%s' groups", other_grp.name)
513
514
          for node_name in other_grp.members[:]:
            node = other_config.GetNodeInfo(node_name)
Stephen Shirley's avatar
Stephen Shirley committed
515
516
            # Access to a protected member of a client class
            # pylint: disable-msg=W0212
517
518
            other_config._UnlockedRemoveNodeFromGroup(node)

Stephen Shirley's avatar
Stephen Shirley committed
519
520
            # Access to a protected member of a client class
            # pylint: disable-msg=W0212
521
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
Stephen Shirley's avatar
Stephen Shirley committed
522
523
524

            # Access to a protected member of a client class
            # pylint: disable-msg=W0212
525
526
527
528
529
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
            node.group = my_grp_uuid
          # Remove from list of groups to add
          other_grps.remove(other_grp)

530
    for grp in other_grps:
531
532
533
      #TODO: handle node group conflicts
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)

534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
  # R0201: Method could be a function
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
    """Starts the local master daemon.

    @param no_vote: Should the masterd started without voting? default: False
    @raise errors.CommandError: If unable to start daemon.

    """
    env = {}
    if no_vote:
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"

    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
    if result.failed:
      raise errors.CommandError("Couldn't start ganeti master."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def _ReaddMergedNodesAndRedist(self):
    """Readds all merging nodes and make sure their config is up-to-date.

    @raise errors.CommandError: If anything fails.

    """
    for data in self.merger_data:
      for node in data.nodes:
        result = utils.RunCmd(["gnt-node", "add", "--readd",
561
                               "--no-ssh-key-check", "--force-join", node])
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
        if result.failed:
          raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
                                    " output: %s" % (node, result.fail_reason,
                                                     result.output))

    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
    if result.failed:
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
                                " output: %s" % (result.fail_reason,
                                                result.output))

  # R0201: Method could be a function
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
    """Starts up all instances (locally).

    @raise errors.CommandError: If unable to start clusters

    """
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
                           "--force-multiple"])
    if result.failed:
      raise errors.CommandError("Unable to start all instances."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  # R0201: Method could be a function
  def _VerifyCluster(self): # pylint: disable-msg=R0201
    """Runs gnt-cluster verify to verify the health.

    @raise errors.ProgrammError: If cluster fails on verification

    """
    result = utils.RunCmd(["gnt-cluster", "verify"])
    if result.failed:
      raise errors.CommandError("Verification of cluster failed."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def Merge(self):
    """Does the actual merge.

    It runs all the steps in the right order and updates the user about steps
    taken. Also it keeps track of rollback_steps to undo everything.

    """
    rbsteps = []
    try:
      logging.info("Pre cluster verification")
      self._VerifyCluster()

612
613
614
615
616
      logging.info("Prepare authorized_keys")
      rbsteps.append("Remove our key from authorized_keys on nodes:"
                     " %(nodes)s")
      self._PrepareAuthorizedKeys()

617
618
619
620
621
622
623
624
625
626
627
      rbsteps.append("Start all instances again on the merging"
                     " clusters: %(clusters)s")
      logging.info("Stopping merging instances (takes a while)")
      self._StopMergingInstances()

      logging.info("Disable watcher")
      self._DisableWatcher()
      logging.info("Stop daemons on merging nodes")
      self._StopDaemons()
      logging.info("Merging config")
      self._FetchRemoteConfig()
628

629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
      logging.info("Stopping master daemon")
      self._KillMasterDaemon()

      rbsteps.append("Restore %s from another master candidate"
                     " and restart master daemon" %
                     constants.CLUSTER_CONF_FILE)
      self._MergeConfig()
      self._StartMasterDaemon(no_vote=True)

      # Point of no return, delete rbsteps
      del rbsteps[:]

      logging.warning("We are at the point of no return. Merge can not easily"
                      " be undone after this point.")
      logging.info("Readd nodes")
      self._ReaddMergedNodesAndRedist()

      logging.info("Merge done, restart master daemon normally")
      self._KillMasterDaemon()
      self._StartMasterDaemon()
649

650
651
652
653
654
      if self.restart == _RESTART_ALL:
        logging.info("Starting instances again")
        self._StartupAllInstances()
      else:
        logging.info("Not starting instances again")
655
656
657
658
659
660
661
662
663
664
665
666
667
      logging.info("Post cluster verification")
      self._VerifyCluster()
    except errors.GenericError, e:
      logging.exception(e)

      if rbsteps:
        nodes = Flatten([data.nodes for data in self.merger_data])
        info = {
          "clusters": self.clusters,
          "nodes": nodes,
          }
        logging.critical("In order to rollback do the following:")
        for step in rbsteps:
René Nussbaumer's avatar
René Nussbaumer committed
668
          logging.critical("  * %s", step % info)
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
      else:
        logging.critical("Nothing to rollback.")

      # TODO: Keep track of steps done for a flawless resume?

  def Cleanup(self):
    """Clean up our environment.

    This cleans up remote private keys and configs and after that
    deletes the temporary directory.

    """
    shutil.rmtree(self.work_dir)


def SetupLogging(options):
  """Setting up logging infrastructure.

  @param options: Parsed command line options

  """
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")

  stderr_handler = logging.StreamHandler()
  stderr_handler.setFormatter(formatter)
  if options.debug:
    stderr_handler.setLevel(logging.NOTSET)
  elif options.verbose:
    stderr_handler.setLevel(logging.INFO)
  else:
699
    stderr_handler.setLevel(logging.WARNING)
700
701
702
703
704
705
706
707
708
709
710
711

  root_logger = logging.getLogger("")
  root_logger.setLevel(logging.NOTSET)
  root_logger.addHandler(stderr_handler)


def main():
  """Main routine.

  """
  program = os.path.basename(sys.argv[0])

712
713
  parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
                                 prog=program)
714
715
716
  parser.add_option(cli.DEBUG_OPT)
  parser.add_option(cli.VERBOSE_OPT)
  parser.add_option(PAUSE_PERIOD_OPT)
717
  parser.add_option(GROUPS_OPT)
718
  parser.add_option(RESTART_OPT)
719
720
721
722
723
724
725
726

  (options, args) = parser.parse_args()

  SetupLogging(options)

  if not args:
    parser.error("No clusters specified")

727
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
728
                          options.groups, options.restart)
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
  try:
    try:
      cluster_merger.Setup()
      cluster_merger.Merge()
    except errors.GenericError, e:
      logging.exception(e)
      return constants.EXIT_FAILURE
  finally:
    cluster_merger.Cleanup()

  return constants.EXIT_SUCCESS


if __name__ == "__main__":
  sys.exit(main())