Commit 5d97d6dd authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

cmdlib: Add utility to transfer instance data within the cluster



This is yet another wrapper around the instance import/export utility
classes, providing an even simpler API for instance imports/exports within
the same cluster.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarRené Nussbaumer <rn@google.com>
parent 033a1d00
......@@ -747,3 +747,218 @@ class ImportExportLoop:
success = diskie.Finalize() and success
return success
class _TransferInstCbBase(ImportExportCbBase):
def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
dest_node, dest_ip):
"""Initializes this class.
"""
ImportExportCbBase.__init__(self)
self.lu = lu
self.feedback_fn = feedback_fn
self.instance = instance
self.timeouts = timeouts
self.src_node = src_node
self.src_cbs = src_cbs
self.dest_node = dest_node
self.dest_ip = dest_ip
class _TransferInstSourceCb(_TransferInstCbBase):
def ReportConnected(self, ie, dtp):
"""Called when a connection has been established.
"""
assert self.src_cbs is None
assert dtp.src_export == ie
assert dtp.dest_import
self.feedback_fn("%s is sending data on %s" %
(dtp.data.name, ie.node_name))
def ReportFinished(self, ie, dtp):
"""Called when a transfer has finished.
"""
assert self.src_cbs is None
assert dtp.src_export == ie
assert dtp.dest_import
if ie.success:
self.feedback_fn("%s finished sending data" % dtp.data.name)
else:
self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
(dtp.data.name, ie.final_message, ie.recent_output))
dtp.RecordResult(ie.success)
cb = dtp.data.finished_fn
if cb:
cb()
# TODO: Check whether sending SIGTERM right away is okay, maybe we should
# give the daemon a moment to sort things out
if dtp.dest_import and not ie.success:
dtp.dest_import.Abort()
class _TransferInstDestCb(_TransferInstCbBase):
def ReportListening(self, ie, dtp):
"""Called when daemon started listening.
"""
assert self.src_cbs
assert dtp.src_export is None
assert dtp.dest_import
self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
# Start export on source node
de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip,
ie.listen_port, self.instance,
dtp.data.src_io, dtp.data.src_ioargs,
self.timeouts, self.src_cbs, private=dtp)
ie.loop.Add(de)
dtp.src_export = de
def ReportConnected(self, ie, dtp):
"""Called when a connection has been established.
"""
self.feedback_fn("%s is receiving data on %s" %
(dtp.data.name, self.dest_node))
def ReportFinished(self, ie, dtp):
"""Called when a transfer has finished.
"""
if ie.success:
self.feedback_fn("%s finished receiving data" % dtp.data.name)
else:
self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
(dtp.data.name, ie.final_message, ie.recent_output))
dtp.RecordResult(ie.success)
# TODO: Check whether sending SIGTERM right away is okay, maybe we should
# give the daemon a moment to sort things out
if dtp.src_export and not ie.success:
dtp.src_export.Abort()
class DiskTransfer(object):
def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
finished_fn):
"""Initializes this class.
@type name: string
@param name: User-visible name for this transfer (e.g. "disk/0")
@param src_io: Source I/O type
@param src_ioargs: Source I/O arguments
@param dest_io: Destination I/O type
@param dest_ioargs: Destination I/O arguments
@type finished_fn: callable
@param finished_fn: Function called once transfer has finished
"""
self.name = name
self.src_io = src_io
self.src_ioargs = src_ioargs
self.dest_io = dest_io
self.dest_ioargs = dest_ioargs
self.finished_fn = finished_fn
class _DiskTransferPrivate(object):
def __init__(self, data, success):
"""Initializes this class.
@type data: L{DiskTransfer}
@type success: bool
"""
self.data = data
self.src_export = None
self.dest_import = None
self.success = success
def RecordResult(self, success):
"""Updates the status.
One failed part will cause the whole transfer to fail.
"""
self.success = self.success and success
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
instance, all_transfers):
"""Transfers an instance's data from one node to another.
@param lu: Logical unit instance
@param feedback_fn: Feedback function
@type src_node: string
@param src_node: Source node name
@type dest_node: string
@param dest_node: Destination node name
@type dest_ip: string
@param dest_ip: IP address of destination node
@type instance: L{objects.Instance}
@param instance: Instance object
@type all_transfers: list of L{DiskTransfer} instances
@param all_transfers: List of all disk transfers to be made
@rtype: list
@return: List with a boolean (True=successful, False=failed) for success for
each transfer
"""
timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
src_node, None, dest_node, dest_ip)
dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
src_node, src_cbs, dest_node, dest_ip)
all_dtp = []
ieloop = ImportExportLoop(lu)
try:
for transfer in all_transfers:
if transfer:
feedback_fn("Exporting %s from %s to %s" %
(transfer.name, src_node, dest_node))
dtp = _DiskTransferPrivate(transfer, True)
di = DiskImport(lu, dest_node, None, None, instance,
transfer.dest_io, transfer.dest_ioargs,
timeouts, dest_cbs, private=dtp)
ieloop.Add(di)
dtp.dest_import = di
else:
dtp = _DiskTransferPrivate(None, False)
all_dtp.append(dtp)
ieloop.Run()
finally:
ieloop.FinalizeAll()
assert len(all_dtp) == len(all_transfers)
assert compat.all([(dtp.src_export is None or
dtp.src_export.success is not None) and
(dtp.dest_import is None or
dtp.dest_import.success is not None)
for dtp in all_dtp]), \
"Not all imports/exports are finalized"
return [bool(dtp.success) for dtp in all_dtp]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment