......@@ -93,6 +93,21 @@ def _RequireJobQueueLock(fn):
return wrapper
def _DecodeImportExportIO(ieio, ieioargs):
"""Decodes import/export I/O information.
if ieio == constants.IEIO_RAW_DISK:
assert len(ieioargs) == 1
return (objects.Disk.FromDict(ieioargs[0]), )
if ieio == constants.IEIO_SCRIPT:
assert len(ieioargs) == 2
return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
return ieioargs
class NodeHttpServer(http.server.HttpServer):
"""The server implementation.
......@@ -838,6 +853,50 @@ class NodeHttpServer(http.server.HttpServer):
(name, ) = params
return backend.RemoveX509Certificate(name)
# Import and export
def perspective_start_import_listener(params):
"""Starts an import daemon.
(x509_key_name, source_x509_ca, instance, dest, dest_args) = params
return backend.StartImportExportDaemon(constants.IEM_IMPORT,
x509_key_name, source_x509_ca,
None, None,
def perspective_start_export(params):
"""Starts an export daemon.
(x509_key_name, dest_x509_ca, host, port, instance,
source, source_args) = params
return backend.StartImportExportDaemon(constants.IEM_EXPORT,
x509_key_name, dest_x509_ca,
host, port,
def perspective_get_import_export_status(params):
"""Retrieves the status of an import or export daemon.
return backend.GetImportExportStatus(params[0])
def perspective_cleanup_import_export(params):
"""Cleans up after an import or export.
return backend.CleanupImportExport(params[0])
def CheckNoded(_, args):
"""Initial checks whether to run or exit with a failure.
......@@ -889,6 +948,7 @@ def main():
dirs.append((constants.LOG_OS_DIR, 0750))
dirs.append((constants.LOCK_DIR, 1777))
dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE))
dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
......@@ -56,6 +56,7 @@ from ganeti import constants
from ganeti import bdev
from ganeti import objects
from ganeti import ssconf
from ganeti import serializer
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
......@@ -68,6 +69,9 @@ _ALLOWED_CLEAN_DIRS = frozenset([
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
_X509_KEY_FILE = "key"
_X509_CERT_FILE = "cert"
_IES_STATUS_FILE = "status"
_IES_PID_FILE = "pid"
_IES_CA_FILE = "ca"
class RPCFail(Exception):
......@@ -832,6 +836,7 @@ def _InstanceLogName(kind, os_name, instance):
@param instance: the name of the instance being imported/added/etc.
# TODO: Use tempfile.mkstemp to create unique filename
base = ("%s-%s-%s-%s.log" %
(kind, os_name, instance, utils.TimestampForFilename()))
return utils.PathJoin(constants.LOG_OS_DIR, base)
......@@ -2597,6 +2602,282 @@ def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
cert_dir, err)
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
"""Returns the command for the requested input/output.
@type instance: L{objects.Instance}
@param instance: The instance object
@param mode: Import/export mode
@param ieio: Input/output type
@param ieargs: Input/output arguments
assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
env = None
prefix = None
suffix = None
if ieio == constants.IEIO_FILE:
(filename, ) = ieargs
if not utils.IsNormAbsPath(filename):
_Fail("Path '%s' is not normalized or absolute", filename)
directory = os.path.normpath(os.path.dirname(filename))
if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
_Fail("File '%s' is not under exports directory '%s'",
filename, constants.EXPORT_DIR)
# Create directory
utils.Makedirs(directory, mode=0750)
quoted_filename = utils.ShellQuote(filename)
if mode == constants.IEM_IMPORT:
suffix = "> %s" % quoted_filename
elif mode == constants.IEM_EXPORT:
suffix = "< %s" % quoted_filename
elif ieio == constants.IEIO_RAW_DISK:
(disk, ) = ieargs
real_disk = _OpenRealBD(disk)
if mode == constants.IEM_IMPORT:
# we set here a smaller block size as, due to transport buffering, more
# than 64-128k will mostly ignored; we use nocreat to fail if the device
# is not already there or we pass a wrong path; we use notrunc to no
# attempt truncate on an LV device; we use oflag=dsync to not buffer too
# much memory; this means that at best, we flush every 64k, which will
# not be very fast
suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
" bs=%s oflag=dsync"),
str(64 * 1024))
elif mode == constants.IEM_EXPORT:
# the block size on the read dd is 1MiB to match our units
prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
str(1024 * 1024), # 1 MB
elif ieio == constants.IEIO_SCRIPT:
(disk, disk_index, ) = ieargs
assert isinstance(disk_index, (int, long))
real_disk = _OpenRealBD(disk)
inst_os = OSFromDisk(instance.os)
env = OSEnvironment(instance, inst_os)
if mode == constants.IEM_IMPORT:
env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
env["IMPORT_INDEX"] = str(disk_index)
script = inst_os.import_script
elif mode == constants.IEM_EXPORT:
env["EXPORT_DEVICE"] = real_disk.dev_path
env["EXPORT_INDEX"] = str(disk_index)
script = inst_os.export_script
# TODO: Pass special environment only to script
script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
if mode == constants.IEM_IMPORT:
suffix = "| %s" % script_cmd
elif mode == constants.IEM_EXPORT:
prefix = "%s |" % script_cmd
_Fail("Invalid %s I/O mode %r", mode, ieio)
return (env, prefix, suffix)
def _CreateImportExportStatusDir(prefix):
"""Creates status directory for import/export.
return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
prefix=("%s-%s-" %
(prefix, utils.TimestampForFilename())))
def StartImportExportDaemon(mode, key_name, ca, host, port, instance,
ieio, ieioargs):
"""Starts an import or export daemon.
@param mode: Import/output mode
@type key_name: string
@param key_name: RSA key name (None to use cluster certificate)
@type ca: string:
@param ca: Remote CA in PEM format (None to use cluster certificate)
@type host: string
@param host: Remote host for export (None for import)
@type port: int
@param port: Remote port for export (None for import)
@type instance: L{objects.Instance}
@param instance: Instance object
@param ieio: Input/output type
@param ieioargs: Input/output arguments
if mode == constants.IEM_IMPORT:
prefix = "import"
if not (host is None and port is None):
_Fail("Can not specify host or port on import")
elif mode == constants.IEM_EXPORT:
prefix = "export"
if host is None or port is None:
_Fail("Host and port must be specified for an export")
_Fail("Invalid mode %r", mode)
if (key_name is None) ^ (ca is None):
_Fail("Cluster certificate can only be used for both key and CA")
(cmd_env, cmd_prefix, cmd_suffix) = \
_GetImportExportIoCommand(instance, mode, ieio, ieioargs)
if key_name is None:
# Use server.pem
key_path = constants.NODED_CERT_FILE
cert_path = constants.NODED_CERT_FILE
assert ca is None
(_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
assert ca is not None
status_dir = _CreateImportExportStatusDir(prefix)
status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
if ca is None:
# Use server.pem
# TODO: If socat runs as a non-root user, this might need to be copied to
# a separate file
ca_path = constants.NODED_CERT_FILE
ca_path = utils.PathJoin(status_dir, _IES_CA_FILE)
utils.WriteFile(ca_path, data=ca, mode=0400)
cmd = [
status_file, mode,
"--key=%s" % key_path,
"--cert=%s" % cert_path,
"--ca=%s" % ca_path,
if host:
cmd.append("--host=%s" % host)
if port:
cmd.append("--port=%s" % port)
if cmd_prefix:
cmd.append("--cmd-prefix=%s" % cmd_prefix)
if cmd_suffix:
cmd.append("--cmd-suffix=%s" % cmd_suffix)
logfile = _InstanceLogName(prefix, instance.os,
# TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
# support for receiving a file descriptor for output
utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
# The import/export name is simply the status directory name
return os.path.basename(status_dir)
except Exception:
shutil.rmtree(status_dir, ignore_errors=True)
def GetImportExportStatus(names):
"""Returns import/export daemon status.
@type names: sequence
@param names: List of names
@rtype: List of dicts
@return: Returns a list of the state of each named import/export or None if a
status couldn't be read
result = []
for name in names:
status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
data = utils.ReadFile(status_file)
except EnvironmentError, err:
if err.errno != errno.ENOENT:
data = None
if not data:
return result
def CleanupImportExport(name):
"""Cleanup after an import or export.
If the import/export daemon is still running it's killed. Afterwards the
whole status directory is removed.
""""Finalizing import/export %s", name)
status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
pid = None
fd =, os.O_RDONLY)
except EnvironmentError, err:
if err.errno != errno.ENOENT:
# PID file doesn't exist
# Try to acquire lock
except errors.LockError:
# Couldn't lock, daemon is running
pid = int(, 100))
if pid:"Import/export %s is still running with PID %s",
name, pid)
utils.KillProcess(pid, waitpid=False)
shutil.rmtree(status_dir, ignore_errors=True)
def _FindDisks(nodes_ip, disks):
"""Sets the physical ID on disks and returns the block devices.
......@@ -93,6 +93,8 @@ SOCKET_DIR = RUN_GANETI_DIR + "/socket"
# keep RUN_GANETI_DIR first here, to make sure all get created when the node
# daemon is started (this takes care of RUN_DIR being tmpfs)
......@@ -199,6 +201,14 @@ IMPORT_EXPORT_DAEMON = _autoconf.PKGLIBDIR + "/import-export"
IEM_IMPORT = "import"
IEM_EXPORT = "export"
# Import/export I/O
# Direct file I/O, equivalent to a shell's I/O redirection using '<' or '>'
IEIO_FILE = "file"
# Raw block device I/O using "dd"
# OS definition import/export script
IEIO_SCRIPT = "script"
VALUE_DEFAULT = "default"
VALUE_AUTO = "auto"
VALUE_GENERATE = "generate"
......@@ -258,6 +258,21 @@ class Client:
return results
def _EncodeImportExportIO(ieio, ieioargs):
"""Encodes import/export I/O information.
if ieio == constants.IEIO_RAW_DISK:
assert len(ieioargs) == 1
return (ieioargs[0].ToDict(), )
if ieio == constants.IEIO_SCRIPT:
assert len(ieioargs) == 2
return (ieioargs[0].ToDict(), ieioargs[1])
return ieioargs
class RpcRunner(object):
"""RPC runner class"""
......@@ -1209,3 +1224,79 @@ class RpcRunner(object):
return self._SingleNodeCall(node, "remove_x509_certificate", [name])
def call_start_import_listener(self, node, x509_key_name, source_x509_ca,
instance, dest, dest_args):
"""Starts a listener for an import.
This is a single-node call.
@type node: string
@param node: Node name
@type instance: C{objects.Instance}
@param instance: Instance object
return self._SingleNodeCall(node, "start_import_listener",
[x509_key_name, source_x509_ca,
self._InstDict(instance), dest,
_EncodeImportExportIO(dest, dest_args)])
def call_start_export(self, node, x509_key_name, dest_x509_ca, host, port,
instance, source, source_args):
"""Starts an export daemon.
This is a single-node call.
@type node: string
@param node: Node name
@type instance: C{objects.Instance}
@param instance: Instance object
return self._SingleNodeCall(node, "start_export",
[x509_key_name, dest_x509_ca, host, port,
self._InstDict(instance), source,
_EncodeImportExportIO(source, source_args)])
def call_get_import_export_status(self, node, names):
"""Gets the status of an import or export.
This is a single-node call.
@type node: string
@param node: Node name
@type names: List of strings
@param names: Import/export names
@rtype: List of L{objects.ImportExportStatus} instances
@return: Returns a list of the state of each named import/export or None if
a status couldn't be retrieved
result = self._SingleNodeCall(node, "get_import_export_status", [names])
if not result.fail_msg:
decoded = []
for i in result.payload:
if i is None:
result.payload = decoded
return result
def call_cleanup_import_export(self, node, name):
"""Cleans up after an import or export.
This is a single-node call.
@type node: string
@param node: Node name
@type name: string
@param name: Import/export name
return self._SingleNodeCall(node, "cleanup_import_export", [name])
