Commit e311ed53 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Factorize LUExportInstance



Apart from moving parts of the code to separate functions, error handling
is also improved by making sure snapshots are always removed.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent c7e4b037
...@@ -45,6 +45,7 @@ from ganeti import objects ...@@ -45,6 +45,7 @@ from ganeti import objects
from ganeti import serializer from ganeti import serializer
from ganeti import ssconf from ganeti import ssconf
from ganeti import uidpool from ganeti import uidpool
from ganeti import compat
class LogicalUnit(object): class LogicalUnit(object):
...@@ -8871,6 +8872,94 @@ class LUExportInstance(LogicalUnit): ...@@ -8871,6 +8872,94 @@ class LUExportInstance(LogicalUnit):
raise errors.OpPrereqError("Export not supported for instances with" raise errors.OpPrereqError("Export not supported for instances with"
" file-based disks", errors.ECODE_INVAL) " file-based disks", errors.ECODE_INVAL)
def _CreateSnapshots(self, feedback_fn):
"""Creates an LVM snapshot for every disk of the instance.
@return: List of snapshots as L{objects.Disk} instances
"""
instance = self.instance
src_node = instance.primary_node
vgname = self.cfg.GetVGName()
snap_disks = []
for idx, disk in enumerate(instance.disks):
feedback_fn("Creating a snapshot of disk/%s on node %s" %
(idx, src_node))
# result.payload will be a snapshot of an lvm leaf of the one we
# passed
result = self.rpc.call_blockdev_snapshot(src_node, disk)
msg = result.fail_msg
if msg:
self.LogWarning("Could not snapshot disk/%s on node %s: %s",
idx, src_node, msg)
snap_disks.append(False)
else:
disk_id = (vgname, result.payload)
new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
logical_id=disk_id, physical_id=disk_id,
iv_name=disk.iv_name)
snap_disks.append(new_dev)
return snap_disks
def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index):
"""Removes an LVM snapshot.
@type snap_disks: list
@param snap_disks: The list of all snapshots as returned by
L{_CreateSnapshots}
@type disk_index: number
@param disk_index: Index of the snapshot to be removed
@rtype: bool
@return: Whether removal was successful or not
"""
disk = snap_disks[disk_index]
if disk:
src_node = self.instance.primary_node
feedback_fn("Removing snapshot of disk/%s on node %s" %
(disk_index, src_node))
result = self.rpc.call_blockdev_remove(src_node, disk)
if not result.fail_msg:
return True
self.LogWarning("Could not remove snapshot for disk/%d from node"
" %s: %s", disk_index, src_node, result.fail_msg)
return False
def _CleanupExports(self, feedback_fn):
"""Removes exports of current instance from all other nodes.
If an instance in a cluster with nodes A..D was exported to node C, its
exports will be removed from the nodes A, B and D.
"""
nodelist = self.cfg.GetNodeList()
nodelist.remove(self.dst_node.name)
# on one-node clusters nodelist will be empty after the removal
# if we proceed the backup would be removed because OpQueryExports
# substitutes an empty list with the full cluster node list.
iname = self.instance.name
if nodelist:
feedback_fn("Removing old exports for instance %s" % iname)
exportlist = self.rpc.call_export_list(nodelist)
for node in exportlist:
if exportlist[node].fail_msg:
continue
if iname in exportlist[node].payload:
msg = self.rpc.call_export_remove(node, iname).fail_msg
if msg:
self.LogWarning("Could not remove older export for instance %s"
" on node %s: %s", iname, node, msg)
def Exec(self, feedback_fn): def Exec(self, feedback_fn):
"""Export an instance to an image in the cluster. """Export an instance to an image in the cluster.
...@@ -8887,10 +8976,6 @@ class LUExportInstance(LogicalUnit): ...@@ -8887,10 +8976,6 @@ class LUExportInstance(LogicalUnit):
result.Raise("Could not shutdown instance %s on" result.Raise("Could not shutdown instance %s on"
" node %s" % (instance.name, src_node)) " node %s" % (instance.name, src_node))
vgname = self.cfg.GetVGName()
snap_disks = []
# set the disks ID correctly since call_instance_start needs the # set the disks ID correctly since call_instance_start needs the
# correct drbd minor to create the symlinks # correct drbd minor to create the symlinks
for disk in instance.disks: for disk in instance.disks:
...@@ -8906,93 +8991,79 @@ class LUExportInstance(LogicalUnit): ...@@ -8906,93 +8991,79 @@ class LUExportInstance(LogicalUnit):
try: try:
# per-disk results # per-disk results
dresults = [] dresults = []
removed_snaps = [False] * len(instance.disks)
snap_disks = None
try: try:
for idx, disk in enumerate(instance.disks): try:
feedback_fn("Creating a snapshot of disk/%s on node %s" % snap_disks = self._CreateSnapshots(feedback_fn)
(idx, src_node)) finally:
if self.op.shutdown and instance.admin_up:
# result.payload will be a snapshot of an lvm leaf of the one we feedback_fn("Starting instance %s" % instance.name)
# passed result = self.rpc.call_instance_start(src_node, instance,
result = self.rpc.call_blockdev_snapshot(src_node, disk) None, None)
msg = result.fail_msg msg = result.fail_msg
if msg: if msg:
self.LogWarning("Could not snapshot disk/%s on node %s: %s", _ShutdownInstanceDisks(self, instance)
idx, src_node, msg) raise errors.OpExecError("Could not start instance: %s" % msg)
snap_disks.append(False)
else:
disk_id = (vgname, result.payload)
new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
logical_id=disk_id, physical_id=disk_id,
iv_name=disk.iv_name)
snap_disks.append(new_dev)
finally: assert len(snap_disks) == len(instance.disks)
if self.op.shutdown and instance.admin_up: assert len(removed_snaps) == len(instance.disks)
feedback_fn("Starting instance %s" % instance.name)
result = self.rpc.call_instance_start(src_node, instance, None, None) # TODO: check for size
msg = result.fail_msg
if msg: cluster_name = self.cfg.GetClusterName()
_ShutdownInstanceDisks(self, instance) for idx, dev in enumerate(snap_disks):
raise errors.OpExecError("Could not start instance: %s" % msg) feedback_fn("Exporting snapshot %s from %s to %s" %
(idx, src_node, dst_node.name))
# TODO: check for size if dev:
# FIXME: pass debug from opcode to backend
cluster_name = self.cfg.GetClusterName() result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
for idx, dev in enumerate(snap_disks): instance, cluster_name,
feedback_fn("Exporting snapshot %s from %s to %s" % idx, self.op.debug_level)
(idx, src_node, dst_node.name)) msg = result.fail_msg
if dev: if msg:
# FIXME: pass debug from opcode to backend self.LogWarning("Could not export disk/%s from node %s to"
result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, " node %s: %s", idx, src_node, dst_node.name, msg)
instance, cluster_name, dresults.append(False)
idx, self.op.debug_level) else:
msg = result.fail_msg dresults.append(True)
if msg:
self.LogWarning("Could not export disk/%s from node %s to" # Remove snapshot
" node %s: %s", idx, src_node, dst_node.name, msg) if self._RemoveSnapshot(feedback_fn, snap_disks, idx):
dresults.append(False) removed_snaps[idx] = True
else: else:
dresults.append(True) dresults.append(False)
msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
if msg:
self.LogWarning("Could not remove snapshot for disk/%d from node"
" %s: %s", idx, src_node, msg)
else:
dresults.append(False)
feedback_fn("Finalizing export on %s" % dst_node.name) assert len(dresults) == len(instance.disks)
result = self.rpc.call_finalize_export(dst_node.name, instance,
snap_disks) # Check for backwards compatibility
fin_resu = True assert compat.all(isinstance(i, bool) for i in dresults), \
msg = result.fail_msg "Not all results are boolean: %r" % dresults
if msg:
self.LogWarning("Could not finalize export for instance %s" feedback_fn("Finalizing export on %s" % dst_node.name)
" on node %s: %s", instance.name, dst_node.name, msg) result = self.rpc.call_finalize_export(dst_node.name, instance,
fin_resu = False snap_disks)
msg = result.fail_msg
fin_resu = not msg
if msg:
self.LogWarning("Could not finalize export for instance %s"
" on node %s: %s", instance.name, dst_node.name, msg)
finally:
# Remove all snapshots
assert len(removed_snaps) == len(instance.disks)
for idx, removed in enumerate(removed_snaps):
if not removed:
self._RemoveSnapshot(feedback_fn, snap_disks, idx)
finally: finally:
if activate_disks: if activate_disks:
feedback_fn("Deactivating disks for %s" % instance.name) feedback_fn("Deactivating disks for %s" % instance.name)
_ShutdownInstanceDisks(self, instance) _ShutdownInstanceDisks(self, instance)
nodelist = self.cfg.GetNodeList() self._CleanupExports(feedback_fn)
nodelist.remove(dst_node.name)
# on one-node clusters nodelist will be empty after the removal
# if we proceed the backup would be removed because OpQueryExports
# substitutes an empty list with the full cluster node list.
iname = instance.name
if nodelist:
feedback_fn("Removing old exports for instance %s" % iname)
exportlist = self.rpc.call_export_list(nodelist)
for node in exportlist:
if exportlist[node].fail_msg:
continue
if iname in exportlist[node].payload:
msg = self.rpc.call_export_remove(node, iname).fail_msg
if msg:
self.LogWarning("Could not remove older export for instance %s"
" on node %s: %s", iname, node, msg)
return fin_resu, dresults return fin_resu, dresults
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment