Commit eb630f50 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Put common import/export daemon options into object



The X509 key name and CA are passed from cmdlib all the way to
the backend import/export daemon. With the addition of an option
to choose the compression method, another parameter would have
to be passed all the way. By moving these options to a separate
object, adding new ones will become much easier.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 7e3c1da6
......@@ -847,23 +847,27 @@ class NodeHttpServer(http.server.HttpServer):
"""Starts an import daemon.
"""
(x509_key_name, source_x509_ca, instance, dest, dest_args) = params
return backend.StartImportExportDaemon(constants.IEM_IMPORT,
x509_key_name, source_x509_ca,
(opts_s, instance, dest, dest_args) = params
opts = objects.ImportExportOptions.FromDict(opts_s)
return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
None, None,
objects.Instance.FromDict(instance),
dest,
_DecodeImportExportIO(dest,
dest_args))
@staticmethod
def perspective_export_start(params):
"""Starts an export daemon.
"""
(x509_key_name, dest_x509_ca, host, port, instance,
source, source_args) = params
return backend.StartImportExportDaemon(constants.IEM_EXPORT,
x509_key_name, dest_x509_ca,
(opts_s, host, port, instance, source, source_args) = params
opts = objects.ImportExportOptions.FromDict(opts_s)
return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
host, port,
objects.Instance.FromDict(instance),
source,
......
......@@ -2617,15 +2617,12 @@ def _CreateImportExportStatusDir(prefix):
(prefix, utils.TimestampForFilename())))
def StartImportExportDaemon(mode, key_name, ca, host, port, instance,
ieio, ieioargs):
def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
"""Starts an import or export daemon.
@param mode: Import/output mode
@type key_name: string
@param key_name: RSA key name (None to use cluster certificate)
@type ca: string:
@param ca: Remote CA in PEM format (None to use cluster certificate)
@type opts: L{objects.ImportExportOptions}
@param opts: Daemon options
@type host: string
@param host: Remote host for export (None for import)
@type port: int
......@@ -2651,21 +2648,21 @@ def StartImportExportDaemon(mode, key_name, ca, host, port, instance,
else:
_Fail("Invalid mode %r", mode)
if (key_name is None) ^ (ca is None):
if (opts.key_name is None) ^ (opts.ca_pem is None):
_Fail("Cluster certificate can only be used for both key and CA")
(cmd_env, cmd_prefix, cmd_suffix) = \
_GetImportExportIoCommand(instance, mode, ieio, ieioargs)
if key_name is None:
if opts.key_name is None:
# Use server.pem
key_path = constants.NODED_CERT_FILE
cert_path = constants.NODED_CERT_FILE
assert ca is None
assert opts.ca_pem is None
else:
(_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
key_name)
assert ca is not None
opts.key_name)
assert opts.ca_pem is not None
for i in [key_path, cert_path]:
if not os.path.exists(i):
......@@ -2677,10 +2674,13 @@ def StartImportExportDaemon(mode, key_name, ca, host, port, instance,
pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
if ca is None:
if opts.ca_pem is None:
# Use server.pem
ca = utils.ReadFile(constants.NODED_CERT_FILE)
else:
ca = opts.ca_pem
# Write CA file
utils.WriteFile(ca_file, data=ca, mode=0400)
cmd = [
......
......@@ -9190,9 +9190,15 @@ class LUExportInstance(LogicalUnit):
timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
(key_name, _, _) = self.x509_key_name
(fin_resu, dresults) = helper.RemoteExport(key_name,
self.dest_x509_ca,
self.op.target_node,
dest_ca_pem = \
OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
self.dest_x509_ca)
opts = objects.ImportExportOptions(key_name=key_name,
ca_pem=dest_ca_pem)
(fin_resu, dresults) = helper.RemoteExport(opts, self.op.target_node,
timeouts)
finally:
helper.Cleanup()
......
......@@ -121,17 +121,15 @@ def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
class _DiskImportExportBase(object):
MODE_TEXT = None
def __init__(self, lu, node_name, x509_key_name, remote_x509_ca,
def __init__(self, lu, node_name, opts,
instance, timeouts, cbs, private=None):
"""Initializes this class.
@param lu: Logical unit instance
@type node_name: string
@param node_name: Node name for import
@type x509_key_name: string
@param x509_key_name: Name of X509 key (None for node daemon key)
@type remote_x509_ca: string
@param remote_x509_ca: Remote peer's CA (None for node daemon certificate)
@type opts: L{objects.ImportExportOptions}
@param opts: Import/export daemon options
@type instance: L{objects.Instance}
@param instance: Instance object
@type timeouts: L{ImportExportTimeouts}
......@@ -145,8 +143,7 @@ class _DiskImportExportBase(object):
self._lu = lu
self.node_name = node_name
self._x509_key_name = x509_key_name
self._remote_x509_ca = remote_x509_ca
self._opts = opts
self._instance = instance
self._timeouts = timeouts
self._cbs = cbs
......@@ -433,17 +430,15 @@ class _DiskImportExportBase(object):
class DiskImport(_DiskImportExportBase):
MODE_TEXT = "import"
def __init__(self, lu, node_name, x509_key_name, source_x509_ca, instance,
def __init__(self, lu, node_name, opts, instance,
dest, dest_args, timeouts, cbs, private=None):
"""Initializes this class.
@param lu: Logical unit instance
@type node_name: string
@param node_name: Node name for import
@type x509_key_name: string
@param x509_key_name: Name of X509 key (None for node daemon key)
@type source_x509_ca: string
@param source_x509_ca: Remote peer's CA (None for node daemon certificate)
@type opts: L{objects.ImportExportOptions}
@param opts: Import/export daemon options
@type instance: L{objects.Instance}
@param instance: Instance object
@param dest: I/O destination
......@@ -455,8 +450,7 @@ class DiskImport(_DiskImportExportBase):
@param private: Private data for callback functions
"""
_DiskImportExportBase.__init__(self, lu, node_name,
x509_key_name, source_x509_ca,
_DiskImportExportBase.__init__(self, lu, node_name, opts,
instance, timeouts, cbs, private)
self._dest = dest
self._dest_args = dest_args
......@@ -478,9 +472,8 @@ class DiskImport(_DiskImportExportBase):
"""Starts the import daemon.
"""
return self._lu.rpc.call_import_start(self.node_name,
self._x509_key_name,
self._remote_x509_ca, self._instance,
return self._lu.rpc.call_import_start(self.node_name, self._opts,
self._instance,
self._dest, self._dest_args)
def CheckListening(self):
......@@ -526,7 +519,7 @@ class DiskImport(_DiskImportExportBase):
class DiskExport(_DiskImportExportBase):
MODE_TEXT = "export"
def __init__(self, lu, node_name, x509_key_name, dest_x509_ca,
def __init__(self, lu, node_name, opts,
dest_host, dest_port, instance, source, source_args,
timeouts, cbs, private=None):
"""Initializes this class.
......@@ -534,10 +527,8 @@ class DiskExport(_DiskImportExportBase):
@param lu: Logical unit instance
@type node_name: string
@param node_name: Node name for import
@type x509_key_name: string
@param x509_key_name: Name of X509 key (None for node daemon key)
@type dest_x509_ca: string
@param dest_x509_ca: Remote peer's CA (None for node daemon certificate)
@type opts: L{objects.ImportExportOptions}
@param opts: Import/export daemon options
@type dest_host: string
@param dest_host: Destination host name or IP address
@type dest_port: number
......@@ -553,8 +544,7 @@ class DiskExport(_DiskImportExportBase):
@param private: Private data for callback functions
"""
_DiskImportExportBase.__init__(self, lu, node_name,
x509_key_name, dest_x509_ca,
_DiskImportExportBase.__init__(self, lu, node_name, opts,
instance, timeouts, cbs, private)
self._dest_host = dest_host
self._dest_port = dest_port
......@@ -565,8 +555,7 @@ class DiskExport(_DiskImportExportBase):
"""Starts the export daemon.
"""
return self._lu.rpc.call_export_start(self.node_name, self._x509_key_name,
self._remote_x509_ca,
return self._lu.rpc.call_export_start(self.node_name, self._opts,
self._dest_host, self._dest_port,
self._instance, self._source,
self._source_args)
......@@ -819,10 +808,11 @@ class _TransferInstDestCb(_TransferInstCbBase):
self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
opts = objects.ImportExportOptions(key_name=None, ca_pem=None)
# Start export on source node
de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip,
ie.listen_port, self.instance,
dtp.data.src_io, dtp.data.src_ioargs,
de = DiskExport(self.lu, self.src_node, opts, self.dest_ip, ie.listen_port,
self.instance, dtp.data.src_io, dtp.data.src_ioargs,
self.timeouts, self.src_cbs, private=dtp)
ie.loop.Add(de)
......@@ -924,6 +914,7 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
each transfer
"""
opts = objects.ImportExportOptions(key_name=None, ca_pem=None)
timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
src_node, None, dest_node, dest_ip)
......@@ -941,7 +932,7 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
dtp = _DiskTransferPrivate(transfer, True)
di = DiskImport(lu, dest_node, None, None, instance,
di = DiskImport(lu, dest_node, opts, instance,
transfer.dest_io, transfer.dest_ioargs,
timeouts, dest_cbs, private=dtp)
ieloop.Add(di)
......@@ -1131,13 +1122,11 @@ class ExportInstanceHelper:
return (fin_resu, dresults)
def RemoteExport(self, x509_key_name, dest_x509_ca, disk_info, timeouts):
def RemoteExport(self, opts, disk_info, timeouts):
"""Inter-cluster instance export.
@type x509_key_name: string
@param x509_key_name: X509 key name for encrypting data
@type dest_x509_ca: OpenSSL.crypto.X509
@param dest_x509_ca: Remote peer X509 CA object
@type opts: L{objects.ImportExportOptions}
@param opts: Import/export daemon options
@type disk_info: list
@param disk_info: Per-disk destination information
@type timeouts: L{ImportExportTimeouts}
......@@ -1150,9 +1139,6 @@ class ExportInstanceHelper:
cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
dest_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
dest_x509_ca)
ieloop = ImportExportLoop(self._lu)
try:
for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks,
......@@ -1160,7 +1146,7 @@ class ExportInstanceHelper:
self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
finished_fn = compat.partial(self._TransferFinished, idx)
ieloop.Add(DiskExport(self._lu, instance.primary_node,
x509_key_name, dest_ca_pem, host, port, instance,
opts, host, port, instance,
constants.IEIO_SCRIPT, (dev, idx),
timeouts, cbs, private=(idx, finished_fn)))
......@@ -1316,6 +1302,10 @@ def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
x509_cert_pem)
# Import daemon options
opts = objects.ImportExportOptions(key_name=x509_key_name,
ca_pem=source_ca_pem)
# Sign certificate
signed_x509_cert_pem = \
utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
......@@ -1326,8 +1316,7 @@ def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
ieloop = ImportExportLoop(lu)
try:
for idx, dev in enumerate(instance.disks):
ieloop.Add(DiskImport(lu, instance.primary_node,
x509_key_name, source_ca_pem, instance,
ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
constants.IEIO_SCRIPT, (dev, idx),
timeouts, cbs, private=(idx, )))
......
......@@ -1022,6 +1022,19 @@ class ImportExportStatus(ConfigObject):
] + _TIMESTAMPS
class ImportExportOptions(ConfigObject):
"""Options for import/export daemon
@ivar key_name: X509 key name (None for cluster certificate)
@ivar ca_pem: Remote peer CA in PEM format (None for cluster certificate)
"""
__slots__ = [
"key_name",
"ca_pem",
]
class ConfdRequest(ConfigObject):
"""Object holding a confd request.
......
......@@ -1202,8 +1202,7 @@ class RpcRunner(object):
"""
return self._SingleNodeCall(node, "x509_cert_remove", [name])
def call_import_start(self, node, x509_key_name, source_x509_ca,
instance, dest, dest_args):
def call_import_start(self, node, opts, instance, dest, dest_args):
"""Starts a listener for an import.
This is a single-node call.
......@@ -1215,11 +1214,11 @@ class RpcRunner(object):
"""
return self._SingleNodeCall(node, "import_start",
[x509_key_name, source_x509_ca,
[opts.ToDict(),
self._InstDict(instance), dest,
_EncodeImportExportIO(dest, dest_args)])
def call_export_start(self, node, x509_key_name, dest_x509_ca, host, port,
def call_export_start(self, node, opts, host, port,
instance, source, source_args):
"""Starts an export daemon.
......@@ -1232,7 +1231,7 @@ class RpcRunner(object):
"""
return self._SingleNodeCall(node, "export_start",
[x509_key_name, dest_x509_ca, host, port,
[opts.ToDict(), host, port,
self._InstDict(instance), source,
_EncodeImportExportIO(source, source_args)])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment