Commit 2d76b580 authored by Michael Hanselmann's avatar Michael Hanselmann

Add daemon for instance import and export

This backend daemon for instance import and export will be used to
transfer instance data to other machines. It is implemented in a generic
way to support different ways of data input and output. The third-party
program “socat”, which is already used by the KVM hypervisor abstraction,
is used to connect to remote machines using SSL/TLS. After starting the
child processes in a separate process group, the import/export daemon
monitors their output and updates a status file regularily. This status
file can then be read by ganeti-noded (not in this patch).

Three I/O methods are supported: Raw disk, file and script. Each of these
can be used for import and export.

Similar to daemon-util, an incomplete set of tests written in Bash is
included.

Two future enhancements are planned:
- Run parts of the command chain as a dedicated user (privilege
  separation).
- Currently users of this daemon have to poll the status file while data
  is transferred. This is inefficient and creates unnecessary delays. By
  adding “dd” into the chain and sending it SIGUSR1 regularily, we can get
  some statistics, optimize the polling frequenc and even provide the user
  with an ETA (which isn't available with all current methods to
  import/export instance data).
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 409e8ec2
......@@ -241,8 +241,12 @@ dist_tools_SCRIPTS = \
tools/lvmstrap \
tools/sanitize-config
pkglib_python_scripts = \
daemons/import-export
pkglib_SCRIPTS = \
daemons/daemon-util
daemons/daemon-util \
$(pkglib_python_scripts)
EXTRA_DIST = \
NEWS \
......@@ -256,6 +260,7 @@ EXTRA_DIST = \
$(RUN_IN_TEMPDIR) \
daemons/daemon-util.in \
daemons/ganeti-cleaner.in \
$(pkglib_python_scripts) \
devel/upload.in \
$(docdot) \
$(docpng) \
......@@ -322,7 +327,8 @@ TEST_FILES = \
test/data/cert1.pem \
test/data/proc_drbd8.txt \
test/data/proc_drbd80-emptyline.txt \
test/data/proc_drbd83.txt
test/data/proc_drbd83.txt \
test/import-export_unittest-helper
python_tests = \
test/ganeti.backend_unittest.py \
......@@ -351,6 +357,7 @@ python_tests = \
dist_TESTS = \
test/daemon-util_unittest.bash \
test/import-export_unittest.bash \
$(python_tests)
nodist_TESTS =
......@@ -368,6 +375,7 @@ TESTS_ENVIRONMENT = \
all_python_code = \
$(dist_sbin_SCRIPTS) \
$(dist_tools_SCRIPTS) \
$(pkglib_python_scripts) \
$(python_tests) \
$(pkgpython_PYTHON) \
$(hypervisor_PYTHON) \
......@@ -379,6 +387,7 @@ all_python_code = \
srclink_files = \
man/footer.sgml \
test/daemon-util_unittest.bash \
test/import-export_unittest.bash \
$(all_python_code)
check_python_code = \
......@@ -389,6 +398,7 @@ lint_python_code = \
ganeti \
$(dist_sbin_SCRIPTS) \
$(dist_tools_SCRIPTS) \
$(pkglib_python_scripts) \
$(BUILD_BASH_COMPLETION)
test/daemon-util_unittest.bash: daemons/daemon-util
......
#!/usr/bin/python
#
# Copyright (C) 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Import/export daemon.
"""
# pylint: disable-msg=C0103
# C0103: Invalid name import-export
import errno
import logging
import optparse
import os
import re
import select
import signal
import socket
import subprocess
import sys
import time
from cStringIO import StringIO
from ganeti import constants
from ganeti import cli
from ganeti import utils
from ganeti import serializer
from ganeti import objects
from ganeti import locking
#: Used to recognize point at which socat(1) starts to listen on its socket.
#: The local address is required for the remote peer to connect (in particular
#: the port number).
LISTENING_RE = re.compile(r"^listening on\s+"
r"AF=(?P<family>\d+)\s+"
r"(?P<address>.+):(?P<port>\d+)$", re.I)
#: Used to recognize point at which socat(1) is sending data over the wire
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
re.I)
SOCAT_LOG_DEBUG = "D"
SOCAT_LOG_INFO = "I"
SOCAT_LOG_NOTICE = "N"
SOCAT_LOG_WARNING = "W"
SOCAT_LOG_ERROR = "E"
SOCAT_LOG_FATAL = "F"
SOCAT_LOG_IGNORE = frozenset([
SOCAT_LOG_DEBUG,
SOCAT_LOG_INFO,
SOCAT_LOG_NOTICE,
])
#: Socat buffer size: at most this many bytes are transferred per step
SOCAT_BUFSIZE = 1024 * 1024
#: How many lines to keep in the status file
MAX_RECENT_OUTPUT_LINES = 20
#: Don't update status file more than once every 5 seconds (unless forced)
MIN_UPDATE_INTERVAL = 5.0
#: Give child process up to 5 seconds to exit after sending a signal
CHILD_LINGER_TIMEOUT = 5.0
# Common options for socat
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
SOCAT_CONNECT_TIMEOUT = 60
# Global variable for options
options = None
class Error(Exception):
"""Generic exception"""
def SetupLogging():
"""Configures the logging module.
"""
formatter = logging.Formatter("%(asctime)s: %(message)s")
stderr_handler = logging.StreamHandler()
stderr_handler.setFormatter(formatter)
stderr_handler.setLevel(logging.NOTSET)
root_logger = logging.getLogger("")
root_logger.addHandler(stderr_handler)
if options.debug:
root_logger.setLevel(logging.NOTSET)
elif options.verbose:
root_logger.setLevel(logging.INFO)
else:
root_logger.setLevel(logging.ERROR)
# Create special logger for child process output
child_logger = logging.Logger("child output")
child_logger.addHandler(stderr_handler)
child_logger.setLevel(logging.NOTSET)
return child_logger
def _VerifyListening(family, address, port):
"""Verify address given as listening address by socat.
"""
# TODO: Implement IPv6 support
if family != socket.AF_INET:
raise Error("Address family %r not supported" % family)
try:
packed_address = socket.inet_pton(family, address)
except socket.error:
raise Error("Invalid address %r for family %s" % (address, family))
return (socket.inet_ntop(family, packed_address), port)
class StatusFile:
"""Status file manager.
"""
def __init__(self, path):
"""Initializes class.
"""
self._path = path
self._data = objects.ImportExportStatus(ctime=time.time(),
mtime=None,
recent_output=[])
def AddRecentOutput(self, line):
"""Adds a new line of recent output.
"""
self._data.recent_output.append(line)
# Remove old lines
del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
def SetListenPort(self, port):
"""Sets the port the daemon is listening on.
@type port: int
@param port: TCP/UDP port
"""
assert isinstance(port, (int, long)) and 0 < port < 2**16
self._data.listen_port = port
def GetListenPort(self):
"""Returns the port the daemon is listening on.
"""
return self._data.listen_port
def SetConnected(self):
"""Sets the connected flag.
"""
self._data.connected = True
def SetExitStatus(self, exit_status, error_message):
"""Sets the exit status and an error message.
"""
# Require error message when status isn't 0
assert exit_status == 0 or error_message
self._data.exit_status = exit_status
self._data.error_message = error_message
def ExitStatusIsSuccess(self):
"""Returns whether the exit status means "success".
"""
return not bool(self._data.error_message)
def Update(self, force):
"""Updates the status file.
@type force: bool
@param force: Write status file in any case, not only when minimum interval
is expired
"""
if not (force or
self._data.mtime is None or
time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
return
logging.debug("Updating status file %s", self._path)
self._data.mtime = time.time()
utils.WriteFile(self._path,
data=serializer.DumpJson(self._data.ToDict(), indent=True),
mode=0400)
def _ProcessSocatOutput(status_file, level, msg):
"""Interprets socat log output.
"""
if level == SOCAT_LOG_NOTICE:
if status_file.GetListenPort() is None:
# TODO: Maybe implement timeout to not listen forever
m = LISTENING_RE.match(msg)
if m:
(_, port) = _VerifyListening(int(m.group("family")), m.group("address"),
int(m.group("port")))
status_file.SetListenPort(port)
return True
m = TRANSFER_LOOP_RE.match(msg)
if m:
status_file.SetConnected()
return True
return False
def ProcessOutput(line, status_file, logger, socat):
"""Takes care of child process output.
@param status_file: Status file manager
@param logger: Child output logger
@type socat: bool
@param socat: Whether it's a socat output line
@type line: string
@param line: Child output line
"""
force_update = False
forward_line = line
if socat:
level = None
parts = line.split(None, 4)
if len(parts) == 5:
(_, _, _, level, msg) = parts
force_update = _ProcessSocatOutput(status_file, level, msg)
if options.debug or (level and level not in SOCAT_LOG_IGNORE):
forward_line = "socat: %s %s" % (level, msg)
else:
forward_line = None
else:
forward_line = "socat: %s" % line
if forward_line:
logger.info(forward_line)
status_file.AddRecentOutput(forward_line)
status_file.Update(force_update)
def GetBashCommand(cmd):
"""Prepares a command to be run in Bash.
"""
return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
def GetSocatCommand(mode):
"""Returns the socat command.
"""
common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
"key=%s" % options.key,
"cert=%s" % options.cert,
"cafile=%s" % options.ca,
]
if options.bind is not None:
common_addr_opts.append("bind=%s" % options.bind)
if mode == constants.IEM_IMPORT:
if options.port is None:
port = 0
else:
port = options.port
addr1 = [
"OPENSSL-LISTEN:%s" % port,
"reuseaddr",
] + common_addr_opts
addr2 = ["stdout"]
elif mode == constants.IEM_EXPORT:
addr1 = ["stdin"]
addr2 = [
"OPENSSL:%s:%s" % (options.host, options.port),
"connect-timeout=%s" % SOCAT_CONNECT_TIMEOUT,
] + common_addr_opts
else:
raise Error("Invalid mode")
for i in [addr1, addr2]:
for value in i:
if "," in value:
raise Error("Comma not allowed in socat option value: %r" % value)
return [
constants.SOCAT_PATH,
# Log to stderr
"-ls",
# Log level
"-d", "-d",
# Buffer size
"-b%s" % SOCAT_BUFSIZE,
# Unidirectional mode, the first address is only used for reading, and the
# second address is only used for writing
"-u",
",".join(addr1), ",".join(addr2)
]
def GetTransportCommand(mode, socat_stderr_fd):
"""Returns the command for the transport part of the daemon.
@param mode: Daemon mode (import or export)
@type socat_stderr_fd: int
@param socat_stderr_fd: File descriptor socat should write its stderr to
"""
socat_cmd = ("%s 2>&%d" %
(utils.ShellQuoteArgs(GetSocatCommand(mode)),
socat_stderr_fd))
# TODO: Make compression configurable
if mode == constants.IEM_IMPORT:
transport_cmd = "%s | gunzip -c" % socat_cmd
elif mode == constants.IEM_EXPORT:
transport_cmd = "gzip -c | %s" % socat_cmd
else:
raise Error("Invalid mode")
# TODO: Use "dd" to measure processed data (allows to give an ETA)
# TODO: If a connection to socat is dropped (e.g. due to a wrong
# certificate), socat should be restarted
# TODO: Run transport as separate user
# The transport uses its own shell to simplify running it as a separate user
# in the future.
return GetBashCommand(transport_cmd)
def GetCommand(mode, socat_stderr_fd):
"""Returns the complete child process command.
"""
buf = StringIO()
if options.cmd_prefix:
buf.write(options.cmd_prefix)
buf.write(" ")
buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd)))
if options.cmd_suffix:
buf.write(" ")
buf.write(options.cmd_suffix)
return GetBashCommand(buf.getvalue())
def ProcessChildIO(child, socat_stderr_read, status_file, child_logger,
signal_notify, signal_handler):
"""Handles the child processes' output.
"""
poller = select.poll()
script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
child_logger, False)
try:
socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
child_logger, True)
try:
fdmap = {
child.stderr.fileno(): (child.stderr, script_stderr_lines),
socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines),
signal_notify.fileno(): (signal_notify, None),
}
for fd in fdmap:
utils.SetNonblockFlag(fd, True)
poller.register(fd, select.POLLIN)
timeout_calculator = None
while True:
# Break out of loop if only signal notify FD is left
if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
break
if timeout_calculator:
timeout = timeout_calculator.Remaining() * 1000
if timeout < 0:
logging.info("Child process didn't exit in time")
break
else:
timeout = None
for fd, event in utils.RetryOnSignal(poller.poll, timeout):
if event & (select.POLLIN | event & select.POLLPRI):
(from_, to) = fdmap[fd]
# Read up to 1 KB of data
data = from_.read(1024)
if data:
if to:
to.write(data)
elif fd == signal_notify.fileno():
# Signal handling
if signal_handler.called:
signal_handler.Clear()
if timeout_calculator:
logging.info("Child process still has about %0.2f seconds"
" to exit", timeout_calculator.Remaining())
else:
logging.info("Giving child process %0.2f seconds to exit",
CHILD_LINGER_TIMEOUT)
timeout_calculator = \
locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
else:
poller.unregister(fd)
del fdmap[fd]
elif event & (select.POLLNVAL | select.POLLHUP |
select.POLLERR):
poller.unregister(fd)
del fdmap[fd]
script_stderr_lines.flush()
socat_stderr_lines.flush()
# If there was a timeout calculator, we were waiting for the child to
# finish, e.g. due to a signal
return not bool(timeout_calculator)
finally:
socat_stderr_lines.close()
finally:
script_stderr_lines.close()
def ParseOptions():
"""Parses the options passed to the program.
@return: Arguments to program
"""
global options # pylint: disable-msg=W0603
parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
(constants.IEM_IMPORT,
constants.IEM_EXPORT)))
parser.add_option(cli.DEBUG_OPT)
parser.add_option(cli.VERBOSE_OPT)
parser.add_option("--key", dest="key", action="store", type="string",
help="RSA key file")
parser.add_option("--cert", dest="cert", action="store", type="string",
help="X509 certificate file")
parser.add_option("--ca", dest="ca", action="store", type="string",
help="X509 CA file")
parser.add_option("--bind", dest="bind", action="store", type="string",
help="Bind address")
parser.add_option("--host", dest="host", action="store", type="string",
help="Remote hostname")
parser.add_option("--port", dest="port", action="store", type="int",
help="Remote port")
parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
type="string", help="Command prefix")
parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
type="string", help="Command suffix")
(options, args) = parser.parse_args()
if len(args) != 2:
# Won't return
parser.error("Expected exactly two arguments")
(status_file_path, mode) = args
if mode not in (constants.IEM_IMPORT,
constants.IEM_EXPORT):
# Won't return
parser.error("Invalid mode: %s" % mode)
return (status_file_path, mode)
def main():
"""Main function.
"""
# Option parsing
(status_file_path, mode) = ParseOptions()
# Configure logging
child_logger = SetupLogging()
status_file = StatusFile(status_file_path)
try:
try:
# Pipe to receive socat's stderr output
(socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
# Pipe to notify on signals
(signal_notify_read_fd, signal_notify_write_fd) = os.pipe()
# Configure signal module's notifier
try:
# This is only supported in Python 2.5 and above (some distributions
# backported it to Python 2.4)
set_wakeup_fd_fn = signal.set_wakeup_fd
except AttributeError:
pass
else:
set_wakeup_fd_fn(signal_notify_write_fd)
# Buffer size 0 is important, otherwise .read() with a specified length
# might buffer data while poll(2) won't mark its file descriptor as
# readable again.
socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
signal_notify_read = os.fdopen(signal_notify_read_fd, "r", 0)
# Get child process command
cmd = GetCommand(mode, socat_stderr_write_fd)
logging.debug("Starting command %r", cmd)
def _ChildPreexec():
# Move child to separate process group. By sending a signal to its
# process group we can kill the child process and all its own
# child-processes.
os.setpgid(0, 0)
# Close almost all file descriptors
utils.CloseFDs(noclose_fds=[socat_stderr_write_fd])
# Not using close_fds because doing so would also close the socat stderr
# pipe, which we still need.
child = subprocess.Popen(cmd, shell=False, close_fds=False,
stderr=subprocess.PIPE, stdout=None, stdin=None,
preexec_fn=_ChildPreexec)
try:
# Avoid race condition by setting child's process group (as good as
# possible in Python) before sending signals to child. For an
# explanation, see preexec function for child.
try:
os.setpgid(child.pid, child.pid)
except EnvironmentError, err:
# If the child process was faster we receive EPERM or EACCES
if err.errno not in (errno.EPERM, errno.EACCES):
raise
# Forward signals to child process
def _ForwardSignal(signum, _):