instance_storage.py 98.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#
#

# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Logical units dealing with storage of instances."""

import itertools
import logging
import os
import time

from ganeti import compat
from ganeti import constants
from ganeti import errors
from ganeti import ht
from ganeti import locking
from ganeti.masterd import iallocator
from ganeti import objects
from ganeti import utils
from ganeti import rpc
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
Thomas Thrainer's avatar
Thomas Thrainer committed
40
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
  CheckDiskTemplateEnabled
44
45
46
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48
49
50
51
52
53
54
55
56
57

import ganeti.masterd.instance


_DISK_TEMPLATE_NAME_PREFIX = {
  constants.DT_PLAIN: "",
  constants.DT_RBD: ".rbd",
  constants.DT_EXT: ".ext",
  }


Thomas Thrainer's avatar
Thomas Thrainer committed
58
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
59
                         excl_stor):
60
61
62
63
64
65
  """Create a single block device on a given node.

  This will not recurse over children of the device, so they must be
  created in advance.

  @param lu: the lu on whose behalf we execute
Thomas Thrainer's avatar
Thomas Thrainer committed
66
  @param node_uuid: the node on which to create the device
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
  @type instance: L{objects.Instance}
  @param instance: the instance which owns the device
  @type device: L{objects.Disk}
  @param device: the device to create
  @param info: the extra 'metadata' we should attach to the device
      (this will be represented as a LVM tag)
  @type force_open: boolean
  @param force_open: this parameter will be passes to the
      L{backend.BlockdevCreate} function where it specifies
      whether we run on primary or not, and it affects both
      the child assembly and the device own Open() execution
  @type excl_stor: boolean
  @param excl_stor: Whether exclusive_storage is active for the node

  """
82
83
84
  result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
                                       device.size, instance.name, force_open,
                                       info, excl_stor)
85
  result.Raise("Can't create block device %s on"
Thomas Thrainer's avatar
Thomas Thrainer committed
86
87
88
               " node %s for instance %s" % (device,
                                             lu.cfg.GetNodeName(node_uuid),
                                             instance.name))
89
90


Thomas Thrainer's avatar
Thomas Thrainer committed
91
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
92
93
94
95
96
97
98
99
100
101
102
                         info, force_open, excl_stor):
  """Create a tree of block devices on a given node.

  If this device type has to be created on secondaries, create it and
  all its children.

  If not, just recurse to children keeping the same 'force' value.

  @attention: The device has to be annotated already.

  @param lu: the lu on whose behalf we execute
Thomas Thrainer's avatar
Thomas Thrainer committed
103
  @param node_uuid: the node on which to create the device
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
  @type instance: L{objects.Instance}
  @param instance: the instance which owns the device
  @type device: L{objects.Disk}
  @param device: the device to create
  @type force_create: boolean
  @param force_create: whether to force creation of this device; this
      will be change to True whenever we find a device which has
      CreateOnSecondary() attribute
  @param info: the extra 'metadata' we should attach to the device
      (this will be represented as a LVM tag)
  @type force_open: boolean
  @param force_open: this parameter will be passes to the
      L{backend.BlockdevCreate} function where it specifies
      whether we run on primary or not, and it affects both
      the child assembly and the device own Open() execution
  @type excl_stor: boolean
  @param excl_stor: Whether exclusive_storage is active for the node

  @return: list of created devices
  """
  created_devices = []
  try:
    if device.CreateOnSecondary():
      force_create = True

    if device.children:
      for child in device.children:
Thomas Thrainer's avatar
Thomas Thrainer committed
131
132
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
                                    force_create, info, force_open, excl_stor)
133
134
135
136
137
        created_devices.extend(devs)

    if not force_create:
      return created_devices

Thomas Thrainer's avatar
Thomas Thrainer committed
138
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
139
                         excl_stor)
140
141
    # The device has been completely created, so there is no point in keeping
    # its subdevices in the list. We just add the device itself instead.
Thomas Thrainer's avatar
Thomas Thrainer committed
142
    created_devices = [(node_uuid, device)]
143
144
145
146
147
148
149
150
151
    return created_devices

  except errors.DeviceCreationError, e:
    e.created_devices.extend(created_devices)
    raise e
  except errors.OpExecError, e:
    raise errors.DeviceCreationError(str(e), created_devices)


Thomas Thrainer's avatar
Thomas Thrainer committed
152
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
153
154
155
156
  """Whether exclusive_storage is in effect for the given node.

  @type cfg: L{config.ConfigWriter}
  @param cfg: The cluster configuration
Thomas Thrainer's avatar
Thomas Thrainer committed
157
158
  @type node_uuid: string
  @param node_uuid: The node UUID
159
160
161
162
163
  @rtype: bool
  @return: The effective value of exclusive_storage
  @raise errors.OpPrereqError: if no node exists with the given name

  """
Thomas Thrainer's avatar
Thomas Thrainer committed
164
  ni = cfg.GetNodeInfo(node_uuid)
165
  if ni is None:
Thomas Thrainer's avatar
Thomas Thrainer committed
166
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
167
                               errors.ECODE_NOENT)
168
  return IsExclusiveStorageEnabledNode(cfg, ni)
169
170


Thomas Thrainer's avatar
Thomas Thrainer committed
171
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
172
173
174
175
176
177
                    force_open):
  """Wrapper around L{_CreateBlockDevInner}.

  This method annotates the root device first.

  """
178
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
Thomas Thrainer's avatar
Thomas Thrainer committed
179
180
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
181
182
183
                              force_open, excl_stor)


184
def _UndoCreateDisks(lu, disks_created, instance):
185
186
187
188
189
190
191
192
  """Undo the work performed by L{CreateDisks}.

  This function is called in case of an error to undo the work of
  L{CreateDisks}.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @param disks_created: the result returned by L{CreateDisks}
193
194
  @type instance: L{objects.Instance}
  @param instance: the instance for which disks were created
195
196

  """
Thomas Thrainer's avatar
Thomas Thrainer committed
197
  for (node_uuid, disk) in disks_created:
198
    result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
199
    result.Warn("Failed to remove newly-created disk %s on node %s" %
Thomas Thrainer's avatar
Thomas Thrainer committed
200
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
201
202


Thomas Thrainer's avatar
Thomas Thrainer committed
203
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
204
205
206
207
208
209
210
211
212
213
  """Create all disks for an instance.

  This abstracts away some work from AddInstance.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @type instance: L{objects.Instance}
  @param instance: the instance whose disks we should create
  @type to_skip: list
  @param to_skip: list of indices to skip
Thomas Thrainer's avatar
Thomas Thrainer committed
214
215
  @type target_node_uuid: string
  @param target_node_uuid: if passed, overrides the target node for creation
216
217
218
219
220
221
  @type disks: list of {objects.Disk}
  @param disks: the disks to create; if not specified, all the disks of the
      instance are created
  @return: information about the created disks, to be used to call
      L{_UndoCreateDisks}
  @raise errors.OpPrereqError: in case of error
222
223

  """
224
  info = GetInstanceInfoText(instance)
Thomas Thrainer's avatar
Thomas Thrainer committed
225
226
227
  if target_node_uuid is None:
    pnode_uuid = instance.primary_node
    all_node_uuids = instance.all_nodes
228
  else:
Thomas Thrainer's avatar
Thomas Thrainer committed
229
230
    pnode_uuid = target_node_uuid
    all_node_uuids = [pnode_uuid]
231

232
233
234
  if disks is None:
    disks = instance.disks

235
236
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)

237
238
  if instance.disk_template in constants.DTS_FILEBASED:
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
Thomas Thrainer's avatar
Thomas Thrainer committed
239
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
240
241

    result.Raise("Failed to create directory '%s' on"
Thomas Thrainer's avatar
Thomas Thrainer committed
242
243
                 " node %s" % (file_storage_dir,
                               lu.cfg.GetNodeName(pnode_uuid)))
244
245

  disks_created = []
246
  for idx, device in enumerate(disks):
247
248
249
    if to_skip and idx in to_skip:
      continue
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
Thomas Thrainer's avatar
Thomas Thrainer committed
250
251
    for node_uuid in all_node_uuids:
      f_create = node_uuid == pnode_uuid
252
      try:
Thomas Thrainer's avatar
Thomas Thrainer committed
253
254
255
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
                        f_create)
        disks_created.append((node_uuid, device))
256
257
258
259
      except errors.DeviceCreationError, e:
        logging.warning("Creating disk %s for instance '%s' failed",
                        idx, instance.name)
        disks_created.extend(e.created_devices)
260
        _UndoCreateDisks(lu, disks_created, instance)
261
        raise errors.OpExecError(e.message)
262
  return disks_created
263
264


265
def ComputeDiskSizePerVG(disk_template, disks):
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
  """Compute disk size requirements in the volume group

  """
  def _compute(disks, payload):
    """Universal algorithm.

    """
    vgs = {}
    for disk in disks:
      vgs[disk[constants.IDISK_VG]] = \
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload

    return vgs

  # Required free disk space as a function of disk and swap space
  req_size_dict = {
    constants.DT_DISKLESS: {},
    constants.DT_PLAIN: _compute(disks, 0),
    # 128 MB are added for drbd metadata for each disk
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
    constants.DT_FILE: {},
    constants.DT_SHARED_FILE: {},
Santi Raffa's avatar
Santi Raffa committed
288
    constants.DT_GLUSTER: {},
289
290
291
292
293
294
295
296
297
    }

  if disk_template not in req_size_dict:
    raise errors.ProgrammerError("Disk template '%s' size requirement"
                                 " is unknown" % disk_template)

  return req_size_dict[disk_template]


298
def ComputeDisks(op, default_vg):
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
  """Computes the instance disks.

  @param op: The instance opcode
  @param default_vg: The default_vg to assume

  @return: The computed disks

  """
  disks = []
  for disk in op.disks:
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
    if mode not in constants.DISK_ACCESS_SET:
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
                                 mode, errors.ECODE_INVAL)
    size = disk.get(constants.IDISK_SIZE, None)
    if size is None:
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
    try:
      size = int(size)
    except (TypeError, ValueError):
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
                                 errors.ECODE_INVAL)

    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
    if ext_provider and op.disk_template != constants.DT_EXT:
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
                                 " disk template, not %s" %
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
                                  op.disk_template), errors.ECODE_INVAL)

    data_vg = disk.get(constants.IDISK_VG, default_vg)
    name = disk.get(constants.IDISK_NAME, None)
    if name is not None and name.lower() == constants.VALUE_NONE:
      name = None
    new_disk = {
      constants.IDISK_SIZE: size,
      constants.IDISK_MODE: mode,
      constants.IDISK_VG: data_vg,
      constants.IDISK_NAME: name,
      }

340
341
342
343
344
345
346
    for key in [
      constants.IDISK_METAVG,
      constants.IDISK_ADOPT,
      constants.IDISK_SPINDLES,
      ]:
      if key in disk:
        new_disk[key] = disk[key]
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364

    # For extstorage, demand the `provider' option and add any
    # additional parameters (ext-params) to the dict
    if op.disk_template == constants.DT_EXT:
      if ext_provider:
        new_disk[constants.IDISK_PROVIDER] = ext_provider
        for key in disk:
          if key not in constants.IDISK_PARAMS:
            new_disk[key] = disk[key]
      else:
        raise errors.OpPrereqError("Missing provider for template '%s'" %
                                   constants.DT_EXT, errors.ECODE_INVAL)

    disks.append(new_disk)

  return disks


365
def CheckRADOSFreeSpace():
366
367
368
369
370
371
372
  """Compute disk size requirements inside the RADOS cluster.

  """
  # For the RADOS cluster we assume there is always enough space.
  pass


Thomas Thrainer's avatar
Thomas Thrainer committed
373
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
374
375
376
377
378
379
380
381
                         iv_name, p_minor, s_minor):
  """Generate a drbd8 device complete with its children.

  """
  assert len(vgnames) == len(names) == 2
  port = lu.cfg.AllocatePort()
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())

382
  dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
383
384
385
                          logical_id=(vgnames[0], names[0]),
                          params={})
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
386
  dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
387
388
389
390
                          size=constants.DRBD_META_SIZE,
                          logical_id=(vgnames[1], names[1]),
                          params={})
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
391
  drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
Thomas Thrainer's avatar
Thomas Thrainer committed
392
                          logical_id=(primary_uuid, secondary_uuid, port,
393
394
395
396
397
398
399
400
                                      p_minor, s_minor,
                                      shared_secret),
                          children=[dev_data, dev_meta],
                          iv_name=iv_name, params={})
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
  return drbd_dev


401
def GenerateDiskTemplate(
402
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
403
  disk_info, file_storage_dir, file_driver, base_index,
404
  feedback_fn, full_disk_params):
405
406
407
408
409
410
411
  """Generate the entire disk layout for a given template type.

  """
  vgname = lu.cfg.GetVGName()
  disk_count = len(disk_info)
  disks = []

412
413
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)

414
415
416
  if template_name == constants.DT_DISKLESS:
    pass
  elif template_name == constants.DT_DRBD8:
Thomas Thrainer's avatar
Thomas Thrainer committed
417
    if len(secondary_node_uuids) != 1:
418
      raise errors.ProgrammerError("Wrong template configuration")
Thomas Thrainer's avatar
Thomas Thrainer committed
419
    remote_node_uuid = secondary_node_uuids[0]
420
    minors = lu.cfg.AllocateDRBDMinor(
421
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
422
423
424
425
426
427
428
429
430
431
432
433
434
435

    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
                                                       full_disk_params)
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]

    names = []
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
                                               for i in range(disk_count)]):
      names.append(lv_prefix + "_data")
      names.append(lv_prefix + "_meta")
    for idx, disk in enumerate(disk_info):
      disk_index = idx + base_index
      data_vg = disk.get(constants.IDISK_VG, vgname)
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
Thomas Thrainer's avatar
Thomas Thrainer committed
436
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
437
438
439
440
441
442
443
444
445
                                      disk[constants.IDISK_SIZE],
                                      [data_vg, meta_vg],
                                      names[idx * 2:idx * 2 + 2],
                                      "disk/%d" % disk_index,
                                      minors[idx * 2], minors[idx * 2 + 1])
      disk_dev.mode = disk[constants.IDISK_MODE]
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
      disks.append(disk_dev)
  else:
Thomas Thrainer's avatar
Thomas Thrainer committed
446
    if secondary_node_uuids:
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
      raise errors.ProgrammerError("Wrong template configuration")

    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
    if name_prefix is None:
      names = None
    else:
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
                                        (name_prefix, base_index + i)
                                        for i in range(disk_count)])

    if template_name == constants.DT_PLAIN:

      def logical_id_fn(idx, _, disk):
        vg = disk.get(constants.IDISK_VG, vgname)
        return (vg, names[idx])

Santi Raffa's avatar
Santi Raffa committed
463
464
465
466
467
468
    elif template_name == constants.DT_GLUSTER:
      logical_id_fn = lambda _1, disk_index, _2: \
        (file_driver, "ganeti/%s.%d" % (instance_uuid,
                                        disk_index))

    elif template_name in constants.DTS_FILEBASED: # Gluster handled above
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
      logical_id_fn = \
        lambda _, disk_index, disk: (file_driver,
                                     "%s/disk%d" % (file_storage_dir,
                                                    disk_index))
    elif template_name == constants.DT_BLOCK:
      logical_id_fn = \
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
                                       disk[constants.IDISK_ADOPT])
    elif template_name == constants.DT_RBD:
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
    elif template_name == constants.DT_EXT:
      def logical_id_fn(idx, _, disk):
        provider = disk.get(constants.IDISK_PROVIDER, None)
        if provider is None:
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
                                       " not found", constants.DT_EXT,
                                       constants.IDISK_PROVIDER)
        return (provider, names[idx])
    else:
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)

490
    dev_type = template_name
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507

    for idx, disk in enumerate(disk_info):
      params = {}
      # Only for the Ext template add disk_info to params
      if template_name == constants.DT_EXT:
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
        for key in disk:
          if key not in constants.IDISK_PARAMS:
            params[key] = disk[key]
      disk_index = idx + base_index
      size = disk[constants.IDISK_SIZE]
      feedback_fn("* disk %s, size %s" %
                  (disk_index, utils.FormatUnit(size, "h")))
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
                              logical_id=logical_id_fn(idx, disk_index, disk),
                              iv_name="disk/%d" % disk_index,
                              mode=disk[constants.IDISK_MODE],
508
509
                              params=params,
                              spindles=disk.get(constants.IDISK_SPINDLES))
510
511
512
513
514
515
516
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
      disks.append(disk_dev)

  return disks


517
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
518
519
520
521
522
523
  """Check the presence of the spindle options with exclusive_storage.

  @type diskdict: dict
  @param diskdict: disk parameters
  @type es_flag: bool
  @param es_flag: the effective value of the exlusive_storage flag
524
525
  @type required: bool
  @param required: whether spindles are required or just optional
526
527
528
529
530
531
532
533
  @raise errors.OpPrereqError when spindles are given and they should not

  """
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
      diskdict[constants.IDISK_SPINDLES] is not None):
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
                               " when exclusive storage is not active",
                               errors.ECODE_INVAL)
534
535
536
537
538
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
                                diskdict[constants.IDISK_SPINDLES] is None)):
    raise errors.OpPrereqError("You must specify spindles in instance disks"
                               " when exclusive storage is active",
                               errors.ECODE_INVAL)
539
540


541
542
543
544
545
546
547
548
549
550
551
class LUInstanceRecreateDisks(LogicalUnit):
  """Recreate an instance's missing disks.

  """
  HPATH = "instance-recreate-disks"
  HTYPE = constants.HTYPE_INSTANCE
  REQ_BGL = False

  _MODIFYABLE = compat.UniqueFrozenset([
    constants.IDISK_SIZE,
    constants.IDISK_MODE,
552
    constants.IDISK_SPINDLES,
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
    ])

  # New or changed disk parameters may have different semantics
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
    constants.IDISK_ADOPT,

    # TODO: Implement support changing VG while recreating
    constants.IDISK_VG,
    constants.IDISK_METAVG,
    constants.IDISK_PROVIDER,
    constants.IDISK_NAME,
    ]))

  def _RunAllocator(self):
    """Run the allocator based on input opcode.

    """
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)

    # FIXME
    # The allocator should actually run in "relocate" mode, but current
    # allocators don't support relocating all the nodes of an instance at
    # the same time. As a workaround we use "allocate" mode, but this is
    # suboptimal for two reasons:
    # - The instance name passed to the allocator is present in the list of
    #   existing instances, so there could be a conflict within the
    #   internal structures of the allocator. This doesn't happen with the
    #   current allocators, but it's a liability.
    # - The allocator counts the resources used by the instance twice: once
    #   because the instance exists already, and once because it tries to
    #   allocate a new instance.
    # The allocator could choose some of the nodes on which the instance is
    # running, but that's not a problem. If the instance nodes are broken,
    # they should be already be marked as drained or offline, and hence
    # skipped by the allocator. If instance disks have been lost for other
    # reasons, then recreating the disks on the same nodes should be fine.
    disk_template = self.instance.disk_template
    spindle_use = be_full[constants.BE_SPINDLE_USE]
591
592
593
594
595
    disks = [{
      constants.IDISK_SIZE: d.size,
      constants.IDISK_MODE: d.mode,
      constants.IDISK_SPINDLES: d.spindles,
      } for d in self.instance.disks]
596
597
598
599
600
601
602
603
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
                                        disk_template=disk_template,
                                        tags=list(self.instance.GetTags()),
                                        os=self.instance.os,
                                        nics=[{}],
                                        vcpus=be_full[constants.BE_VCPUS],
                                        memory=be_full[constants.BE_MAXMEM],
                                        spindle_use=spindle_use,
604
                                        disks=disks,
605
606
607
608
609
610
611
612
613
614
615
616
617
                                        hypervisor=self.instance.hypervisor,
                                        node_whitelist=None)
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)

    ial.Run(self.op.iallocator)

    assert req.RequiredNodes() == len(self.instance.all_nodes)

    if not ial.success:
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
                                 " %s" % (self.op.iallocator, ial.info),
                                 errors.ECODE_NORES)

Thomas Thrainer's avatar
Thomas Thrainer committed
618
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
619
620
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
                 self.op.instance_name, self.op.iallocator,
Thomas Thrainer's avatar
Thomas Thrainer committed
621
                 utils.CommaJoin(self.op.nodes))
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636

  def CheckArguments(self):
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
      # Normalize and convert deprecated list of disk indices
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]

    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
    if duplicates:
      raise errors.OpPrereqError("Some disks have been specified more than"
                                 " once: %s" % utils.CommaJoin(duplicates),
                                 errors.ECODE_INVAL)

    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
    # when neither iallocator nor nodes are specified
    if self.op.iallocator or self.op.nodes:
637
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652

    for (idx, params) in self.op.disks:
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
      if unsupported:
        raise errors.OpPrereqError("Parameters for disk %s try to change"
                                   " unmodifyable parameter(s): %s" %
                                   (idx, utils.CommaJoin(unsupported)),
                                   errors.ECODE_INVAL)

  def ExpandNames(self):
    self._ExpandAndLockInstance()
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND

    if self.op.nodes:
Thomas Thrainer's avatar
Thomas Thrainer committed
653
654
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
    else:
      self.needed_locks[locking.LEVEL_NODE] = []
      if self.op.iallocator:
        # iallocator will select a new node in the same group
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET

    self.needed_locks[locking.LEVEL_NODE_RES] = []

  def DeclareLocks(self, level):
    if level == locking.LEVEL_NODEGROUP:
      assert self.op.iallocator is not None
      assert not self.op.nodes
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
      # Lock the primary group used by the instance optimistically; this
      # requires going via the node before it's locked, requiring
      # verification later on
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
674
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697

    elif level == locking.LEVEL_NODE:
      # If an allocator is used, then we lock all the nodes in the current
      # instance group, as we don't know yet which ones will be selected;
      # if we replace the nodes without using an allocator, locks are
      # already declared in ExpandNames; otherwise, we need to lock all the
      # instance nodes for disk re-creation
      if self.op.iallocator:
        assert not self.op.nodes
        assert not self.needed_locks[locking.LEVEL_NODE]
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1

        # Lock member nodes of the group of the primary node
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
          self.needed_locks[locking.LEVEL_NODE].extend(
            self.cfg.GetNodeGroup(group_uuid).members)

        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
      elif not self.op.nodes:
        self._LockInstancesNodes(primary_only=False)
    elif level == locking.LEVEL_NODE_RES:
      # Copy node locks
      self.needed_locks[locking.LEVEL_NODE_RES] = \
698
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
699
700
701
702
703
704
705

  def BuildHooksEnv(self):
    """Build hooks env.

    This runs on master, primary and secondary nodes of the instance.

    """
706
    return BuildInstanceHookEnvByObject(self, self.instance)
707
708
709
710
711
712
713
714
715
716
717
718
719
720

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
    return (nl, nl)

  def CheckPrereq(self):
    """Check prerequisites.

    This checks that the instance is in the cluster and is not running.

    """
721
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
722
723
    assert instance is not None, \
      "Cannot retrieve locked instance %s" % self.op.instance_name
Thomas Thrainer's avatar
Thomas Thrainer committed
724
725
    if self.op.node_uuids:
      if len(self.op.node_uuids) != len(instance.all_nodes):
726
727
728
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
                                   " %d replacement nodes were specified" %
                                   (instance.name, len(instance.all_nodes),
Thomas Thrainer's avatar
Thomas Thrainer committed
729
                                    len(self.op.node_uuids)),
730
731
                                   errors.ECODE_INVAL)
      assert instance.disk_template != constants.DT_DRBD8 or \
Thomas Thrainer's avatar
Thomas Thrainer committed
732
             len(self.op.node_uuids) == 2
733
      assert instance.disk_template != constants.DT_PLAIN or \
Thomas Thrainer's avatar
Thomas Thrainer committed
734
735
             len(self.op.node_uuids) == 1
      primary_node = self.op.node_uuids[0]
736
737
738
    else:
      primary_node = instance.primary_node
    if not self.op.iallocator:
739
      CheckNodeOnline(self, primary_node)
740
741
742
743
744
745
746
747
748
749

    if instance.disk_template == constants.DT_DISKLESS:
      raise errors.OpPrereqError("Instance '%s' has no disks" %
                                 self.op.instance_name, errors.ECODE_INVAL)

    # Verify if node group locks are still correct
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
    if owned_groups:
      # Node group locks are acquired only for the primary node (and only
      # when the allocator is used)
750
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
751
                              primary_only=True)
752
753
754
755

    # if we replace nodes *and* the old primary is offline, we don't
    # check the instance state
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
Thomas Thrainer's avatar
Thomas Thrainer committed
756
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
757
758
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
                         msg="cannot recreate disks")
759
760
761
762
763
764
765
766
767
768
769

    if self.op.disks:
      self.disks = dict(self.op.disks)
    else:
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))

    maxidx = max(self.disks.keys())
    if maxidx >= len(instance.disks):
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
                                 errors.ECODE_INVAL)

Thomas Thrainer's avatar
Thomas Thrainer committed
770
    if ((self.op.node_uuids or self.op.iallocator) and
771
772
773
774
775
776
777
778
779
780
         sorted(self.disks.keys()) != range(len(instance.disks))):
      raise errors.OpPrereqError("Can't recreate disks partially and"
                                 " change the nodes at the same time",
                                 errors.ECODE_INVAL)

    self.instance = instance

    if self.op.iallocator:
      self._RunAllocator()
      # Release unneeded node and node resource locks
Thomas Thrainer's avatar
Thomas Thrainer committed
781
782
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
783
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
784
785
786

    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)

Thomas Thrainer's avatar
Thomas Thrainer committed
787
788
    if self.op.node_uuids:
      node_uuids = self.op.node_uuids
789
    else:
Thomas Thrainer's avatar
Thomas Thrainer committed
790
      node_uuids = instance.all_nodes
791
    excl_stor = compat.any(
Thomas Thrainer's avatar
Thomas Thrainer committed
792
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
793
794
      )
    for new_params in self.disks.values():
795
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
796

797
798
799
800
801
802
803
804
805
806
  def Exec(self, feedback_fn):
    """Recreate the disks.

    """
    assert (self.owned_locks(locking.LEVEL_NODE) ==
            self.owned_locks(locking.LEVEL_NODE_RES))

    to_skip = []
    mods = [] # keeps track of needed changes

807
    for idx, disk in enumerate(self.instance.disks):
808
809
810
811
812
813
814
815
      try:
        changes = self.disks[idx]
      except KeyError:
        # Disk should not be recreated
        to_skip.append(idx)
        continue

      # update secondaries for disks, if needed
816
      if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
817
        # need to update the nodes and minors
Thomas Thrainer's avatar
Thomas Thrainer committed
818
        assert len(self.op.node_uuids) == 2
819
820
821
        assert len(disk.logical_id) == 6 # otherwise disk internals
                                         # have changed
        (_, _, old_port, _, _, old_secret) = disk.logical_id
Thomas Thrainer's avatar
Thomas Thrainer committed
822
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
823
                                                self.instance.uuid)
Thomas Thrainer's avatar
Thomas Thrainer committed
824
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
825
826
827
828
829
830
831
832
833
834
                  new_minors[0], new_minors[1], old_secret)
        assert len(disk.logical_id) == len(new_id)
      else:
        new_id = None

      mods.append((idx, new_id, changes))

    # now that we have passed all asserts above, we can apply the mods
    # in a single run (to avoid partial changes)
    for idx, new_id, changes in mods:
835
      disk = self.instance.disks[idx]
836
      if new_id is not None:
837
        assert disk.dev_type == constants.DT_DRBD8
838
839
840
        disk.logical_id = new_id
      if changes:
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
841
842
                    mode=changes.get(constants.IDISK_MODE, None),
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
843
844

    # change primary node, if needed
Thomas Thrainer's avatar
Thomas Thrainer committed
845
    if self.op.node_uuids:
846
      self.instance.primary_node = self.op.node_uuids[0]
847
848
849
      self.LogWarning("Changing the instance's nodes, you will have to"
                      " remove any disks left on the older nodes manually")

Thomas Thrainer's avatar
Thomas Thrainer committed
850
    if self.op.node_uuids:
851
      self.cfg.Update(self.instance, feedback_fn)
852
853
854

    # All touched nodes must be locked
    mylocks = self.owned_locks(locking.LEVEL_NODE)
855
856
    assert mylocks.issuperset(frozenset(self.instance.all_nodes))
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
857
858
859
860

    # TODO: Release node locks before wiping, or explain why it's not possible
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
      wipedisks = [(idx, disk, 0)
861
                   for (idx, disk) in enumerate(self.instance.disks)
862
                   if idx not in to_skip]
863
864
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
                         cleanup=new_disks)
865
866


867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
def _PerformNodeInfoCall(lu, node_uuids, vg):
  """Prepares the input and performs a node info call.

  @type lu: C{LogicalUnit}
  @param lu: a logical unit from which we get configuration data
  @type node_uuids: list of string
  @param node_uuids: list of node UUIDs to perform the call for
  @type vg: string
  @param vg: the volume group's name

  """
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
                                                  node_uuids)
  hvname = lu.cfg.GetHypervisorType()
  hvparams = lu.cfg.GetClusterInfo().hvparams
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
                                   [(hvname, hvparams[hvname])])
  return nodeinfo


def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
  """Checks the vg capacity for a given node.

  @type node_info: tuple (_, list of dicts, _)
  @param node_info: the result of the node info call for one node
  @type node_name: string
  @param node_name: the name of the node
  @type vg: string
  @param vg: volume group name
  @type requested: int
  @param requested: the amount of disk in MiB to check for
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
      or we cannot check the node

  """
  (_, space_info, _) = node_info
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
      space_info, constants.ST_LVM_VG)
  if not lvm_vg_info:
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
  vg_free = lvm_vg_info.get("storage_free", None)
  if not isinstance(vg_free, int):
    raise errors.OpPrereqError("Can't compute free disk space on node"
                               " %s for vg %s, result was '%s'" %
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
  if requested > vg_free:
    raise errors.OpPrereqError("Not enough disk space on target node %s"
                               " vg %s: required %d MiB, available %d MiB" %
                               (node_name, vg, requested, vg_free),
                               errors.ECODE_NORES)


Thomas Thrainer's avatar
Thomas Thrainer committed
920
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
921
922
923
924
925
926
927
928
929
  """Checks if nodes have enough free disk space in the specified VG.

  This function checks if all given nodes have the needed amount of
  free disk. In case any node has less disk or we cannot get the
  information from the node, this function raises an OpPrereqError
  exception.

  @type lu: C{LogicalUnit}
  @param lu: a logical unit from which we get configuration data
Thomas Thrainer's avatar
Thomas Thrainer committed
930
931
  @type node_uuids: C{list}
  @param node_uuids: the list of node UUIDs to check
932
933
934
935
936
937
938
939
  @type vg: C{str}
  @param vg: the volume group to check
  @type requested: C{int}
  @param requested: the amount of disk in MiB to check for
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
      or we cannot check the node

  """
940
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
941
942
943
  for node_uuid in node_uuids:
    node_name = lu.cfg.GetNodeName(node_uuid)
    info = nodeinfo[node_uuid]
Thomas Thrainer's avatar
Thomas Thrainer committed
944
    info.Raise("Cannot get current information from node %s" % node_name,
945
               prereq=True, ecode=errors.ECODE_ENVIRON)
946
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
947
948


Thomas Thrainer's avatar
Thomas Thrainer committed
949
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
950
951
952
953
954
955
956
957
958
  """Checks if nodes have enough free disk space in all the VGs.

  This function checks if all given nodes have the needed amount of
  free disk. In case any node has less disk or we cannot get the
  information from the node, this function raises an OpPrereqError
  exception.

  @type lu: C{LogicalUnit}
  @param lu: a logical unit from which we get configuration data
Thomas Thrainer's avatar
Thomas Thrainer committed
959
960
  @type node_uuids: C{list}
  @param node_uuids: the list of node UUIDs to check
961
962
963
964
965
966
967
968
  @type req_sizes: C{dict}
  @param req_sizes: the hash of vg and corresponding amount of disk in
      MiB to check for
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
      or we cannot check the node

  """
  for vg, req_size in req_sizes.items():
Thomas Thrainer's avatar
Thomas Thrainer committed
969
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001


def _DiskSizeInBytesToMebibytes(lu, size):
  """Converts a disk size in bytes to mebibytes.

  Warns and rounds up if the size isn't an even multiple of 1 MiB.

  """
  (mib, remainder) = divmod(size, 1024 * 1024)

  if remainder != 0:
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
                  " to not overwrite existing data (%s bytes will not be"
                  " wiped)", (1024 * 1024) - remainder)
    mib += 1

  return mib


def _CalcEta(time_taken, written, total_size):
  """Calculates the ETA based on size written and total size.

  @param time_taken: The time taken so far
  @param written: amount written so far
  @param total_size: The total size of data to be written
  @return: The remaining time in seconds

  """
  avg_time = time_taken / float(written)
  return (total_size - written) * avg_time


1002
def WipeDisks(lu, instance, disks=None):
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
  """Wipes instance disks.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @type instance: L{objects.Instance}
  @param instance: the instance whose disks we should create
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
  @param disks: Disk details; tuple contains disk index, disk object and the
    start offset

  """
Thomas Thrainer's avatar
Thomas Thrainer committed
1014
1015
  node_uuid = instance.primary_node
  node_name = lu.cfg.GetNodeName(node_uuid)
1016
1017
1018
1019
1020
1021
1022

  if disks is None:
    disks = [(idx, disk, 0)
             for (idx, disk) in enumerate(instance.disks)]

  logging.info("Pausing synchronization of disks of instance '%s'",
               instance.name)
Thomas Thrainer's avatar
Thomas Thrainer committed
1023
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1024
1025
1026
                                                  (map(compat.snd, disks),
                                                   instance),
                                                  True)
Thomas Thrainer's avatar
Thomas Thrainer committed
1027
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055

  for idx, success in enumerate(result.payload):
    if not success:
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
                   " failed", idx, instance.name)

  try:
    for (idx, device, offset) in disks:
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
      wipe_chunk_size = \
        int(min(constants.MAX_WIPE_CHUNK,
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))

      size = device.size
      last_output = 0
      start_time = time.time()

      if offset == 0:
        info_text = ""
      else:
        info_text = (" (from %s to %s)" %
                     (utils.FormatUnit(offset, "h"),
                      utils.FormatUnit(size, "h")))

      lu.LogInfo("* Wiping disk %s%s", idx, info_text)

      logging.info("Wiping disk %d for instance %s on node %s using"
Thomas Thrainer's avatar
Thomas Thrainer committed
1056
1057
                   " chunk size %s", idx, instance.name, node_name,
                   wipe_chunk_size)
1058
1059
1060
1061
1062
1063
1064

      while offset < size:
        wipe_size = min(wipe_chunk_size, size - offset)

        logging.debug("Wiping disk %d, offset %s, chunk %s",
                      idx, offset, wipe_size)

Thomas Thrainer's avatar
Thomas Thrainer committed
1065
1066
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
                                           offset, wipe_size)
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
                     (idx, offset, wipe_size))

        now = time.time()
        offset += wipe_size
        if now - last_output >= 60:
          eta = _CalcEta(now - start_time, offset, size)
          lu.LogInfo(" - done: %.1f%% ETA: %s",
                     offset / float(size) * 100, utils.FormatSeconds(eta))
          last_output = now
  finally:
    logging.info("Resuming synchronization of disks for instance '%s'",
                 instance.name)

Thomas Thrainer's avatar
Thomas Thrainer committed
1081
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1082
1083
1084
1085
1086
1087
                                                    (map(compat.snd, disks),
                                                     instance),
                                                    False)

    if result.fail_msg:
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
Thomas Thrainer's avatar
Thomas Thrainer committed
1088
                    node_name, result.fail_msg)
1089
1090
1091
1092
1093
1094
1095
    else:
      for idx, success in enumerate(result.payload):
        if not success:
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
                        " failed", idx, instance.name)


1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
  """Wrapper for L{WipeDisks} that handles errors.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @type instance: L{objects.Instance}
  @param instance: the instance whose disks we should wipe
  @param disks: see L{WipeDisks}
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
      case of error
  @raise errors.OpPrereqError: in case of failure

  """
  try:
    WipeDisks(lu, instance, disks=disks)
  except errors.OpExecError:
    logging.warning("Wiping disks for instance '%s' failed",
                    instance.name)
1114
    _UndoCreateDisks(lu, cleanup, instance)
1115
1116
1117
    raise


1118
def ExpandCheckDisks(instance, disks):
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
  """Return the instance disks selected by the disks list

  @type disks: list of L{objects.Disk} or None
  @param disks: selected disks
  @rtype: list of L{objects.Disk}
  @return: selected instance disks to act on

  """
  if disks is None:
    return instance.disks
  else:
    if not set(disks).issubset(instance.disks):
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1132
1133
                                   " target instance: expected a subset of %r,"
                                   " got %r" % (instance.disks, disks))
1134
1135
1136
    return disks


1137
def WaitForSync(lu, instance, disks=None, oneshot=False):
1138
1139
1140
1141
1142
1143
  """Sleep and poll for an instance's disk to sync.

  """
  if not instance.disks or disks is not None and not disks:
    return True

1144
  disks = ExpandCheckDisks(instance, disks)
1145
1146
1147
1148

  if not oneshot:
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)

Thomas Thrainer's avatar
Thomas Thrainer committed
1149
1150
  node_uuid = instance.primary_node
  node_name = lu.cfg.GetNodeName(node_uuid)
1151
1152
1153
1154
1155
1156
1157
1158
1159

  # TODO: Convert to utils.Retry

  retries = 0
  degr_retries = 10 # in seconds, as we sleep 1 second each time
  while True:
    max_time = 0
    done = True
    cumul_degraded = False
Thomas Thrainer's avatar
Thomas Thrainer committed
1160
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1161
1162
    msg = rstats.fail_msg
    if msg:
Thomas Thrainer's avatar
Thomas Thrainer committed
1163
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1164
1165
1166
      retries += 1
      if retries >= 10:
        raise errors.RemoteError("Can't contact node %s for mirror data,"
Thomas Thrainer's avatar
Thomas Thrainer committed
1167
                                 " aborting." % node_name)
1168
1169
1170
1171
1172
1173
1174
      time.sleep(6)
      continue
    rstats = rstats.payload
    retries = 0
    for i, mstat in enumerate(rstats):
      if mstat is None:
        lu.LogWarning("Can't compute data for node %s/%s",
Thomas Thrainer's avatar
Thomas Thrainer committed
1175
                      node_name, disks[i].iv_name)
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
        continue

      cumul_degraded = (cumul_degraded or
                        (mstat.is_degraded and mstat.sync_percent is None))
      if mstat.sync_percent is not None:
        done = False
        if mstat.estimated_time is not None:
          rem_time = ("%s remaining (estimated)" %
                      utils.FormatSeconds(mstat.estimated_time))
          max_time = mstat.estimated_time
        else:
          rem_time = "no time estimate"
        lu.LogInfo("- device %s: %5.2f%% done, %s",
                   disks[i].iv_name, mstat.sync_percent, rem_time)

    # if we're done but degraded, let's do a few small retries, to
    # make sure we see a stable and not transient situation; therefore
    # we force restart of the loop
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
      logging.info("Degraded disks found, %d retries left", degr_retries)
      degr_retries -= 1
      time.sleep(1)
      continue

    if done or oneshot:
      break

    time.sleep(min(60, max_time))

  if done:
    lu.LogInfo("Instance %s's disks are in sync", instance.name)

  return not cumul_degraded


1211
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1212
1213
1214
1215
1216
1217
1218
1219
1220
  """Shutdown block devices of an instance.

  This does the shutdown on all nodes of the instance.

  If the ignore_primary is false, errors on the primary node are
  ignored.

  """
  all_result = True
Dimitris Aragiorgis's avatar
Dimitris Aragiorgis committed
1221
1222
1223
1224

  if disks is None:
    # only mark instance disks as inactive if all disks are affected
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1225
  disks = ExpandCheckDisks(instance, disks)
1226
1227

  for disk in disks:
Thomas Thrainer's avatar
Thomas Thrainer committed
1228
1229
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1230
1231
1232
      msg = result.fail_msg
      if msg:
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
Thomas Thrainer's avatar
Thomas Thrainer committed
1233
1234
1235
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
        if ((node_uuid == instance.primary_node and not ignore_primary) or
            (node_uuid != instance.primary_node and not result.offline)):
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
          all_result = False
  return all_result


def _SafeShutdownInstanceDisks(lu, instance, disks=None):
  """Shutdown block devices of an instance.

  This function checks if an instance is running, before calling
  _ShutdownInstanceDisks.

  """
1247
1248
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
  ShutdownInstanceDisks(lu, instance, disks=disks)
1249
1250


1251
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
Dimitris Aragiorgis's avatar
Dimitris Aragiorgis committed
1252
                          ignore_size=False):
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
  """Prepare the block devices for an instance.

  This sets up the block devices on all nodes.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @type instance: L{objects.Instance}
  @param instance: the instance for whose disks we assemble
  @type disks: list of L{objects.Disk} or None
  @param disks: which disks to assemble (or all, if None)
  @type ignore_secondaries: boolean
  @param ignore_secondaries: if true, errors on secondary nodes
      won't result in an error return from the function
  @type ignore_size: boolean
  @param ignore_size: if true, the current known size of the disk
      will not be used during the disk activation, useful for cases
      when the size is wrong
  @return: False if the operation failed, otherwise a list of
      (host, instance_visible_name, node_visible_name)
      with the mapping from node devices to instance devices

  """
  device_info = []
  disks_ok = True
Dimitris Aragiorgis's avatar
Dimitris Aragiorgis committed
1277
1278
1279
1280
1281

  if disks is None:
    # only mark instance disks as active if all disks are affected
    lu.cfg.MarkInstanceDisksActive(instance.uuid)

1282
  disks = ExpandCheckDisks(instance, disks)
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294

  # With the two passes mechanism we try to reduce the window of
  # opportunity for the race condition of switching DRBD to primary
  # before handshaking occured, but we do not eliminate it

  # The proper fix would be to wait (with some limits) until the
  # connection has been made and drbd transitions from WFConnection
  # into any other network-connected state (Connected, SyncTarget,
  # SyncSource, etc.)

  # 1st pass, assemble on all nodes in secondary mode
  for idx, inst_disk in enumerate(disks):
Thomas Thrainer's avatar
Thomas Thrainer committed
1295
1296
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
                                  instance.primary_node):
1297
1298
1299
      if ignore_size:
        node_disk = node_disk.Copy()
        node_disk.UnsetSize()
Thomas Thrainer's avatar
Thomas Thrainer committed
1300
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1301
                                             instance.name, False, idx)
1302
1303
      msg = result.fail_msg
      if msg:
Thomas Thrainer's avatar
Thomas Thrainer committed
1304
        is_offline_secondary = (node_uuid in instance.secondary_nodes and
1305
1306
1307
                                result.offline)
        lu.LogWarning("Could not prepare block device %s on node %s"
                      " (is_primary=False, pass=1): %s",
Thomas Thrainer's avatar
Thomas Thrainer committed
1308
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1309
1310
1311
1312
1313
1314
1315
1316
1317
        if not (ignore_secondaries or is_offline_secondary):
          disks_ok = False

  # FIXME: race condition on drbd migration to primary

  # 2nd pass, do only the primary node
  for idx, inst_disk in enumerate(disks):
    dev_path = None

Thomas Thrainer's avatar
Thomas Thrainer committed
1318
1319
1320
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
                                  instance.primary_node):
      if node_uuid != instance.primary_node:
1321
1322
1323
1324
        continue
      if ignore_size:
        node_disk = node_disk.Copy()
        node_disk.UnsetSize()
Thomas Thrainer's avatar
Thomas Thrainer committed
1325
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1326
                                             instance.name, True, idx)
1327
1328
1329
1330
      msg = result.fail_msg
      if msg:
        lu.LogWarning("Could not prepare block device %s on node %s"
                      " (is_primary=True, pass=2): %s",
Thomas Thrainer's avatar
Thomas Thrainer committed
1331
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1332
1333
        disks_ok = False
      else:
Dimitris Aragiorgis's avatar
Dimitris Aragiorgis committed
1334
        dev_path, _ = result.payload
1335

Thomas Thrainer's avatar
Thomas Thrainer committed
1336
1337
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
                        inst_disk.iv_name, dev_path))
1338

1339
  if not disks_ok:
1340
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1341

1342
1343
1344
  return disks_ok, device_info


1345
def StartInstanceDisks(lu, instance, force):
1346
1347
1348
  """Start the disks of an instance.

  """
1349
1350
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
                                      ignore_secondaries=force)
1351
  if not disks_ok:
1352
    ShutdownInstanceDisks(lu, instance)
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
    if force is not None and not force:
      lu.LogWarning("",
                    hint=("If the message above refers to a secondary node,"
                          " you can retry the operation using '--force'"))
    raise errors.OpExecError("Disk consistency error")


class LUInstanceGrowDisk(LogicalUnit):
  """Grow a disk of an instance.

  """
  HPATH = "disk-grow"
  HTYPE = constants.HTYPE_INSTANCE
  REQ_BGL = False

  def ExpandNames(self):
    self._ExpandAndLockInstance()
    self.needed_locks[locking.LEVEL_NODE] = []
    self.needed_locks[locking.LEVEL_NODE_RES] = []
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE

  def DeclareLocks(self, level):
    if level == locking.LEVEL_NODE:
      self._LockInstancesNodes()
    elif level == locking.LEVEL_NODE_RES:
      # Copy node locks
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1381
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393

  def BuildHooksEnv(self):
    """Build hooks env.

    This runs on the master, the primary and all the secondaries.

    """
    env = {
      "DISK": self.op.disk,
      "AMOUNT": self.op.amount,
      "ABSOLUTE": self.op.absolute,
      }
1394
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
    return env

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
    return (nl, nl)

  def CheckPrereq(self):
    """Check prerequisites.

    This checks that the instance is in the cluster.

    """
1410
1411
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
    assert self.instance is not None, \
1412
      "Cannot retrieve locked instance %s" % self.op.instance_name
1413
    node_uuids = list(self.instance.all_nodes)
Thomas Thrainer's avatar
Thomas Thrainer committed
1414
1415
    for node_uuid in node_uuids:
      CheckNodeOnline(self, node_uuid)
1416
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1417

1418
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1419