instance_migration.py 37.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#
#

# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Logical units dealing with instance migration an failover."""

import logging
import time

from ganeti import constants
from ganeti import errors
from ganeti import locking
from ganeti.masterd import iallocator
from ganeti import utils
from ganeti.cmdlib.base import LogicalUnit, Tasklet
33
from ganeti.cmdlib.common import ExpandInstanceUuidAndName, \
Thomas Thrainer's avatar
Thomas Thrainer committed
34
  CheckIAllocatorOrNode, ExpandNodeUuidAndName
35
36
37
38
39
from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
  ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
  CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
  CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40
41
42
43
44
45
46
47
48
49
50

import ganeti.masterd.instance


def _ExpandNamesForMigration(lu):
  """Expands names for use with L{TLMigrateInstance}.

  @type lu: L{LogicalUnit}

  """
  if lu.op.target_node is not None:
Thomas Thrainer's avatar
Thomas Thrainer committed
51
52
    (lu.op.target_node_uuid, lu.op.target_node) = \
      ExpandNodeUuidAndName(lu.cfg, lu.op.target_node_uuid, lu.op.target_node)
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

  lu.needed_locks[locking.LEVEL_NODE] = []
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE

  lu.needed_locks[locking.LEVEL_NODE_RES] = []
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE

  # The node allocation lock is actually only needed for externally replicated
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []


def _DeclareLocksForMigration(lu, level):
  """Declares locks for L{TLMigrateInstance}.

  @type lu: L{LogicalUnit}
  @param level: Lock level

  """
  if level == locking.LEVEL_NODE_ALLOC:
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)

75
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_uuid)
76
77
78
79
80
81
82
83
84

    # Node locks are already declared here rather than at LEVEL_NODE as we need
    # the instance object anyway to declare the node allocation lock.
    if instance.disk_template in constants.DTS_EXT_MIRROR:
      if lu.op.target_node is None:
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
      else:
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
Thomas Thrainer's avatar
Thomas Thrainer committed
85
                                               lu.op.target_node_uuid]
86
87
88
89
90
91
92
93
94
95
96
97
      del lu.recalculate_locks[locking.LEVEL_NODE]
    else:
      lu._LockInstancesNodes() # pylint: disable=W0212

  elif level == locking.LEVEL_NODE:
    # Node locks are declared together with the node allocation lock
    assert (lu.needed_locks[locking.LEVEL_NODE] or
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)

  elif level == locking.LEVEL_NODE_RES:
    # Copy node locks
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
98
      CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120


class LUInstanceFailover(LogicalUnit):
  """Failover an instance.

  """
  HPATH = "instance-failover"
  HTYPE = constants.HTYPE_INSTANCE
  REQ_BGL = False

  def CheckArguments(self):
    """Check the arguments.

    """
    self.iallocator = getattr(self.op, "iallocator", None)
    self.target_node = getattr(self.op, "target_node", None)

  def ExpandNames(self):
    self._ExpandAndLockInstance()
    _ExpandNamesForMigration(self)

    self._migrater = \
121
      TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
122
123
                        self.op.cleanup, True, False,
                        self.op.ignore_consistency, True,
124
125
126
127
128
129
130
131
132
133
134
135
136
137
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)

    self.tasklets = [self._migrater]

  def DeclareLocks(self, level):
    _DeclareLocksForMigration(self, level)

  def BuildHooksEnv(self):
    """Build hooks env.

    This runs on master, primary and secondary nodes of the instance.

    """
    instance = self._migrater.instance
Thomas Thrainer's avatar
Thomas Thrainer committed
138
    source_node_uuid = instance.primary_node
139
    target_node_uuid = self._migrater.target_node_uuid
140
141
142
    env = {
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
Thomas Thrainer's avatar
Thomas Thrainer committed
143
      "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
144
      "NEW_PRIMARY": self.cfg.GetNodeName(target_node_uuid),
145
      "FAILOVER_CLEANUP": self.op.cleanup,
146
147
148
      }

    if instance.disk_template in constants.DTS_INT_MIRROR:
Thomas Thrainer's avatar
Thomas Thrainer committed
149
150
      env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
      env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
151
152
153
    else:
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""

154
    env.update(BuildInstanceHookEnvByObject(self, instance))
155
156
157
158
159
160
161
162
163

    return env

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    instance = self._migrater.instance
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
164
    nl.append(self._migrater.target_node_uuid)
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
    return (nl, nl + [instance.primary_node])


class LUInstanceMigrate(LogicalUnit):
  """Migrate an instance.

  This is migration without shutting down, compared to the failover,
  which is done with shutdown.

  """
  HPATH = "instance-migrate"
  HTYPE = constants.HTYPE_INSTANCE
  REQ_BGL = False

  def ExpandNames(self):
    self._ExpandAndLockInstance()
    _ExpandNamesForMigration(self)

    self._migrater = \
184
185
      TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
                        self.op.cleanup, False, self.op.allow_failover, False,
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
                        self.op.allow_runtime_changes,
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
                        self.op.ignore_ipolicy)

    self.tasklets = [self._migrater]

  def DeclareLocks(self, level):
    _DeclareLocksForMigration(self, level)

  def BuildHooksEnv(self):
    """Build hooks env.

    This runs on master, primary and secondary nodes of the instance.

    """
    instance = self._migrater.instance
Thomas Thrainer's avatar
Thomas Thrainer committed
202
    source_node_uuid = instance.primary_node
203
    target_node_uuid = self._migrater.target_node_uuid
204
    env = BuildInstanceHookEnvByObject(self, instance)
205
206
207
    env.update({
      "MIGRATE_LIVE": self._migrater.live,
      "MIGRATE_CLEANUP": self.op.cleanup,
Thomas Thrainer's avatar
Thomas Thrainer committed
208
      "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
209
      "NEW_PRIMARY": self.cfg.GetNodeName(target_node_uuid),
210
211
212
213
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
      })

    if instance.disk_template in constants.DTS_INT_MIRROR:
Thomas Thrainer's avatar
Thomas Thrainer committed
214
215
      env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
      env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
216
    else:
217
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
218
219
220
221
222
223
224
225

    return env

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    instance = self._migrater.instance
Thomas Thrainer's avatar
Thomas Thrainer committed
226
227
    snode_uuids = list(instance.secondary_nodes)
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snode_uuids
228
    nl.append(self._migrater.target_node_uuid)
229
230
231
232
233
234
235
236
237
238
239
240
241
    return (nl, nl)


class TLMigrateInstance(Tasklet):
  """Tasklet class for instance migration.

  @type live: boolean
  @ivar live: whether the migration will be done live or non-live;
      this variable is initalized only after CheckPrereq has run
  @type cleanup: boolean
  @ivar cleanup: Wheater we cleanup from a failed migration
  @type iallocator: string
  @ivar iallocator: The iallocator used to determine target_node
Thomas Thrainer's avatar
Thomas Thrainer committed
242
243
244
  @type target_node_uuid: string
  @ivar target_node_uuid: If given, the target node UUID to reallocate the
      instance to
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
  @type failover: boolean
  @ivar failover: Whether operation results in failover or migration
  @type fallback: boolean
  @ivar fallback: Whether fallback to failover is allowed if migration not
                  possible
  @type ignore_consistency: boolean
  @ivar ignore_consistency: Wheter we should ignore consistency between source
                            and target node
  @type shutdown_timeout: int
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
  @type ignore_ipolicy: bool
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating

  """

  # Constants
  _MIGRATION_POLL_INTERVAL = 1      # seconds
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds

264
265
266
  def __init__(self, lu, instance_uuid, instance_name, cleanup, failover,
               fallback, ignore_consistency, allow_runtime_changes,
               shutdown_timeout, ignore_ipolicy):
267
268
269
270
271
272
    """Initializes this class.

    """
    Tasklet.__init__(self, lu)

    # Parameters
273
    self.instance_uuid = instance_uuid
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
    self.instance_name = instance_name
    self.cleanup = cleanup
    self.live = False # will be overridden later
    self.failover = failover
    self.fallback = fallback
    self.ignore_consistency = ignore_consistency
    self.shutdown_timeout = shutdown_timeout
    self.ignore_ipolicy = ignore_ipolicy
    self.allow_runtime_changes = allow_runtime_changes

  def CheckPrereq(self):
    """Check prerequisites.

    This checks that the instance is in the cluster.

    """
290
291
292
293
    (self.instance_uuid, self.instance_name) = \
      ExpandInstanceUuidAndName(self.lu.cfg, self.instance_uuid,
                                self.instance_name)
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
294
    assert self.instance is not None
295
296
297
    cluster = self.cfg.GetClusterInfo()

    if (not self.cleanup and
298
        not self.instance.admin_state == constants.ADMINST_UP and
299
300
301
302
303
        not self.failover and self.fallback):
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
                      " switching to failover")
      self.failover = True

304
    if self.instance.disk_template not in constants.DTS_MIRRORED:
305
306
307
308
309
      if self.failover:
        text = "failovers"
      else:
        text = "migrations"
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
310
                                 " %s" % (self.instance.disk_template, text),
311
312
                                 errors.ECODE_STATE)

313
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
314
      CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
315
316
317
318
319

      if self.lu.op.iallocator:
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
        self._RunAllocator()
      else:
Thomas Thrainer's avatar
Thomas Thrainer committed
320
        # We set set self.target_node_uuid as it is required by
321
        # BuildHooksEnv
Thomas Thrainer's avatar
Thomas Thrainer committed
322
        self.target_node_uuid = self.lu.op.target_node_uuid
323
324

      # Check that the target node is correct in terms of instance policy
Thomas Thrainer's avatar
Thomas Thrainer committed
325
      nodeinfo = self.cfg.GetNodeInfo(self.target_node_uuid)
326
327
328
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
                                                              group_info)
329
330
      CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
                             self.cfg, ignore=self.ignore_ipolicy)
331
332
333

      # self.target_node is already populated, either directly or by the
      # iallocator run
Thomas Thrainer's avatar
Thomas Thrainer committed
334
      target_node_uuid = self.target_node_uuid
335
      if self.target_node_uuid == self.instance.primary_node:
Thomas Thrainer's avatar
Thomas Thrainer committed
336
337
        raise errors.OpPrereqError(
          "Cannot migrate instance %s to its primary (%s)" %
338
339
          (self.instance.name,
           self.cfg.GetNodeName(self.instance.primary_node)),
Thomas Thrainer's avatar
Thomas Thrainer committed
340
          errors.ECODE_STATE)
341
342
343
344

      if len(self.lu.tasklets) == 1:
        # It is safe to release locks only when we're the only tasklet
        # in the LU
345
        ReleaseLocks(self.lu, locking.LEVEL_NODE,
346
                     keep=[self.instance.primary_node, self.target_node_uuid])
347
        ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
348
349
350
351

    else:
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)

352
      secondary_node_uuids = self.instance.secondary_nodes
Thomas Thrainer's avatar
Thomas Thrainer committed
353
      if not secondary_node_uuids:
354
355
        raise errors.ConfigurationError("No secondary node but using"
                                        " %s disk template" %
356
                                        self.instance.disk_template)
357
      self.target_node_uuid = target_node_uuid = secondary_node_uuids[0]
Thomas Thrainer's avatar
Thomas Thrainer committed
358
359
360
      if self.lu.op.iallocator or \
        (self.lu.op.target_node_uuid and
         self.lu.op.target_node_uuid != target_node_uuid):
361
362
363
364
365
366
367
368
        if self.failover:
          text = "failed over"
        else:
          text = "migrated"
        raise errors.OpPrereqError("Instances with disk template %s cannot"
                                   " be %s to arbitrary nodes"
                                   " (neither an iallocator nor a target"
                                   " node can be passed)" %
369
                                   (self.instance.disk_template, text),
370
                                   errors.ECODE_INVAL)
Thomas Thrainer's avatar
Thomas Thrainer committed
371
      nodeinfo = self.cfg.GetNodeInfo(target_node_uuid)
372
373
374
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
                                                              group_info)
375
376
      CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
                             self.cfg, ignore=self.ignore_ipolicy)
377

378
    i_be = cluster.FillBE(self.instance)
379
380
381

    # check memory requirements on the secondary node
    if (not self.cleanup and
382
383
         (not self.failover or
           self.instance.admin_state == constants.ADMINST_UP)):
384
      self.tgt_free_mem = CheckNodeFreeMemory(
385
386
387
388
          self.lu, target_node_uuid,
          "migrating instance %s" % self.instance.name,
          i_be[constants.BE_MINMEM], self.instance.hypervisor,
          self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])
389
390
391
392
393
394
395
396
397
398
399
400
    else:
      self.lu.LogInfo("Not checking memory on the secondary node as"
                      " instance will not be started")

    # check if failover must be forced instead of migration
    if (not self.cleanup and not self.failover and
        i_be[constants.BE_ALWAYS_FAILOVER]):
      self.lu.LogInfo("Instance configured to always failover; fallback"
                      " to failover")
      self.failover = True

    # check bridge existance
401
402
    CheckInstanceBridgesExist(self.lu, self.instance,
                              node_uuid=target_node_uuid)
403
404

    if not self.cleanup:
Thomas Thrainer's avatar
Thomas Thrainer committed
405
      CheckNodeNotDrained(self.lu, target_node_uuid)
406
      if not self.failover:
407
408
        result = self.rpc.call_instance_migratable(self.instance.primary_node,
                                                   self.instance)
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
        if result.fail_msg and self.fallback:
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
                          " failover")
          self.failover = True
        else:
          result.Raise("Can't migrate, please use failover",
                       prereq=True, ecode=errors.ECODE_STATE)

    assert not (self.failover and self.cleanup)

    if not self.failover:
      if self.lu.op.live is not None and self.lu.op.mode is not None:
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
                                   " parameters are accepted",
                                   errors.ECODE_INVAL)
      if self.lu.op.live is not None:
        if self.lu.op.live:
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
        else:
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
        # reset the 'live' parameter to None so that repeated
        # invocations of CheckPrereq do not raise an exception
        self.lu.op.live = None
      elif self.lu.op.mode is None:
        # read the default value from the hypervisor
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]

      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
    else:
      # Failover is never live
      self.live = False

    if not (self.failover or self.cleanup):
443
      remote_info = self.rpc.call_instance_info(
444
445
          self.instance.primary_node, self.instance.name,
          self.instance.hypervisor, cluster.hvparams[self.instance.hypervisor])
446
      remote_info.Raise("Error checking instance on node %s" %
447
448
                        self.cfg.GetNodeName(self.instance.primary_node),
                        prereq=True)
449
450
451
452
453
454
455
456
457
458
459
      instance_running = bool(remote_info.payload)
      if instance_running:
        self.current_mem = int(remote_info.payload["memory"])

  def _RunAllocator(self):
    """Run the allocator based on input opcode.

    """
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)

    # FIXME: add a self.ignore_ipolicy option
Thomas Thrainer's avatar
Thomas Thrainer committed
460
    req = iallocator.IAReqRelocate(
461
          inst_uuid=self.instance_uuid,
Thomas Thrainer's avatar
Thomas Thrainer committed
462
          relocate_from_node_uuids=[self.instance.primary_node])
463
464
465
466
467
468
469
470
471
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)

    ial.Run(self.lu.op.iallocator)

    if not ial.success:
      raise errors.OpPrereqError("Can't compute nodes using"
                                 " iallocator '%s': %s" %
                                 (self.lu.op.iallocator, ial.info),
                                 errors.ECODE_NORES)
Thomas Thrainer's avatar
Thomas Thrainer committed
472
    self.target_node_uuid = self.cfg.GetNodeInfoByName(ial.result[0]).uuid
473
474
475
476
477
478
479
480
481
482
483
484
485
486
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
                    self.instance_name, self.lu.op.iallocator,
                    utils.CommaJoin(ial.result))

  def _WaitUntilSync(self):
    """Poll with custom rpc for disk sync.

    This uses our own step-based rpc call.

    """
    self.feedback_fn("* wait until resync is done")
    all_done = False
    while not all_done:
      all_done = True
Thomas Thrainer's avatar
Thomas Thrainer committed
487
      result = self.rpc.call_drbd_wait_sync(self.all_node_uuids,
488
489
490
                                            (self.instance.disks,
                                             self.instance))
      min_percent = 100
Thomas Thrainer's avatar
Thomas Thrainer committed
491
492
493
      for node_uuid, nres in result.items():
        nres.Raise("Cannot resync disks on node %s" %
                   self.cfg.GetNodeName(node_uuid))
494
495
496
497
498
499
500
501
502
        node_done, node_percent = nres.payload
        all_done = all_done and node_done
        if node_percent is not None:
          min_percent = min(min_percent, node_percent)
      if not all_done:
        if min_percent < 100:
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
        time.sleep(2)

Thomas Thrainer's avatar
Thomas Thrainer committed
503
  def _EnsureSecondary(self, node_uuid):
504
505
506
    """Demote a node to secondary.

    """
Thomas Thrainer's avatar
Thomas Thrainer committed
507
508
    self.feedback_fn("* switching node %s to secondary mode" %
                     self.cfg.GetNodeName(node_uuid))
509

Thomas Thrainer's avatar
Thomas Thrainer committed
510
    result = self.rpc.call_blockdev_close(node_uuid, self.instance.name,
511
                                          (self.instance.disks, self.instance))
Thomas Thrainer's avatar
Thomas Thrainer committed
512
513
    result.Raise("Cannot change disk to secondary on node %s" %
                 self.cfg.GetNodeName(node_uuid))
514
515
516
517
518
519

  def _GoStandalone(self):
    """Disconnect from the network.

    """
    self.feedback_fn("* changing into standalone mode")
520
521
    result = self.rpc.call_drbd_disconnect_net(
               self.all_node_uuids, (self.instance.disks, self.instance))
Thomas Thrainer's avatar
Thomas Thrainer committed
522
523
524
    for node_uuid, nres in result.items():
      nres.Raise("Cannot disconnect disks node %s" %
                 self.cfg.GetNodeName(node_uuid))
525
526
527
528
529
530
531
532
533
534

  def _GoReconnect(self, multimaster):
    """Reconnect to the network.

    """
    if multimaster:
      msg = "dual-master"
    else:
      msg = "single-master"
    self.feedback_fn("* changing disks into %s mode" % msg)
535
    result = self.rpc.call_drbd_attach_net(self.all_node_uuids,
536
537
                                           (self.instance.disks, self.instance),
                                           self.instance.name, multimaster)
Thomas Thrainer's avatar
Thomas Thrainer committed
538
539
540
    for node_uuid, nres in result.items():
      nres.Raise("Cannot change disks config on node %s" %
                 self.cfg.GetNodeName(node_uuid))
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558

  def _ExecCleanup(self):
    """Try to cleanup after a failed migration.

    The cleanup is done by:
      - check that the instance is running only on one node
        (and update the config if needed)
      - change disks on its secondary node to secondary
      - wait until disks are fully synchronized
      - disconnect from the network
      - change disks into single-master mode
      - wait again until disks are fully synchronized

    """
    # check running on only one node
    self.feedback_fn("* checking where the instance actually runs"
                     " (if this hangs, the hypervisor might be in"
                     " a bad state)")
559
    cluster_hvparams = self.cfg.GetClusterInfo().hvparams
Thomas Thrainer's avatar
Thomas Thrainer committed
560
    ins_l = self.rpc.call_instance_list(self.all_node_uuids,
561
                                        [self.instance.hypervisor],
562
                                        cluster_hvparams)
Thomas Thrainer's avatar
Thomas Thrainer committed
563
564
    for node_uuid, result in ins_l.items():
      result.Raise("Can't contact node %s" % node_uuid)
565

566
567
568
569
    runningon_source = self.instance.name in \
                         ins_l[self.source_node_uuid].payload
    runningon_target = self.instance.name in \
                         ins_l[self.target_node_uuid].payload
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585

    if runningon_source and runningon_target:
      raise errors.OpExecError("Instance seems to be running on two nodes,"
                               " or the hypervisor is confused; you will have"
                               " to ensure manually that it runs only on one"
                               " and restart this operation")

    if not (runningon_source or runningon_target):
      raise errors.OpExecError("Instance does not seem to be running at all;"
                               " in this case it's safer to repair by"
                               " running 'gnt-instance stop' to ensure disk"
                               " shutdown, and then restarting it")

    if runningon_target:
      # the migration has actually succeeded, we need to update the config
      self.feedback_fn("* instance running on secondary node (%s),"
Thomas Thrainer's avatar
Thomas Thrainer committed
586
                       " updating config" %
587
588
589
590
                       self.cfg.GetNodeName(self.target_node_uuid))
      self.instance.primary_node = self.target_node_uuid
      self.cfg.Update(self.instance, self.feedback_fn)
      demoted_node_uuid = self.source_node_uuid
591
592
    else:
      self.feedback_fn("* instance confirmed to be running on its"
Thomas Thrainer's avatar
Thomas Thrainer committed
593
                       " primary node (%s)" %
594
595
                       self.cfg.GetNodeName(self.source_node_uuid))
      demoted_node_uuid = self.target_node_uuid
596

597
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
Thomas Thrainer's avatar
Thomas Thrainer committed
598
      self._EnsureSecondary(demoted_node_uuid)
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
      try:
        self._WaitUntilSync()
      except errors.OpExecError:
        # we ignore here errors, since if the device is standalone, it
        # won't be able to sync
        pass
      self._GoStandalone()
      self._GoReconnect(False)
      self._WaitUntilSync()

    self.feedback_fn("* done")

  def _RevertDiskStatus(self):
    """Try to revert the disk status after a failed migration.

    """
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
      return

    try:
Thomas Thrainer's avatar
Thomas Thrainer committed
619
      self._EnsureSecondary(self.target_node_uuid)
620
621
622
623
624
625
626
627
628
629
630
631
      self._GoStandalone()
      self._GoReconnect(False)
      self._WaitUntilSync()
    except errors.OpExecError, err:
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
                         " please try to recover the instance manually;"
                         " error '%s'" % str(err))

  def _AbortMigration(self):
    """Call the hypervisor code to abort a started migration.

    """
Thomas Thrainer's avatar
Thomas Thrainer committed
632
    abort_result = self.rpc.call_instance_finalize_migration_dst(
633
634
                     self.target_node_uuid, self.instance, self.migration_info,
                     False)
635
636
637
    abort_msg = abort_result.fail_msg
    if abort_msg:
      logging.error("Aborting migration failed on target node %s: %s",
Thomas Thrainer's avatar
Thomas Thrainer committed
638
                    self.cfg.GetNodeName(self.target_node_uuid), abort_msg)
639
640
641
642
      # Don't raise an exception here, as we stil have to try to revert the
      # disk status, even if this step failed.

    abort_result = self.rpc.call_instance_finalize_migration_src(
643
      self.source_node_uuid, self.instance, False, self.live)
644
645
646
    abort_msg = abort_result.fail_msg
    if abort_msg:
      logging.error("Aborting migration failed on source node %s: %s",
Thomas Thrainer's avatar
Thomas Thrainer committed
647
                    self.cfg.GetNodeName(self.source_node_uuid), abort_msg)
648
649
650
651
652
653
654
655
656
657
658
659
660
661

  def _ExecMigration(self):
    """Migrate an instance.

    The migrate is done by:
      - change the disks into dual-master mode
      - wait until disks are fully synchronized again
      - migrate the instance
      - change disks on the new secondary node (the old primary) to secondary
      - wait until disks are fully synchronized
      - change disks into single-master mode

    """
    # Check for hypervisor version mismatch and warn the user.
662
663
664
    hvspecs = [(self.instance.hypervisor,
                self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])]
    nodeinfo = self.rpc.call_node_info(
665
                 [self.source_node_uuid, self.target_node_uuid], None, hvspecs)
666
667
668
    for ninfo in nodeinfo.values():
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
                  ninfo.node)
669
670
    (_, _, (src_info, )) = nodeinfo[self.source_node_uuid].payload
    (_, _, (dst_info, )) = nodeinfo[self.target_node_uuid].payload
671
672
673
674
675
676
677
678
679
680
681

    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
      if src_version != dst_version:
        self.feedback_fn("* warning: hypervisor version mismatch between"
                         " source (%s) and target (%s) node" %
                         (src_version, dst_version))

    self.feedback_fn("* checking disk consistency between source and target")
682
683
684
    for (idx, dev) in enumerate(self.instance.disks):
      if not CheckDiskConsistency(self.lu, self.instance, dev,
                                  self.target_node_uuid,
Thomas Thrainer's avatar
Thomas Thrainer committed
685
                                  False):
686
687
688
689
690
691
692
693
694
        raise errors.OpExecError("Disk %s is degraded or not fully"
                                 " synchronized on target node,"
                                 " aborting migration" % idx)

    if self.current_mem > self.tgt_free_mem:
      if not self.allow_runtime_changes:
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
                                 " free memory to fit instance %s on target"
                                 " node %s (have %dMB, need %dMB)" %
695
696
                                 (self.instance.name,
                                  self.cfg.GetNodeName(self.target_node_uuid),
697
698
                                  self.tgt_free_mem, self.current_mem))
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
699
700
      rpcres = self.rpc.call_instance_balloon_memory(self.instance.primary_node,
                                                     self.instance,
701
702
703
704
                                                     self.tgt_free_mem)
      rpcres.Raise("Cannot modify instance runtime memory")

    # First get the migration information from the remote node
705
    result = self.rpc.call_migration_info(self.source_node_uuid, self.instance)
706
707
708
    msg = result.fail_msg
    if msg:
      log_err = ("Failed fetching source migration information from %s: %s" %
709
                 (self.cfg.GetNodeName(self.source_node_uuid), msg))
710
711
712
713
714
715
716
      logging.error(log_err)
      raise errors.OpExecError(log_err)

    self.migration_info = migration_info = result.payload

    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
      # Then switch the disks to master/master mode
717
      self._EnsureSecondary(self.target_node_uuid)
718
719
720
721
      self._GoStandalone()
      self._GoReconnect(True)
      self._WaitUntilSync()

Thomas Thrainer's avatar
Thomas Thrainer committed
722
    self.feedback_fn("* preparing %s to accept the instance" %
723
724
725
                     self.cfg.GetNodeName(self.target_node_uuid))
    result = self.rpc.call_accept_instance(self.target_node_uuid,
                                           self.instance,
726
                                           migration_info,
727
                                           self.nodes_ip[self.target_node_uuid])
728
729
730
731
732
733
734
735
736

    msg = result.fail_msg
    if msg:
      logging.error("Instance pre-migration failed, trying to revert"
                    " disk status: %s", msg)
      self.feedback_fn("Pre-migration failed, aborting")
      self._AbortMigration()
      self._RevertDiskStatus()
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
737
                               (self.instance.name, msg))
738

Thomas Thrainer's avatar
Thomas Thrainer committed
739
    self.feedback_fn("* migrating instance to %s" %
740
                     self.cfg.GetNodeName(self.target_node_uuid))
741
742
    cluster = self.cfg.GetClusterInfo()
    result = self.rpc.call_instance_migrate(
743
744
        self.source_node_uuid, cluster.cluster_name, self.instance,
        self.nodes_ip[self.target_node_uuid], self.live)
745
746
747
748
749
750
751
752
    msg = result.fail_msg
    if msg:
      logging.error("Instance migration failed, trying to revert"
                    " disk status: %s", msg)
      self.feedback_fn("Migration failed, aborting")
      self._AbortMigration()
      self._RevertDiskStatus()
      raise errors.OpExecError("Could not migrate instance %s: %s" %
753
                               (self.instance.name, msg))
754
755
756
757

    self.feedback_fn("* starting memory transfer")
    last_feedback = time.time()
    while True:
758
759
      result = self.rpc.call_instance_get_migration_status(
                 self.source_node_uuid, self.instance)
760
761
762
763
764
765
766
767
768
769
770
      msg = result.fail_msg
      ms = result.payload   # MigrationStatus instance
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
        logging.error("Instance migration failed, trying to revert"
                      " disk status: %s", msg)
        self.feedback_fn("Migration failed, aborting")
        self._AbortMigration()
        self._RevertDiskStatus()
        if not msg:
          msg = "hypervisor returned failure"
        raise errors.OpExecError("Could not migrate instance %s: %s" %
771
                                 (self.instance.name, msg))
772
773
774
775
776
777
778
779
780
781
782
783
784
785

      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
        self.feedback_fn("* memory transfer complete")
        break

      if (utils.TimeoutExpired(last_feedback,
                               self._MIGRATION_FEEDBACK_INTERVAL) and
          ms.transferred_ram is not None):
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
        last_feedback = time.time()

      time.sleep(self._MIGRATION_POLL_INTERVAL)

786
787
    result = self.rpc.call_instance_finalize_migration_src(
               self.source_node_uuid, self.instance, True, self.live)
788
789
790
791
792
793
794
    msg = result.fail_msg
    if msg:
      logging.error("Instance migration succeeded, but finalization failed"
                    " on the source node: %s", msg)
      raise errors.OpExecError("Could not finalize instance migration: %s" %
                               msg)

795
    self.instance.primary_node = self.target_node_uuid
796
797

    # distribute new instance config to the other nodes
798
    self.cfg.Update(self.instance, self.feedback_fn)
799

800
801
    result = self.rpc.call_instance_finalize_migration_dst(
               self.target_node_uuid, self.instance, migration_info, True)
802
803
804
805
806
807
808
809
    msg = result.fail_msg
    if msg:
      logging.error("Instance migration succeeded, but finalization failed"
                    " on the target node: %s", msg)
      raise errors.OpExecError("Could not finalize instance migration: %s" %
                               msg)

    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
810
      self._EnsureSecondary(self.source_node_uuid)
811
812
813
814
815
816
817
818
      self._WaitUntilSync()
      self._GoStandalone()
      self._GoReconnect(False)
      self._WaitUntilSync()

    # If the instance's disk template is `rbd' or `ext' and there was a
    # successful migration, unmap the device from the source node.
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
819
      disks = ExpandCheckDisks(self.instance, self.instance.disks)
Thomas Thrainer's avatar
Thomas Thrainer committed
820
      self.feedback_fn("* unmapping instance's disks from %s" %
821
                       self.cfg.GetNodeName(self.source_node_uuid))
822
      for disk in disks:
823
824
        result = self.rpc.call_blockdev_shutdown(self.source_node_uuid,
                                                 (disk, self.instance))
825
826
827
828
        msg = result.fail_msg
        if msg:
          logging.error("Migration was successful, but couldn't unmap the"
                        " block device %s on source node %s: %s",
829
830
                        disk.iv_name,
                        self.cfg.GetNodeName(self.source_node_uuid), msg)
831
          logging.error("You need to unmap the device %s manually on %s",
832
833
                        disk.iv_name,
                        self.cfg.GetNodeName(self.source_node_uuid))
834
835
836
837
838
839
840
841
842
843

    self.feedback_fn("* done")

  def _ExecFailover(self):
    """Failover an instance.

    The failover is done by shutting it down on its present node and
    starting it on the secondary.

    """
844
    primary_node = self.cfg.GetNodeInfo(self.instance.primary_node)
845

846
    source_node_uuid = self.instance.primary_node
847

848
    if self.instance.disks_active:
849
      self.feedback_fn("* checking disk consistency between source and target")
850
      for (idx, dev) in enumerate(self.instance.disks):
851
        # for drbd, these are drbd over lvm
852
853
        if not CheckDiskConsistency(self.lu, self.instance, dev,
                                    self.target_node_uuid, False):
854
855
856
          if primary_node.offline:
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
                             " target node %s" %
Thomas Thrainer's avatar
Thomas Thrainer committed
857
                             (primary_node.name, idx,
858
                              self.cfg.GetNodeName(self.target_node_uuid)))
859
860
861
862
863
864
865
866
867
          elif not self.ignore_consistency:
            raise errors.OpExecError("Disk %s is degraded on target node,"
                                     " aborting failover" % idx)
    else:
      self.feedback_fn("* not checking disk consistency as instance is not"
                       " running")

    self.feedback_fn("* shutting down instance on source node")
    logging.info("Shutting down instance %s on node %s",
868
                 self.instance.name, self.cfg.GetNodeName(source_node_uuid))
869

870
    result = self.rpc.call_instance_shutdown(source_node_uuid, self.instance,
871
872
873
874
875
876
877
878
                                             self.shutdown_timeout,
                                             self.lu.op.reason)
    msg = result.fail_msg
    if msg:
      if self.ignore_consistency or primary_node.offline:
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
                           " proceeding anyway; please make sure node"
                           " %s is down; error details: %s",
879
                           self.instance.name,
Thomas Thrainer's avatar
Thomas Thrainer committed
880
881
                           self.cfg.GetNodeName(source_node_uuid),
                           self.cfg.GetNodeName(source_node_uuid), msg)
882
883
884
      else:
        raise errors.OpExecError("Could not shutdown instance %s on"
                                 " node %s: %s" %
885
                                 (self.instance.name,
Thomas Thrainer's avatar
Thomas Thrainer committed
886
                                  self.cfg.GetNodeName(source_node_uuid), msg))
887
888

    self.feedback_fn("* deactivating the instance's disks on source node")
889
    if not ShutdownInstanceDisks(self.lu, self.instance, ignore_primary=True):
890
891
      raise errors.OpExecError("Can't shut down the instance's disks")

892
    self.instance.primary_node = self.target_node_uuid
893
    # distribute new instance config to the other nodes
894
    self.cfg.Update(self.instance, self.feedback_fn)
895
896

    # Only start the instance if it's marked as up
897
    if self.instance.admin_state == constants.ADMINST_UP:
898
      self.feedback_fn("* activating the instance's disks on target node %s" %
899
900
901
                       self.cfg.GetNodeName(self.target_node_uuid))
      logging.info("Starting instance %s on node %s", self.instance.name,
                   self.cfg.GetNodeName(self.target_node_uuid))
902

903
      disks_ok, _ = AssembleInstanceDisks(self.lu, self.instance,
904
                                          ignore_secondaries=True)
905
      if not disks_ok:
906
        ShutdownInstanceDisks(self.lu, self.instance)
907
908
909
        raise errors.OpExecError("Can't activate the instance's disks")

      self.feedback_fn("* starting the instance on the target node %s" %
910
911
912
                       self.cfg.GetNodeName(self.target_node_uuid))
      result = self.rpc.call_instance_start(self.target_node_uuid,
                                            (self.instance, None, None), False,
Thomas Thrainer's avatar
Thomas Thrainer committed
913
                                            self.lu.op.reason)
914
915
      msg = result.fail_msg
      if msg:
916
        ShutdownInstanceDisks(self.lu, self.instance)
917
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
918
919
920
                                 (self.instance.name,
                                  self.cfg.GetNodeName(self.target_node_uuid),
                                  msg))
921
922
923
924
925
926

  def Exec(self, feedback_fn):
    """Perform the migration.

    """
    self.feedback_fn = feedback_fn
Thomas Thrainer's avatar
Thomas Thrainer committed
927
    self.source_node_uuid = self.instance.primary_node
928
929
930

    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
Thomas Thrainer's avatar
Thomas Thrainer committed
931
      self.target_node_uuid = self.instance.secondary_nodes[0]
932
933
934
      # Otherwise self.target_node has been populated either
      # directly, or through an iallocator.

Thomas Thrainer's avatar
Thomas Thrainer committed
935
936
937
    self.all_node_uuids = [self.source_node_uuid, self.target_node_uuid]
    self.nodes_ip = dict((uuid, node.secondary_ip) for (uuid, node)
                         in self.cfg.GetMultiNodeInfo(self.all_node_uuids))
938
939
940
941
942
943
944
945
946
947
948

    if self.failover:
      feedback_fn("Failover instance %s" % self.instance.name)
      self._ExecFailover()
    else:
      feedback_fn("Migrating instance %s" % self.instance.name)

      if self.cleanup:
        return self._ExecCleanup()
      else:
        return self._ExecMigration()