Commit 6a1434d7 authored by Andrea Spadaccini's avatar Andrea Spadaccini
Browse files

Make migration RPC non-blocking



To add status reporting for the KVM migration, the instance_migrate RPC
must be non-blocking. Moreover, there must be a way to represent the
migration status and a way to fetch it.

* constants.py:
  - add constants representing the migration statuses

* objects.py:
  - add the MigrationStatus object

* hypervisor/hv_base.py
  - change the FinalizeMigration method name to FinalizeMigrationDst
  - add the FinalizeMigrationSource method
  - add the GetMigrationStatus method

* hypervisor/hv_kvm.py
  - change the implementation of MigrateInstance to be non-blocking
    (i.e. do not poll the status of the migration)
  - implement the new methods defined in BaseHypervisor

* backend.py, server/noded.py, rpc.py
  - add methods to call the new hypervisor methods
  - fix documentation of the existing methods to reflect the changes

* cmdlib.py
  - adapt the logic of TLMigrateInstance._ExecMigration to reflect
    the changes
Signed-off-by: default avatarAndrea Spadaccini <spadaccio@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parent f8326fca
......@@ -1280,7 +1280,7 @@ def AcceptInstance(instance, info, target):
_Fail("Failed to accept instance: %s", err, exc=True)
def FinalizeMigration(instance, info, success):
def FinalizeMigrationDst(instance, info, success):
"""Finalize any preparation to accept an instance.
@type instance: L{objects.Instance}
......@@ -1293,9 +1293,9 @@ def FinalizeMigration(instance, info, success):
"""
hyper = hypervisor.GetHypervisor(instance.hypervisor)
try:
hyper.FinalizeMigration(instance, info, success)
hyper.FinalizeMigrationDst(instance, info, success)
except errors.HypervisorError, err:
_Fail("Failed to finalize migration: %s", err, exc=True)
_Fail("Failed to finalize migration on the target node: %s", err, exc=True)
def MigrateInstance(instance, target, live):
......@@ -1319,6 +1319,46 @@ def MigrateInstance(instance, target, live):
_Fail("Failed to migrate instance: %s", err, exc=True)
def FinalizeMigrationSource(instance, success, live):
"""Finalize the instance migration on the source node.
@type instance: L{objects.Instance}
@param instance: the instance definition of the migrated instance
@type success: bool
@param success: whether the migration succeeded or not
@type live: bool
@param live: whether the user requested a live migration or not
@raise RPCFail: If the execution fails for some reason
"""
hyper = hypervisor.GetHypervisor(instance.hypervisor)
try:
hyper.FinalizeMigrationSource(instance, success, live)
except Exception, err: # pylint: disable=W0703
_Fail("Failed to finalize the migration on the source node: %s", err,
exc=True)
def GetMigrationStatus(instance):
"""Get the migration status
@type instance: L{objects.Instance}
@param instance: the instance that is being migrated
@rtype: L{objects.MigrationStatus}
@return: the status of the current migration (one of
L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
progress info that can be retrieved from the hypervisor
@raise RPCFail: If the migration status cannot be retrieved
"""
hyper = hypervisor.GetHypervisor(instance.hypervisor)
try:
return hyper.GetMigrationStatus(instance)
except Exception, err: # pylint: disable=W0703
_Fail("Failed to get migration status: %s", err, exc=True)
def BlockdevCreate(disk, size, owner, on_primary, info):
"""Creates a block device for an instance.
......
......@@ -7025,6 +7025,10 @@ class TLMigrateInstance(Tasklet):
@ivar shutdown_timeout: In case of failover timeout of the shutdown
"""
# Constants
_MIGRATION_POLL_INTERVAL = 0.5
def __init__(self, lu, instance_name, cleanup=False,
failover=False, fallback=False,
ignore_consistency=False,
......@@ -7348,12 +7352,13 @@ class TLMigrateInstance(Tasklet):
"""
instance = self.instance
target_node = self.target_node
source_node = self.source_node
migration_info = self.migration_info
abort_result = self.rpc.call_finalize_migration(target_node,
instance,
migration_info,
False)
abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
instance,
migration_info,
False)
abort_msg = abort_result.fail_msg
if abort_msg:
logging.error("Aborting migration failed on target node %s: %s",
......@@ -7361,6 +7366,13 @@ class TLMigrateInstance(Tasklet):
# Don't raise an exception here, as we stil have to try to revert the
# disk status, even if this step failed.
abort_result = self.rpc.call_instance_finalize_migration_src(source_node,
instance, False, self.live)
abort_msg = abort_result.fail_msg
if abort_msg:
logging.error("Aborting migration failed on source node %s: %s",
source_node, abort_msg)
def _ExecMigration(self):
"""Migrate an instance.
......@@ -7432,18 +7444,51 @@ class TLMigrateInstance(Tasklet):
raise errors.OpExecError("Could not migrate instance %s: %s" %
(instance.name, msg))
self.feedback_fn("* starting memory transfer")
while True:
result = self.rpc.call_instance_get_migration_status(source_node,
instance)
msg = result.fail_msg
ms = result.payload # MigrationStatus instance
if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
logging.error("Instance migration failed, trying to revert"
" disk status: %s", msg)
self.feedback_fn("Migration failed, aborting")
self._AbortMigration()
self._RevertDiskStatus()
raise errors.OpExecError("Could not migrate instance %s: %s" %
(instance.name, msg))
if result.payload.status != constants.HV_MIGRATION_ACTIVE:
self.feedback_fn("* memory transfer complete")
break
time.sleep(self._MIGRATION_POLL_INTERVAL)
result = self.rpc.call_instance_finalize_migration_src(source_node,
instance,
True,
self.live)
msg = result.fail_msg
if msg:
logging.error("Instance migration succeeded, but finalization failed"
" on the source node: %s", msg)
raise errors.OpExecError("Could not finalize instance migration: %s" %
msg)
instance.primary_node = target_node
# distribute new instance config to the other nodes
self.cfg.Update(instance, self.feedback_fn)
result = self.rpc.call_finalize_migration(target_node,
instance,
migration_info,
True)
result = self.rpc.call_instance_finalize_migration_dst(target_node,
instance,
migration_info,
True)
msg = result.fail_msg
if msg:
logging.error("Instance migration succeeded, but finalization failed:"
" %s", msg)
logging.error("Instance migration succeeded, but finalization failed"
" on the target node: %s", msg)
raise errors.OpExecError("Could not finalize instance migration: %s" %
msg)
......
......@@ -797,6 +797,27 @@ HVS_PARAMETER_TYPES = {
HVS_PARAMETERS = frozenset(HVS_PARAMETER_TYPES.keys())
# Migration statuses
HV_MIGRATION_COMPLETED = "completed"
HV_MIGRATION_ACTIVE = "active"
HV_MIGRATION_FAILED = "failed"
HV_MIGRATION_CANCELLED = "cancelled"
HV_MIGRATION_VALID_STATUSES = frozenset([
HV_MIGRATION_COMPLETED,
HV_MIGRATION_ACTIVE,
HV_MIGRATION_FAILED,
HV_MIGRATION_CANCELLED,
])
HV_MIGRATION_FAILED_STATUSES = frozenset([
HV_MIGRATION_FAILED,
HV_MIGRATION_CANCELLED,
])
# KVM-specific statuses
HV_KVM_MIGRATION_VALID_STATUSES = HV_MIGRATION_VALID_STATUSES
# Backend parameter names
BE_MEMORY = "memory"
BE_VCPUS = "vcpus"
......
......@@ -291,8 +291,8 @@ class BaseHypervisor(object):
"""
pass
def FinalizeMigration(self, instance, info, success):
"""Finalized an instance migration.
def FinalizeMigrationDst(self, instance, info, success):
"""Finalize the instance migration on the target node.
Should finalize or revert any preparation done to accept the instance.
Since by default we do no preparation, we also don't have anything to do
......@@ -320,6 +320,32 @@ class BaseHypervisor(object):
"""
raise NotImplementedError
def FinalizeMigrationSource(self, instance, success, live):
"""Finalize the instance migration on the source node.
@type instance: L{objects.Instance}
@param instance: the instance that was migrated
@type success: bool
@param success: whether the migration succeeded or not
@type live: bool
@param live: whether the user requested a live migration or not
"""
pass
def GetMigrationStatus(self, instance):
"""Get the migration status
@type instance: L{objects.Instance}
@param instance: the instance that is being migrated
@rtype: L{objects.MigrationStatus}
@return: the status of the current migration (one of
L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
progress info that can be retrieved from the hypervisor
"""
raise NotImplementedError
@classmethod
def CheckParameterSyntax(cls, hvparams):
"""Check the given parameters for validity.
......
......@@ -1574,8 +1574,8 @@ class KVMHypervisor(hv_base.BaseHypervisor):
incoming_address = (target, instance.hvparams[constants.HV_MIGRATION_PORT])
self._ExecuteKVMRuntime(instance, kvm_runtime, incoming=incoming_address)
def FinalizeMigration(self, instance, info, success):
"""Finalize an instance migration.
def FinalizeMigrationDst(self, instance, info, success):
"""Finalize the instance migration on the target node.
Stop the incoming mode KVM.
......@@ -1622,7 +1622,7 @@ class KVMHypervisor(hv_base.BaseHypervisor):
"""
instance_name = instance.name
port = instance.hvparams[constants.HV_MIGRATION_PORT]
pidfile, pid, alive = self._InstancePidAlive(instance_name)
_, _, alive = self._InstancePidAlive(instance_name)
if not alive:
raise errors.HypervisorError("Instance not running, cannot migrate")
......@@ -1640,42 +1640,57 @@ class KVMHypervisor(hv_base.BaseHypervisor):
migrate_command = "migrate -d tcp:%s:%s" % (target, port)
self._CallMonitorCommand(instance_name, migrate_command)
def FinalizeMigrationSource(self, instance, success, live):
"""Finalize the instance migration on the source node.
@type instance: L{objects.Instance}
@param instance: the instance that was migrated
@type success: bool
@param success: whether the migration succeeded or not
@type live: bool
@param live: whether the user requested a live migration or not
"""
if success:
pidfile, pid, _ = self._InstancePidAlive(instance.name)
utils.KillProcess(pid)
self._RemoveInstanceRuntimeFiles(pidfile, instance.name)
elif live:
self._CallMonitorCommand(instance.name, self._CONT_CMD)
def GetMigrationStatus(self, instance):
"""Get the migration status
@type instance: L{objects.Instance}
@param instance: the instance that is being migrated
@rtype: L{objects.MigrationStatus}
@return: the status of the current migration (one of
L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
progress info that can be retrieved from the hypervisor
"""
info_command = "info migrate"
done = False
broken_answers = 0
while not done:
result = self._CallMonitorCommand(instance_name, info_command)
for _ in range(self._MIGRATION_INFO_MAX_BAD_ANSWERS):
result = self._CallMonitorCommand(instance.name, info_command)
match = self._MIGRATION_STATUS_RE.search(result.stdout)
if not match:
broken_answers += 1
if not result.stdout:
logging.info("KVM: empty 'info migrate' result")
else:
logging.warning("KVM: unknown 'info migrate' result: %s",
result.stdout)
time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
else:
status = match.group(1)
if status == "completed":
done = True
elif status == "active":
# reset the broken answers count
broken_answers = 0
time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
elif status == "failed" or status == "cancelled":
if not live:
self._CallMonitorCommand(instance_name, self._CONT_CMD)
raise errors.HypervisorError("Migration %s at the kvm level" %
status)
else:
logging.warning("KVM: unknown migration status '%s'", status)
broken_answers += 1
time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
if broken_answers >= self._MIGRATION_INFO_MAX_BAD_ANSWERS:
raise errors.HypervisorError("Too many 'info migrate' broken answers")
if status in constants.HV_KVM_MIGRATION_VALID_STATUSES:
migration_status = objects.MigrationStatus(status=status)
return migration_status
utils.KillProcess(pid)
self._RemoveInstanceRuntimeFiles(pidfile, instance_name)
logging.warning("KVM: unknown migration status '%s'", status)
time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
return objects.MigrationStatus(status=constants.HV_MIGRATION_FAILED,
info="Too many 'info migrate' broken answers")
def GetNodeInfo(self):
"""Return information about the node.
......
......@@ -1503,6 +1503,17 @@ class QueryFieldsResponse(_QueryResponseBase):
]
class MigrationStatus(ConfigObject):
"""Object holding the status of a migration.
"""
__slots__ = [
"status",
"transferred_ram",
"total_ram",
]
class InstanceConsole(ConfigObject):
"""Object describing how to access the console of an instance.
......
......@@ -703,7 +703,7 @@ class RpcRunner(object):
[self._InstDict(instance), info, target])
@_RpcTimeout(_TMO_NORMAL)
def call_finalize_migration(self, node, instance, info, success):
def call_instance_finalize_migration_dst(self, node, instance, info, success):
"""Finalize any target-node migration specific operation.
This is called both in case of a successful migration and in case of error
......@@ -721,7 +721,7 @@ class RpcRunner(object):
@param success: whether the migration was a success or a failure
"""
return self._SingleNodeCall(node, "finalize_migration",
return self._SingleNodeCall(node, "instance_finalize_migration_dst",
[self._InstDict(instance), info, success])
@_RpcTimeout(_TMO_SLOW)
......@@ -744,6 +744,43 @@ class RpcRunner(object):
return self._SingleNodeCall(node, "instance_migrate",
[self._InstDict(instance), target, live])
@_RpcTimeout(_TMO_SLOW)
def call_instance_finalize_migration_src(self, node, instance, success, live):
"""Finalize the instance migration on the source node.
This is a single-node call.
@type instance: L{objects.Instance}
@param instance: the instance that was migrated
@type success: bool
@param success: whether the migration succeeded or not
@type live: bool
@param live: whether the user requested a live migration or not
"""
return self._SingleNodeCall(node, "instance_finalize_migration_src",
[self._InstDict(instance), success, live])
@_RpcTimeout(_TMO_SLOW)
def call_instance_get_migration_status(self, node, instance):
"""Report migration status.
This is a single-node call that must be executed on the source node.
@type instance: L{objects.Instance}
@param instance: the instance that is being migrated
@rtype: L{objects.MigrationStatus}
@return: the status of the current migration (one of
L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
progress info that can be retrieved from the hypervisor
"""
result = self._SingleNodeCall(node, "instance_get_migration_status",
[self._InstDict(instance)])
if not result.fail_msg and result.payload is not None:
result.payload = objects.MigrationStatus.FromDict(result.payload)
return result
@_RpcTimeout(_TMO_NORMAL)
def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
"""Reboots an instance.
......
......@@ -579,13 +579,13 @@ class NodeHttpServer(http.server.HttpServer):
return backend.AcceptInstance(instance, info, target)
@staticmethod
def perspective_finalize_migration(params):
"""Finalize the instance migration.
def perspective_instance_finalize_migration_dst(params):
"""Finalize the instance migration on the destination node.
"""
instance, info, success = params
instance = objects.Instance.FromDict(instance)
return backend.FinalizeMigration(instance, info, success)
return backend.FinalizeMigrationDst(instance, info, success)
@staticmethod
def perspective_instance_migrate(params):
......@@ -596,6 +596,23 @@ class NodeHttpServer(http.server.HttpServer):
instance = objects.Instance.FromDict(instance)
return backend.MigrateInstance(instance, target, live)
@staticmethod
def perspective_instance_finalize_migration_src(params):
"""Finalize the instance migration on the source node.
"""
instance, success, live = params
instance = objects.Instance.FromDict(instance)
return backend.FinalizeMigrationSource(instance, success, live)
@staticmethod
def perspective_instance_get_migration_status(params):
"""Reports migration status.
"""
instance = objects.Instance.FromDict(params[0])
return backend.GetMigrationStatus(instance).ToDict()
@staticmethod
def perspective_instance_reboot(params):
"""Reboot an instance.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment