Commit 4a96f1d1 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Implement opcode changes for remote-export


Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 1410fa8d
......@@ -8903,23 +8903,41 @@ class LUExportInstance(LogicalUnit):
self.remove_instance = getattr(self.op, "remove_instance", False)
self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures",
False)
self.export_mode = getattr(self.op, "mode", constants.EXPORT_MODE_LOCAL)
self.x509_key_name = getattr(self.op, "x509_key_name", None)
self.dest_x509_ca_pem = getattr(self.op, "destination_x509_ca", None)
if self.remove_instance and not self.op.shutdown:
raise errors.OpPrereqError("Can not remove instance without shutting it"
" down before")
if self.export_mode not in constants.EXPORT_MODES:
raise errors.OpPrereqError("Invalid export mode %r" % self.export_mode,
errors.ECODE_INVAL)
if self.export_mode == constants.EXPORT_MODE_REMOTE:
if not self.x509_key_name:
raise errors.OpPrereqError("Missing X509 key name for encryption",
errors.ECODE_INVAL)
if not self.dest_x509_ca_pem:
raise errors.OpPrereqError("Missing destination X509 CA",
errors.ECODE_INVAL)
def ExpandNames(self):
self._ExpandAndLockInstance()
# FIXME: lock only instance primary and destination node
#
# Sad but true, for now we have do lock all nodes, as we don't know where
# the previous export might be, and and in this LU we search for it and
# remove it from its current node. In the future we could fix this by:
# - making a tasklet to search (share-lock all), then create the new one,
# then one to remove, after
# - removing the removal operation altogether
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
# Lock all nodes for local exports
if self.export_mode == constants.EXPORT_MODE_LOCAL:
# FIXME: lock only instance primary and destination node
#
# Sad but true, for now we have do lock all nodes, as we don't know where
# the previous export might be, and in this LU we search for it and
# remove it from its current node. In the future we could fix this by:
# - making a tasklet to search (share-lock all), then create the new one,
# then one to remove, after
# - removing the removal operation altogether
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
def DeclareLocks(self, level):
"""Last minute lock declaration."""
......@@ -8932,15 +8950,21 @@ class LUExportInstance(LogicalUnit):
"""
env = {
"EXPORT_MODE": self.export_mode,
"EXPORT_NODE": self.op.target_node,
"EXPORT_DO_SHUTDOWN": self.op.shutdown,
"SHUTDOWN_TIMEOUT": self.shutdown_timeout,
# TODO: Generic function for boolean env variables
"REMOVE_INSTANCE": str(bool(self.remove_instance)),
}
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
self.op.target_node]
nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
if self.export_mode == constants.EXPORT_MODE_LOCAL:
nl.append(self.op.target_node)
return env, nl, nl
def CheckPrereq(self):
......@@ -8950,17 +8974,70 @@ class LUExportInstance(LogicalUnit):
"""
instance_name = self.op.instance_name
self.instance = self.cfg.GetInstanceInfo(instance_name)
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
_CheckNodeOnline(self, self.instance.primary_node)
self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
assert self.dst_node is not None
if self.export_mode == constants.EXPORT_MODE_LOCAL:
self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
assert self.dst_node is not None
_CheckNodeOnline(self, self.dst_node.name)
_CheckNodeNotDrained(self, self.dst_node.name)
self._cds = None
self.dest_x509_ca = None
elif self.export_mode == constants.EXPORT_MODE_REMOTE:
self.dst_node = None
if len(self.op.target_node) != len(self.instance.disks):
raise errors.OpPrereqError(("Received destination information for %s"
" disks, but instance %s has %s disks") %
(len(self.op.target_node), instance_name,
len(self.instance.disks)),
errors.ECODE_INVAL)
cds = _GetClusterDomainSecret()
# Check X509 key name
try:
(key_name, hmac_digest, hmac_salt) = self.x509_key_name
except (TypeError, ValueError), err:
raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err)
_CheckNodeOnline(self, self.dst_node.name)
_CheckNodeNotDrained(self, self.dst_node.name)
if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
raise errors.OpPrereqError("HMAC for X509 key name is wrong",
errors.ECODE_INVAL)
# Load and verify CA
try:
(cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
except OpenSSL.crypto.Error, err:
raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
(err, ), errors.ECODE_INVAL)
(errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
if errcode is not None:
raise errors.OpPrereqError("Invalid destination X509 CA (%s)" % (msg, ),
errors.ECODE_INVAL)
self.dest_x509_ca = cert
# Verify target information
for idx, disk_data in enumerate(self.op.target_node):
try:
masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
except errors.GenericError, err:
raise errors.OpPrereqError("Target info for disk %s: %s" % (idx, err),
errors.ECODE_INVAL)
else:
raise errors.ProgrammerError("Unhandled export mode %r" %
self.export_mode)
# instance disk type verification
# TODO: Implement export support for file-based disks
......@@ -8976,6 +9053,8 @@ class LUExportInstance(LogicalUnit):
exports will be removed from the nodes A, B and D.
"""
assert self.export_mode != constants.EXPORT_MODE_REMOTE
nodelist = self.cfg.GetNodeList()
nodelist.remove(self.dst_node.name)
......@@ -8999,6 +9078,8 @@ class LUExportInstance(LogicalUnit):
"""Export an instance to an image in the cluster.
"""
assert self.export_mode in constants.EXPORT_MODES
instance = self.instance
src_node = instance.primary_node
......@@ -9029,7 +9110,17 @@ class LUExportInstance(LogicalUnit):
helper.CreateSnapshots()
try:
(fin_resu, dresults) = helper.LocalExport(self.dst_node)
if self.export_mode == constants.EXPORT_MODE_LOCAL:
(fin_resu, dresults) = helper.LocalExport(self.dst_node)
elif self.export_mode == constants.EXPORT_MODE_REMOTE:
connect_timeout = constants.RIE_CONNECT_TIMEOUT
timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
(key_name, _, _) = self.x509_key_name
(fin_resu, dresults) = helper.RemoteExport(key_name,
self.dest_x509_ca,
self.op.target_node,
timeouts)
finally:
helper.Cleanup()
......@@ -9053,7 +9144,8 @@ class LUExportInstance(LogicalUnit):
_RemoveInstance(self, feedback_fn, instance,
self.ignore_remove_failures)
self._CleanupExports(feedback_fn)
if self.export_mode == constants.EXPORT_MODE_LOCAL:
self._CleanupExports(feedback_fn)
return fin_resu, dresults
......
......@@ -328,6 +328,14 @@ REPLACE_DISK_SEC = "replace_on_secondary" # replace disks on secondary
REPLACE_DISK_CHG = "replace_new_secondary" # change secondary node
REPLACE_DISK_AUTO = "replace_auto"
# Instance export mode
EXPORT_MODE_LOCAL = "local"
EXPORT_MODE_REMOTE = "remote"
EXPORT_MODES = frozenset([
EXPORT_MODE_LOCAL,
EXPORT_MODE_REMOTE,
])
# lock recalculate mode
LOCKS_REPLACE = 'replace'
LOCKS_APPEND = 'append'
......@@ -343,6 +351,9 @@ RIE_HANDSHAKE = "Hi, I'm Ganeti"
# Remote import/export certificate validity in seconds
RIE_CERT_VALIDITY = 24 * 60 * 60
# Remote import/export connect timeout for socat
RIE_CONNECT_TIMEOUT = 60
DISK_TEMPLATES = frozenset([DT_DISKLESS, DT_PLAIN,
DT_DRBD8, DT_FILE])
......
......@@ -25,6 +25,7 @@
import logging
import time
import OpenSSL
from ganeti import constants
from ganeti import errors
......@@ -966,6 +967,48 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
return [bool(dtp.success) for dtp in all_dtp]
class _RemoteExportCb(ImportExportCbBase):
def __init__(self, feedback_fn, disk_count):
"""Initializes this class.
"""
ImportExportCbBase.__init__(self)
self._feedback_fn = feedback_fn
self._dresults = [None] * disk_count
@property
def disk_results(self):
"""Returns per-disk results.
"""
return self._dresults
def ReportConnected(self, ie, private):
"""Called when a connection has been established.
"""
(idx, _) = private
self._feedback_fn("Disk %s is now sending data" % idx)
def ReportFinished(self, ie, private):
"""Called when a transfer has finished.
"""
(idx, finished_fn) = private
if ie.success:
self._feedback_fn("Disk %s finished sending data" % idx)
else:
self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
(idx, ie.final_message, ie.recent_output))
self._dresults[idx] = bool(ie.success)
if finished_fn:
finished_fn()
class ExportInstanceHelper:
def __init__(self, lu, feedback_fn, instance):
"""Initializes this class.
......@@ -1088,6 +1131,45 @@ class ExportInstanceHelper:
return (fin_resu, dresults)
def RemoteExport(self, x509_key_name, dest_x509_ca, disk_info, timeouts):
"""Inter-cluster instance export.
@type x509_key_name: string
@param x509_key_name: X509 key name for encrypting data
@type dest_x509_ca: OpenSSL.crypto.X509
@param dest_x509_ca: Remote peer X509 CA object
@type disk_info: list
@param disk_info: Per-disk destination information
@type timeouts: L{ImportExportTimeouts}
@param timeouts: Timeouts for this import
"""
instance = self._instance
assert len(disk_info) == len(instance.disks)
cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
dest_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
dest_x509_ca)
ieloop = ImportExportLoop(self._lu)
try:
for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks,
disk_info)):
self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
finished_fn = compat.partial(self._TransferFinished, idx)
ieloop.Add(DiskExport(self._lu, instance.primary_node,
x509_key_name, dest_ca_pem, host, port, instance,
constants.IEIO_SCRIPT, (dev, idx),
timeouts, cbs, private=(idx, finished_fn)))
ieloop.Run()
finally:
ieloop.FinalizeAll()
return (True, cbs.disk_results)
def _TransferFinished(self, idx):
"""Called once a transfer has finished.
......@@ -1152,3 +1234,64 @@ def CheckRemoteExportHandshake(cds, handshake):
(constants.RIE_VERSION, version))
return None
def _GetRieDiskInfoMessage(disk_index, host, port):
"""Returns the hashed text for import/export disk information.
@type disk_index: number
@param disk_index: Index of disk (included in hash)
@type host: string
@param host: Hostname
@type port: number
@param port: Daemon port
"""
return "%s:%s:%s" % (disk_index, host, port)
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
"""Verifies received disk information for an export.
@type cds: string
@param cds: Cluster domain secret
@type disk_index: number
@param disk_index: Index of disk (included in hash)
@type disk_info: sequence
@param disk_info: Disk information sent by remote peer
"""
try:
(host, port, hmac_digest, hmac_salt) = disk_info
except (TypeError, ValueError), err:
raise errors.GenericError("Invalid data: %s" % err)
if not (host and port):
raise errors.GenericError("Missing destination host or port")
msg = _GetRieDiskInfoMessage(disk_index, host, port)
if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
raise errors.GenericError("HMAC is wrong")
return (host, port)
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
"""Computes the signed disk information for a remote import.
@type cds: string
@param cds: Cluster domain secret
@type salt: string
@param salt: HMAC salt
@type disk_index: number
@param disk_index: Index of disk (included in hash)
@type host: string
@param host: Hostname
@type port: number
@param port: Daemon port
"""
msg = _GetRieDiskInfoMessage(disk_index, host, port)
hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
return (host, port, hmac_digest, salt)
......@@ -668,13 +668,32 @@ class OpPrepareExport(OpCode):
class OpExportInstance(OpCode):
"""Export an instance."""
"""Export an instance.
For local exports, the export destination is the node name. For remote
exports, the export destination is a list of tuples, each consisting of
hostname/IP address, port, HMAC and HMAC salt. The HMAC is calculated using
the cluster domain secret over the value "${index}:${hostname}:${port}". The
destination X509 CA must be a signed certificate.
@ivar mode: Export mode (one of L{constants.EXPORT_MODES})
@ivar target_node: Export destination
@ivar x509_key_name: X509 key to use (remote export only)
@ivar destination_x509_ca: Destination X509 CA in PEM format (remote export
only)
"""
OP_ID = "OP_BACKUP_EXPORT"
OP_DSC_FIELD = "instance_name"
__slots__ = [
# TODO: Rename target_node as it changes meaning for different export modes
# (e.g. "destination")
"instance_name", "target_node", "shutdown", "shutdown_timeout",
"remove_instance",
"ignore_remove_failures",
"mode",
"x509_key_name",
"destination_x509_ca",
]
......
......@@ -26,12 +26,14 @@ import sys
import unittest
from ganeti import constants
from ganeti import errors
from ganeti import utils
from ganeti import masterd
from ganeti.masterd.instance import \
ImportExportTimeouts, _TimeoutExpired, _DiskImportExportBase, \
ComputeRemoteExportHandshake, CheckRemoteExportHandshake
ComputeRemoteExportHandshake, CheckRemoteExportHandshake, \
ComputeRemoteImportDiskInfo, CheckRemoteExportDiskInfo
import testutils
......@@ -86,5 +88,36 @@ class TestRieHandshake(unittest.TestCase):
self.assert_(CheckRemoteExportHandshake(cds, hs))
class TestRieDiskInfo(unittest.TestCase):
def test(self):
cds = "bbf46ea9a"
salt = "ee5ad9"
di = ComputeRemoteImportDiskInfo(cds, salt, 0, "node1", 1234)
self.assertEqual(CheckRemoteExportDiskInfo(cds, 0, di),
("node1", 1234))
for i in range(1, 100):
# Wrong disk index
self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
cds, i, di)
def testCheckErrors(self):
cds = "0776450535a"
self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
cds, 0, "")
self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
cds, 0, ())
self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
cds, 0, ("", 1, 2, 3, 4, 5))
# No host/port
self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
cds, 0, ("", 0, "", ""))
# Wrong hash
self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
cds, 0, ("nodeX", 123, "fakehash", "xyz"))
if __name__ == "__main__":
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment