Commit 9bf56d77 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Implement opcode changes for remote-import


Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 4a96f1d1
......@@ -6103,8 +6103,7 @@ class LUCreateInstance(LogicalUnit):
self.adopt_disks = has_adopt
# verify creation mode
if self.op.mode not in (constants.INSTANCE_CREATE,
constants.INSTANCE_IMPORT):
if self.op.mode not in constants.INSTANCE_CREATE_MODES:
raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
self.op.mode, errors.ECODE_INVAL)
......@@ -6114,6 +6113,9 @@ class LUCreateInstance(LogicalUnit):
self.op.instance_name = self.hostname1.name
# used in CheckPrereq for ip ping check
self.check_ip = self.hostname1.ip
elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
raise errors.OpPrereqError("Remote imports require names to be checked" %
errors.ECODE_INVAL)
else:
self.check_ip = None
......@@ -6133,6 +6135,8 @@ class LUCreateInstance(LogicalUnit):
" node must be given",
errors.ECODE_INVAL)
self._cds = _GetClusterDomainSecret()
if self.op.mode == constants.INSTANCE_IMPORT:
# On import force_variant must be True, because if we forced it at
# initial install, our only chance when importing it back is that it
......@@ -6142,7 +6146,7 @@ class LUCreateInstance(LogicalUnit):
if self.op.no_install:
self.LogInfo("No-installation mode has no effect during import")
else: # INSTANCE_CREATE
elif self.op.mode == constants.INSTANCE_CREATE:
if getattr(self.op, "os_type", None) is None:
raise errors.OpPrereqError("No guest OS specified",
errors.ECODE_INVAL)
......@@ -6151,6 +6155,51 @@ class LUCreateInstance(LogicalUnit):
raise errors.OpPrereqError("No disk template specified",
errors.ECODE_INVAL)
elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
# Check handshake to ensure both clusters have the same domain secret
src_handshake = getattr(self.op, "source_handshake", None)
if not src_handshake:
raise errors.OpPrereqError("Missing source handshake",
errors.ECODE_INVAL)
errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds,
src_handshake)
if errmsg:
raise errors.OpPrereqError("Invalid handshake: %s" % errmsg,
errors.ECODE_INVAL)
# Load and check source CA
self.source_x509_ca_pem = getattr(self.op, "source_x509_ca", None)
if not self.source_x509_ca_pem:
raise errors.OpPrereqError("Missing source X509 CA",
errors.ECODE_INVAL)
try:
(cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem,
self._cds)
except OpenSSL.crypto.Error, err:
raise errors.OpPrereqError("Unable to load source X509 CA (%s)" %
(err, ), errors.ECODE_INVAL)
(errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
if errcode is not None:
raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ),
errors.ECODE_INVAL)
self.source_x509_ca = cert
src_instance_name = getattr(self.op, "source_instance_name", None)
if not src_instance_name:
raise errors.OpPrereqError("Missing source instance name",
errors.ECODE_INVAL)
self.source_instance_name = \
utils.GetHostInfo(utils.HostInfo.NormalizeName(src_instance_name)).name
else:
raise errors.OpPrereqError("Invalid instance creation mode %r" %
self.op.mode, errors.ECODE_INVAL)
def ExpandNames(self):
"""ExpandNames for CreateInstance.
......@@ -6829,6 +6878,30 @@ class LUCreateInstance(LogicalUnit):
self.LogWarning("Some disks for instance %s on node %s were not"
" imported successfully" % (instance, pnode_name))
elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
feedback_fn("* preparing remote import...")
connect_timeout = constants.RIE_CONNECT_TIMEOUT
timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
disk_results = masterd.instance.RemoteImport(self, feedback_fn, iobj,
self.source_x509_ca,
self._cds, timeouts)
if not compat.all(disk_results):
# TODO: Should the instance still be started, even if some disks
# failed to import (valid for local imports, too)?
self.LogWarning("Some disks for instance %s on node %s were not"
" imported successfully" % (instance, pnode_name))
# Run rename script on newly imported instance
assert iobj.name == instance
feedback_fn("Running rename script for %s" % instance)
result = self.rpc.call_instance_run_rename(pnode_name, iobj,
self.source_instance_name,
self.op.debug_level)
if result.fail_msg:
self.LogWarning("Failed to run rename script for %s on node"
" %s: %s" % (instance, pnode_name, result.fail_msg))
else:
# also checked in the prereq part
raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
......
......@@ -343,6 +343,12 @@ LOCKS_APPEND = 'append'
# instance creation modes
INSTANCE_CREATE = "create"
INSTANCE_IMPORT = "import"
INSTANCE_REMOTE_IMPORT = "remote-import"
INSTANCE_CREATE_MODES = frozenset([
INSTANCE_CREATE,
INSTANCE_IMPORT,
INSTANCE_REMOTE_IMPORT,
])
# Remote import/export handshake message and version
RIE_VERSION = 0
......@@ -703,6 +709,7 @@ OPS_FINALIZED = frozenset([OP_STATUS_CANCELED,
# Execution log types
ELOG_MESSAGE = "message"
ELOG_PROGRESS = "progress"
ELOG_REMOTE_IMPORT = "remote-import"
# max dynamic devices
MAX_NICS = 8
......
......@@ -1189,6 +1189,159 @@ class ExportInstanceHelper:
self._RemoveSnapshot(idx)
class _RemoteImportCb(ImportExportCbBase):
def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
external_address):
"""Initializes this class.
@type cds: string
@param cds: Cluster domain secret
@type x509_cert_pem: string
@param x509_cert_pem: CA used for signing import key
@type disk_count: number
@param disk_count: Number of disks
@type external_address: string
@param external_address: External address of destination node
"""
ImportExportCbBase.__init__(self)
self._feedback_fn = feedback_fn
self._cds = cds
self._x509_cert_pem = x509_cert_pem
self._disk_count = disk_count
self._external_address = external_address
self._dresults = [None] * disk_count
self._daemon_port = [None] * disk_count
self._salt = utils.GenerateSecret(8)
@property
def disk_results(self):
"""Returns per-disk results.
"""
return self._dresults
def _CheckAllListening(self):
"""Checks whether all daemons are listening.
If all daemons are listening, the information is sent to the client.
"""
if not compat.all(dp is not None for dp in self._daemon_port):
return
host = self._external_address
disks = []
for idx, port in enumerate(self._daemon_port):
disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
idx, host, port))
assert len(disks) == self._disk_count
self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
"disks": disks,
"x509_ca": self._x509_cert_pem,
})
def ReportListening(self, ie, private):
"""Called when daemon started listening.
"""
(idx, ) = private
self._feedback_fn("Disk %s is now listening" % idx)
assert self._daemon_port[idx] is None
self._daemon_port[idx] = ie.listen_port
self._CheckAllListening()
def ReportConnected(self, ie, private):
"""Called when a connection has been established.
"""
(idx, ) = private
self._feedback_fn("Disk %s is now receiving data" % idx)
def ReportFinished(self, ie, private):
"""Called when a transfer has finished.
"""
(idx, ) = private
# Daemon is certainly no longer listening
self._daemon_port[idx] = None
if ie.success:
self._feedback_fn("Disk %s finished receiving data" % idx)
else:
self._feedback_fn(("Disk %s failed to receive data: %s"
" (recent output: %r)") %
(idx, ie.final_message, ie.recent_output))
self._dresults[idx] = bool(ie.success)
def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
"""Imports an instance from another cluster.
@param lu: Logical unit instance
@param feedback_fn: Feedback function
@type instance: L{objects.Instance}
@param instance: Instance object
@type source_x509_ca: OpenSSL.crypto.X509
@param source_x509_ca: Import source's X509 CA
@type cds: string
@param cds: Cluster domain secret
@type timeouts: L{ImportExportTimeouts}
@param timeouts: Timeouts for this import
"""
source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
source_x509_ca)
# Create crypto key
result = lu.rpc.call_x509_cert_create(instance.primary_node,
constants.RIE_CERT_VALIDITY)
result.Raise("Can't create X509 key and certificate on %s" % result.node)
(x509_key_name, x509_cert_pem) = result.payload
try:
# Load certificate
x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
x509_cert_pem)
# Sign certificate
signed_x509_cert_pem = \
utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
len(instance.disks), instance.primary_node)
ieloop = ImportExportLoop(lu)
try:
for idx, dev in enumerate(instance.disks):
ieloop.Add(DiskImport(lu, instance.primary_node,
x509_key_name, source_ca_pem, instance,
constants.IEIO_SCRIPT, (dev, idx),
timeouts, cbs, private=(idx, )))
ieloop.Run()
finally:
ieloop.FinalizeAll()
finally:
# Remove crypto key and certificate
result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
result.Raise("Can't remove X509 key and certificate on %s" % result.node)
return cbs.disk_results
def _GetImportExportHandshakeMessage(version):
"""Returns the handshake message for a RIE protocol version.
......
......@@ -460,7 +460,15 @@ class OpNodeEvacuationStrategy(OpCode):
# instance opcodes
class OpCreateInstance(OpCode):
"""Create an instance."""
"""Create an instance.
@ivar instance_name: Instance name
@ivar mode: Instance creation mode (one of L{constants.INSTANCE_CREATE_MODES})
@ivar source_handshake: Signed handshake from source (remote import only)
@ivar source_x509_ca: Source X509 CA in PEM format (remote import only)
@ivar source_instance_name: Previous name of instance (remote import only)
"""
OP_ID = "OP_INSTANCE_CREATE"
OP_DSC_FIELD = "instance_name"
__slots__ = [
......@@ -473,6 +481,9 @@ class OpCreateInstance(OpCode):
"file_storage_dir", "file_driver",
"iallocator",
"hypervisor", "hvparams", "beparams",
"source_handshake",
"source_x509_ca",
"source_instance_name",
"dry_run",
]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment