Commit cff90b79 authored by Iustin Pop's avatar Iustin Pop
Browse files

Enhance DBRD8 disk replacement (same nodes)

This patch adds enhanced reporting and much more checks to the disk
replacement (when not switching the secondary).

Reviewed-by: imsnah
parent 9db6dbce
......@@ -3419,10 +3419,12 @@ class LUReplaceDisks(LogicalUnit):
" the secondary while doing a primary"
" node disk replacement")
self.tgt_node = instance.primary_node
self.oth_node = instance.secondary_nodes[0]
elif self.op.mode == constants.REPLACE_DISK_SEC:
self.new_node = remote_node # this can be None, in which case
# we don't change the secondary
self.tgt_node = instance.secondary_nodes[0]
self.oth_node = instance.primary_node
else:
raise errors.ProgrammerError("Unhandled disk replace mode")
......@@ -3543,13 +3545,54 @@ class LUReplaceDisks(LogicalUnit):
- remove old LVs (which have the name name_replaces.<time_t>)
Failures are not very well handled.
"""
steps_total = 6
warning, info = (self.processor.LogWarning, self.processor.LogInfo)
instance = self.instance
iv_names = {}
vgname = self.cfg.GetVGName()
# start of work
cfg = self.cfg
tgt_node = self.tgt_node
oth_node = self.oth_node
# Step: check device activation
self.processor.LogStep(1, steps_total, "check device existence")
info("checking volume groups")
my_vg = cfg.GetVGName()
results = rpc.call_vg_list([oth_node, tgt_node])
if not results:
raise errors.OpExecError("Can't list volume groups on the nodes")
for node in oth_node, tgt_node:
res = results.get(node, False)
if not res or my_vg not in res:
raise errors.OpExecError("Volume group '%s' not found on %s" %
(my_vg, node))
for dev in instance.disks:
if not dev.iv_name in self.op.disks:
continue
for node in tgt_node, oth_node:
info("checking %s on %s" % (dev.iv_name, node))
cfg.SetDiskID(dev, node)
if not rpc.call_blockdev_find(node, dev):
raise errors.OpExecError("Can't find device %s on node %s" %
(dev.iv_name, node))
# Step: check other node consistency
self.processor.LogStep(2, steps_total, "check peer consistency")
for dev in instance.disks:
if not dev.iv_name in self.op.disks:
continue
info("checking %s consistency on %s" % (dev.iv_name, oth_node))
if not _CheckDiskConsistency(self.cfg, dev, oth_node,
oth_node==instance.primary_node):
raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
" to replace disks on this node (%s)" %
(oth_node, tgt_node))
# Step: create new storage
self.processor.LogStep(3, steps_total, "allocate new storage")
for dev in instance.disks:
if not dev.iv_name in self.op.disks:
continue
......@@ -3564,8 +3607,8 @@ class LUReplaceDisks(LogicalUnit):
new_lvs = [lv_data, lv_meta]
old_lvs = dev.children
iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
logger.Info("adding new local storage on %s for %s" %
(tgt_node, dev.iv_name))
info("creating new local storage on %s for %s" %
(tgt_node, dev.iv_name))
# since we *always* want to create this LV, we use the
# _Create...OnPrimary (which forces the creation), even if we
# are talking about the secondary node
......@@ -3576,59 +3619,68 @@ class LUReplaceDisks(LogicalUnit):
" node '%s'" %
(new_lv.logical_id[1], tgt_node))
# Step: for each lv, detach+rename*2+attach
self.processor.LogStep(4, steps_total, "change drbd configuration")
for dev, old_lvs, new_lvs in iv_names.itervalues():
info("detaching %s drbd from local storage" % dev.iv_name)
if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
raise errors.OpExecError("Can't detach drbd from local storage on node"
" %s for device %s" % (tgt_node, dev.iv_name))
dev.children = []
cfg.Update(instance)
#dev.children = []
#cfg.Update(instance)
# ok, we created the new LVs, so now we know we have the needed
# storage; as such, we proceed on the target node to rename
# old_lv to _old, and new_lv to old_lv; note that we rename LVs
# using the assumption than logical_id == physical_id (which in
# turn is the unique_id on that node)
# FIXME(iustin): use a better name for the replaced LVs
temp_suffix = int(time.time())
logger.Info("renaming the old LVs on the target node")
ren_fn = lambda d, suff: (d.physical_id[0],
d.physical_id[1] + "_replaced-%s" % suff)
rlist = [(disk, ren_fn(disk, temp_suffix)) for disk in old_lvs]
# build the rename list based on what LVs exist on the node
rlist = []
for to_ren in old_lvs:
find_res = rpc.call_blockdev_find(tgt_node, to_ren)
if find_res is not None: # device exists
rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
info("renaming the old LVs on the target node")
if not rpc.call_blockdev_rename(tgt_node, rlist):
logger.Error("Can't rename old LVs on node %s" % tgt_node)
do_change_old = False
else:
do_change_old = True
raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
# now we rename the new LVs to the old LVs
logger.Info("renaming the new LVs on the target node")
info("renaming the new LVs on the target node")
rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
if not rpc.call_blockdev_rename(tgt_node, rlist):
logger.Error("Can't rename new LVs on node %s" % tgt_node)
else:
for old, new in zip(old_lvs, new_lvs):
new.logical_id = old.logical_id
cfg.SetDiskID(new, tgt_node)
raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
for old, new in zip(old_lvs, new_lvs):
new.logical_id = old.logical_id
cfg.SetDiskID(new, tgt_node)
if do_change_old:
for disk in old_lvs:
disk.logical_id = ren_fn(disk, temp_suffix)
cfg.SetDiskID(disk, tgt_node)
for disk in old_lvs:
disk.logical_id = ren_fn(disk, temp_suffix)
cfg.SetDiskID(disk, tgt_node)
# now that the new lvs have the old name, we can add them to the device
logger.Info("adding new mirror component on %s" % tgt_node)
info("adding new mirror component on %s" % tgt_node)
if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
logger.Error("Can't add local storage to drbd!")
for new_lv in new_lvs:
if not rpc.call_blockdev_remove(tgt_node, new_lv):
logger.Error("Can't rollback device %s")
return
warning("Can't rollback device %s", "manually cleanup unused"
" logical volumes")
raise errors.OpExecError("Can't add local storage to drbd")
dev.children = new_lvs
cfg.Update(instance)
# Step: wait for sync
# this can fail as the old devices are degraded and _WaitForSync
# does a combined result over all disks, so we don't check its
# return value
logger.Info("Done changing drbd configs, waiting for sync")
self.processor.LogStep(5, steps_total, "sync devices")
_WaitForSync(cfg, instance, unlock=True)
# so check manually all the devices
......@@ -3638,13 +3690,14 @@ class LUReplaceDisks(LogicalUnit):
if is_degr:
raise errors.OpExecError("DRBD device %s is degraded!" % name)
# Step: remove old storage
self.processor.LogStep(6, steps_total, "removing old storage")
for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
logger.Info("remove logical volumes for %s" % name)
info("remove logical volumes for %s" % name)
for lv in old_lvs:
cfg.SetDiskID(lv, tgt_node)
if not rpc.call_blockdev_remove(tgt_node, lv):
logger.Error("Can't cleanup child device, skipping. You need to"
" fix manually!")
warning("Can't remove old LV", "manually remove unused LVs")
continue
def _ExecD8Secondary(self, feedback_fn):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment