Commit 387794f8 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

LUExportInstance: Move exporting code to helper class



This will simplify the implementation of intra-cluster instance
exports and reduces the number of local variables in
LUExportInstance.Exec.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent c1e7897d
......@@ -8757,6 +8757,7 @@ class LUSetInstanceParams(LogicalUnit):
(constants.DT_DRBD8, constants.DT_PLAIN): _ConvertDrbdToPlain,
}
class LUQueryExports(NoHooksLU):
"""Query the exports list
......@@ -8886,68 +8887,6 @@ class LUExportInstance(LogicalUnit):
raise errors.OpPrereqError("Export not supported for instances with"
" file-based disks", errors.ECODE_INVAL)
def _CreateSnapshots(self, feedback_fn):
"""Creates an LVM snapshot for every disk of the instance.
@return: List of snapshots as L{objects.Disk} instances
"""
instance = self.instance
src_node = instance.primary_node
vgname = self.cfg.GetVGName()
snap_disks = []
for idx, disk in enumerate(instance.disks):
feedback_fn("Creating a snapshot of disk/%s on node %s" %
(idx, src_node))
# result.payload will be a snapshot of an lvm leaf of the one we
# passed
result = self.rpc.call_blockdev_snapshot(src_node, disk)
msg = result.fail_msg
if msg:
self.LogWarning("Could not snapshot disk/%s on node %s: %s",
idx, src_node, msg)
snap_disks.append(False)
else:
disk_id = (vgname, result.payload)
new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
logical_id=disk_id, physical_id=disk_id,
iv_name=disk.iv_name)
snap_disks.append(new_dev)
return snap_disks
def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index):
"""Removes an LVM snapshot.
@type snap_disks: list
@param snap_disks: The list of all snapshots as returned by
L{_CreateSnapshots}
@type disk_index: number
@param disk_index: Index of the snapshot to be removed
@rtype: bool
@return: Whether removal was successful or not
"""
disk = snap_disks[disk_index]
if disk:
src_node = self.instance.primary_node
feedback_fn("Removing snapshot of disk/%s on node %s" %
(disk_index, src_node))
result = self.rpc.call_blockdev_remove(src_node, disk)
if not result.fail_msg:
return True
self.LogWarning("Could not remove snapshot for disk/%d from node"
" %s: %s", disk_index, src_node, result.fail_msg)
return False
def _CleanupExports(self, feedback_fn):
"""Removes exports of current instance from all other nodes.
......@@ -8979,7 +8918,6 @@ class LUExportInstance(LogicalUnit):
"""
instance = self.instance
dst_node = self.dst_node
src_node = instance.primary_node
if self.op.shutdown:
......@@ -9004,81 +8942,19 @@ class LUExportInstance(LogicalUnit):
_StartInstanceDisks(self, instance, None)
try:
# per-disk results
removed_snaps = [False] * len(instance.disks)
helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
instance)
snap_disks = None
helper.CreateSnapshots()
try:
try:
snap_disks = self._CreateSnapshots(feedback_fn)
finally:
if (self.op.shutdown and instance.admin_up and
not self.remove_instance):
feedback_fn("Starting instance %s" % instance.name)
result = self.rpc.call_instance_start(src_node, instance,
None, None)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Could not start instance: %s" % msg)
assert len(snap_disks) == len(instance.disks)
assert len(removed_snaps) == len(instance.disks)
# TODO: check for size
def _TransferFinished(idx):
logging.debug("Transfer %s finished", idx)
if self._RemoveSnapshot(feedback_fn, snap_disks, idx):
removed_snaps[idx] = True
transfers = []
for idx, dev in enumerate(snap_disks):
if not dev:
transfers.append(None)
continue
path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
dev.physical_id[1])
finished_fn = compat.partial(_TransferFinished, idx)
# FIXME: pass debug option from opcode to backend
dt = masterd.instance.DiskTransfer("snapshot/%s" % idx,
constants.IEIO_SCRIPT, (dev, idx),
constants.IEIO_FILE, (path, ),
finished_fn)
transfers.append(dt)
# Actually export data
dresults = \
masterd.instance.TransferInstanceData(self, feedback_fn,
src_node, dst_node.name,
dst_node.secondary_ip,
instance, transfers)
assert len(dresults) == len(instance.disks)
# Check for backwards compatibility
assert compat.all(isinstance(i, bool) for i in dresults), \
"Not all results are boolean: %r" % dresults
feedback_fn("Finalizing export on %s" % dst_node.name)
result = self.rpc.call_finalize_export(dst_node.name, instance,
snap_disks)
msg = result.fail_msg
fin_resu = not msg
if msg:
self.LogWarning("Could not finalize export for instance %s"
" on node %s: %s", instance.name, dst_node.name, msg)
(fin_resu, dresults) = helper.LocalExport(self.dst_node)
finally:
# Remove all snapshots
assert len(removed_snaps) == len(instance.disks)
for idx, removed in enumerate(removed_snaps):
if not removed:
self._RemoveSnapshot(feedback_fn, snap_disks, idx)
helper.Cleanup()
# Check for backwards compatibility
assert len(dresults) == len(instance.disks)
assert compat.all(isinstance(i, bool) for i in dresults), \
"Not all results are boolean: %r" % dresults
finally:
if activate_disks:
......
......@@ -29,6 +29,8 @@ import time
from ganeti import constants
from ganeti import errors
from ganeti import compat
from ganeti import utils
from ganeti import objects
class _ImportExportError(Exception):
......@@ -962,3 +964,144 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
"Not all imports/exports are finalized"
return [bool(dtp.success) for dtp in all_dtp]
class ExportInstanceHelper:
def __init__(self, lu, feedback_fn, instance):
"""Initializes this class.
@param lu: Logical unit instance
@param feedback_fn: Feedback function
@type instance: L{objects.Instance}
@param instance: Instance object
"""
self._lu = lu
self._feedback_fn = feedback_fn
self._instance = instance
self._snap_disks = []
self._removed_snaps = [False] * len(instance.disks)
def CreateSnapshots(self):
"""Creates an LVM snapshot for every disk of the instance.
"""
assert not self._snap_disks
instance = self._instance
src_node = instance.primary_node
vgname = self._lu.cfg.GetVGName()
for idx, disk in enumerate(instance.disks):
self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
(idx, src_node))
# result.payload will be a snapshot of an lvm leaf of the one we
# passed
result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
msg = result.fail_msg
if msg:
self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
idx, src_node, msg)
new_dev = False
else:
disk_id = (vgname, result.payload)
new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
logical_id=disk_id, physical_id=disk_id,
iv_name=disk.iv_name)
self._snap_disks.append(new_dev)
assert len(self._snap_disks) == len(instance.disks)
assert len(self._removed_snaps) == len(instance.disks)
def _RemoveSnapshot(self, disk_index):
"""Removes an LVM snapshot.
@type disk_index: number
@param disk_index: Index of the snapshot to be removed
"""
disk = self._snap_disks[disk_index]
if disk and not self._removed_snaps[disk_index]:
src_node = self._instance.primary_node
self._feedback_fn("Removing snapshot of disk/%s on node %s" %
(disk_index, src_node))
result = self._lu.rpc.call_blockdev_remove(src_node, disk)
if result.fail_msg:
self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
" %s: %s", disk_index, src_node, result.fail_msg)
else:
self._removed_snaps[disk_index] = True
def LocalExport(self, dest_node):
"""Intra-cluster instance export.
@type dest_node: L{objects.Node}
@param dest_node: Destination node
"""
instance = self._instance
src_node = instance.primary_node
assert len(self._snap_disks) == len(instance.disks)
transfers = []
for idx, dev in enumerate(self._snap_disks):
if not dev:
transfers.append(None)
continue
path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
dev.physical_id[1])
finished_fn = compat.partial(self._TransferFinished, idx)
# FIXME: pass debug option from opcode to backend
dt = DiskTransfer("snapshot/%s" % idx,
constants.IEIO_SCRIPT, (dev, idx),
constants.IEIO_FILE, (path, ),
finished_fn)
transfers.append(dt)
# Actually export data
dresults = TransferInstanceData(self._lu, self._feedback_fn,
src_node, dest_node.name,
dest_node.secondary_ip,
instance, transfers)
assert len(dresults) == len(instance.disks)
self._feedback_fn("Finalizing export on %s" % dest_node.name)
result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
self._snap_disks)
msg = result.fail_msg
fin_resu = not msg
if msg:
self._lu.LogWarning("Could not finalize export for instance %s"
" on node %s: %s", instance.name, dest_node.name, msg)
return (fin_resu, dresults)
def _TransferFinished(self, idx):
"""Called once a transfer has finished.
@type idx: number
@param idx: Disk index
"""
logging.debug("Transfer %s finished", idx)
self._RemoveSnapshot(idx)
def Cleanup(self):
"""Remove all snapshots.
"""
assert len(self._removed_snaps) == len(self._instance.disks)
for idx in range(len(self._instance.disks)):
self._RemoveSnapshot(idx)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment