diff --git a/daemons/import-export b/daemons/import-export index 29386d1fe6deee95cd0035fa45fcd6d00b536d9e..0a6f6daa63468653d5b5605b373fa62787301af0 100755 --- a/daemons/import-export +++ b/daemons/import-export @@ -30,10 +30,8 @@ import errno import logging import optparse import os -import re import select import signal -import socket import subprocess import sys import time @@ -47,30 +45,6 @@ from ganeti import locking from ganeti import impexpd -#: Used to recognize point at which socat(1) starts to listen on its socket. -#: The local address is required for the remote peer to connect (in particular -#: the port number). -LISTENING_RE = re.compile(r"^listening on\s+" - r"AF=(?P<family>\d+)\s+" - r"(?P<address>.+):(?P<port>\d+)$", re.I) - -#: Used to recognize point at which socat(1) is sending data over the wire -TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$", - re.I) - -SOCAT_LOG_DEBUG = "D" -SOCAT_LOG_INFO = "I" -SOCAT_LOG_NOTICE = "N" -SOCAT_LOG_WARNING = "W" -SOCAT_LOG_ERROR = "E" -SOCAT_LOG_FATAL = "F" - -SOCAT_LOG_IGNORE = frozenset([ - SOCAT_LOG_DEBUG, - SOCAT_LOG_INFO, - SOCAT_LOG_NOTICE, - ]) - #: How many lines to keep in the status file MAX_RECENT_OUTPUT_LINES = 20 @@ -88,10 +62,6 @@ DEFAULT_CONNECT_TIMEOUT = 60 options = None -class Error(Exception): - """Generic exception""" - - def SetupLogging(): """Configures the logging module. @@ -120,22 +90,6 @@ def SetupLogging(): return child_logger -def _VerifyListening(family, address, port): - """Verify address given as listening address by socat. - - """ - # TODO: Implement IPv6 support - if family != socket.AF_INET: - raise Error("Address family %r not supported" % family) - - try: - packed_address = socket.inet_pton(family, address) - except socket.error: - raise Error("Invalid address %r for family %s" % (address, family)) - - return (socket.inet_ntop(family, packed_address), port) - - class StatusFile: """Status file manager. @@ -223,67 +177,6 @@ class StatusFile: mode=0400) -def _ProcessSocatOutput(status_file, level, msg): - """Interprets socat log output. - - """ - if level == SOCAT_LOG_NOTICE: - if status_file.GetListenPort() is None: - # TODO: Maybe implement timeout to not listen forever - m = LISTENING_RE.match(msg) - if m: - (_, port) = _VerifyListening(int(m.group("family")), m.group("address"), - int(m.group("port"))) - - status_file.SetListenPort(port) - return True - - if not status_file.GetConnected(): - m = TRANSFER_LOOP_RE.match(msg) - if m: - status_file.SetConnected() - return True - - return False - - -def ProcessOutput(line, status_file, logger, socat): - """Takes care of child process output. - - @param status_file: Status file manager - @param logger: Child output logger - @type socat: bool - @param socat: Whether it's a socat output line - @type line: string - @param line: Child output line - - """ - force_update = False - forward_line = line - - if socat: - level = None - parts = line.split(None, 4) - - if len(parts) == 5: - (_, _, _, level, msg) = parts - - force_update = _ProcessSocatOutput(status_file, level, msg) - - if options.debug or (level and level not in SOCAT_LOG_IGNORE): - forward_line = "socat: %s %s" % (level, msg) - else: - forward_line = None - else: - forward_line = "socat: %s" % line - - if forward_line: - logger.info(forward_line) - status_file.AddRecentOutput(forward_line) - - status_file.Update(force_update) - - def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, signal_notify, signal_handler, mode): """Handles the child processes' output. @@ -297,96 +190,92 @@ def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, # readable again. socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0) - script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file, - child_logger, False) + child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file, + child_logger) try: - socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file, - child_logger, True) - try: - fdmap = { - child.stderr.fileno(): (child.stderr, script_stderr_lines), - socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines), - signal_notify.fileno(): (signal_notify, None), - } - - poller = select.poll() - for fd in fdmap: - utils.SetNonblockFlag(fd, True) - poller.register(fd, select.POLLIN) - - if options.connect_timeout and mode == constants.IEM_IMPORT: - listen_timeout = locking.RunningTimeout(options.connect_timeout, True) - else: - listen_timeout = None - - exit_timeout = None - - while True: - # Break out of loop if only signal notify FD is left - if len(fdmap) == 1 and signal_notify.fileno() in fdmap: + fdmap = { + child.stderr.fileno(): + (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)), + socat_stderr_read.fileno(): + (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)), + signal_notify.fileno(): (signal_notify, None), + } + + poller = select.poll() + for fd in fdmap: + utils.SetNonblockFlag(fd, True) + poller.register(fd, select.POLLIN) + + if options.connect_timeout and mode == constants.IEM_IMPORT: + listen_timeout = locking.RunningTimeout(options.connect_timeout, True) + else: + listen_timeout = None + + exit_timeout = None + + while True: + # Break out of loop if only signal notify FD is left + if len(fdmap) == 1 and signal_notify.fileno() in fdmap: + break + + timeout = None + + if listen_timeout and not exit_timeout: + if status_file.GetConnected(): + listen_timeout = None + elif listen_timeout.Remaining() < 0: + logging.info("Child process didn't establish connection in time") + child.Kill(signal.SIGTERM) + exit_timeout = \ + locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True) + # Next block will calculate timeout + else: + # Not yet connected, check again in a second + timeout = 1000 + + if exit_timeout: + timeout = exit_timeout.Remaining() * 1000 + if timeout < 0: + logging.info("Child process didn't exit in time") break - timeout = None - - if listen_timeout and not exit_timeout: - if status_file.GetConnected(): - listen_timeout = None - elif listen_timeout.Remaining() < 0: - logging.info("Child process didn't establish connection in time") - child.Kill(signal.SIGTERM) - exit_timeout = \ - locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True) - # Next block will calculate timeout + for fd, event in utils.RetryOnSignal(poller.poll, timeout): + if event & (select.POLLIN | event & select.POLLPRI): + (from_, to) = fdmap[fd] + + # Read up to 1 KB of data + data = from_.read(1024) + if data: + if to: + to.write(data) + elif fd == signal_notify.fileno(): + # Signal handling + if signal_handler.called: + signal_handler.Clear() + if exit_timeout: + logging.info("Child process still has about %0.2f seconds" + " to exit", exit_timeout.Remaining()) + else: + logging.info("Giving child process %0.2f seconds to exit", + CHILD_LINGER_TIMEOUT) + exit_timeout = \ + locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True) else: - # Not yet connected, check again in a second - timeout = 1000 - - if exit_timeout: - timeout = exit_timeout.Remaining() * 1000 - if timeout < 0: - logging.info("Child process didn't exit in time") - break - - for fd, event in utils.RetryOnSignal(poller.poll, timeout): - if event & (select.POLLIN | event & select.POLLPRI): - (from_, to) = fdmap[fd] - - # Read up to 1 KB of data - data = from_.read(1024) - if data: - if to: - to.write(data) - elif fd == signal_notify.fileno(): - # Signal handling - if signal_handler.called: - signal_handler.Clear() - if exit_timeout: - logging.info("Child process still has about %0.2f seconds" - " to exit", exit_timeout.Remaining()) - else: - logging.info("Giving child process %0.2f seconds to exit", - CHILD_LINGER_TIMEOUT) - exit_timeout = \ - locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True) - else: - poller.unregister(fd) - del fdmap[fd] - - elif event & (select.POLLNVAL | select.POLLHUP | - select.POLLERR): poller.unregister(fd) del fdmap[fd] - script_stderr_lines.flush() - socat_stderr_lines.flush() + elif event & (select.POLLNVAL | select.POLLHUP | + select.POLLERR): + poller.unregister(fd) + del fdmap[fd] + + child_io_proc.FlushAll() - # If there was a timeout calculator, we were waiting for the child to - # finish, e.g. due to a signal - return not bool(exit_timeout) - finally: - socat_stderr_lines.close() + # If there was a timeout calculator, we were waiting for the child to + # finish, e.g. due to a signal + return not bool(exit_timeout) finally: - script_stderr_lines.close() + child_io_proc.CloseAll() def ParseOptions(): diff --git a/lib/impexpd/__init__.py b/lib/impexpd/__init__.py index 8a57c4f4448c85f67cf0c31b11384423f2b5623c..b75bbf3dc1745536cd44da5bdce6b0182caa78af 100644 --- a/lib/impexpd/__init__.py +++ b/lib/impexpd/__init__.py @@ -23,6 +23,8 @@ """ +import re +import socket from cStringIO import StringIO from ganeti import constants @@ -30,6 +32,30 @@ from ganeti import errors from ganeti import utils +#: Used to recognize point at which socat(1) starts to listen on its socket. +#: The local address is required for the remote peer to connect (in particular +#: the port number). +LISTENING_RE = re.compile(r"^listening on\s+" + r"AF=(?P<family>\d+)\s+" + r"(?P<address>.+):(?P<port>\d+)$", re.I) + +#: Used to recognize point at which socat(1) is sending data over the wire +TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$", + re.I) + +SOCAT_LOG_DEBUG = "D" +SOCAT_LOG_INFO = "I" +SOCAT_LOG_NOTICE = "N" +SOCAT_LOG_WARNING = "W" +SOCAT_LOG_ERROR = "E" +SOCAT_LOG_FATAL = "F" + +SOCAT_LOG_IGNORE = frozenset([ + SOCAT_LOG_DEBUG, + SOCAT_LOG_INFO, + SOCAT_LOG_NOTICE, + ]) + #: Buffer size: at most this many bytes are transferred at once BUFSIZE = 1024 * 1024 @@ -37,6 +63,13 @@ BUFSIZE = 1024 * 1024 SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"] +(PROG_OTHER, + PROG_SOCAT) = range(1, 3) +PROG_ALL = frozenset([ + PROG_OTHER, + PROG_SOCAT, + ]) + class CommandBuilder(object): def __init__(self, mode, opts, socat_stderr_fd): @@ -181,3 +214,113 @@ class CommandBuilder(object): buf.write(self._opts.cmd_suffix) return self.GetBashCommand(buf.getvalue()) + + +def _VerifyListening(family, address, port): + """Verify address given as listening address by socat. + + """ + # TODO: Implement IPv6 support + if family != socket.AF_INET: + raise errors.GenericError("Address family %r not supported" % family) + + try: + packed_address = socket.inet_pton(family, address) + except socket.error: + raise errors.GenericError("Invalid address %r for family %s" % + (address, family)) + + return (socket.inet_ntop(family, packed_address), port) + + +class ChildIOProcessor(object): + def __init__(self, debug, status_file, logger): + """Initializes this class. + + """ + self._debug = debug + self._status_file = status_file + self._logger = logger + + self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog)) + for prog in PROG_ALL]) + + def GetLineSplitter(self, prog): + """Returns the line splitter for a program. + + """ + return self._splitter[prog] + + def FlushAll(self): + """Flushes all line splitters. + + """ + for ls in self._splitter.itervalues(): + ls.flush() + + def CloseAll(self): + """Closes all line splitters. + + """ + for ls in self._splitter.itervalues(): + ls.close() + self._splitter.clear() + + def _ProcessOutput(self, line, prog): + """Takes care of child process output. + + @type line: string + @param line: Child output line + @type prog: number + @param prog: Program from which the line originates + + """ + force_update = False + forward_line = line + + if prog == PROG_SOCAT: + level = None + parts = line.split(None, 4) + + if len(parts) == 5: + (_, _, _, level, msg) = parts + + force_update = self._ProcessSocatOutput(self._status_file, level, msg) + + if self._debug or (level and level not in SOCAT_LOG_IGNORE): + forward_line = "socat: %s %s" % (level, msg) + else: + forward_line = None + else: + forward_line = "socat: %s" % line + + if forward_line: + self._logger.info(forward_line) + self._status_file.AddRecentOutput(forward_line) + + self._status_file.Update(force_update) + + @staticmethod + def _ProcessSocatOutput(status_file, level, msg): + """Interprets socat log output. + + """ + if level == SOCAT_LOG_NOTICE: + if status_file.GetListenPort() is None: + # TODO: Maybe implement timeout to not listen forever + m = LISTENING_RE.match(msg) + if m: + (_, port) = _VerifyListening(int(m.group("family")), + m.group("address"), + int(m.group("port"))) + + status_file.SetListenPort(port) + return True + + if not status_file.GetConnected(): + m = TRANSFER_LOOP_RE.match(msg) + if m: + status_file.SetConnected() + return True + + return False