diff --git a/Makefile.am b/Makefile.am index dcdf17e1c964a2e9537366794f3a9cb520f19ac6..1eaba4b1b20bf225456f31d0253a44f556e54a5a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -22,6 +22,7 @@ httpdir = $(pkgpythondir)/http masterddir = $(pkgpythondir)/masterd confddir = $(pkgpythondir)/confd rapidir = $(pkgpythondir)/rapi +impexpddir = $(pkgpythondir)/impexpd toolsdir = $(pkglibdir)/tools docdir = $(datadir)/doc/$(PACKAGE) @@ -40,6 +41,7 @@ DIRS = \ lib/confd \ lib/http \ lib/hypervisor \ + lib/impexpd \ lib/masterd \ lib/rapi \ man \ @@ -148,6 +150,9 @@ masterd_PYTHON = \ lib/masterd/__init__.py \ lib/masterd/instance.py +impexpd_PYTHON = \ + lib/impexpd/__init__.py + docrst = \ doc/admin.rst \ doc/design-2.0.rst \ @@ -350,6 +355,7 @@ python_tests = \ test/ganeti.errors_unittest.py \ test/ganeti.hooks_unittest.py \ test/ganeti.http_unittest.py \ + test/ganeti.impexpd_unittest.py \ test/ganeti.locking_unittest.py \ test/ganeti.luxi_unittest.py \ test/ganeti.masterd.instance_unittest.py \ @@ -397,6 +403,7 @@ all_python_code = \ $(http_PYTHON) \ $(confd_PYTHON) \ $(masterd_PYTHON) \ + $(impexpd_PYTHON) \ $(noinst_PYTHON) srclink_files = \ diff --git a/daemons/import-export b/daemons/import-export index 7e9b265a3dfc9db6a0b3829020d2847b23823dbd..29386d1fe6deee95cd0035fa45fcd6d00b536d9e 100755 --- a/daemons/import-export +++ b/daemons/import-export @@ -37,7 +37,6 @@ import socket import subprocess import sys import time -from cStringIO import StringIO from ganeti import constants from ganeti import cli @@ -45,6 +44,7 @@ from ganeti import utils from ganeti import serializer from ganeti import objects from ganeti import locking +from ganeti import impexpd #: Used to recognize point at which socat(1) starts to listen on its socket. @@ -71,9 +71,6 @@ SOCAT_LOG_IGNORE = frozenset([ SOCAT_LOG_NOTICE, ]) -#: Socat buffer size: at most this many bytes are transferred per step -SOCAT_BUFSIZE = 1024 * 1024 - #: How many lines to keep in the status file MAX_RECENT_OUTPUT_LINES = 20 @@ -86,10 +83,6 @@ CHILD_LINGER_TIMEOUT = 5.0 #: How long to wait for a connection to be established DEFAULT_CONNECT_TIMEOUT = 60 -# Common options for socat -SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] -SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"] - # Global variable for options options = None @@ -291,150 +284,6 @@ def ProcessOutput(line, status_file, logger, socat): status_file.Update(force_update) -class CommandBuilder(object): - def __init__(self, mode, opts, socat_stderr_fd): - """Initializes this class. - - @param mode: Daemon mode (import or export) - @param opts: Options object - @type socat_stderr_fd: int - @param socat_stderr_fd: File descriptor socat should write its stderr to - - """ - self._opts = opts - self._mode = mode - self._socat_stderr_fd = socat_stderr_fd - - @staticmethod - def GetBashCommand(cmd): - """Prepares a command to be run in Bash. - - """ - return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd] - - def _GetSocatCommand(self): - """Returns the socat command. - - """ - common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ - "key=%s" % self._opts.key, - "cert=%s" % self._opts.cert, - "cafile=%s" % self._opts.ca, - ] - - if self._opts.bind is not None: - common_addr_opts.append("bind=%s" % self._opts.bind) - - if self._mode == constants.IEM_IMPORT: - if self._opts.port is None: - port = 0 - else: - port = self._opts.port - - addr1 = [ - "OPENSSL-LISTEN:%s" % port, - "reuseaddr", - - # Retry to listen if connection wasn't established successfully, up to - # 100 times a second. Note that this still leaves room for DoS attacks. - "forever", - "intervall=0.01", - ] + common_addr_opts - addr2 = ["stdout"] - - elif self._mode == constants.IEM_EXPORT: - addr1 = ["stdin"] - addr2 = [ - "OPENSSL:%s:%s" % (self._opts.host, self._opts.port), - - # How long to wait per connection attempt - "connect-timeout=%s" % self._opts.connect_timeout, - - # Retry a few times before giving up to connect (once per second) - "retry=%s" % self._opts.connect_retries, - "intervall=1", - ] + common_addr_opts - - else: - raise Error("Invalid mode '%s'" % self._mode) - - for i in [addr1, addr2]: - for value in i: - if "," in value: - raise Error("Comma not allowed in socat option value: %r" % value) - - return [ - constants.SOCAT_PATH, - - # Log to stderr - "-ls", - - # Log level - "-d", "-d", - - # Buffer size - "-b%s" % SOCAT_BUFSIZE, - - # Unidirectional mode, the first address is only used for reading, and the - # second address is only used for writing - "-u", - - ",".join(addr1), ",".join(addr2) - ] - - def _GetTransportCommand(self): - """Returns the command for the transport part of the daemon. - - """ - socat_cmd = ("%s 2>&%d" % - (utils.ShellQuoteArgs(self._GetSocatCommand()), - self._socat_stderr_fd)) - - compr = self._opts.compress - - assert compr in constants.IEC_ALL - - if self._mode == constants.IEM_IMPORT: - if compr == constants.IEC_GZIP: - transport_cmd = "%s | gunzip -c" % socat_cmd - else: - transport_cmd = socat_cmd - elif self._mode == constants.IEM_EXPORT: - if compr == constants.IEC_GZIP: - transport_cmd = "gzip -c | %s" % socat_cmd - else: - transport_cmd = socat_cmd - else: - raise Error("Invalid mode '%s'" % self._mode) - - # TODO: Use "dd" to measure processed data (allows to give an ETA) - - # TODO: Run transport as separate user - # The transport uses its own shell to simplify running it as a separate user - # in the future. - return self.GetBashCommand(transport_cmd) - - def GetCommand(self): - """Returns the complete child process command. - - """ - transport_cmd = self._GetTransportCommand() - - buf = StringIO() - - if self._opts.cmd_prefix: - buf.write(self._opts.cmd_prefix) - buf.write(" ") - - buf.write(utils.ShellQuoteArgs(transport_cmd)) - - if self._opts.cmd_suffix: - buf.write(" ") - buf.write(self._opts.cmd_suffix) - - return self.GetBashCommand(buf.getvalue()) - - def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, signal_notify, signal_handler, mode): """Handles the child processes' output. @@ -673,7 +522,8 @@ def main(): (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe() # Get child process command - cmd = CommandBuilder(mode, options, socat_stderr_write_fd).GetCommand() + cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd) + cmd = cmd_builder.GetCommand() logging.debug("Starting command %r", cmd) diff --git a/lib/impexpd/__init__.py b/lib/impexpd/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..8a57c4f4448c85f67cf0c31b11384423f2b5623c --- /dev/null +++ b/lib/impexpd/__init__.py @@ -0,0 +1,183 @@ +# +# + +# Copyright (C) 2010 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Classes and functions for import/export daemon. + +""" + +from cStringIO import StringIO + +from ganeti import constants +from ganeti import errors +from ganeti import utils + + +#: Buffer size: at most this many bytes are transferred at once +BUFSIZE = 1024 * 1024 + +# Common options for socat +SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] +SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"] + + +class CommandBuilder(object): + def __init__(self, mode, opts, socat_stderr_fd): + """Initializes this class. + + @param mode: Daemon mode (import or export) + @param opts: Options object + @type socat_stderr_fd: int + @param socat_stderr_fd: File descriptor socat should write its stderr to + + """ + self._opts = opts + self._mode = mode + self._socat_stderr_fd = socat_stderr_fd + + @staticmethod + def GetBashCommand(cmd): + """Prepares a command to be run in Bash. + + """ + return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd] + + def _GetSocatCommand(self): + """Returns the socat command. + + """ + common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ + "key=%s" % self._opts.key, + "cert=%s" % self._opts.cert, + "cafile=%s" % self._opts.ca, + ] + + if self._opts.bind is not None: + common_addr_opts.append("bind=%s" % self._opts.bind) + + if self._mode == constants.IEM_IMPORT: + if self._opts.port is None: + port = 0 + else: + port = self._opts.port + + addr1 = [ + "OPENSSL-LISTEN:%s" % port, + "reuseaddr", + + # Retry to listen if connection wasn't established successfully, up to + # 100 times a second. Note that this still leaves room for DoS attacks. + "forever", + "intervall=0.01", + ] + common_addr_opts + addr2 = ["stdout"] + + elif self._mode == constants.IEM_EXPORT: + addr1 = ["stdin"] + addr2 = [ + "OPENSSL:%s:%s" % (self._opts.host, self._opts.port), + + # How long to wait per connection attempt + "connect-timeout=%s" % self._opts.connect_timeout, + + # Retry a few times before giving up to connect (once per second) + "retry=%s" % self._opts.connect_retries, + "intervall=1", + ] + common_addr_opts + + else: + raise errors.GenericError("Invalid mode '%s'" % self._mode) + + for i in [addr1, addr2]: + for value in i: + if "," in value: + raise errors.GenericError("Comma not allowed in socat option" + " value: %r" % value) + + return [ + constants.SOCAT_PATH, + + # Log to stderr + "-ls", + + # Log level + "-d", "-d", + + # Buffer size + "-b%s" % BUFSIZE, + + # Unidirectional mode, the first address is only used for reading, and the + # second address is only used for writing + "-u", + + ",".join(addr1), ",".join(addr2) + ] + + def _GetTransportCommand(self): + """Returns the command for the transport part of the daemon. + + """ + socat_cmd = ("%s 2>&%d" % + (utils.ShellQuoteArgs(self._GetSocatCommand()), + self._socat_stderr_fd)) + + compr = self._opts.compress + + assert compr in constants.IEC_ALL + + if self._mode == constants.IEM_IMPORT: + if compr == constants.IEC_GZIP: + transport_cmd = "%s | gunzip -c" % socat_cmd + else: + transport_cmd = socat_cmd + elif self._mode == constants.IEM_EXPORT: + if compr == constants.IEC_GZIP: + transport_cmd = "gzip -c | %s" % socat_cmd + else: + transport_cmd = socat_cmd + else: + raise errors.GenericError("Invalid mode '%s'" % self._mode) + + # TODO: Use "dd" to measure processed data (allows to give an ETA) + + # TODO: Run transport as separate user + # The transport uses its own shell to simplify running it as a separate user + # in the future. + return self.GetBashCommand(transport_cmd) + + def GetCommand(self): + """Returns the complete child process command. + + """ + transport_cmd = self._GetTransportCommand() + + buf = StringIO() + + if self._opts.cmd_prefix: + buf.write(self._opts.cmd_prefix) + buf.write(" ") + + buf.write(utils.ShellQuoteArgs(transport_cmd)) + + if self._opts.cmd_suffix: + buf.write(" ") + buf.write(self._opts.cmd_suffix) + + return self.GetBashCommand(buf.getvalue()) diff --git a/test/ganeti.impexpd_unittest.py b/test/ganeti.impexpd_unittest.py new file mode 100755 index 0000000000000000000000000000000000000000..e4a82b6644e3a246ca32c4c89e415b764a8579e8 --- /dev/null +++ b/test/ganeti.impexpd_unittest.py @@ -0,0 +1,125 @@ +#!/usr/bin/python +# + +# Copyright (C) 2010 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Script for testing ganeti.impexpd""" + +import os +import sys +import re +import unittest + +from ganeti import constants +from ganeti import objects +from ganeti import compat +from ganeti import utils +from ganeti import errors +from ganeti import impexpd + +import testutils + + +class CmdBuilderConfig(objects.ConfigObject): + __slots__ = [ + "bind", + "key", + "cert", + "ca", + "host", + "port", + "compress", + "connect_timeout", + "connect_retries", + "cmd_prefix", + "cmd_suffix", + ] + + +def CheckCmdWord(cmd, word): + wre = re.compile(r"\b%s\b" % re.escape(word)) + return compat.any(wre.search(i) for i in cmd) + + +class TestCommandBuilder(unittest.TestCase): + def test(self): + for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]: + if mode == constants.IEM_IMPORT: + comprcmd = "gunzip" + elif mode == constants.IEM_EXPORT: + comprcmd = "gzip" + + for compress in [constants.IEC_NONE, constants.IEC_GZIP]: + for host in ["localhost", "1.2.3.4", "192.0.2.99"]: + for port in [0, 1, 1234, 7856, 45452]: + for cmd_prefix in [None, "PrefixCommandGoesHere|", + "dd if=/dev/hda bs=1048576 |"]: + for cmd_suffix in [None, "< /some/file/name", + "| dd of=/dev/null"]: + opts = CmdBuilderConfig(host=host, port=port, compress=compress, + cmd_prefix=cmd_prefix, + cmd_suffix=cmd_suffix) + + builder = impexpd.CommandBuilder(mode, opts, 1) + + # Check complete command + cmd = builder.GetCommand() + self.assert_(isinstance(cmd, list)) + + if compress == constants.IEC_GZIP: + self.assert_(CheckCmdWord(cmd, comprcmd)) + + if cmd_prefix is not None: + self.assert_(cmd_prefix in i for i in cmd) + + if cmd_suffix is not None: + self.assert_(cmd_suffix in i for i in cmd) + + # Check socat command + socat_cmd = builder._GetSocatCommand() + + if mode == constants.IEM_IMPORT: + ssl_addr = socat_cmd[-2].split(",") + self.assert_(("OPENSSL-LISTEN:%s" % port) in ssl_addr) + elif mode == constants.IEM_EXPORT: + ssl_addr = socat_cmd[-1].split(",") + self.assert_(("OPENSSL:%s:%s" % (host, port)) in ssl_addr) + + self.assert_("verify=1" in ssl_addr) + + def testCommaError(self): + opts = CmdBuilderConfig(host="localhost", port=1234, + ca="/some/path/with,a/,comma") + + for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]: + builder = impexpd.CommandBuilder(mode, opts, 1) + self.assertRaises(errors.GenericError, builder.GetCommand) + + def testModeError(self): + mode = "foobarbaz" + + assert mode not in [constants.IEM_IMPORT, constants.IEM_EXPORT] + + opts = CmdBuilderConfig(host="localhost", port=1234) + builder = impexpd.CommandBuilder(mode, opts, 1) + self.assertRaises(errors.GenericError, builder.GetCommand) + + +if __name__ == "__main__": + testutils.GanetiTestProgram()