cluster-merge 25.8 KB
Newer Older
1
2
3
#!/usr/bin/python
#

4
# Copyright (C) 2010 Google Inc.
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.

"""Tool to merge two or more clusters together.

The clusters have to run the same version of Ganeti!

"""

# pylint: disable-msg=C0103
# C0103: Invalid name cluster-merge

import logging
import os
import optparse
import shutil
import sys
import tempfile

from ganeti import cli
from ganeti import config
from ganeti import constants
from ganeti import errors
from ganeti import ssh
from ganeti import utils


45
46
47
48
_GROUPS_MERGE = "merge"
_GROUPS_RENAME = "rename"
_CLUSTERMERGE_ECID = "clustermerge-ecid"

49
50
51
52
53
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
                                  action="store", type="int",
                                  dest="pause_period",
                                  help=("Amount of time in seconds watcher"
                                        " should be suspended from running"))
54
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
Stephen Shirley's avatar
Stephen Shirley committed
55
56
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
                            dest="groups",
57
58
59
                            help=("How to handle groups that have the"
                                  " same name (One of: %s/%s)" %
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
60
61


Stephen Shirley's avatar
Stephen Shirley committed
62
def Flatten(unflattened_list):
63
64
  """Flattens a list.

Stephen Shirley's avatar
Stephen Shirley committed
65
66
  @param unflattened_list: A list of unflattened list objects.
  @return: A flattened list
67
68

  """
Stephen Shirley's avatar
Stephen Shirley committed
69
  flattened_list = []
70

Stephen Shirley's avatar
Stephen Shirley committed
71
  for item in unflattened_list:
72
    if isinstance(item, list):
Stephen Shirley's avatar
Stephen Shirley committed
73
      flattened_list.extend(Flatten(item))
74
    else:
Stephen Shirley's avatar
Stephen Shirley committed
75
76
      flattened_list.append(item)
  return flattened_list
77
78
79
80
81
82
83
84
85
86
87
88
89


class MergerData(object):
  """Container class to hold data used for merger.

  """
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
    """Initialize the container.

    @param cluster: The name of the cluster
    @param key_path: Path to the ssh private key used for authentication
    @param nodes: List of nodes in the merging cluster
    @param instances: List of instances running on merging cluster
90
    @param config_path: Path to the merging cluster config
91
92
93
94
95

    """
    self.cluster = cluster
    self.key_path = key_path
    self.nodes = nodes
96
97
    self.instances = instances
    self.config_path = config_path
98
99
100
101
102
103


class Merger(object):
  """Handling the merge.

  """
104
  def __init__(self, clusters, pause_period, groups):
105
106
107
108
    """Initialize object with sane defaults and infos required.

    @param clusters: The list of clusters to merge in
    @param pause_period: The time watcher shall be disabled for
109
    @param groups: How to handle group conflicts
110
111
112
113
114
115

    """
    self.merger_data = []
    self.clusters = clusters
    self.pause_period = pause_period
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
116
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
117
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
118
    self.groups = groups
119
120
121
122
123
124
125
126
127
128
129
130

  def Setup(self):
    """Sets up our end so we can do the merger.

    This method is setting us up as a preparation for the merger.
    It makes the initial contact and gathers information needed.

    @raise errors.RemoteError: for errors in communication/grabbing

    """
    (remote_path, _, _) = ssh.GetUserFiles("root")

131
132
133
134
    if self.cluster_name in self.clusters:
      raise errors.CommandError("Cannot merge cluster %s with itself" %
                                self.cluster_name)

135
136
137
138
139
140
141
142
143
    # Fetch remotes private key
    for cluster in self.clusters:
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
                            ask_key=False)
      if result.failed:
        raise errors.RemoteError("There was an error while grabbing ssh private"
                                 " key from %s. Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

144
      key_path = utils.PathJoin(self.work_dir, cluster)
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
      utils.WriteFile(key_path, mode=0600, data=result.stdout)

      result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
                            private_key=key_path)
      if result.failed:
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))
      nodes = result.stdout.splitlines()

      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
                            private_key=key_path)
      if result.failed:
        raise errors.RemoteError("Unable to retrieve list of instances from"
                                 " %s. Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))
      instances = result.stdout.splitlines()

      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))

  def _PrepareAuthorizedKeys(self):
    """Prepare the authorized_keys on every merging node.

    This method add our public key to remotes authorized_key for further
    communication.

    """
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
    pub_key = utils.ReadFile(pub_key_file)

    for data in self.merger_data:
      for node in data.nodes:
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
                                     (auth_keys, pub_key)),
                              private_key=data.key_path)

        if result.failed:
          raise errors.RemoteError("Unable to add our public key to %s in %s."
                                   " Fail reason: %s; output: %s" %
                                   (node, data.cluster, result.fail_reason,
                                    result.output))

  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
              strict_host_check=False, private_key=None, batch=True,
              ask_key=False):
    """Wrapping SshRunner.Run with default parameters.

192
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230

    """
    return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
                               use_cluster_key=use_cluster_key,
                               strict_host_check=strict_host_check,
                               private_key=private_key, batch=batch,
                               ask_key=ask_key)

  def _StopMergingInstances(self):
    """Stop instances on merging clusters.

    """
    for cluster in self.clusters:
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
                                     " --force-multiple")

      if result.failed:
        raise errors.RemoteError("Unable to stop instances on %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

  def _DisableWatcher(self):
    """Disable watch on all merging clusters, including ourself.

    """
    for cluster in ["localhost"] + self.clusters:
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
                                     self.pause_period)

      if result.failed:
        raise errors.RemoteError("Unable to pause watcher on %s."
                                 " Fail reason: %s; output: %s" %
                                 (cluster, result.fail_reason, result.output))

  def _StopDaemons(self):
    """Stop all daemons on merging nodes.

    """
231
    cmd = "%s stop-all" % constants.DAEMON_UTIL
232
233
    for data in self.merger_data:
      for node in data.nodes:
234
        result = self._RunCmd(node, cmd)
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256

        if result.failed:
          raise errors.RemoteError("Unable to stop daemons on %s."
                                   " Fail reason: %s; output: %s." %
                                   (node, result.fail_reason, result.output))

  def _FetchRemoteConfig(self):
    """Fetches and stores remote cluster config from the master.

    This step is needed before we can merge the config.

    """
    for data in self.merger_data:
      result = self._RunCmd(data.cluster, "cat %s" %
                                          constants.CLUSTER_CONF_FILE)

      if result.failed:
        raise errors.RemoteError("Unable to retrieve remote config on %s."
                                 " Fail reason: %s; output %s" %
                                 (data.cluster, result.fail_reason,
                                  result.output))

257
258
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
                                        data.cluster)
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
      utils.WriteFile(data.config_path, data=result.stdout)

  # R0201: Method could be a function
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
    """Kills the local master daemon.

    @raise errors.CommandError: If unable to kill

    """
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
    if result.failed:
      raise errors.CommandError("Unable to stop master daemons."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def _MergeConfig(self):
    """Merges all foreign config into our own config.

    """
    my_config = config.ConfigWriter(offline=True)
    fake_ec_id = 0 # Needs to be uniq over the whole config merge

    for data in self.merger_data:
282
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
283
      self._MergeClusterConfigs(my_config, other_config)
284
      self._MergeNodeGroups(my_config, other_config)
285
286
287

      for node in other_config.GetNodeList():
        node_info = other_config.GetNodeInfo(node)
Stephen Shirley's avatar
Stephen Shirley committed
288
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
        fake_ec_id += 1

      for instance in other_config.GetInstanceList():
        instance_info = other_config.GetInstanceInfo(instance)

        # Update the DRBD port assignments
        # This is a little bit hackish
        for dsk in instance_info.disks:
          if dsk.dev_type in constants.LDS_DRBD:
            port = my_config.AllocatePort()

            logical_id = list(dsk.logical_id)
            logical_id[2] = port
            dsk.logical_id = tuple(logical_id)

            physical_id = list(dsk.physical_id)
            physical_id[1] = physical_id[3] = port
            dsk.physical_id = tuple(physical_id)

Stephen Shirley's avatar
Stephen Shirley committed
308
309
        my_config.AddInstance(instance_info,
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
310
311
        fake_ec_id += 1

312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
  # R0201: Method could be a function
  def _MergeClusterConfigs(self, my_config, other_config):
    """Checks that all relevant cluster parameters are compatible

    """
    # pylint: disable-msg=R0201
    my_cluster = my_config.GetClusterInfo()
    other_cluster = other_config.GetClusterInfo()
    err_count = 0

    #
    # Generic checks
    #
    check_params = (
      "beparams",
      "default_iallocator",
      "drbd_usermode_helper",
      "file_storage_dir",
      "hidden_os",
      "maintain_node_health",
      "master_netdev",
      "ndparams",
      "nicparams",
      "primary_ip_family",
      "tags",
      "uid_pool",
      "volume_group_name",
      )
    for param_name in check_params:
      my_param = getattr(my_cluster, param_name)
      other_param = getattr(other_cluster, param_name)
      if my_param != other_param:
        logging.error("The value (%s) of the cluster parameter %s on %s"
                      " differs to this cluster's value (%s)",
                      other_param, param_name, other_cluster.cluster_name,
                      my_param)
        err_count += 1

    #
    # Custom checks
    #

    # Check default hypervisor
    my_defhyp = my_cluster.enabled_hypervisors[0]
    other_defhyp = other_cluster.enabled_hypervisors[0]
    if my_defhyp != other_defhyp:
      logging.warning("The default hypervisor (%s) differs on %s, new"
                      " instances will be created with this cluster's"
                      " default hypervisor (%s)", other_defhyp,
                      other_cluster.cluster_name, my_defhyp)

    if (set(my_cluster.enabled_hypervisors) !=
        set(other_cluster.enabled_hypervisors)):
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
                    " this cluster's set (%s)",
                    other_cluster.enabled_hypervisors,
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
      err_count += 1

    # Check hypervisor params for hypervisors we care about
    # TODO: we probably don't care about all params for a given hypervisor
    for hyp in my_cluster.enabled_hypervisors:
      for param in my_cluster.hvparams[hyp]:
        my_value = my_cluster.hvparams[hyp][param]
        other_value = other_cluster.hvparams[hyp][param]
        if my_value != other_value:
          logging.error("The value (%s) of the %s parameter of the %s"
                        " hypervisor on %s differs to this cluster's parameter"
                        " (%s)",
                        other_value, param, hyp, other_cluster.cluster_name,
                        my_value)
          err_count += 1

    # Check os hypervisor params for hypervisors we care about
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
      for hyp in my_cluster.enabled_hypervisors:
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
        if my_os_hvp != other_os_hvp:
          logging.error("The OS parameters (%s) for the %s OS for the %s"
                        " hypervisor on %s differs to this cluster's parameters"
                        " (%s)",
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
                        my_os_hvp)
          err_count += 1

    #
    # Warnings
    #
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
                      " this cluster's value (%s) will take precedence",
                      other_cluster.modify_etc_hosts,
                      other_cluster.cluster_name,
                      my_cluster.modify_etc_hosts)

    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
                      " this cluster's value (%s) will take precedence",
                      other_cluster.modify_ssh_setup,
                      other_cluster.cluster_name,
                      my_cluster.modify_ssh_setup)

    #
    # Actual merging
    #
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
                                       other_cluster.reserved_lvs))

    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
                      " cluster's value (%s). The least permissive value (%s)"
                      " will be used", other_cluster.prealloc_wipe_disks,
                      other_cluster.cluster_name,
                      my_cluster.prealloc_wipe_disks, True)
      my_cluster.prealloc_wipe_disks = True

    for os_, osparams in other_cluster.osparams.items():
      if os_ not in my_cluster.osparams:
        my_cluster.osparams[os_] = osparams
      elif my_cluster.osparams[os_] != osparams:
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
                      " this cluster's parameters (%s)",
                      osparams, os_, other_cluster.cluster_name,
                      my_cluster.osparams[os_])
        err_count += 1

    if err_count:
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
                                      " values, please fix and re-run" %
                                      other_cluster.cluster_name)

  # R0201: Method could be a function
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
    if os_name in cluster.os_hvp:
      return cluster.os_hvp[os_name].get(hyp, None)
    else:
      return None

451
452
453
454
455
456
457
  # R0201: Method could be a function
  def _MergeNodeGroups(self, my_config, other_config):
    """Adds foreign node groups

    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
    """
    # pylint: disable-msg=R0201
Stephen Shirley's avatar
Stephen Shirley committed
458
    logging.info("Node group conflict strategy: %s", self.groups)
459
460
461
462
463
464
465
466
467
468
469
470
471

    my_grps = my_config.GetAllNodeGroupsInfo().values()
    other_grps = other_config.GetAllNodeGroupsInfo().values()

    # Check for node group naming conflicts:
    conflicts = []
    for other_grp in other_grps:
      for my_grp in my_grps:
        if other_grp.name == my_grp.name:
          conflicts.append(other_grp)

    if conflicts:
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
Stephen Shirley's avatar
Stephen Shirley committed
472
      logging.info("Node groups in both local and remote cluster: %s",
473
474
475
476
477
478
479
480
481
482
                   conflict_names)

      # User hasn't specified how to handle conflicts
      if not self.groups:
        raise errors.CommandError("The following node group(s) are in both"
                                  " clusters, and no merge strategy has been"
                                  " supplied (see the --groups option): %s" %
                                  conflict_names)

      # User wants to rename conflicts
483
      elif self.groups == _GROUPS_RENAME:
