instance_storage.py 108 KB
Newer Older
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Logical units dealing with storage of instances."""

import itertools
import logging
import os
import time

from ganeti import compat
from ganeti import constants
from ganeti import errors
from ganeti import ht
from ganeti import locking
from ganeti.masterd import iallocator
from ganeti import objects
from ganeti import utils
37
import ganeti.rpc.node as rpc
38
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
Thomas Thrainer's avatar
Thomas Thrainer committed
40
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43
  CheckDiskTemplateEnabled, IsInstanceRunning
44
45
46
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48
49
50
51
52
53
54

import ganeti.masterd.instance


_DISK_TEMPLATE_NAME_PREFIX = {
  constants.DT_PLAIN: "",
  constants.DT_RBD: ".rbd",
  constants.DT_EXT: ".ext",
55
56
  constants.DT_FILE: ".file",
  constants.DT_SHARED_FILE: ".sharedfile",
57
58
59
  }


Thomas Thrainer's avatar
Thomas Thrainer committed
60
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
61
                         excl_stor):
62
63
64
65
66
67
  """Create a single block device on a given node.

  This will not recurse over children of the device, so they must be
  created in advance.

  @param lu: the lu on whose behalf we execute
Thomas Thrainer's avatar
Thomas Thrainer committed
68
  @param node_uuid: the node on which to create the device
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
  @type instance: L{objects.Instance}
  @param instance: the instance which owns the device
  @type device: L{objects.Disk}
  @param device: the device to create
  @param info: the extra 'metadata' we should attach to the device
      (this will be represented as a LVM tag)
  @type force_open: boolean
  @param force_open: this parameter will be passes to the
      L{backend.BlockdevCreate} function where it specifies
      whether we run on primary or not, and it affects both
      the child assembly and the device own Open() execution
  @type excl_stor: boolean
  @param excl_stor: Whether exclusive_storage is active for the node

  """
84
85
86
  result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
                                       device.size, instance.name, force_open,
                                       info, excl_stor)
87
  result.Raise("Can't create block device %s on"
Thomas Thrainer's avatar
Thomas Thrainer committed
88
89
90
               " node %s for instance %s" % (device,
                                             lu.cfg.GetNodeName(node_uuid),
                                             instance.name))
91
92


Thomas Thrainer's avatar
Thomas Thrainer committed
93
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
94
95
96
97
98
99
100
101
102
103
104
                         info, force_open, excl_stor):
  """Create a tree of block devices on a given node.

  If this device type has to be created on secondaries, create it and
  all its children.

  If not, just recurse to children keeping the same 'force' value.

  @attention: The device has to be annotated already.

  @param lu: the lu on whose behalf we execute
Thomas Thrainer's avatar
Thomas Thrainer committed
105
  @param node_uuid: the node on which to create the device
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
  @type instance: L{objects.Instance}
  @param instance: the instance which owns the device
  @type device: L{objects.Disk}
  @param device: the device to create
  @type force_create: boolean
  @param force_create: whether to force creation of this device; this
      will be change to True whenever we find a device which has
      CreateOnSecondary() attribute
  @param info: the extra 'metadata' we should attach to the device
      (this will be represented as a LVM tag)
  @type force_open: boolean
  @param force_open: this parameter will be passes to the
      L{backend.BlockdevCreate} function where it specifies
      whether we run on primary or not, and it affects both
      the child assembly and the device own Open() execution
  @type excl_stor: boolean
  @param excl_stor: Whether exclusive_storage is active for the node

  @return: list of created devices
  """
  created_devices = []
  try:
    if device.CreateOnSecondary():
      force_create = True

    if device.children:
      for child in device.children:
Thomas Thrainer's avatar
Thomas Thrainer committed
133
134
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
                                    force_create, info, force_open, excl_stor)
135
136
137
138
139
        created_devices.extend(devs)

    if not force_create:
      return created_devices

Thomas Thrainer's avatar
Thomas Thrainer committed
140
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
141
                         excl_stor)
142
143
    # The device has been completely created, so there is no point in keeping
    # its subdevices in the list. We just add the device itself instead.
Thomas Thrainer's avatar
Thomas Thrainer committed
144
    created_devices = [(node_uuid, device)]
145
146
147
148
149
150
151
152
153
    return created_devices

  except errors.DeviceCreationError, e:
    e.created_devices.extend(created_devices)
    raise e
  except errors.OpExecError, e:
    raise errors.DeviceCreationError(str(e), created_devices)


Thomas Thrainer's avatar
Thomas Thrainer committed
154
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
155
156
157
158
  """Whether exclusive_storage is in effect for the given node.

  @type cfg: L{config.ConfigWriter}
  @param cfg: The cluster configuration
Thomas Thrainer's avatar
Thomas Thrainer committed
159
160
  @type node_uuid: string
  @param node_uuid: The node UUID
161
162
163
164
165
  @rtype: bool
  @return: The effective value of exclusive_storage
  @raise errors.OpPrereqError: if no node exists with the given name

  """
Thomas Thrainer's avatar
Thomas Thrainer committed
166
  ni = cfg.GetNodeInfo(node_uuid)
167
  if ni is None:
Thomas Thrainer's avatar
Thomas Thrainer committed
168
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
169
                               errors.ECODE_NOENT)
170
  return IsExclusiveStorageEnabledNode(cfg, ni)
171
172


Thomas Thrainer's avatar
Thomas Thrainer committed
173
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
174
175
176
177
178
179
                    force_open):
  """Wrapper around L{_CreateBlockDevInner}.

  This method annotates the root device first.

  """
180
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
Thomas Thrainer's avatar
Thomas Thrainer committed
181
182
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
183
184
185
                              force_open, excl_stor)


186
def _UndoCreateDisks(lu, disks_created, instance):
187
188
189
190
191
192
193
194
  """Undo the work performed by L{CreateDisks}.

  This function is called in case of an error to undo the work of
  L{CreateDisks}.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @param disks_created: the result returned by L{CreateDisks}
195
196
  @type instance: L{objects.Instance}
  @param instance: the instance for which disks were created
197
198

  """
Thomas Thrainer's avatar
Thomas Thrainer committed
199
  for (node_uuid, disk) in disks_created:
200
    result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
201
    result.Warn("Failed to remove newly-created disk %s on node %s" %
Thomas Thrainer's avatar
Thomas Thrainer committed
202
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
203
204


205
206
def CreateDisks(lu, instance, instance_disks=None,
                to_skip=None, target_node_uuid=None, disks=None):
207
208
209
210
  """Create all disks for an instance.

  This abstracts away some work from AddInstance.

211
212
213
214
  Since the instance may not have been saved to the config file yet, this
  function can not query the config file for the instance's disks; in that
  case they need to be passed as an argument.

215
216
217
218
  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @type instance: L{objects.Instance}
  @param instance: the instance whose disks we should create
219
220
221
  @type instance_disks: list of L{objects.Disk}
  @param instance_disks: the disks that belong to the instance; if not
      specified, retrieve them from config file
222
223
  @type to_skip: list
  @param to_skip: list of indices to skip
Thomas Thrainer's avatar
Thomas Thrainer committed
224
225
  @type target_node_uuid: string
  @param target_node_uuid: if passed, overrides the target node for creation
226
227
228
229
230
231
  @type disks: list of {objects.Disk}
  @param disks: the disks to create; if not specified, all the disks of the
      instance are created
  @return: information about the created disks, to be used to call
      L{_UndoCreateDisks}
  @raise errors.OpPrereqError: in case of error
232
233

  """
234
  info = GetInstanceInfoText(instance)
235
  if instance_disks is None:
236
    instance_disks = lu.cfg.GetInstanceDisks(instance.uuid)
Thomas Thrainer's avatar
Thomas Thrainer committed
237
238
  if target_node_uuid is None:
    pnode_uuid = instance.primary_node
239
240
241
242
    # We cannot use config's 'GetInstanceNodes' here as 'CreateDisks'
    # is used by 'LUInstanceCreate' and the instance object is not
    # stored in the config yet.
    all_node_uuids = []
243
    for disk in instance_disks:
244
245
246
      all_node_uuids.extend(disk.all_nodes)
    all_node_uuids = set(all_node_uuids)
    # ensure that primary node is always the first
247
    all_node_uuids.discard(pnode_uuid)
248
    all_node_uuids = [pnode_uuid] + list(all_node_uuids)
249
  else:
Thomas Thrainer's avatar
Thomas Thrainer committed
250
251
    pnode_uuid = target_node_uuid
    all_node_uuids = [pnode_uuid]
252

253
  if disks is None:
254
    disks = instance_disks
255

256
257
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)

258
  if instance.disk_template in constants.DTS_FILEBASED:
259
    file_storage_dir = os.path.dirname(instance_disks[0].logical_id[1])
Thomas Thrainer's avatar
Thomas Thrainer committed
260
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
261
262

    result.Raise("Failed to create directory '%s' on"
Thomas Thrainer's avatar
Thomas Thrainer committed
263
264
                 " node %s" % (file_storage_dir,
                               lu.cfg.GetNodeName(pnode_uuid)))
265
266

  disks_created = []
267
  for idx, device in enumerate(disks):
268
269
270
    if to_skip and idx in to_skip:
      continue
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
Thomas Thrainer's avatar
Thomas Thrainer committed
271
272
    for node_uuid in all_node_uuids:
      f_create = node_uuid == pnode_uuid
273
      try:
Thomas Thrainer's avatar
Thomas Thrainer committed
274
275
276
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
                        f_create)
        disks_created.append((node_uuid, device))
277
278
279
280
      except errors.DeviceCreationError, e:
        logging.warning("Creating disk %s for instance '%s' failed",
                        idx, instance.name)
        disks_created.extend(e.created_devices)
281
        _UndoCreateDisks(lu, disks_created, instance)
282
        raise errors.OpExecError(e.message)
283
  return disks_created
284
285


286
def ComputeDiskSizePerVG(disk_template, disks):
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
  """Compute disk size requirements in the volume group

  """
  def _compute(disks, payload):
    """Universal algorithm.

    """
    vgs = {}
    for disk in disks:
      vgs[disk[constants.IDISK_VG]] = \
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload

    return vgs

  # Required free disk space as a function of disk and swap space
  req_size_dict = {
    constants.DT_DISKLESS: {},
    constants.DT_PLAIN: _compute(disks, 0),
    # 128 MB are added for drbd metadata for each disk
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
    constants.DT_FILE: {},
    constants.DT_SHARED_FILE: {},
Santi Raffa's avatar
Santi Raffa committed
309
    constants.DT_GLUSTER: {},
310
311
312
313
314
315
316
317
318
    }

  if disk_template not in req_size_dict:
    raise errors.ProgrammerError("Disk template '%s' size requirement"
                                 " is unknown" % disk_template)

  return req_size_dict[disk_template]


319
def ComputeDisks(op, default_vg):
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
  """Computes the instance disks.

  @param op: The instance opcode
  @param default_vg: The default_vg to assume

  @return: The computed disks

  """
  disks = []
  for disk in op.disks:
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
    if mode not in constants.DISK_ACCESS_SET:
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
                                 mode, errors.ECODE_INVAL)
    size = disk.get(constants.IDISK_SIZE, None)
    if size is None:
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
    try:
      size = int(size)
    except (TypeError, ValueError):
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
                                 errors.ECODE_INVAL)

    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
    if ext_provider and op.disk_template != constants.DT_EXT:
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
                                 " disk template, not %s" %
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
                                  op.disk_template), errors.ECODE_INVAL)

    data_vg = disk.get(constants.IDISK_VG, default_vg)
    name = disk.get(constants.IDISK_NAME, None)
    if name is not None and name.lower() == constants.VALUE_NONE:
      name = None
    new_disk = {
      constants.IDISK_SIZE: size,
      constants.IDISK_MODE: mode,
      constants.IDISK_VG: data_vg,
      constants.IDISK_NAME: name,
      }

361
362
363
364
365
366
367
    for key in [
      constants.IDISK_METAVG,
      constants.IDISK_ADOPT,
      constants.IDISK_SPINDLES,
      ]:
      if key in disk:
        new_disk[key] = disk[key]
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385

    # For extstorage, demand the `provider' option and add any
    # additional parameters (ext-params) to the dict
    if op.disk_template == constants.DT_EXT:
      if ext_provider:
        new_disk[constants.IDISK_PROVIDER] = ext_provider
        for key in disk:
          if key not in constants.IDISK_PARAMS:
            new_disk[key] = disk[key]
      else:
        raise errors.OpPrereqError("Missing provider for template '%s'" %
                                   constants.DT_EXT, errors.ECODE_INVAL)

    disks.append(new_disk)

  return disks


386
def CheckRADOSFreeSpace():
387
388
389
390
391
392
393
  """Compute disk size requirements inside the RADOS cluster.

  """
  # For the RADOS cluster we assume there is always enough space.
  pass


Thomas Thrainer's avatar
Thomas Thrainer committed
394
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
395
396
397
398
399
400
401
402
                         iv_name, p_minor, s_minor):
  """Generate a drbd8 device complete with its children.

  """
  assert len(vgnames) == len(names) == 2
  port = lu.cfg.AllocatePort()
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())

403
  dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
404
405
406
                          logical_id=(vgnames[0], names[0]),
                          params={})
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
407
  dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
408
409
410
411
                          size=constants.DRBD_META_SIZE,
                          logical_id=(vgnames[1], names[1]),
                          params={})
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
412
  drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
Thomas Thrainer's avatar
Thomas Thrainer committed
413
                          logical_id=(primary_uuid, secondary_uuid, port,
414
415
416
417
418
419
420
421
                                      p_minor, s_minor,
                                      shared_secret),
                          children=[dev_data, dev_meta],
                          iv_name=iv_name, params={})
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
  return drbd_dev


422
def GenerateDiskTemplate(
423
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
424
  disk_info, file_storage_dir, file_driver, base_index,
425
  feedback_fn, full_disk_params):
426
427
428
429
430
431
432
  """Generate the entire disk layout for a given template type.

  """
  vgname = lu.cfg.GetVGName()
  disk_count = len(disk_info)
  disks = []

433
434
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)

435
436
437
  if template_name == constants.DT_DISKLESS:
    pass
  elif template_name == constants.DT_DRBD8:
Thomas Thrainer's avatar
Thomas Thrainer committed
438
    if len(secondary_node_uuids) != 1:
439
      raise errors.ProgrammerError("Wrong template configuration")
Thomas Thrainer's avatar
Thomas Thrainer committed
440
    remote_node_uuid = secondary_node_uuids[0]
441
    minors = lu.cfg.AllocateDRBDMinor(
442
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
443
444
445
446
447
448
449
450
451
452
453
454
455
456

    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
                                                       full_disk_params)
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]

    names = []
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
                                               for i in range(disk_count)]):
      names.append(lv_prefix + "_data")
      names.append(lv_prefix + "_meta")
    for idx, disk in enumerate(disk_info):
      disk_index = idx + base_index
      data_vg = disk.get(constants.IDISK_VG, vgname)
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
Thomas Thrainer's avatar
Thomas Thrainer committed
457
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
458
459
460
461
462
463
464
465
466
                                      disk[constants.IDISK_SIZE],
                                      [data_vg, meta_vg],
                                      names[idx * 2:idx * 2 + 2],
                                      "disk/%d" % disk_index,
                                      minors[idx * 2], minors[idx * 2 + 1])
      disk_dev.mode = disk[constants.IDISK_MODE]
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
      disks.append(disk_dev)
  else:
Thomas Thrainer's avatar
Thomas Thrainer committed
467
    if secondary_node_uuids:
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
      raise errors.ProgrammerError("Wrong template configuration")

    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
    if name_prefix is None:
      names = None
    else:
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
                                        (name_prefix, base_index + i)
                                        for i in range(disk_count)])

    if template_name == constants.DT_PLAIN:

      def logical_id_fn(idx, _, disk):
        vg = disk.get(constants.IDISK_VG, vgname)
        return (vg, names[idx])

Santi Raffa's avatar
Santi Raffa committed
484
485
486
487
488
489
    elif template_name == constants.DT_GLUSTER:
      logical_id_fn = lambda _1, disk_index, _2: \
        (file_driver, "ganeti/%s.%d" % (instance_uuid,
                                        disk_index))

    elif template_name in constants.DTS_FILEBASED: # Gluster handled above
490
491
      logical_id_fn = \
        lambda _, disk_index, disk: (file_driver,
492
493
                                     "%s/%s" % (file_storage_dir,
                                                names[idx]))
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
    elif template_name == constants.DT_BLOCK:
      logical_id_fn = \
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
                                       disk[constants.IDISK_ADOPT])
    elif template_name == constants.DT_RBD:
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
    elif template_name == constants.DT_EXT:
      def logical_id_fn(idx, _, disk):
        provider = disk.get(constants.IDISK_PROVIDER, None)
        if provider is None:
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
                                       " not found", constants.DT_EXT,
                                       constants.IDISK_PROVIDER)
        return (provider, names[idx])
    else:
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)

511
    dev_type = template_name
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528

    for idx, disk in enumerate(disk_info):
      params = {}
      # Only for the Ext template add disk_info to params
      if template_name == constants.DT_EXT:
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
        for key in disk:
          if key not in constants.IDISK_PARAMS:
            params[key] = disk[key]
      disk_index = idx + base_index
      size = disk[constants.IDISK_SIZE]
      feedback_fn("* disk %s, size %s" %
                  (disk_index, utils.FormatUnit(size, "h")))
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
                              logical_id=logical_id_fn(idx, disk_index, disk),
                              iv_name="disk/%d" % disk_index,
                              mode=disk[constants.IDISK_MODE],
529
530
                              params=params,
                              spindles=disk.get(constants.IDISK_SPINDLES))
531
532
533
534
535
536
537
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
      disks.append(disk_dev)

  return disks


538
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
539
540
541
542
543
544
  """Check the presence of the spindle options with exclusive_storage.

  @type diskdict: dict
  @param diskdict: disk parameters
  @type es_flag: bool
  @param es_flag: the effective value of the exlusive_storage flag
545
546
  @type required: bool
  @param required: whether spindles are required or just optional
547
548
549
550
551
552
553
554
  @raise errors.OpPrereqError when spindles are given and they should not

  """
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
      diskdict[constants.IDISK_SPINDLES] is not None):
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
                               " when exclusive storage is not active",
                               errors.ECODE_INVAL)
555
556
557
558
559
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
                                diskdict[constants.IDISK_SPINDLES] is None)):
    raise errors.OpPrereqError("You must specify spindles in instance disks"
                               " when exclusive storage is active",
                               errors.ECODE_INVAL)
560
561


562
563
564
565
566
567
568
569
570
571
572
class LUInstanceRecreateDisks(LogicalUnit):
  """Recreate an instance's missing disks.

  """
  HPATH = "instance-recreate-disks"
  HTYPE = constants.HTYPE_INSTANCE
  REQ_BGL = False

  _MODIFYABLE = compat.UniqueFrozenset([
    constants.IDISK_SIZE,
    constants.IDISK_MODE,
573
    constants.IDISK_SPINDLES,
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
    ])

  # New or changed disk parameters may have different semantics
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
    constants.IDISK_ADOPT,

    # TODO: Implement support changing VG while recreating
    constants.IDISK_VG,
    constants.IDISK_METAVG,
    constants.IDISK_PROVIDER,
    constants.IDISK_NAME,
    ]))

  def _RunAllocator(self):
    """Run the allocator based on input opcode.

    """
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)

    # FIXME
    # The allocator should actually run in "relocate" mode, but current
    # allocators don't support relocating all the nodes of an instance at
    # the same time. As a workaround we use "allocate" mode, but this is
    # suboptimal for two reasons:
    # - The instance name passed to the allocator is present in the list of
    #   existing instances, so there could be a conflict within the
    #   internal structures of the allocator. This doesn't happen with the
    #   current allocators, but it's a liability.
    # - The allocator counts the resources used by the instance twice: once
    #   because the instance exists already, and once because it tries to
    #   allocate a new instance.
    # The allocator could choose some of the nodes on which the instance is
    # running, but that's not a problem. If the instance nodes are broken,
    # they should be already be marked as drained or offline, and hence
    # skipped by the allocator. If instance disks have been lost for other
    # reasons, then recreating the disks on the same nodes should be fine.
    disk_template = self.instance.disk_template
    spindle_use = be_full[constants.BE_SPINDLE_USE]
612
613
614
615
    disks = [{
      constants.IDISK_SIZE: d.size,
      constants.IDISK_MODE: d.mode,
      constants.IDISK_SPINDLES: d.spindles,
616
      } for d in self.cfg.GetInstanceDisks(self.instance.uuid)]
617
618
619
620
621
622
623
624
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
                                        disk_template=disk_template,
                                        tags=list(self.instance.GetTags()),
                                        os=self.instance.os,
                                        nics=[{}],
                                        vcpus=be_full[constants.BE_VCPUS],
                                        memory=be_full[constants.BE_MAXMEM],
                                        spindle_use=spindle_use,
625
                                        disks=disks,
626
627
628
629
630
631
                                        hypervisor=self.instance.hypervisor,
                                        node_whitelist=None)
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)

    ial.Run(self.op.iallocator)

632
633
    assert req.RequiredNodes() == \
      len(self.cfg.GetInstanceNodes(self.instance.uuid))
634
635
636
637
638
639

    if not ial.success:
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
                                 " %s" % (self.op.iallocator, ial.info),
                                 errors.ECODE_NORES)

Thomas Thrainer's avatar
Thomas Thrainer committed
640
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
641
642
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
                 self.op.instance_name, self.op.iallocator,
Thomas Thrainer's avatar
Thomas Thrainer committed
643
                 utils.CommaJoin(self.op.nodes))
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658

  def CheckArguments(self):
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
      # Normalize and convert deprecated list of disk indices
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]

    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
    if duplicates:
      raise errors.OpPrereqError("Some disks have been specified more than"
                                 " once: %s" % utils.CommaJoin(duplicates),
                                 errors.ECODE_INVAL)

    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
    # when neither iallocator nor nodes are specified
    if self.op.iallocator or self.op.nodes:
659
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674

    for (idx, params) in self.op.disks:
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
      if unsupported:
        raise errors.OpPrereqError("Parameters for disk %s try to change"
                                   " unmodifyable parameter(s): %s" %
                                   (idx, utils.CommaJoin(unsupported)),
                                   errors.ECODE_INVAL)

  def ExpandNames(self):
    self._ExpandAndLockInstance()
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND

    if self.op.nodes:
Thomas Thrainer's avatar
Thomas Thrainer committed
675
676
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
    else:
      self.needed_locks[locking.LEVEL_NODE] = []
      if self.op.iallocator:
        # iallocator will select a new node in the same group
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET

    self.needed_locks[locking.LEVEL_NODE_RES] = []

  def DeclareLocks(self, level):
    if level == locking.LEVEL_NODEGROUP:
      assert self.op.iallocator is not None
      assert not self.op.nodes
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
      # Lock the primary group used by the instance optimistically; this
      # requires going via the node before it's locked, requiring
      # verification later on
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
696
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719

    elif level == locking.LEVEL_NODE:
      # If an allocator is used, then we lock all the nodes in the current
      # instance group, as we don't know yet which ones will be selected;
      # if we replace the nodes without using an allocator, locks are
      # already declared in ExpandNames; otherwise, we need to lock all the
      # instance nodes for disk re-creation
      if self.op.iallocator:
        assert not self.op.nodes
        assert not self.needed_locks[locking.LEVEL_NODE]
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1

        # Lock member nodes of the group of the primary node
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
          self.needed_locks[locking.LEVEL_NODE].extend(
            self.cfg.GetNodeGroup(group_uuid).members)

        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
      elif not self.op.nodes:
        self._LockInstancesNodes(primary_only=False)
    elif level == locking.LEVEL_NODE_RES:
      # Copy node locks
      self.needed_locks[locking.LEVEL_NODE_RES] = \
720
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
721
722
723
724
725
726
727

  def BuildHooksEnv(self):
    """Build hooks env.

    This runs on master, primary and secondary nodes of the instance.

    """
728
    return BuildInstanceHookEnvByObject(self, self.instance)
729
730
731
732
733

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
734
735
    nl = [self.cfg.GetMasterNode()] + \
      list(self.cfg.GetInstanceNodes(self.instance.uuid))
736
737
738
739
740
741
742
743
    return (nl, nl)

  def CheckPrereq(self):
    """Check prerequisites.

    This checks that the instance is in the cluster and is not running.

    """
744
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
745
746
    assert instance is not None, \
      "Cannot retrieve locked instance %s" % self.op.instance_name
Thomas Thrainer's avatar
Thomas Thrainer committed
747
    if self.op.node_uuids:
748
749
      inst_nodes = self.cfg.GetInstanceNodes(instance.uuid)
      if len(self.op.node_uuids) != len(inst_nodes):
750
751
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
                                   " %d replacement nodes were specified" %
752
                                   (instance.name, len(inst_nodes),
Thomas Thrainer's avatar
Thomas Thrainer committed
753
                                    len(self.op.node_uuids)),
754
755
                                   errors.ECODE_INVAL)
      assert instance.disk_template != constants.DT_DRBD8 or \
Thomas Thrainer's avatar
Thomas Thrainer committed
756
             len(self.op.node_uuids) == 2
757
      assert instance.disk_template != constants.DT_PLAIN or \
Thomas Thrainer's avatar
Thomas Thrainer committed
758
759
             len(self.op.node_uuids) == 1
      primary_node = self.op.node_uuids[0]
760
761
762
    else:
      primary_node = instance.primary_node
    if not self.op.iallocator:
763
      CheckNodeOnline(self, primary_node)
764
765
766
767
768
769
770
771
772
773

    if instance.disk_template == constants.DT_DISKLESS:
      raise errors.OpPrereqError("Instance '%s' has no disks" %
                                 self.op.instance_name, errors.ECODE_INVAL)

    # Verify if node group locks are still correct
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
    if owned_groups:
      # Node group locks are acquired only for the primary node (and only
      # when the allocator is used)
774
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
775
                              primary_only=True)
776
777
778
779

    # if we replace nodes *and* the old primary is offline, we don't
    # check the instance state
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
Thomas Thrainer's avatar
Thomas Thrainer committed
780
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
781
782
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
                         msg="cannot recreate disks")
783
784
785
786
787
788
789
790
791
792
793

    if self.op.disks:
      self.disks = dict(self.op.disks)
    else:
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))

    maxidx = max(self.disks.keys())
    if maxidx >= len(instance.disks):
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
                                 errors.ECODE_INVAL)

Thomas Thrainer's avatar
Thomas Thrainer committed
794
    if ((self.op.node_uuids or self.op.iallocator) and
795
796
797
798
799
800
801
802
803
804
         sorted(self.disks.keys()) != range(len(instance.disks))):
      raise errors.OpPrereqError("Can't recreate disks partially and"
                                 " change the nodes at the same time",
                                 errors.ECODE_INVAL)

    self.instance = instance

    if self.op.iallocator:
      self._RunAllocator()
      # Release unneeded node and node resource locks
Thomas Thrainer's avatar
Thomas Thrainer committed
805
806
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
807
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
808

Thomas Thrainer's avatar
Thomas Thrainer committed
809
810
    if self.op.node_uuids:
      node_uuids = self.op.node_uuids
811
    else:
812
      node_uuids = self.cfg.GetInstanceNodes(instance.uuid)
813
    excl_stor = compat.any(
Thomas Thrainer's avatar
Thomas Thrainer committed
814
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
815
816
      )
    for new_params in self.disks.values():
817
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
818

819
820
821
822
823
824
825
826
827
828
  def Exec(self, feedback_fn):
    """Recreate the disks.

    """
    assert (self.owned_locks(locking.LEVEL_NODE) ==
            self.owned_locks(locking.LEVEL_NODE_RES))

    to_skip = []
    mods = [] # keeps track of needed changes

829
830
    inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
    for idx, disk in enumerate(inst_disks):
831
832
833
834
835
836
837
838
      try:
        changes = self.disks[idx]
      except KeyError:
        # Disk should not be recreated
        to_skip.append(idx)
        continue

      # update secondaries for disks, if needed
839
      if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
840
        # need to update the nodes and minors
Thomas Thrainer's avatar
Thomas Thrainer committed
841
        assert len(self.op.node_uuids) == 2
842
843
844
        assert len(disk.logical_id) == 6 # otherwise disk internals
                                         # have changed
        (_, _, old_port, _, _, old_secret) = disk.logical_id
Thomas Thrainer's avatar
Thomas Thrainer committed
845
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
846
                                                self.instance.uuid)
Thomas Thrainer's avatar
Thomas Thrainer committed
847
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
848
849
850
851
852
853
854
855
856
857
                  new_minors[0], new_minors[1], old_secret)
        assert len(disk.logical_id) == len(new_id)
      else:
        new_id = None

      mods.append((idx, new_id, changes))

    # now that we have passed all asserts above, we can apply the mods
    # in a single run (to avoid partial changes)
    for idx, new_id, changes in mods:
858
      disk = inst_disks[idx]
859
      if new_id is not None:
860
        assert disk.dev_type == constants.DT_DRBD8
861
862
863
        disk.logical_id = new_id
      if changes:
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
864
865
                    mode=changes.get(constants.IDISK_MODE, None),
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
866
      self.cfg.Update(disk, feedback_fn)
867
868

    # change primary node, if needed
Thomas Thrainer's avatar
Thomas Thrainer committed
869
    if self.op.node_uuids:
870
      self.instance.primary_node = self.op.node_uuids[0]
871
872
873
      self.LogWarning("Changing the instance's nodes, you will have to"
                      " remove any disks left on the older nodes manually")

Thomas Thrainer's avatar
Thomas Thrainer committed
874
    if self.op.node_uuids:
875
      self.cfg.Update(self.instance, feedback_fn)
876
877
878

    # All touched nodes must be locked
    mylocks = self.owned_locks(locking.LEVEL_NODE)
879
880
    inst_nodes = self.cfg.GetInstanceNodes(self.instance.uuid)
    assert mylocks.issuperset(frozenset(inst_nodes))
881
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
882
883

    # TODO: Release node locks before wiping, or explain why it's not possible
884
    inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
885
886
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
      wipedisks = [(idx, disk, 0)
887
                   for (idx, disk) in enumerate(inst_disks)
888
                   if idx not in to_skip]
889
890
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
                         cleanup=new_disks)
891
892


893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
def _PerformNodeInfoCall(lu, node_uuids, vg):
  """Prepares the input and performs a node info call.

  @type lu: C{LogicalUnit}
  @param lu: a logical unit from which we get configuration data
  @type node_uuids: list of string
  @param node_uuids: list of node UUIDs to perform the call for
  @type vg: string
  @param vg: the volume group's name

  """
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
                                                  node_uuids)
  hvname = lu.cfg.GetHypervisorType()
  hvparams = lu.cfg.GetClusterInfo().hvparams
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
                                   [(hvname, hvparams[hvname])])
  return nodeinfo


def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
  """Checks the vg capacity for a given node.

  @type node_info: tuple (_, list of dicts, _)
  @param node_info: the result of the node info call for one node
  @type node_name: string
  @param node_name: the name of the node
  @type vg: string
  @param vg: volume group name
  @type requested: int
  @param requested: the amount of disk in MiB to check for
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
      or we cannot check the node

  """
  (_, space_info, _) = node_info
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
      space_info, constants.ST_LVM_VG)
  if not lvm_vg_info:
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
  vg_free = lvm_vg_info.get("storage_free", None)
  if not isinstance(vg_free, int):
    raise errors.OpPrereqError("Can't compute free disk space on node"
                               " %s for vg %s, result was '%s'" %
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
  if requested > vg_free:
    raise errors.OpPrereqError("Not enough disk space on target node %s"
                               " vg %s: required %d MiB, available %d MiB" %
                               (node_name, vg, requested, vg_free),
                               errors.ECODE_NORES)


Thomas Thrainer's avatar
Thomas Thrainer committed
946
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
947
948
949
950
951
952
953
954
955
  """Checks if nodes have enough free disk space in the specified VG.

  This function checks if all given nodes have the needed amount of
  free disk. In case any node has less disk or we cannot get the
  information from the node, this function raises an OpPrereqError
  exception.

  @type lu: C{LogicalUnit}
  @param lu: a logical unit from which we get configuration data
Thomas Thrainer's avatar
Thomas Thrainer committed
956
957
  @type node_uuids: C{list}
  @param node_uuids: the list of node UUIDs to check
958
959
960
961
962
963
964
965
  @type vg: C{str}
  @param vg: the volume group to check
  @type requested: C{int}
  @param requested: the amount of disk in MiB to check for
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
      or we cannot check the node

  """
966
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
967
968
969
  for node_uuid in node_uuids:
    node_name = lu.cfg.GetNodeName(node_uuid)
    info = nodeinfo[node_uuid]
Thomas Thrainer's avatar
Thomas Thrainer committed
970
    info.Raise("Cannot get current information from node %s" % node_name,
971
               prereq=True, ecode=errors.ECODE_ENVIRON)
972
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
973
974


Thomas Thrainer's avatar
Thomas Thrainer committed
975
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
976
977
978
979
980
981
982
983
984
  """Checks if nodes have enough free disk space in all the VGs.

  This function checks if all given nodes have the needed amount of
  free disk. In case any node has less disk or we cannot get the
  information from the node, this function raises an OpPrereqError
  exception.

  @type lu: C{LogicalUnit}
  @param lu: a logical unit from which we get configuration data
Thomas Thrainer's avatar
Thomas Thrainer committed
985
986
  @type node_uuids: C{list}
  @param node_uuids: the list of node UUIDs to check
987
988
989
990
991
992
993
994
  @type req_sizes: C{dict}
  @param req_sizes: the hash of vg and corresponding amount of disk in
      MiB to check for
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
      or we cannot check the node

  """
  for vg, req_size in req_sizes.items():
Thomas Thrainer's avatar
Thomas Thrainer committed
995
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027


def _DiskSizeInBytesToMebibytes(lu, size):
  """Converts a disk size in bytes to mebibytes.

  Warns and rounds up if the size isn't an even multiple of 1 MiB.

  """
  (mib, remainder) = divmod(size, 1024 * 1024)

  if remainder != 0:
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
                  " to not overwrite existing data (%s bytes will not be"
                  " wiped)", (1024 * 1024) - remainder)
    mib += 1

  return mib


def _CalcEta(time_taken, written, total_size):
  """Calculates the ETA based on size written and total size.

  @param time_taken: The time taken so far
  @param written: amount written so far
  @param total_size: The total size of data to be written
  @return: The remaining time in seconds

  """
  avg_time = time_taken / float(written)
  return (total_size - written) * avg_time


1028
def WipeDisks(lu, instance, disks=None):
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
  """Wipes instance disks.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @type instance: L{objects.Instance}
  @param instance: the instance whose disks we should create
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
  @param disks: Disk details; tuple contains disk index, disk object and the
    start offset

  """
Thomas Thrainer's avatar
Thomas Thrainer committed
1040
1041
  node_uuid = instance.primary_node
  node_name = lu.cfg.GetNodeName(node_uuid)
1042
1043

  if disks is None:
1044
    inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
1045
    disks = [(idx, disk, 0)
1046
             for (idx, disk) in enumerate(inst_disks)]
1047
1048
1049

  logging.info("Pausing synchronization of disks of instance '%s'",
               instance.name)
Thomas Thrainer's avatar
Thomas Thrainer committed
1050
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1051
1052
1053
                                                  (map(compat.snd, disks),
                                                   instance),
                                                  True)
Thomas Thrainer's avatar
Thomas Thrainer committed
1054
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082

  for idx, success in enumerate(result.payload):
    if not success:
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
                   " failed", idx, instance.name)

  try:
    for (idx, device, offset) in disks:
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
      wipe_chunk_size = \
        int(min(constants.MAX_WIPE_CHUNK,
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))

      size = device.size
      last_output = 0
      start_time = time.time()

      if offset == 0:
        info_text = ""
      else:
        info_text = (" (from %s to %s)" %
                     (utils.FormatUnit(offset, "h"),
                      utils.FormatUnit(size, "h")))

      lu.LogInfo("* Wiping disk %s%s", idx, info_text)

      logging.info("Wiping disk %d for instance %s on node %s using"
Thomas Thrainer's avatar
Thomas Thrainer committed
1083
1084
                   " chunk size %s", idx, instance.name, node_name,
                   wipe_chunk_size)
1085
1086
1087
1088
1089
1090
1091

      while offset < size:
        wipe_size = min(wipe_chunk_size, size - offset)

        logging.debug("Wiping disk %d, offset %s, chunk %s",
                      idx, offset, wipe_size)

Thomas Thrainer's avatar
Thomas Thrainer committed
1092
1093
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
                                           offset, wipe_size)
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
                     (idx, offset, wipe_size))

        now = time.time()
        offset += wipe_size
        if now - last_output >= 60:
          eta = _CalcEta(now - start_time, offset, size)
          lu.LogInfo(" - done: %.1f%% ETA: %s",
                     offset / float(size) * 100, utils.FormatSeconds(eta))
          last_output = now
  finally:
    logging.info("Resuming synchronization of disks for instance '%s'",
                 instance.name)

Thomas Thrainer's avatar
Thomas Thrainer committed
1108
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1109
1110
1111
1112
1113
1114
                                                    (map(compat.snd, disks),
                                                     instance),
                                                    False)

    if result.fail_msg:
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
Thomas Thrainer's avatar
Thomas Thrainer committed
1115
                    node_name, result.fail_msg)
1116
1117
1118
1119
1120
1121
1122
    else:
      for idx, success in enumerate(result.payload):
        if not success:
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
                        " failed", idx, instance.name)


1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
def ImageDisks(lu, instance, image, disks=None):
  """Dumps an image onto an instance disk.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @type instance: L{objects.Instance}
  @param instance: the instance whose disks we should create
  @type image: string
  @param image: the image whose disks we should create
  @type disks: None or list of ints
  @param disks: disk indices

  """
  node_uuid = instance.primary_node
  node_name = lu.cfg.GetNodeName(node_uuid)

1139
  inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
1140
  if disks is None:
1141
    disks = [(0, inst_disks[0])]
1142
  else:
1143
    disks = map(lambda idx: (idx, inst_disks[idx]), disks)
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185

  logging.info("Pausing synchronization of disks of instance '%s'",
               instance.name)
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
                                                  (map(compat.snd, disks),
                                                   instance),
                                                  True)
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)

  for idx, success in enumerate(result.payload):
    if not success:
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
                   " failed", idx, instance.name)

  try:
    for (idx, device) in disks:
      lu.LogInfo("Imaging disk '%d' for instance '%s' on node '%s'",
                 idx, instance.name, node_name)

      result = lu.rpc.call_blockdev_image(node_uuid, (device, instance),
                                          image, device.size)
      result.Raise("Could not image disk '%d' for instance '%s' on node '%s'" %
                   (idx, instance.name, node_name))
  finally:
    logging.info("Resuming synchronization of disks for instance '%s'",
                 instance.name)

    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
                                                    (map(compat.snd, disks),
                                                     instance),
                                                    False)

    if result.fail_msg:
      lu.LogWarning("Failed to resume disk synchronization for instance '%s' on"
                    " node '%s'", node_name, result.fail_msg)
    else:
      for idx, success in enumerate(result.payload):
        if not success:
          lu.LogWarning("Failed to resume synchronization of disk '%d' of"
                        " instance '%s'", idx, instance.name)


1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
  """Wrapper for L{WipeDisks} that handles errors.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @type instance: L{objects.Instance}
  @param instance: the instance whose disks we should wipe
  @param disks: see L{WipeDisks}
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
      case of error
  @raise errors.OpPrereqError: in case of failure

  """
  try:
    WipeDisks(lu, instance, disks=disks)
  except errors.OpExecError:
    logging.warning("Wiping disks for instance '%s' failed",
                    instance.name)
1204
    _UndoCreateDisks(lu, cleanup, instance)
1205
1206
1207
    raise


1208
def ExpandCheckDisks(instance_disks, disks):
1209
1210
1211
1212
1213
1214
1215
1216
1217
  """Return the instance disks selected by the disks list

  @type disks: list of L{objects.Disk} or None
  @param disks: selected disks
  @rtype: list of L{objects.Disk}
  @return: selected instance disks to act on

  """
  if disks is None:
1218
    return instance_disks
1219
  else:
1220
1221
1222
    inst_disks_uuids = [d.uuid for d in instance_disks]
    disks_uuids = [d.uuid for d in disks]
    if not set(disks_uuids).issubset(inst_disks_uuids):
1223
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1224
1225
                                   " target instance: expected a subset of %s,"
                                   " got %s" % (inst_disks_uuids, disks_uuids))
1226
1227
1228
    return disks


1229
def WaitForSync(lu, instance, disks=None, oneshot=False):
1230
1231
1232
  """Sleep and poll for an instance's disk to sync.

  """
1233
1234
  inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
  if not inst_disks or disks is not None and not disks:
1235
1236
    return True

1237
  disks = ExpandCheckDisks(inst_disks, disks)
1238
1239
1240
1241

  if not oneshot:
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)

Thomas Thrainer's avatar
Thomas Thrainer committed
1242
1243
  node_uuid = instance.primary_node
  node_name = lu.cfg.GetNodeName(node_uuid)
1244
1245
1246
1247
1248
1249
1250
1251
1252

  # TODO: Convert to utils.Retry

  retries = 0
  degr_retries = 10 # in seconds, as we sleep 1 second each time
  while True:
    max_time = 0
    done = True
    cumul_degraded = False
Thomas Thrainer's avatar
Thomas Thrainer committed
1253
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1254
1255
    msg = rstats.fail_msg
    if msg:
Thomas Thrainer's avatar
Thomas Thrainer committed
1256
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1257
1258
1259
      retries += 1
      if retries >= 10:
        raise errors.RemoteError("Can't contact node %s for mirror data,"
Thomas Thrainer's avatar
Thomas Thrainer committed
1260
                                 " aborting." % node_name)
1261
1262
1263
1264
1265
1266
1267
      time.sleep(6)
      continue
    rstats = rstats.payload
    retries = 0
    for i, mstat in enumerate(rstats):
      if mstat is None:
        lu.LogWarning("Can't compute data for node %s/%s",
Thomas Thrainer's avatar
Thomas Thrainer committed
1268
                      node_name, disks[i].iv_name)
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
        continue

      cumul_degraded = (cumul_degraded or
                        (mstat.is_degraded and mstat.sync_percent is None))
      if mstat.sync_percent is not None:
        done = False
        if mstat.estimated_time is not None:
          rem_time = ("%s remaining (estimated)" %
                      utils.FormatSeconds(mstat.estimated_time))
          max_time = mstat.estimated_time
        else:
          rem_time = "no time estimate"
1281
          max_time = 5 # sleep at least a bit between retries
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
        lu.LogInfo("- device %s: %5.2f%% done, %s",
                   disks[i].iv_name, mstat.sync_percent, rem_time)

    # if we're done but degraded, let's do a few small retries, to
    # make sure we see a stable and not transient situation; therefore
    # we force restart of the loop
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
      logging.info("Degraded disks found, %d retries left", degr_retries)
      degr_retries -= 1
      time.sleep(1)
      continue

    if done or oneshot:
      break

    time.sleep(min(60, max_time))

  if done:
    lu.LogInfo("Instance %s's disks are in sync", instance.name)

  return not cumul_degraded


1305
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1306
1307
1308
1309
1310
1311
1312
  """Shutdown block devices of an instance.

  This does the shutdown on all nodes of the instance.

  If the ignore_primary is false, errors on the primary node are
  ignored.

