diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 9f8b1fffd2e76600685c09c0ef2a59806f81b2a6..c0755c0763c01079be7fc716010a589be79c823f 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -8757,6 +8757,7 @@ class LUSetInstanceParams(LogicalUnit): (constants.DT_DRBD8, constants.DT_PLAIN): _ConvertDrbdToPlain, } + class LUQueryExports(NoHooksLU): """Query the exports list @@ -8886,68 +8887,6 @@ class LUExportInstance(LogicalUnit): raise errors.OpPrereqError("Export not supported for instances with" " file-based disks", errors.ECODE_INVAL) - def _CreateSnapshots(self, feedback_fn): - """Creates an LVM snapshot for every disk of the instance. - - @return: List of snapshots as L{objects.Disk} instances - - """ - instance = self.instance - src_node = instance.primary_node - - vgname = self.cfg.GetVGName() - - snap_disks = [] - - for idx, disk in enumerate(instance.disks): - feedback_fn("Creating a snapshot of disk/%s on node %s" % - (idx, src_node)) - - # result.payload will be a snapshot of an lvm leaf of the one we - # passed - result = self.rpc.call_blockdev_snapshot(src_node, disk) - msg = result.fail_msg - if msg: - self.LogWarning("Could not snapshot disk/%s on node %s: %s", - idx, src_node, msg) - snap_disks.append(False) - else: - disk_id = (vgname, result.payload) - new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, - logical_id=disk_id, physical_id=disk_id, - iv_name=disk.iv_name) - snap_disks.append(new_dev) - - return snap_disks - - def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index): - """Removes an LVM snapshot. - - @type snap_disks: list - @param snap_disks: The list of all snapshots as returned by - L{_CreateSnapshots} - @type disk_index: number - @param disk_index: Index of the snapshot to be removed - @rtype: bool - @return: Whether removal was successful or not - - """ - disk = snap_disks[disk_index] - if disk: - src_node = self.instance.primary_node - - feedback_fn("Removing snapshot of disk/%s on node %s" % - (disk_index, src_node)) - - result = self.rpc.call_blockdev_remove(src_node, disk) - if not result.fail_msg: - return True - - self.LogWarning("Could not remove snapshot for disk/%d from node" - " %s: %s", disk_index, src_node, result.fail_msg) - - return False - def _CleanupExports(self, feedback_fn): """Removes exports of current instance from all other nodes. @@ -8979,7 +8918,6 @@ class LUExportInstance(LogicalUnit): """ instance = self.instance - dst_node = self.dst_node src_node = instance.primary_node if self.op.shutdown: @@ -9004,81 +8942,19 @@ class LUExportInstance(LogicalUnit): _StartInstanceDisks(self, instance, None) try: - # per-disk results - removed_snaps = [False] * len(instance.disks) + helper = masterd.instance.ExportInstanceHelper(self, feedback_fn, + instance) - snap_disks = None + helper.CreateSnapshots() try: - try: - snap_disks = self._CreateSnapshots(feedback_fn) - finally: - if (self.op.shutdown and instance.admin_up and - not self.remove_instance): - feedback_fn("Starting instance %s" % instance.name) - result = self.rpc.call_instance_start(src_node, instance, - None, None) - msg = result.fail_msg - if msg: - _ShutdownInstanceDisks(self, instance) - raise errors.OpExecError("Could not start instance: %s" % msg) - - assert len(snap_disks) == len(instance.disks) - assert len(removed_snaps) == len(instance.disks) - - # TODO: check for size - - def _TransferFinished(idx): - logging.debug("Transfer %s finished", idx) - if self._RemoveSnapshot(feedback_fn, snap_disks, idx): - removed_snaps[idx] = True - - transfers = [] - - for idx, dev in enumerate(snap_disks): - if not dev: - transfers.append(None) - continue - - path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name, - dev.physical_id[1]) - - finished_fn = compat.partial(_TransferFinished, idx) - - # FIXME: pass debug option from opcode to backend - dt = masterd.instance.DiskTransfer("snapshot/%s" % idx, - constants.IEIO_SCRIPT, (dev, idx), - constants.IEIO_FILE, (path, ), - finished_fn) - transfers.append(dt) - - # Actually export data - dresults = \ - masterd.instance.TransferInstanceData(self, feedback_fn, - src_node, dst_node.name, - dst_node.secondary_ip, - instance, transfers) - - assert len(dresults) == len(instance.disks) - - # Check for backwards compatibility - assert compat.all(isinstance(i, bool) for i in dresults), \ - "Not all results are boolean: %r" % dresults - - feedback_fn("Finalizing export on %s" % dst_node.name) - result = self.rpc.call_finalize_export(dst_node.name, instance, - snap_disks) - msg = result.fail_msg - fin_resu = not msg - if msg: - self.LogWarning("Could not finalize export for instance %s" - " on node %s: %s", instance.name, dst_node.name, msg) - + (fin_resu, dresults) = helper.LocalExport(self.dst_node) finally: - # Remove all snapshots - assert len(removed_snaps) == len(instance.disks) - for idx, removed in enumerate(removed_snaps): - if not removed: - self._RemoveSnapshot(feedback_fn, snap_disks, idx) + helper.Cleanup() + + # Check for backwards compatibility + assert len(dresults) == len(instance.disks) + assert compat.all(isinstance(i, bool) for i in dresults), \ + "Not all results are boolean: %r" % dresults finally: if activate_disks: diff --git a/lib/masterd/instance.py b/lib/masterd/instance.py index 9872467b4b03d77fd0ad7eeb3f940f58e637a46c..7e95b0d5464d3fc99872f871075ff00338d9a71d 100644 --- a/lib/masterd/instance.py +++ b/lib/masterd/instance.py @@ -29,6 +29,8 @@ import time from ganeti import constants from ganeti import errors from ganeti import compat +from ganeti import utils +from ganeti import objects class _ImportExportError(Exception): @@ -962,3 +964,144 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, "Not all imports/exports are finalized" return [bool(dtp.success) for dtp in all_dtp] + + +class ExportInstanceHelper: + def __init__(self, lu, feedback_fn, instance): + """Initializes this class. + + @param lu: Logical unit instance + @param feedback_fn: Feedback function + @type instance: L{objects.Instance} + @param instance: Instance object + + """ + self._lu = lu + self._feedback_fn = feedback_fn + self._instance = instance + + self._snap_disks = [] + self._removed_snaps = [False] * len(instance.disks) + + def CreateSnapshots(self): + """Creates an LVM snapshot for every disk of the instance. + + """ + assert not self._snap_disks + + instance = self._instance + src_node = instance.primary_node + + vgname = self._lu.cfg.GetVGName() + + for idx, disk in enumerate(instance.disks): + self._feedback_fn("Creating a snapshot of disk/%s on node %s" % + (idx, src_node)) + + # result.payload will be a snapshot of an lvm leaf of the one we + # passed + result = self._lu.rpc.call_blockdev_snapshot(src_node, disk) + msg = result.fail_msg + if msg: + self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s", + idx, src_node, msg) + new_dev = False + else: + disk_id = (vgname, result.payload) + new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, + logical_id=disk_id, physical_id=disk_id, + iv_name=disk.iv_name) + + self._snap_disks.append(new_dev) + + assert len(self._snap_disks) == len(instance.disks) + assert len(self._removed_snaps) == len(instance.disks) + + def _RemoveSnapshot(self, disk_index): + """Removes an LVM snapshot. + + @type disk_index: number + @param disk_index: Index of the snapshot to be removed + + """ + disk = self._snap_disks[disk_index] + if disk and not self._removed_snaps[disk_index]: + src_node = self._instance.primary_node + + self._feedback_fn("Removing snapshot of disk/%s on node %s" % + (disk_index, src_node)) + + result = self._lu.rpc.call_blockdev_remove(src_node, disk) + if result.fail_msg: + self._lu.LogWarning("Could not remove snapshot for disk/%d from node" + " %s: %s", disk_index, src_node, result.fail_msg) + else: + self._removed_snaps[disk_index] = True + + def LocalExport(self, dest_node): + """Intra-cluster instance export. + + @type dest_node: L{objects.Node} + @param dest_node: Destination node + + """ + instance = self._instance + src_node = instance.primary_node + + assert len(self._snap_disks) == len(instance.disks) + + transfers = [] + + for idx, dev in enumerate(self._snap_disks): + if not dev: + transfers.append(None) + continue + + path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name, + dev.physical_id[1]) + + finished_fn = compat.partial(self._TransferFinished, idx) + + # FIXME: pass debug option from opcode to backend + dt = DiskTransfer("snapshot/%s" % idx, + constants.IEIO_SCRIPT, (dev, idx), + constants.IEIO_FILE, (path, ), + finished_fn) + transfers.append(dt) + + # Actually export data + dresults = TransferInstanceData(self._lu, self._feedback_fn, + src_node, dest_node.name, + dest_node.secondary_ip, + instance, transfers) + + assert len(dresults) == len(instance.disks) + + self._feedback_fn("Finalizing export on %s" % dest_node.name) + result = self._lu.rpc.call_finalize_export(dest_node.name, instance, + self._snap_disks) + msg = result.fail_msg + fin_resu = not msg + if msg: + self._lu.LogWarning("Could not finalize export for instance %s" + " on node %s: %s", instance.name, dest_node.name, msg) + + return (fin_resu, dresults) + + def _TransferFinished(self, idx): + """Called once a transfer has finished. + + @type idx: number + @param idx: Disk index + + """ + logging.debug("Transfer %s finished", idx) + self._RemoveSnapshot(idx) + + def Cleanup(self): + """Remove all snapshots. + + """ + assert len(self._removed_snaps) == len(self._instance.disks) + for idx in range(len(self._instance.disks)): + self._RemoveSnapshot(idx)