From 4a96f1d1d2dfd8336e28654737e9f99bec3e8e01 Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <hansmi@google.com>
Date: Thu, 13 May 2010 16:57:24 +0200
Subject: [PATCH] Implement opcode changes for remote-export

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>
---
 lib/cmdlib.py                            | 128 +++++++++++++++++---
 lib/constants.py                         |  11 ++
 lib/masterd/instance.py                  | 143 +++++++++++++++++++++++
 lib/opcodes.py                           |  21 +++-
 test/ganeti.masterd.instance_unittest.py |  35 +++++-
 5 files changed, 318 insertions(+), 20 deletions(-)

diff --git a/lib/cmdlib.py b/lib/cmdlib.py
index 51514d7e0..f3e065c81 100644
--- a/lib/cmdlib.py
+++ b/lib/cmdlib.py
@@ -8903,23 +8903,41 @@ class LUExportInstance(LogicalUnit):
     self.remove_instance = getattr(self.op, "remove_instance", False)
     self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures",
                                           False)
+    self.export_mode = getattr(self.op, "mode", constants.EXPORT_MODE_LOCAL)
+    self.x509_key_name = getattr(self.op, "x509_key_name", None)
+    self.dest_x509_ca_pem = getattr(self.op, "destination_x509_ca", None)
 
     if self.remove_instance and not self.op.shutdown:
       raise errors.OpPrereqError("Can not remove instance without shutting it"
                                  " down before")
 
+    if self.export_mode not in constants.EXPORT_MODES:
+      raise errors.OpPrereqError("Invalid export mode %r" % self.export_mode,
+                                 errors.ECODE_INVAL)
+
+    if self.export_mode == constants.EXPORT_MODE_REMOTE:
+      if not self.x509_key_name:
+        raise errors.OpPrereqError("Missing X509 key name for encryption",
+                                   errors.ECODE_INVAL)
+
+      if not self.dest_x509_ca_pem:
+        raise errors.OpPrereqError("Missing destination X509 CA",
+                                   errors.ECODE_INVAL)
+
   def ExpandNames(self):
     self._ExpandAndLockInstance()
 
-    # FIXME: lock only instance primary and destination node
-    #
-    # Sad but true, for now we have do lock all nodes, as we don't know where
-    # the previous export might be, and and in this LU we search for it and
-    # remove it from its current node. In the future we could fix this by:
-    #  - making a tasklet to search (share-lock all), then create the new one,
-    #    then one to remove, after
-    #  - removing the removal operation altogether
-    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    # Lock all nodes for local exports
+    if self.export_mode == constants.EXPORT_MODE_LOCAL:
+      # FIXME: lock only instance primary and destination node
+      #
+      # Sad but true, for now we have do lock all nodes, as we don't know where
+      # the previous export might be, and in this LU we search for it and
+      # remove it from its current node. In the future we could fix this by:
+      #  - making a tasklet to search (share-lock all), then create the new one,
+      #    then one to remove, after
+      #  - removing the removal operation altogether
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
 
   def DeclareLocks(self, level):
     """Last minute lock declaration."""
@@ -8932,15 +8950,21 @@ class LUExportInstance(LogicalUnit):
 
     """
     env = {
+      "EXPORT_MODE": self.export_mode,
       "EXPORT_NODE": self.op.target_node,
       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
       "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
       # TODO: Generic function for boolean env variables
       "REMOVE_INSTANCE": str(bool(self.remove_instance)),
       }
+
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
-          self.op.target_node]
+
+    nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
+
+    if self.export_mode == constants.EXPORT_MODE_LOCAL:
+      nl.append(self.op.target_node)
+
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -8950,17 +8974,70 @@ class LUExportInstance(LogicalUnit):
 
     """
     instance_name = self.op.instance_name
+
     self.instance = self.cfg.GetInstanceInfo(instance_name)
     assert self.instance is not None, \
           "Cannot retrieve locked instance %s" % self.op.instance_name
     _CheckNodeOnline(self, self.instance.primary_node)
 
-    self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
-    self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
-    assert self.dst_node is not None
+    if self.export_mode == constants.EXPORT_MODE_LOCAL:
+      self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
+      self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
+      assert self.dst_node is not None
+
+      _CheckNodeOnline(self, self.dst_node.name)
+      _CheckNodeNotDrained(self, self.dst_node.name)
+
+      self._cds = None
+      self.dest_x509_ca = None
+
+    elif self.export_mode == constants.EXPORT_MODE_REMOTE:
+      self.dst_node = None
+
+      if len(self.op.target_node) != len(self.instance.disks):
+        raise errors.OpPrereqError(("Received destination information for %s"
+                                    " disks, but instance %s has %s disks") %
+                                   (len(self.op.target_node), instance_name,
+                                    len(self.instance.disks)),
+                                   errors.ECODE_INVAL)
+
+      cds = _GetClusterDomainSecret()
+
+      # Check X509 key name
+      try:
+        (key_name, hmac_digest, hmac_salt) = self.x509_key_name
+      except (TypeError, ValueError), err:
+        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err)
 
-    _CheckNodeOnline(self, self.dst_node.name)
-    _CheckNodeNotDrained(self, self.dst_node.name)
+      if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
+        raise errors.OpPrereqError("HMAC for X509 key name is wrong",
+                                   errors.ECODE_INVAL)
+
+      # Load and verify CA
+      try:
+        (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
+      except OpenSSL.crypto.Error, err:
+        raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
+                                   (err, ), errors.ECODE_INVAL)
+
+      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
+      if errcode is not None:
+        raise errors.OpPrereqError("Invalid destination X509 CA (%s)" % (msg, ),
+                                   errors.ECODE_INVAL)
+
+      self.dest_x509_ca = cert
+
+      # Verify target information
+      for idx, disk_data in enumerate(self.op.target_node):
+        try:
+          masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
+        except errors.GenericError, err:
+          raise errors.OpPrereqError("Target info for disk %s: %s" % (idx, err),
+                                     errors.ECODE_INVAL)
+
+    else:
+      raise errors.ProgrammerError("Unhandled export mode %r" %
+                                   self.export_mode)
 
     # instance disk type verification
     # TODO: Implement export support for file-based disks
@@ -8976,6 +9053,8 @@ class LUExportInstance(LogicalUnit):
     exports will be removed from the nodes A, B and D.
 
     """
+    assert self.export_mode != constants.EXPORT_MODE_REMOTE
+
     nodelist = self.cfg.GetNodeList()
     nodelist.remove(self.dst_node.name)
 
@@ -8999,6 +9078,8 @@ class LUExportInstance(LogicalUnit):
     """Export an instance to an image in the cluster.
 
     """
+    assert self.export_mode in constants.EXPORT_MODES
+
     instance = self.instance
     src_node = instance.primary_node
 
@@ -9029,7 +9110,17 @@ class LUExportInstance(LogicalUnit):
 
       helper.CreateSnapshots()
       try:
-        (fin_resu, dresults) = helper.LocalExport(self.dst_node)
+        if self.export_mode == constants.EXPORT_MODE_LOCAL:
+          (fin_resu, dresults) = helper.LocalExport(self.dst_node)
+        elif self.export_mode == constants.EXPORT_MODE_REMOTE:
+          connect_timeout = constants.RIE_CONNECT_TIMEOUT
+          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
+
+          (key_name, _, _) = self.x509_key_name
+          (fin_resu, dresults) = helper.RemoteExport(key_name,
+                                                     self.dest_x509_ca,
+                                                     self.op.target_node,
+                                                     timeouts)
       finally:
         helper.Cleanup()
 
@@ -9053,7 +9144,8 @@ class LUExportInstance(LogicalUnit):
         _RemoveInstance(self, feedback_fn, instance,
                         self.ignore_remove_failures)
 
-    self._CleanupExports(feedback_fn)
+    if self.export_mode == constants.EXPORT_MODE_LOCAL:
+      self._CleanupExports(feedback_fn)
 
     return fin_resu, dresults
 
diff --git a/lib/constants.py b/lib/constants.py
index df9a534fa..8d8cb0c48 100644
--- a/lib/constants.py
+++ b/lib/constants.py
@@ -328,6 +328,14 @@ REPLACE_DISK_SEC = "replace_on_secondary"  # replace disks on secondary
 REPLACE_DISK_CHG = "replace_new_secondary" # change secondary node
 REPLACE_DISK_AUTO = "replace_auto"
 
+# Instance export mode
+EXPORT_MODE_LOCAL = "local"
+EXPORT_MODE_REMOTE = "remote"
+EXPORT_MODES = frozenset([
+  EXPORT_MODE_LOCAL,
+  EXPORT_MODE_REMOTE,
+  ])
+
 # lock recalculate mode
 LOCKS_REPLACE = 'replace'
 LOCKS_APPEND = 'append'
@@ -343,6 +351,9 @@ RIE_HANDSHAKE = "Hi, I'm Ganeti"
 # Remote import/export certificate validity in seconds
 RIE_CERT_VALIDITY = 24 * 60 * 60
 
+# Remote import/export connect timeout for socat
+RIE_CONNECT_TIMEOUT = 60
+
 DISK_TEMPLATES = frozenset([DT_DISKLESS, DT_PLAIN,
                             DT_DRBD8, DT_FILE])
 
diff --git a/lib/masterd/instance.py b/lib/masterd/instance.py
index f48e01b13..5e24edf95 100644
--- a/lib/masterd/instance.py
+++ b/lib/masterd/instance.py
@@ -25,6 +25,7 @@
 
 import logging
 import time
+import OpenSSL
 
 from ganeti import constants
 from ganeti import errors
@@ -966,6 +967,48 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
   return [bool(dtp.success) for dtp in all_dtp]
 
 
+class _RemoteExportCb(ImportExportCbBase):
+  def __init__(self, feedback_fn, disk_count):
+    """Initializes this class.
+
+    """
+    ImportExportCbBase.__init__(self)
+    self._feedback_fn = feedback_fn
+    self._dresults = [None] * disk_count
+
+  @property
+  def disk_results(self):
+    """Returns per-disk results.
+
+    """
+    return self._dresults
+
+  def ReportConnected(self, ie, private):
+    """Called when a connection has been established.
+
+    """
+    (idx, _) = private
+
+    self._feedback_fn("Disk %s is now sending data" % idx)
+
+  def ReportFinished(self, ie, private):
+    """Called when a transfer has finished.
+
+    """
+    (idx, finished_fn) = private
+
+    if ie.success:
+      self._feedback_fn("Disk %s finished sending data" % idx)
+    else:
+      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
+                        (idx, ie.final_message, ie.recent_output))
+
+    self._dresults[idx] = bool(ie.success)
+
+    if finished_fn:
+      finished_fn()
+
+
 class ExportInstanceHelper:
   def __init__(self, lu, feedback_fn, instance):
     """Initializes this class.
@@ -1088,6 +1131,45 @@ class ExportInstanceHelper:
 
     return (fin_resu, dresults)
 
+  def RemoteExport(self, x509_key_name, dest_x509_ca, disk_info, timeouts):
+    """Inter-cluster instance export.
+
+    @type x509_key_name: string
+    @param x509_key_name: X509 key name for encrypting data
+    @type dest_x509_ca: OpenSSL.crypto.X509
+    @param dest_x509_ca: Remote peer X509 CA object
+    @type disk_info: list
+    @param disk_info: Per-disk destination information
+    @type timeouts: L{ImportExportTimeouts}
+    @param timeouts: Timeouts for this import
+
+    """
+    instance = self._instance
+
+    assert len(disk_info) == len(instance.disks)
+
+    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
+
+    dest_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                                  dest_x509_ca)
+
+    ieloop = ImportExportLoop(self._lu)
+    try:
+      for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks,
+                                                          disk_info)):
+        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
+        finished_fn = compat.partial(self._TransferFinished, idx)
+        ieloop.Add(DiskExport(self._lu, instance.primary_node,
+                              x509_key_name, dest_ca_pem, host, port, instance,
+                              constants.IEIO_SCRIPT, (dev, idx),
+                              timeouts, cbs, private=(idx, finished_fn)))
+
+      ieloop.Run()
+    finally:
+      ieloop.FinalizeAll()
+
+    return (True, cbs.disk_results)
+
   def _TransferFinished(self, idx):
     """Called once a transfer has finished.
 
@@ -1152,3 +1234,64 @@ def CheckRemoteExportHandshake(cds, handshake):
             (constants.RIE_VERSION, version))
 
   return None
+
+
+def _GetRieDiskInfoMessage(disk_index, host, port):
+  """Returns the hashed text for import/export disk information.
+
+  @type disk_index: number
+  @param disk_index: Index of disk (included in hash)
+  @type host: string
+  @param host: Hostname
+  @type port: number
+  @param port: Daemon port
+
+  """
+  return "%s:%s:%s" % (disk_index, host, port)
+
+
+def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
+  """Verifies received disk information for an export.
+
+  @type cds: string
+  @param cds: Cluster domain secret
+  @type disk_index: number
+  @param disk_index: Index of disk (included in hash)
+  @type disk_info: sequence
+  @param disk_info: Disk information sent by remote peer
+
+  """
+  try:
+    (host, port, hmac_digest, hmac_salt) = disk_info
+  except (TypeError, ValueError), err:
+    raise errors.GenericError("Invalid data: %s" % err)
+
+  if not (host and port):
+    raise errors.GenericError("Missing destination host or port")
+
+  msg = _GetRieDiskInfoMessage(disk_index, host, port)
+
+  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
+    raise errors.GenericError("HMAC is wrong")
+
+  return (host, port)
+
+
+def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
+  """Computes the signed disk information for a remote import.
+
+  @type cds: string
+  @param cds: Cluster domain secret
+  @type salt: string
+  @param salt: HMAC salt
+  @type disk_index: number
+  @param disk_index: Index of disk (included in hash)
+  @type host: string
+  @param host: Hostname
+  @type port: number
+  @param port: Daemon port
+
+  """
+  msg = _GetRieDiskInfoMessage(disk_index, host, port)
+  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
+  return (host, port, hmac_digest, salt)
diff --git a/lib/opcodes.py b/lib/opcodes.py
index 149dbabaa..06ce78df9 100644
--- a/lib/opcodes.py
+++ b/lib/opcodes.py
@@ -668,13 +668,32 @@ class OpPrepareExport(OpCode):
 
 
 class OpExportInstance(OpCode):
-  """Export an instance."""
+  """Export an instance.
+
+  For local exports, the export destination is the node name. For remote
+  exports, the export destination is a list of tuples, each consisting of
+  hostname/IP address, port, HMAC and HMAC salt. The HMAC is calculated using
+  the cluster domain secret over the value "${index}:${hostname}:${port}". The
+  destination X509 CA must be a signed certificate.
+
+  @ivar mode: Export mode (one of L{constants.EXPORT_MODES})
+  @ivar target_node: Export destination
+  @ivar x509_key_name: X509 key to use (remote export only)
+  @ivar destination_x509_ca: Destination X509 CA in PEM format (remote export
+                             only)
+
+  """
   OP_ID = "OP_BACKUP_EXPORT"
   OP_DSC_FIELD = "instance_name"
   __slots__ = [
+    # TODO: Rename target_node as it changes meaning for different export modes
+    # (e.g. "destination")
     "instance_name", "target_node", "shutdown", "shutdown_timeout",
     "remove_instance",
     "ignore_remove_failures",
+    "mode",
+    "x509_key_name",
+    "destination_x509_ca",
     ]
 
 
diff --git a/test/ganeti.masterd.instance_unittest.py b/test/ganeti.masterd.instance_unittest.py
index 0c74defcf..06228b3fe 100755
--- a/test/ganeti.masterd.instance_unittest.py
+++ b/test/ganeti.masterd.instance_unittest.py
@@ -26,12 +26,14 @@ import sys
 import unittest
 
 from ganeti import constants
+from ganeti import errors
 from ganeti import utils
 from ganeti import masterd
 
 from ganeti.masterd.instance import \
   ImportExportTimeouts, _TimeoutExpired, _DiskImportExportBase, \
-  ComputeRemoteExportHandshake, CheckRemoteExportHandshake
+  ComputeRemoteExportHandshake, CheckRemoteExportHandshake, \
+  ComputeRemoteImportDiskInfo, CheckRemoteExportDiskInfo
 
 import testutils
 
@@ -86,5 +88,36 @@ class TestRieHandshake(unittest.TestCase):
     self.assert_(CheckRemoteExportHandshake(cds, hs))
 
 
+class TestRieDiskInfo(unittest.TestCase):
+  def test(self):
+    cds = "bbf46ea9a"
+    salt = "ee5ad9"
+    di = ComputeRemoteImportDiskInfo(cds, salt, 0, "node1", 1234)
+    self.assertEqual(CheckRemoteExportDiskInfo(cds, 0, di),
+                     ("node1", 1234))
+
+    for i in range(1, 100):
+      # Wrong disk index
+      self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
+                        cds, i, di)
+
+  def testCheckErrors(self):
+    cds = "0776450535a"
+    self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
+                      cds, 0, "")
+    self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
+                      cds, 0, ())
+    self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
+                      cds, 0, ("", 1, 2, 3, 4, 5))
+
+    # No host/port
+    self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
+                      cds, 0, ("", 0, "", ""))
+
+    # Wrong hash
+    self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
+                      cds, 0, ("nodeX", 123, "fakehash", "xyz"))
+
+
 if __name__ == "__main__":
   testutils.GanetiTestProgram()
-- 
GitLab