1313
1314
1315
  Modifies the configuration of the instance, so the caller should re-read the
  instance configuration, if needed.

1316
1317
  """
  all_result = True
Dimitris Aragiorgis's avatar
Dimitris Aragiorgis committed
1318
1319
1320
1321

  if disks is None:
    # only mark instance disks as inactive if all disks are affected
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1322
1323
  inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
  disks = ExpandCheckDisks(inst_disks, disks)
1324
1325

  for disk in disks:
Thomas Thrainer's avatar
Thomas Thrainer committed
1326
1327
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1328
1329
1330
      msg = result.fail_msg
      if msg:
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
Thomas Thrainer's avatar
Thomas Thrainer committed
1331
1332
1333
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
        if ((node_uuid == instance.primary_node and not ignore_primary) or
            (node_uuid != instance.primary_node and not result.offline)):
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
          all_result = False
  return all_result


def _SafeShutdownInstanceDisks(lu, instance, disks=None):
  """Shutdown block devices of an instance.

  This function checks if an instance is running, before calling
  _ShutdownInstanceDisks.

  """
1345
1346
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
  ShutdownInstanceDisks(lu, instance, disks=disks)
1347
1348


1349
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
Dimitris Aragiorgis's avatar
Dimitris Aragiorgis committed
1350
                          ignore_size=False):
1351
1352
1353
1354
  """Prepare the block devices for an instance.

  This sets up the block devices on all nodes.

