-
Manuel Franceschini authored
This patch moves network utility functions to a dedicated module. Signed-off-by:
Manuel Franceschini <livewire@google.com> Reviewed-by:
Iustin Pop <iustin@google.com>
a744b676
import-export 19.14 KiB
#!/usr/bin/python
#
# Copyright (C) 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Import/export daemon.
"""
# pylint: disable-msg=C0103
# C0103: Invalid name import-export
import errno
import logging
import optparse
import os
import select
import signal
import subprocess
import sys
import time
import math
from ganeti import constants
from ganeti import cli
from ganeti import utils
from ganeti import errors
from ganeti import serializer
from ganeti import objects
from ganeti import locking
from ganeti import impexpd
from ganeti import netutils
#: How many lines to keep in the status file
MAX_RECENT_OUTPUT_LINES = 20
#: Don't update status file more than once every 5 seconds (unless forced)
MIN_UPDATE_INTERVAL = 5.0
#: Give child process up to 5 seconds to exit after sending a signal
CHILD_LINGER_TIMEOUT = 5.0
#: How long to wait for a connection to be established
DEFAULT_CONNECT_TIMEOUT = 60
#: Get dd(1) statistics every few seconds
DD_STATISTICS_INTERVAL = 5.0
#: Seconds for throughput calculation
DD_THROUGHPUT_INTERVAL = 60.0
#: Number of samples for throughput calculation
DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
DD_STATISTICS_INTERVAL))
# Global variable for options
options = None
def SetupLogging():
"""Configures the logging module.
"""
formatter = logging.Formatter("%(asctime)s: %(message)s")
stderr_handler = logging.StreamHandler()
stderr_handler.setFormatter(formatter)
stderr_handler.setLevel(logging.NOTSET)
root_logger = logging.getLogger("")
root_logger.addHandler(stderr_handler)
if options.debug:
root_logger.setLevel(logging.NOTSET)
elif options.verbose:
root_logger.setLevel(logging.INFO)
else:
root_logger.setLevel(logging.ERROR)
# Create special logger for child process output
child_logger = logging.Logger("child output")
child_logger.addHandler(stderr_handler)
child_logger.setLevel(logging.NOTSET)
return child_logger
class StatusFile:
"""Status file manager.
"""
def __init__(self, path):
"""Initializes class.
"""
self._path = path
self._data = objects.ImportExportStatus(ctime=time.time(),
mtime=None,
recent_output=[])
def AddRecentOutput(self, line):
"""Adds a new line of recent output.
"""
self._data.recent_output.append(line)
# Remove old lines
del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
def SetListenPort(self, port):
"""Sets the port the daemon is listening on.
@type port: int
@param port: TCP/UDP port
"""
assert isinstance(port, (int, long)) and 0 < port < 2**16
self._data.listen_port = port
def GetListenPort(self):
"""Returns the port the daemon is listening on.
"""
return self._data.listen_port
def SetConnected(self):
"""Sets the connected flag.
"""
self._data.connected = True
def GetConnected(self):
"""Determines whether the daemon is connected.
"""
return self._data.connected
def SetProgress(self, mbytes, throughput, percent, eta):
"""Sets how much data has been transferred so far.
@type mbytes: number
@param mbytes: Transferred amount of data in MiB.
@type throughput: float
@param throughput: MiB/second
@type percent: number
@param percent: Percent processed
@type eta: number
@param eta: Expected number of seconds until done
"""
self._data.progress_mbytes = mbytes
self._data.progress_throughput = throughput
self._data.progress_percent = percent
self._data.progress_eta = eta
def SetExitStatus(self, exit_status, error_message):
"""Sets the exit status and an error message.
"""
# Require error message when status isn't 0
assert exit_status == 0 or error_message
self._data.exit_status = exit_status
self._data.error_message = error_message
def ExitStatusIsSuccess(self):
"""Returns whether the exit status means "success".
"""
return not bool(self._data.error_message)
def Update(self, force):
"""Updates the status file.
@type force: bool
@param force: Write status file in any case, not only when minimum interval
is expired
"""
if not (force or
self._data.mtime is None or
time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
return
logging.debug("Updating status file %s", self._path)
self._data.mtime = time.time()
utils.WriteFile(self._path,
data=serializer.DumpJson(self._data.ToDict(), indent=True),
mode=0400)
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
signal_notify, signal_handler, mode):
"""Handles the child processes' output.
"""
assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \
"Other signals are not handled in this function"
# Buffer size 0 is important, otherwise .read() with a specified length
# might buffer data while poll(2) won't mark its file descriptor as
# readable again.
socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
tp_samples = DD_THROUGHPUT_SAMPLES
if options.exp_size == constants.IE_CUSTOM_SIZE:
exp_size = None
else:
exp_size = options.exp_size
child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
child_logger, tp_samples,
exp_size)
try:
fdmap = {
child.stderr.fileno():
(child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
socat_stderr_read.fileno():
(socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
dd_pid_read.fileno():
(dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
dd_stderr_read.fileno():
(dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
exp_size_read.fileno():
(exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
signal_notify.fileno(): (signal_notify, None),
}
poller = select.poll()
for fd in fdmap:
utils.SetNonblockFlag(fd, True)
poller.register(fd, select.POLLIN)
if options.connect_timeout and mode == constants.IEM_IMPORT:
listen_timeout = locking.RunningTimeout(options.connect_timeout, True)
else:
listen_timeout = None
exit_timeout = None
dd_stats_timeout = None
while True:
# Break out of loop if only signal notify FD is left
if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
break
timeout = None
if listen_timeout and not exit_timeout:
if status_file.GetConnected():
listen_timeout = None
elif listen_timeout.Remaining() < 0:
logging.info("Child process didn't establish connection in time")
child.Kill(signal.SIGTERM)
exit_timeout = \
locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
# Next block will calculate timeout
else:
# Not yet connected, check again in a second
timeout = 1000
if exit_timeout:
timeout = exit_timeout.Remaining() * 1000
if timeout < 0:
logging.info("Child process didn't exit in time")
break
if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
notify_status = child_io_proc.NotifyDd()
if notify_status:
# Schedule next notification
dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL,
True)
else:
# Try again soon (dd isn't ready yet)
dd_stats_timeout = locking.RunningTimeout(1.0, True)
if dd_stats_timeout:
dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
if timeout is None:
timeout = dd_timeout
else:
timeout = min(timeout, dd_timeout)
for fd, event in utils.RetryOnSignal(poller.poll, timeout):
if event & (select.POLLIN | event & select.POLLPRI):
(from_, to) = fdmap[fd]
# Read up to 1 KB of data
data = from_.read(1024)
if data:
if to:
to.write(data)
elif fd == signal_notify.fileno():
# Signal handling
if signal_handler.called:
signal_handler.Clear()
if exit_timeout:
logging.info("Child process still has about %0.2f seconds"
" to exit", exit_timeout.Remaining())
else:
logging.info("Giving child process %0.2f seconds to exit",
CHILD_LINGER_TIMEOUT)
exit_timeout = \
locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
else:
poller.unregister(fd)
del fdmap[fd]
elif event & (select.POLLNVAL | select.POLLHUP |
select.POLLERR):
poller.unregister(fd)
del fdmap[fd]
child_io_proc.FlushAll()
# If there was a timeout calculator, we were waiting for the child to
# finish, e.g. due to a signal
return not bool(exit_timeout)
finally:
child_io_proc.CloseAll()
def ParseOptions():
"""Parses the options passed to the program.
@return: Arguments to program
"""
global options # pylint: disable-msg=W0603
parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
(constants.IEM_IMPORT,
constants.IEM_EXPORT)))
parser.add_option(cli.DEBUG_OPT)
parser.add_option(cli.VERBOSE_OPT)
parser.add_option("--key", dest="key", action="store", type="string",
help="RSA key file")
parser.add_option("--cert", dest="cert", action="store", type="string",
help="X509 certificate file")
parser.add_option("--ca", dest="ca", action="store", type="string",
help="X509 CA file")
parser.add_option("--bind", dest="bind", action="store", type="string",
help="Bind address")
parser.add_option("--host", dest="host", action="store", type="string",
help="Remote hostname")
parser.add_option("--port", dest="port", action="store", type="int",
help="Remote port")
parser.add_option("--connect-retries", dest="connect_retries", action="store",
type="int", default=0,
help=("How many times the connection should be retried"
" (export only)"))
parser.add_option("--connect-timeout", dest="connect_timeout", action="store",
type="int", default=DEFAULT_CONNECT_TIMEOUT,
help="Timeout for connection to be established (seconds)")
parser.add_option("--compress", dest="compress", action="store",
type="choice", help="Compression method",
metavar="[%s]" % "|".join(constants.IEC_ALL),
choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
parser.add_option("--expected-size", dest="exp_size", action="store",
type="string", default=None,
help="Expected import/export size (MiB)")
parser.add_option("--magic", dest="magic", action="store",
type="string", default=None, help="Magic string")
parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
type="string", help="Command prefix")
parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
type="string", help="Command suffix")
(options, args) = parser.parse_args()
if len(args) != 2:
# Won't return
parser.error("Expected exactly two arguments")
(status_file_path, mode) = args
if mode not in (constants.IEM_IMPORT,
constants.IEM_EXPORT):
# Won't return
parser.error("Invalid mode: %s" % mode)
# Normalize and check parameters
if options.host is not None:
try:
options.host = netutils.HostInfo.NormalizeName(options.host)
except errors.OpPrereqError, err:
parser.error("Invalid hostname '%s': %s" % (options.host, err))
if options.port is not None:
options.port = utils.ValidateServiceName(options.port)
if (options.exp_size is not None and
options.exp_size != constants.IE_CUSTOM_SIZE):
try:
options.exp_size = int(options.exp_size)
except (ValueError, TypeError), err:
# Won't return
parser.error("Invalid value for --expected-size: %s (%s)" %
(options.exp_size, err))
if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)):
parser.error("Magic must match regular expression %s" %
constants.IE_MAGIC_RE.pattern)
return (status_file_path, mode)
class ChildProcess(subprocess.Popen):
def __init__(self, env, cmd, noclose_fds):
"""Initializes this class.
"""
self._noclose_fds = noclose_fds
# Not using close_fds because doing so would also close the socat stderr
# pipe, which we still need.
subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
stderr=subprocess.PIPE, stdout=None, stdin=None,
preexec_fn=self._ChildPreexec)
self._SetProcessGroup()
def _ChildPreexec(self):
"""Called before child executable is execve'd.
"""
# Move to separate process group. By sending a signal to its process group
# we can kill the child process and all grandchildren.
os.setpgid(0, 0)
# Close almost all file descriptors
utils.CloseFDs(noclose_fds=self._noclose_fds)
def _SetProcessGroup(self):
"""Sets the child's process group.
"""
assert self.pid, "Can't be called in child process"
# Avoid race condition by setting child's process group (as good as
# possible in Python) before sending signals to child. For an
# explanation, see preexec function for child.
try:
os.setpgid(self.pid, self.pid)
except EnvironmentError, err:
# If the child process was faster we receive EPERM or EACCES
if err.errno not in (errno.EPERM, errno.EACCES):
raise
def Kill(self, signum):
"""Sends signal to child process.
"""
logging.info("Sending signal %s to child process", signum)
utils.IgnoreProcessNotFound(os.killpg, self.pid, signum)
def ForceQuit(self):
"""Ensure child process is no longer running.
"""
# Final check if child process is still alive
if utils.RetryOnSignal(self.poll) is None:
logging.error("Child process still alive, sending SIGKILL")
self.Kill(signal.SIGKILL)
utils.RetryOnSignal(self.wait)
def main():
"""Main function.
"""
# Option parsing
(status_file_path, mode) = ParseOptions()
# Configure logging
child_logger = SetupLogging()
status_file = StatusFile(status_file_path)
try:
try:
# Pipe to receive socat's stderr output
(socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
# Pipe to receive dd's stderr output
(dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
# Pipe to receive dd's PID
(dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
# Pipe to receive size predicted by export script
(exp_size_read_fd, exp_size_write_fd) = os.pipe()
# Get child process command
cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
dd_stderr_write_fd, dd_pid_write_fd)
cmd = cmd_builder.GetCommand()
# Prepare command environment
cmd_env = os.environ.copy()
if options.exp_size == constants.IE_CUSTOM_SIZE:
cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
logging.debug("Starting command %r", cmd)
# Start child process
child = ChildProcess(cmd_env, cmd,
[socat_stderr_write_fd, dd_stderr_write_fd,
dd_pid_write_fd, exp_size_write_fd])
try:
def _ForwardSignal(signum, _):
"""Forwards signals to child process.
"""
child.Kill(signum)
signal_wakeup = utils.SignalWakeupFd()
try:
# TODO: There is a race condition between starting the child and
# handling the signals here. While there might be a way to work around
# it by registering the handlers before starting the child and
# deferring sent signals until the child is available, doing so can be
# complicated.
signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
handler_fn=_ForwardSignal,
wakeup=signal_wakeup)
try:
# Close child's side
utils.RetryOnSignal(os.close, socat_stderr_write_fd)
utils.RetryOnSignal(os.close, dd_stderr_write_fd)
utils.RetryOnSignal(os.close, dd_pid_write_fd)
utils.RetryOnSignal(os.close, exp_size_write_fd)
if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
dd_pid_read_fd, exp_size_read_fd,
status_file, child_logger,
signal_wakeup, signal_handler, mode):
# The child closed all its file descriptors and there was no
# signal
# TODO: Implement timeout instead of waiting indefinitely
utils.RetryOnSignal(child.wait)
finally:
signal_handler.Reset()
finally:
signal_wakeup.Reset()
finally:
child.ForceQuit()
if child.returncode == 0:
errmsg = None
elif child.returncode < 0:
errmsg = "Exited due to signal %s" % (-child.returncode, )
else:
errmsg = "Exited with status %s" % (child.returncode, )
status_file.SetExitStatus(child.returncode, errmsg)
except Exception, err: # pylint: disable-msg=W0703
logging.exception("Unhandled error occurred")
status_file.SetExitStatus(constants.EXIT_FAILURE,
"Unhandled error occurred: %s" % (err, ))
if status_file.ExitStatusIsSuccess():
sys.exit(constants.EXIT_SUCCESS)
sys.exit(constants.EXIT_FAILURE)
finally:
status_file.Update(True)
if __name__ == "__main__":
main()