From 1651d116f5596638959a148d5d0f00d2ff750939 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Fri, 16 Apr 2010 14:47:47 +0200 Subject: [PATCH] Add RPC calls to import and export instance data These RPC calls can be used to start, monitor and stop the instance data import/export daemon. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Iustin Pop <iustin@google.com> --- daemons/ganeti-noded | 60 +++++++++ lib/backend.py | 281 +++++++++++++++++++++++++++++++++++++++++++ lib/constants.py | 10 ++ lib/rpc.py | 91 ++++++++++++++ 4 files changed, 442 insertions(+) diff --git a/daemons/ganeti-noded b/daemons/ganeti-noded index 922d31580..7a5ab7a5d 100755 --- a/daemons/ganeti-noded +++ b/daemons/ganeti-noded @@ -93,6 +93,21 @@ def _RequireJobQueueLock(fn): return wrapper +def _DecodeImportExportIO(ieio, ieioargs): + """Decodes import/export I/O information. + + """ + if ieio == constants.IEIO_RAW_DISK: + assert len(ieioargs) == 1 + return (objects.Disk.FromDict(ieioargs[0]), ) + + if ieio == constants.IEIO_SCRIPT: + assert len(ieioargs) == 2 + return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) + + return ieioargs + + class NodeHttpServer(http.server.HttpServer): """The server implementation. @@ -838,6 +853,50 @@ class NodeHttpServer(http.server.HttpServer): (name, ) = params return backend.RemoveX509Certificate(name) + # Import and export + + @staticmethod + def perspective_start_import_listener(params): + """Starts an import daemon. + + """ + (x509_key_name, source_x509_ca, instance, dest, dest_args) = params + return backend.StartImportExportDaemon(constants.IEM_IMPORT, + x509_key_name, source_x509_ca, + None, None, + objects.Instance.FromDict(instance), + dest, + _DecodeImportExportIO(dest, + dest_args)) + @staticmethod + def perspective_start_export(params): + """Starts an export daemon. + + """ + (x509_key_name, dest_x509_ca, host, port, instance, + source, source_args) = params + return backend.StartImportExportDaemon(constants.IEM_EXPORT, + x509_key_name, dest_x509_ca, + host, port, + objects.Instance.FromDict(instance), + source, + _DecodeImportExportIO(source, + source_args)) + + @staticmethod + def perspective_get_import_export_status(params): + """Retrieves the status of an import or export daemon. + + """ + return backend.GetImportExportStatus(params[0]) + + @staticmethod + def perspective_cleanup_import_export(params): + """Cleans up after an import or export. + + """ + return backend.CleanupImportExport(params[0]) + def CheckNoded(_, args): """Initial checks whether to run or exit with a failure. @@ -889,6 +948,7 @@ def main(): dirs.append((constants.LOG_OS_DIR, 0750)) dirs.append((constants.LOCK_DIR, 1777)) dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE)) + dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE)) daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded, default_ssl_cert=constants.NODED_CERT_FILE, default_ssl_key=constants.NODED_CERT_FILE) diff --git a/lib/backend.py b/lib/backend.py index 1271681aa..29a2c5b05 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -56,6 +56,7 @@ from ganeti import constants from ganeti import bdev from ganeti import objects from ganeti import ssconf +from ganeti import serializer _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id" @@ -68,6 +69,9 @@ _ALLOWED_CLEAN_DIRS = frozenset([ _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60 _X509_KEY_FILE = "key" _X509_CERT_FILE = "cert" +_IES_STATUS_FILE = "status" +_IES_PID_FILE = "pid" +_IES_CA_FILE = "ca" class RPCFail(Exception): @@ -832,6 +836,7 @@ def _InstanceLogName(kind, os_name, instance): @param instance: the name of the instance being imported/added/etc. """ + # TODO: Use tempfile.mkstemp to create unique filename base = ("%s-%s-%s-%s.log" % (kind, os_name, instance, utils.TimestampForFilename())) return utils.PathJoin(constants.LOG_OS_DIR, base) @@ -2597,6 +2602,282 @@ def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR): cert_dir, err) +def _GetImportExportIoCommand(instance, mode, ieio, ieargs): + """Returns the command for the requested input/output. + + @type instance: L{objects.Instance} + @param instance: The instance object + @param mode: Import/export mode + @param ieio: Input/output type + @param ieargs: Input/output arguments + + """ + assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT) + + env = None + prefix = None + suffix = None + + if ieio == constants.IEIO_FILE: + (filename, ) = ieargs + + if not utils.IsNormAbsPath(filename): + _Fail("Path '%s' is not normalized or absolute", filename) + + directory = os.path.normpath(os.path.dirname(filename)) + + if (os.path.commonprefix([constants.EXPORT_DIR, directory]) != + constants.EXPORT_DIR): + _Fail("File '%s' is not under exports directory '%s'", + filename, constants.EXPORT_DIR) + + # Create directory + utils.Makedirs(directory, mode=0750) + + quoted_filename = utils.ShellQuote(filename) + + if mode == constants.IEM_IMPORT: + suffix = "> %s" % quoted_filename + elif mode == constants.IEM_EXPORT: + suffix = "< %s" % quoted_filename + + elif ieio == constants.IEIO_RAW_DISK: + (disk, ) = ieargs + + real_disk = _OpenRealBD(disk) + + if mode == constants.IEM_IMPORT: + # we set here a smaller block size as, due to transport buffering, more + # than 64-128k will mostly ignored; we use nocreat to fail if the device + # is not already there or we pass a wrong path; we use notrunc to no + # attempt truncate on an LV device; we use oflag=dsync to not buffer too + # much memory; this means that at best, we flush every 64k, which will + # not be very fast + suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc" + " bs=%s oflag=dsync"), + real_disk.dev_path, + str(64 * 1024)) + + elif mode == constants.IEM_EXPORT: + # the block size on the read dd is 1MiB to match our units + prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |", + real_disk.dev_path, + str(1024 * 1024), # 1 MB + str(disk.size)) + + elif ieio == constants.IEIO_SCRIPT: + (disk, disk_index, ) = ieargs + + assert isinstance(disk_index, (int, long)) + + real_disk = _OpenRealBD(disk) + + inst_os = OSFromDisk(instance.os) + env = OSEnvironment(instance, inst_os) + + if mode == constants.IEM_IMPORT: + env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index] + env["IMPORT_INDEX"] = str(disk_index) + script = inst_os.import_script + + elif mode == constants.IEM_EXPORT: + env["EXPORT_DEVICE"] = real_disk.dev_path + env["EXPORT_INDEX"] = str(disk_index) + script = inst_os.export_script + + # TODO: Pass special environment only to script + script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script) + + if mode == constants.IEM_IMPORT: + suffix = "| %s" % script_cmd + + elif mode == constants.IEM_EXPORT: + prefix = "%s |" % script_cmd + + else: + _Fail("Invalid %s I/O mode %r", mode, ieio) + + return (env, prefix, suffix) + + +def _CreateImportExportStatusDir(prefix): + """Creates status directory for import/export. + + """ + return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR, + prefix=("%s-%s-" % + (prefix, utils.TimestampForFilename()))) + + +def StartImportExportDaemon(mode, key_name, ca, host, port, instance, + ieio, ieioargs): + """Starts an import or export daemon. + + @param mode: Import/output mode + @type key_name: string + @param key_name: RSA key name (None to use cluster certificate) + @type ca: string: + @param ca: Remote CA in PEM format (None to use cluster certificate) + @type host: string + @param host: Remote host for export (None for import) + @type port: int + @param port: Remote port for export (None for import) + @type instance: L{objects.Instance} + @param instance: Instance object + @param ieio: Input/output type + @param ieioargs: Input/output arguments + + """ + if mode == constants.IEM_IMPORT: + prefix = "import" + + if not (host is None and port is None): + _Fail("Can not specify host or port on import") + + elif mode == constants.IEM_EXPORT: + prefix = "export" + + if host is None or port is None: + _Fail("Host and port must be specified for an export") + + else: + _Fail("Invalid mode %r", mode) + + if (key_name is None) ^ (ca is None): + _Fail("Cluster certificate can only be used for both key and CA") + + (cmd_env, cmd_prefix, cmd_suffix) = \ + _GetImportExportIoCommand(instance, mode, ieio, ieioargs) + + if key_name is None: + # Use server.pem + key_path = constants.NODED_CERT_FILE + cert_path = constants.NODED_CERT_FILE + assert ca is None + else: + (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR, + key_name) + assert ca is not None + + status_dir = _CreateImportExportStatusDir(prefix) + try: + status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE) + pid_file = utils.PathJoin(status_dir, _IES_PID_FILE) + + if ca is None: + # Use server.pem + # TODO: If socat runs as a non-root user, this might need to be copied to + # a separate file + ca_path = constants.NODED_CERT_FILE + else: + ca_path = utils.PathJoin(status_dir, _IES_CA_FILE) + utils.WriteFile(ca_path, data=ca, mode=0400) + + cmd = [ + constants.IMPORT_EXPORT_DAEMON, + status_file, mode, + "--key=%s" % key_path, + "--cert=%s" % cert_path, + "--ca=%s" % ca_path, + ] + + if host: + cmd.append("--host=%s" % host) + + if port: + cmd.append("--port=%s" % port) + + if cmd_prefix: + cmd.append("--cmd-prefix=%s" % cmd_prefix) + + if cmd_suffix: + cmd.append("--cmd-suffix=%s" % cmd_suffix) + + logfile = _InstanceLogName(prefix, instance.os, instance.name) + + # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has + # support for receiving a file descriptor for output + utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file, + output=logfile) + + # The import/export name is simply the status directory name + return os.path.basename(status_dir) + + except Exception: + shutil.rmtree(status_dir, ignore_errors=True) + raise + + +def GetImportExportStatus(names): + """Returns import/export daemon status. + + @type names: sequence + @param names: List of names + @rtype: List of dicts + @return: Returns a list of the state of each named import/export or None if a + status couldn't be read + + """ + result = [] + + for name in names: + status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name, + _IES_STATUS_FILE) + + try: + data = utils.ReadFile(status_file) + except EnvironmentError, err: + if err.errno != errno.ENOENT: + raise + data = None + + if not data: + result.append(None) + continue + + result.append(serializer.LoadJson(data)) + + return result + + +def CleanupImportExport(name): + """Cleanup after an import or export. + + If the import/export daemon is still running it's killed. Afterwards the + whole status directory is removed. + + """ + logging.info("Finalizing import/export %s", name) + + status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name) + pid_file = utils.PathJoin(status_dir, _IES_PID_FILE) + + pid = None + try: + fd = os.open(pid_file, os.O_RDONLY) + except EnvironmentError, err: + if err.errno != errno.ENOENT: + raise + # PID file doesn't exist + else: + try: + try: + # Try to acquire lock + utils.LockFile(fd) + except errors.LockError: + # Couldn't lock, daemon is running + pid = int(os.read(fd, 100)) + finally: + os.close(fd) + + if pid: + logging.info("Import/export %s is still running with PID %s", + name, pid) + utils.KillProcess(pid, waitpid=False) + + shutil.rmtree(status_dir, ignore_errors=True) + + def _FindDisks(nodes_ip, disks): """Sets the physical ID on disks and returns the block devices. diff --git a/lib/constants.py b/lib/constants.py index b38a5bdda..6a35a42cc 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -93,6 +93,8 @@ SOCKET_DIR = RUN_GANETI_DIR + "/socket" SOCKET_DIR_MODE = 0700 CRYPTO_KEYS_DIR = RUN_GANETI_DIR + "/crypto" CRYPTO_KEYS_DIR_MODE = 0700 +IMPORT_EXPORT_DIR = RUN_GANETI_DIR + "/import-export" +IMPORT_EXPORT_DIR_MODE = 0755 # keep RUN_GANETI_DIR first here, to make sure all get created when the node # daemon is started (this takes care of RUN_DIR being tmpfs) SUB_RUN_DIRS = [ RUN_GANETI_DIR, BDEV_CACHE_DIR, DISK_LINKS_DIR ] @@ -199,6 +201,14 @@ IMPORT_EXPORT_DAEMON = _autoconf.PKGLIBDIR + "/import-export" IEM_IMPORT = "import" IEM_EXPORT = "export" +# Import/export I/O +# Direct file I/O, equivalent to a shell's I/O redirection using '<' or '>' +IEIO_FILE = "file" +# Raw block device I/O using "dd" +IEIO_RAW_DISK = "raw" +# OS definition import/export script +IEIO_SCRIPT = "script" + VALUE_DEFAULT = "default" VALUE_AUTO = "auto" VALUE_GENERATE = "generate" diff --git a/lib/rpc.py b/lib/rpc.py index 3f1fffb9d..b99b49e48 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -258,6 +258,21 @@ class Client: return results +def _EncodeImportExportIO(ieio, ieioargs): + """Encodes import/export I/O information. + + """ + if ieio == constants.IEIO_RAW_DISK: + assert len(ieioargs) == 1 + return (ieioargs[0].ToDict(), ) + + if ieio == constants.IEIO_SCRIPT: + assert len(ieioargs) == 2 + return (ieioargs[0].ToDict(), ieioargs[1]) + + return ieioargs + + class RpcRunner(object): """RPC runner class""" @@ -1209,3 +1224,79 @@ class RpcRunner(object): """ return self._SingleNodeCall(node, "remove_x509_certificate", [name]) + + def call_start_import_listener(self, node, x509_key_name, source_x509_ca, + instance, dest, dest_args): + """Starts a listener for an import. + + This is a single-node call. + + @type node: string + @param node: Node name + @type instance: C{objects.Instance} + @param instance: Instance object + + """ + return self._SingleNodeCall(node, "start_import_listener", + [x509_key_name, source_x509_ca, + self._InstDict(instance), dest, + _EncodeImportExportIO(dest, dest_args)]) + + def call_start_export(self, node, x509_key_name, dest_x509_ca, host, port, + instance, source, source_args): + """Starts an export daemon. + + This is a single-node call. + + @type node: string + @param node: Node name + @type instance: C{objects.Instance} + @param instance: Instance object + + """ + return self._SingleNodeCall(node, "start_export", + [x509_key_name, dest_x509_ca, host, port, + self._InstDict(instance), source, + _EncodeImportExportIO(source, source_args)]) + + def call_get_import_export_status(self, node, names): + """Gets the status of an import or export. + + This is a single-node call. + + @type node: string + @param node: Node name + @type names: List of strings + @param names: Import/export names + @rtype: List of L{objects.ImportExportStatus} instances + @return: Returns a list of the state of each named import/export or None if + a status couldn't be retrieved + + """ + result = self._SingleNodeCall(node, "get_import_export_status", [names]) + + if not result.fail_msg: + decoded = [] + + for i in result.payload: + if i is None: + decoded.append(None) + continue + decoded.append(objects.ImportExportStatus.FromDict(i)) + + result.payload = decoded + + return result + + def call_cleanup_import_export(self, node, name): + """Cleans up after an import or export. + + This is a single-node call. + + @type node: string + @param node: Node name + @type name: string + @param name: Import/export name + + """ + return self._SingleNodeCall(node, "cleanup_import_export", [name]) -- GitLab