1355
1356
1357
  Modifies the configuration of the instance, so the caller should re-read the
  instance configuration, if needed.

1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @type instance: L{objects.Instance}
  @param instance: the instance for whose disks we assemble
  @type disks: list of L{objects.Disk} or None
  @param disks: which disks to assemble (or all, if None)
  @type ignore_secondaries: boolean
  @param ignore_secondaries: if true, errors on secondary nodes
      won't result in an error return from the function
  @type ignore_size: boolean
  @param ignore_size: if true, the current known size of the disk
      will not be used during the disk activation, useful for cases
      when the size is wrong
  @return: False if the operation failed, otherwise a list of
      (host, instance_visible_name, node_visible_name)
      with the mapping from node devices to instance devices

  """
  device_info = []
  disks_ok = True
Dimitris Aragiorgis's avatar
Dimitris Aragiorgis committed
1378
1379
1380

  if disks is None:
    # only mark instance disks as active if all disks are affected
1381
    instance = lu.cfg.MarkInstanceDisksActive(instance.uuid)
Dimitris Aragiorgis's avatar
Dimitris Aragiorgis committed
1382

1383
1384
  inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
  disks = ExpandCheckDisks(inst_disks, disks)
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396

  # With the two passes mechanism we try to reduce the window of
  # opportunity for the race condition of switching DRBD to primary
  # before handshaking occured, but we do not eliminate it

  # The proper fix would be to wait (with some limits) until the
  # connection has been made and drbd transitions from WFConnection
  # into any other network-connected state (Connected, SyncTarget,
  # SyncSource, etc.)

  # 1st pass, assemble on all nodes in secondary mode
  for idx, inst_disk in enumerate(disks):
Thomas Thrainer's avatar
Thomas Thrainer committed
1397
1398
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
                                  instance.primary_node):