484
485
486
        for grp in conflicts:
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
          logging.info("Renaming remote node group from %s to %s"
Stephen Shirley's avatar
Stephen Shirley committed
487
                       " to resolve conflict", grp.name, new_name)
488
489
          grp.name = new_name

490
491
492
      # User wants to merge conflicting groups
      elif self.groups == 'merge':
        for other_grp in conflicts:
Stephen Shirley's avatar
Stephen Shirley committed
493
          logging.info("Merging local and remote '%s' groups", other_grp.name)
494
495
          for node_name in other_grp.members[:]:
            node = other_config.GetNodeInfo(node_name)
Stephen Shirley's avatar
Stephen Shirley committed
496
497
            # Access to a protected member of a client class
            # pylint: disable-msg=W0212
498
499
            other_config._UnlockedRemoveNodeFromGroup(node)

Stephen Shirley's avatar
Stephen Shirley committed
500
501
            # Access to a protected member of a client class
            # pylint: disable-msg=W0212
502
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
Stephen Shirley's avatar
Stephen Shirley committed
503
504
505

            # Access to a protected member of a client class
            # pylint: disable-msg=W0212
506
507
508
509
510
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
            node.group = my_grp_uuid
          # Remove from list of groups to add
          other_grps.remove(other_grp)

511
    for grp in other_grps:
512
513
514
      #TODO: handle node group conflicts
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)

515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
  # R0201: Method could be a function
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
    """Starts the local master daemon.

    @param no_vote: Should the masterd started without voting? default: False
    @raise errors.CommandError: If unable to start daemon.

    """
    env = {}
    if no_vote:
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"

    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
    if result.failed:
      raise errors.CommandError("Couldn't start ganeti master."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def _ReaddMergedNodesAndRedist(self):
    """Readds all merging nodes and make sure their config is up-to-date.

    @raise errors.CommandError: If anything fails.

    """
    for data in self.merger_data:
      for node in data.nodes:
        result = utils.RunCmd(["gnt-node", "add", "--readd",
542
                               "--no-ssh-key-check", "--force-join", node])
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
        if result.failed:
          raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
                                    " output: %s" % (node, result.fail_reason,
                                                     result.output))

    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
    if result.failed:
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
                                " output: %s" % (result.fail_reason,
                                                result.output))

  # R0201: Method could be a function
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
    """Starts up all instances (locally).

    @raise errors.CommandError: If unable to start clusters

    """
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
                           "--force-multiple"])
    if result.failed:
      raise errors.CommandError("Unable to start all instances."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  # R0201: Method could be a function
  def _VerifyCluster(self): # pylint: disable-msg=R0201
    """Runs gnt-cluster verify to verify the health.

    @raise errors.ProgrammError: If cluster fails on verification

    """
    result = utils.RunCmd(["gnt-cluster", "verify"])
    if result.failed:
      raise errors.CommandError("Verification of cluster failed."
                                " Fail reason: %s; output: %s" %
                                (result.fail_reason, result.output))

  def Merge(self):
    """Does the actual merge.

    It runs all the steps in the right order and updates the user about steps
    taken. Also it keeps track of rollback_steps to undo everything.

    """
    rbsteps = []
    try:
      logging.info("Pre cluster verification")
      self._VerifyCluster()

      logging.info("Prepare authorized_keys")
      rbsteps.append("Remove our key from authorized_keys on nodes:"
                     " %(nodes)s")
      self._PrepareAuthorizedKeys()

      rbsteps.append("Start all instances again on the merging"
                     " clusters: %(clusters)s")
      logging.info("Stopping merging instances (takes a while)")
      self._StopMergingInstances()

      logging.info("Disable watcher")
      self._DisableWatcher()
      logging.info("Stop daemons on merging nodes")
      self._StopDaemons()
      logging.info("Merging config")
      self._FetchRemoteConfig()
609

610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
      logging.info("Stopping master daemon")
      self._KillMasterDaemon()

      rbsteps.append("Restore %s from another master candidate"
                     " and restart master daemon" %
                     constants.CLUSTER_CONF_FILE)
      self._MergeConfig()
      self._StartMasterDaemon(no_vote=True)

      # Point of no return, delete rbsteps
      del rbsteps[:]

      logging.warning("We are at the point of no return. Merge can not easily"
                      " be undone after this point.")
      logging.info("Readd nodes")
      self._ReaddMergedNodesAndRedist()

      logging.info("Merge done, restart master daemon normally")
      self._KillMasterDaemon()
      self._StartMasterDaemon()
630

631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
      logging.info("Starting instances again")
      self._StartupAllInstances()
      logging.info("Post cluster verification")
      self._VerifyCluster()
    except errors.GenericError, e:
      logging.exception(e)

      if rbsteps:
        nodes = Flatten([data.nodes for data in self.merger_data])
        info = {
          "clusters": self.clusters,
          "nodes": nodes,
          }
        logging.critical("In order to rollback do the following:")
        for step in rbsteps:
René Nussbaumer's avatar
René Nussbaumer committed
646
          logging.critical("  * %s", step % info)
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
      else:
        logging.critical("Nothing to rollback.")

      # TODO: Keep track of steps done for a flawless resume?

  def Cleanup(self):
    """Clean up our environment.

    This cleans up remote private keys and configs and after that
    deletes the temporary directory.

    """
    shutil.rmtree(self.work_dir)


def SetupLogging(options):
  """Setting up logging infrastructure.

  @param options: Parsed command line options

  """
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")

  stderr_handler = logging.StreamHandler()
  stderr_handler.setFormatter(formatter)
  if options.debug:
    stderr_handler.setLevel(logging.NOTSET)
  elif options.verbose:
    stderr_handler.setLevel(logging.INFO)
  else:
677
    stderr_handler.setLevel(logging.WARNING)
678
679
680
681
682
683
684
685
686
687
688
689

  root_logger = logging.getLogger("")
  root_logger.setLevel(logging.NOTSET)
  root_logger.addHandler(stderr_handler)


def main():
  """Main routine.

  """
  program = os.path.basename(sys.argv[0])

690
  parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
691
                                        " [--watcher-pause-period SECONDS]"
692
693
694
                                        " [--groups [%s|%s]]"
                                        " <cluster> [<cluster...>]" %
                                        (_GROUPS_MERGE, _GROUPS_RENAME)),
695
696
697
698
                                        prog=program)
  parser.add_option(cli.DEBUG_OPT)
  parser.add_option(cli.VERBOSE_OPT)
  parser.add_option(PAUSE_PERIOD_OPT)
699
  parser.add_option(GROUPS_OPT)
700
701
702
703
704
705
706
707

  (options, args) = parser.parse_args()

  SetupLogging(options)

  if not args:
    parser.error("No clusters specified")

708
709
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
                          options.groups)
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
  try:
    try:
      cluster_merger.Setup()
      cluster_merger.Merge()
    except errors.GenericError, e:
      logging.exception(e)
      return constants.EXIT_FAILURE
  finally:
    cluster_merger.Cleanup()

  return constants.EXIT_SUCCESS


if __name__ == "__main__":
  sys.exit(main())