Commit bb44b1ae authored by Michael Hanselmann's avatar Michael Hanselmann

import/export daemon: Move command building into separate module

The import/export daemon code is already large. Moving some code
to a separate module will make it smaller and easier to test.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 1e915b86
......@@ -22,6 +22,7 @@ httpdir = $(pkgpythondir)/http
masterddir = $(pkgpythondir)/masterd
confddir = $(pkgpythondir)/confd
rapidir = $(pkgpythondir)/rapi
impexpddir = $(pkgpythondir)/impexpd
toolsdir = $(pkglibdir)/tools
docdir = $(datadir)/doc/$(PACKAGE)
......@@ -40,6 +41,7 @@ DIRS = \
lib/confd \
lib/http \
lib/hypervisor \
lib/impexpd \
lib/masterd \
lib/rapi \
man \
......@@ -148,6 +150,9 @@ masterd_PYTHON = \
lib/masterd/__init__.py \
lib/masterd/instance.py
impexpd_PYTHON = \
lib/impexpd/__init__.py
docrst = \
doc/admin.rst \
doc/design-2.0.rst \
......@@ -350,6 +355,7 @@ python_tests = \
test/ganeti.errors_unittest.py \
test/ganeti.hooks_unittest.py \
test/ganeti.http_unittest.py \
test/ganeti.impexpd_unittest.py \
test/ganeti.locking_unittest.py \
test/ganeti.luxi_unittest.py \
test/ganeti.masterd.instance_unittest.py \
......@@ -397,6 +403,7 @@ all_python_code = \
$(http_PYTHON) \
$(confd_PYTHON) \
$(masterd_PYTHON) \
$(impexpd_PYTHON) \
$(noinst_PYTHON)
srclink_files = \
......
......@@ -37,7 +37,6 @@ import socket
import subprocess
import sys
import time
from cStringIO import StringIO
from ganeti import constants
from ganeti import cli
......@@ -45,6 +44,7 @@ from ganeti import utils
from ganeti import serializer
from ganeti import objects
from ganeti import locking
from ganeti import impexpd
#: Used to recognize point at which socat(1) starts to listen on its socket.
......@@ -71,9 +71,6 @@ SOCAT_LOG_IGNORE = frozenset([
SOCAT_LOG_NOTICE,
])
#: Socat buffer size: at most this many bytes are transferred per step
SOCAT_BUFSIZE = 1024 * 1024
#: How many lines to keep in the status file
MAX_RECENT_OUTPUT_LINES = 20
......@@ -86,10 +83,6 @@ CHILD_LINGER_TIMEOUT = 5.0
#: How long to wait for a connection to be established
DEFAULT_CONNECT_TIMEOUT = 60
# Common options for socat
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
# Global variable for options
options = None
......@@ -291,150 +284,6 @@ def ProcessOutput(line, status_file, logger, socat):
status_file.Update(force_update)
class CommandBuilder(object):
def __init__(self, mode, opts, socat_stderr_fd):
"""Initializes this class.
@param mode: Daemon mode (import or export)
@param opts: Options object
@type socat_stderr_fd: int
@param socat_stderr_fd: File descriptor socat should write its stderr to
"""
self._opts = opts
self._mode = mode
self._socat_stderr_fd = socat_stderr_fd
@staticmethod
def GetBashCommand(cmd):
"""Prepares a command to be run in Bash.
"""
return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
def _GetSocatCommand(self):
"""Returns the socat command.
"""
common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
"key=%s" % self._opts.key,
"cert=%s" % self._opts.cert,
"cafile=%s" % self._opts.ca,
]
if self._opts.bind is not None:
common_addr_opts.append("bind=%s" % self._opts.bind)
if self._mode == constants.IEM_IMPORT:
if self._opts.port is None:
port = 0
else:
port = self._opts.port
addr1 = [
"OPENSSL-LISTEN:%s" % port,
"reuseaddr",
# Retry to listen if connection wasn't established successfully, up to
# 100 times a second. Note that this still leaves room for DoS attacks.
"forever",
"intervall=0.01",
] + common_addr_opts
addr2 = ["stdout"]
elif self._mode == constants.IEM_EXPORT:
addr1 = ["stdin"]
addr2 = [
"OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
# How long to wait per connection attempt
"connect-timeout=%s" % self._opts.connect_timeout,
# Retry a few times before giving up to connect (once per second)
"retry=%s" % self._opts.connect_retries,
"intervall=1",
] + common_addr_opts
else:
raise Error("Invalid mode '%s'" % self._mode)
for i in [addr1, addr2]:
for value in i:
if "," in value:
raise Error("Comma not allowed in socat option value: %r" % value)
return [
constants.SOCAT_PATH,
# Log to stderr
"-ls",
# Log level
"-d", "-d",
# Buffer size
"-b%s" % SOCAT_BUFSIZE,
# Unidirectional mode, the first address is only used for reading, and the
# second address is only used for writing
"-u",
",".join(addr1), ",".join(addr2)
]
def _GetTransportCommand(self):
"""Returns the command for the transport part of the daemon.
"""
socat_cmd = ("%s 2>&%d" %
(utils.ShellQuoteArgs(self._GetSocatCommand()),
self._socat_stderr_fd))
compr = self._opts.compress
assert compr in constants.IEC_ALL
if self._mode == constants.IEM_IMPORT:
if compr == constants.IEC_GZIP:
transport_cmd = "%s | gunzip -c" % socat_cmd
else:
transport_cmd = socat_cmd
elif self._mode == constants.IEM_EXPORT:
if compr == constants.IEC_GZIP:
transport_cmd = "gzip -c | %s" % socat_cmd
else:
transport_cmd = socat_cmd
else:
raise Error("Invalid mode '%s'" % self._mode)
# TODO: Use "dd" to measure processed data (allows to give an ETA)
# TODO: Run transport as separate user
# The transport uses its own shell to simplify running it as a separate user
# in the future.
return self.GetBashCommand(transport_cmd)
def GetCommand(self):
"""Returns the complete child process command.
"""
transport_cmd = self._GetTransportCommand()
buf = StringIO()
if self._opts.cmd_prefix:
buf.write(self._opts.cmd_prefix)
buf.write(" ")
buf.write(utils.ShellQuoteArgs(transport_cmd))
if self._opts.cmd_suffix:
buf.write(" ")
buf.write(self._opts.cmd_suffix)
return self.GetBashCommand(buf.getvalue())
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
signal_notify, signal_handler, mode):
"""Handles the child processes' output.
......@@ -673,7 +522,8 @@ def main():
(socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
# Get child process command
cmd = CommandBuilder(mode, options, socat_stderr_write_fd).GetCommand()
cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd)
cmd = cmd_builder.GetCommand()
logging.debug("Starting command %r", cmd)
......
#
#
# Copyright (C) 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Classes and functions for import/export daemon.
"""
from cStringIO import StringIO
from ganeti import constants
from ganeti import errors
from ganeti import utils
#: Buffer size: at most this many bytes are transferred at once
BUFSIZE = 1024 * 1024
# Common options for socat
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
class CommandBuilder(object):
def __init__(self, mode, opts, socat_stderr_fd):
"""Initializes this class.
@param mode: Daemon mode (import or export)
@param opts: Options object
@type socat_stderr_fd: int
@param socat_stderr_fd: File descriptor socat should write its stderr to
"""
self._opts = opts
self._mode = mode
self._socat_stderr_fd = socat_stderr_fd
@staticmethod
def GetBashCommand(cmd):
"""Prepares a command to be run in Bash.
"""
return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
def _GetSocatCommand(self):
"""Returns the socat command.
"""
common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
"key=%s" % self._opts.key,
"cert=%s" % self._opts.cert,
"cafile=%s" % self._opts.ca,
]
if self._opts.bind is not None:
common_addr_opts.append("bind=%s" % self._opts.bind)
if self._mode == constants.IEM_IMPORT:
if self._opts.port is None:
port = 0
else:
port = self._opts.port
addr1 = [
"OPENSSL-LISTEN:%s" % port,
"reuseaddr",
# Retry to listen if connection wasn't established successfully, up to
# 100 times a second. Note that this still leaves room for DoS attacks.
"forever",
"intervall=0.01",
] + common_addr_opts
addr2 = ["stdout"]
elif self._mode == constants.IEM_EXPORT:
addr1 = ["stdin"]
addr2 = [
"OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
# How long to wait per connection attempt
"connect-timeout=%s" % self._opts.connect_timeout,
# Retry a few times before giving up to connect (once per second)
"retry=%s" % self._opts.connect_retries,
"intervall=1",
] + common_addr_opts
else:
raise errors.GenericError("Invalid mode '%s'" % self._mode)
for i in [addr1, addr2]:
for value in i:
if "," in value:
raise errors.GenericError("Comma not allowed in socat option"
" value: %r" % value)
return [
constants.SOCAT_PATH,
# Log to stderr
"-ls",
# Log level
"-d", "-d",
# Buffer size
"-b%s" % BUFSIZE,
# Unidirectional mode, the first address is only used for reading, and the
# second address is only used for writing
"-u",
",".join(addr1), ",".join(addr2)
]
def _GetTransportCommand(self):
"""Returns the command for the transport part of the daemon.
"""
socat_cmd = ("%s 2>&%d" %
(utils.ShellQuoteArgs(self._GetSocatCommand()),
self._socat_stderr_fd))
compr = self._opts.compress
assert compr in constants.IEC_ALL
if self._mode == constants.IEM_IMPORT:
if compr == constants.IEC_GZIP:
transport_cmd = "%s | gunzip -c" % socat_cmd
else:
transport_cmd = socat_cmd
elif self._mode == constants.IEM_EXPORT:
if compr == constants.IEC_GZIP:
transport_cmd = "gzip -c | %s" % socat_cmd
else:
transport_cmd = socat_cmd
else:
raise errors.GenericError("Invalid mode '%s'" % self._mode)
# TODO: Use "dd" to measure processed data (allows to give an ETA)
# TODO: Run transport as separate user
# The transport uses its own shell to simplify running it as a separate user
# in the future.
return self.GetBashCommand(transport_cmd)
def GetCommand(self):
"""Returns the complete child process command.
"""
transport_cmd = self._GetTransportCommand()
buf = StringIO()
if self._opts.cmd_prefix:
buf.write(self._opts.cmd_prefix)
buf.write(" ")
buf.write(utils.ShellQuoteArgs(transport_cmd))
if self._opts.cmd_suffix:
buf.write(" ")
buf.write(self._opts.cmd_suffix)
return self.GetBashCommand(buf.getvalue())
#!/usr/bin/python
#
# Copyright (C) 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for testing ganeti.impexpd"""
import os
import sys
import re
import unittest
from ganeti import constants
from ganeti import objects
from ganeti import compat
from ganeti import utils
from ganeti import errors
from ganeti import impexpd
import testutils
class CmdBuilderConfig(objects.ConfigObject):
__slots__ = [
"bind",
"key",
"cert",
"ca",
"host",
"port",
"compress",
"connect_timeout",
"connect_retries",
"cmd_prefix",
"cmd_suffix",
]
def CheckCmdWord(cmd, word):
wre = re.compile(r"\b%s\b" % re.escape(word))
return compat.any(wre.search(i) for i in cmd)
class TestCommandBuilder(unittest.TestCase):
def test(self):
for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
if mode == constants.IEM_IMPORT:
comprcmd = "gunzip"
elif mode == constants.IEM_EXPORT:
comprcmd = "gzip"
for compress in [constants.IEC_NONE, constants.IEC_GZIP]:
for host in ["localhost", "1.2.3.4", "192.0.2.99"]:
for port in [0, 1, 1234, 7856, 45452]:
for cmd_prefix in [None, "PrefixCommandGoesHere|",
"dd if=/dev/hda bs=1048576 |"]:
for cmd_suffix in [None, "< /some/file/name",
"| dd of=/dev/null"]:
opts = CmdBuilderConfig(host=host, port=port, compress=compress,
cmd_prefix=cmd_prefix,
cmd_suffix=cmd_suffix)
builder = impexpd.CommandBuilder(mode, opts, 1)
# Check complete command
cmd = builder.GetCommand()
self.assert_(isinstance(cmd, list))
if compress == constants.IEC_GZIP:
self.assert_(CheckCmdWord(cmd, comprcmd))
if cmd_prefix is not None:
self.assert_(cmd_prefix in i for i in cmd)
if cmd_suffix is not None:
self.assert_(cmd_suffix in i for i in cmd)
# Check socat command
socat_cmd = builder._GetSocatCommand()
if mode == constants.IEM_IMPORT:
ssl_addr = socat_cmd[-2].split(",")
self.assert_(("OPENSSL-LISTEN:%s" % port) in ssl_addr)
elif mode == constants.IEM_EXPORT:
ssl_addr = socat_cmd[-1].split(",")
self.assert_(("OPENSSL:%s:%s" % (host, port)) in ssl_addr)
self.assert_("verify=1" in ssl_addr)
def testCommaError(self):
opts = CmdBuilderConfig(host="localhost", port=1234,
ca="/some/path/with,a/,comma")
for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
builder = impexpd.CommandBuilder(mode, opts, 1)
self.assertRaises(errors.GenericError, builder.GetCommand)
def testModeError(self):
mode = "foobarbaz"
assert mode not in [constants.IEM_IMPORT, constants.IEM_EXPORT]
opts = CmdBuilderConfig(host="localhost", port=1234)
builder = impexpd.CommandBuilder(mode, opts, 1)
self.assertRaises(errors.GenericError, builder.GetCommand)
if __name__ == "__main__":
testutils.GanetiTestProgram()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment