Commit c08d76f5 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

import/export daemon: Record amount of data transferred



This reports the amount of data transferred and the throughput (averaged
over 60 seconds) to the master daemon. While not yet fully implemented,
once the export scripts report the expected data size, we can even provide
an ETA and percentage.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 158377f3
......@@ -35,6 +35,7 @@ import signal
import subprocess
import sys
import time
import math
from ganeti import constants
from ganeti import cli
......@@ -57,6 +58,16 @@ CHILD_LINGER_TIMEOUT = 5.0
#: How long to wait for a connection to be established
DEFAULT_CONNECT_TIMEOUT = 60
#: Get dd(1) statistics every few seconds
DD_STATISTICS_INTERVAL = 5.0
#: Seconds for throughput calculation
DD_THROUGHPUT_INTERVAL = 60.0
#: Number of samples for throughput calculation
DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) /
DD_STATISTICS_INTERVAL))
# Global variable for options
options = None
......@@ -140,6 +151,24 @@ class StatusFile:
"""
return self._data.connected
def SetProgress(self, mbytes, throughput, percent, eta):
"""Sets how much data has been transferred so far.
@type mbytes: number
@param mbytes: Transferred amount of data in MiB.
@type throughput: float
@param throughput: MiB/second
@type percent: number
@param percent: Percent processed
@type eta: number
@param eta: Expected number of seconds until done
"""
self._data.progress_mbytes = mbytes
self._data.progress_throughput = throughput
self._data.progress_percent = percent
self._data.progress_eta = eta
def SetExitStatus(self, exit_status, error_message):
"""Sets the exit status and an error message.
......@@ -177,7 +206,8 @@ class StatusFile:
mode=0400)
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
dd_pid_read_fd, status_file, child_logger,
signal_notify, signal_handler, mode):
"""Handles the child processes' output.
......@@ -189,15 +219,24 @@ def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
# might buffer data while poll(2) won't mark its file descriptor as
# readable again.
socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
tp_samples = DD_THROUGHPUT_SAMPLES
child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
child_logger)
child_logger,
throughput_samples=tp_samples)
try:
fdmap = {
child.stderr.fileno():
(child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)),
socat_stderr_read.fileno():
(socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)),
dd_pid_read.fileno():
(dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
dd_stderr_read.fileno():
(dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
signal_notify.fileno(): (signal_notify, None),
}
......@@ -212,6 +251,7 @@ def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
listen_timeout = None
exit_timeout = None
dd_stats_timeout = None
while True:
# Break out of loop if only signal notify FD is left
......@@ -239,6 +279,24 @@ def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger,
logging.info("Child process didn't exit in time")
break
if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0:
notify_status = child_io_proc.NotifyDd()
if notify_status:
# Schedule next notification
dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL,
True)
else:
# Try again soon (dd isn't ready yet)
dd_stats_timeout = locking.RunningTimeout(1.0, True)
if dd_stats_timeout:
dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000)
if timeout is None:
timeout = dd_timeout
else:
timeout = min(timeout, dd_timeout)
for fd, event in utils.RetryOnSignal(poller.poll, timeout):
if event & (select.POLLIN | event & select.POLLPRI):
(from_, to) = fdmap[fd]
......@@ -410,14 +468,22 @@ def main():
# Pipe to receive socat's stderr output
(socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
# Pipe to receive dd's stderr output
(dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe()
# Pipe to receive dd's PID
(dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
# Get child process command
cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd)
cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
dd_stderr_write_fd, dd_pid_write_fd)
cmd = cmd_builder.GetCommand()
logging.debug("Starting command %r", cmd)
# Start child process
child = ChildProcess(cmd, [socat_stderr_write_fd])
child = ChildProcess(cmd, [socat_stderr_write_fd, dd_stderr_write_fd,
dd_pid_write_fd])
try:
def _ForwardSignal(signum, _):
"""Forwards signals to child process.
......@@ -438,10 +504,12 @@ def main():
try:
# Close child's side
utils.RetryOnSignal(os.close, socat_stderr_write_fd)
utils.RetryOnSignal(os.close, dd_stderr_write_fd)
utils.RetryOnSignal(os.close, dd_pid_write_fd)
if ProcessChildIO(child, socat_stderr_read_fd, status_file,
child_logger, signal_wakeup, signal_handler,
mode):
if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
dd_pid_read_fd, status_file, child_logger,
signal_wakeup, signal_handler, mode):
# The child closed all its file descriptors and there was no
# signal
# TODO: Implement timeout instead of waiting indefinitely
......
......@@ -23,9 +23,13 @@
"""
import os
import re
import socket
import logging
import signal
import errno
import time
from cStringIO import StringIO
from ganeti import constants
......@@ -57,6 +61,17 @@ SOCAT_LOG_IGNORE = frozenset([
SOCAT_LOG_NOTICE,
])
#: Used to parse GNU dd(1) statistics
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
#: unavailable and SIGUSR1 is used instead)
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
#: Buffer size: at most this many bytes are transferred at once
BUFSIZE = 1024 * 1024
......@@ -65,26 +80,36 @@ SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
(PROG_OTHER,
PROG_SOCAT) = range(1, 3)
PROG_SOCAT,
PROG_DD,
PROG_DD_PID) = range(1, 5)
PROG_ALL = frozenset([
PROG_OTHER,
PROG_SOCAT,
PROG_DD,
PROG_DD_PID,
])
class CommandBuilder(object):
def __init__(self, mode, opts, socat_stderr_fd):
def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
"""Initializes this class.
@param mode: Daemon mode (import or export)
@param opts: Options object
@type socat_stderr_fd: int
@param socat_stderr_fd: File descriptor socat should write its stderr to
@type dd_stderr_fd: int
@param dd_stderr_fd: File descriptor dd should write its stderr to
@type dd_pid_fd: int
@param dd_pid_fd: File descriptor the child should write dd's PID to
"""
self._opts = opts
self._mode = mode
self._socat_stderr_fd = socat_stderr_fd
self._dd_stderr_fd = dd_stderr_fd
self._dd_pid_fd = dd_pid_fd
@staticmethod
def GetBashCommand(cmd):
......@@ -172,6 +197,18 @@ class CommandBuilder(object):
(utils.ShellQuoteArgs(self._GetSocatCommand()),
self._socat_stderr_fd))
dd_cmd = StringIO()
# Setting LC_ALL since we want to parse the output and explicitely
# redirecting stdin, as the background process (dd) would have /dev/null as
# stdin otherwise
dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
(BUFSIZE, self._dd_stderr_fd))
# Send PID to daemon
dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
# And wait for dd
dd_cmd.write(" wait $pid;")
dd_cmd.write(" }")
compr = self._opts.compress
assert compr in constants.IEC_ALL
......@@ -181,16 +218,18 @@ class CommandBuilder(object):
transport_cmd = "%s | gunzip -c" % socat_cmd
else:
transport_cmd = socat_cmd
transport_cmd += " | %s" % dd_cmd.getvalue()
elif self._mode == constants.IEM_EXPORT:
if compr == constants.IEC_GZIP:
transport_cmd = "gzip -c | %s" % socat_cmd
else:
transport_cmd = socat_cmd
transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd)
else:
raise errors.GenericError("Invalid mode '%s'" % self._mode)
# TODO: Use "dd" to measure processed data (allows to give an ETA)
# TODO: Run transport as separate user
# The transport uses its own shell to simplify running it as a separate user
# in the future.
......@@ -235,7 +274,7 @@ def _VerifyListening(family, address, port):
class ChildIOProcessor(object):
def __init__(self, debug, status_file, logger):
def __init__(self, debug, status_file, logger, throughput_samples):
"""Initializes this class.
"""
......@@ -246,6 +285,14 @@ class ChildIOProcessor(object):
self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
for prog in PROG_ALL])
self._dd_pid = None
self._dd_ready = False
self._dd_tp_samples = throughput_samples
self._dd_progress = []
# Expected size of transferred data
self._exp_size = None
def GetLineSplitter(self, prog):
"""Returns the line splitter for a program.
......@@ -267,6 +314,38 @@ class ChildIOProcessor(object):
ls.close()
self._splitter.clear()
def NotifyDd(self):
"""Tells dd(1) to write statistics.
"""
if self._dd_pid is None:
# Can't notify
return False
if not self._dd_ready:
# There's a race condition between starting the program and sending
# signals. The signal handler is only registered after some time, so we
# have to check whether the program is ready. If it isn't, sending a
# signal will invoke the default handler (and usually abort the program).
if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
return False
logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
self._dd_ready = True
logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
try:
os.kill(self._dd_pid, DD_INFO_SIGNAL)
except EnvironmentError, err:
if err.errno != errno.ESRCH:
raise
# Process no longer exists
self._dd_pid = None
return True
def _ProcessOutput(self, line, prog):
"""Takes care of child process output.
......@@ -295,6 +374,21 @@ class ChildIOProcessor(object):
else:
forward_line = "socat: %s" % line
elif prog == PROG_DD:
(should_forward, force_update) = self._ProcessDdOutput(line)
if should_forward or self._debug:
forward_line = "dd: %s" % line
else:
forward_line = None
elif prog == PROG_DD_PID:
if self._dd_pid:
raise RuntimeError("dd PID reported more than once")
logging.debug("Received dd PID %r", line)
self._dd_pid = int(line)
forward_line = None
if forward_line:
self._logger.info(forward_line)
self._status_file.AddRecentOutput(forward_line)
......@@ -326,3 +420,73 @@ class ChildIOProcessor(object):
return True
return False
def _ProcessDdOutput(self, line):
"""Interprets a line of dd(1)'s output.
"""
m = DD_INFO_RE.match(line)
if m:
seconds = float(m.group("seconds"))
mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
self._UpdateDdProgress(seconds, mbytes)
return (False, True)
m = DD_STDERR_IGNORE.match(line)
if m:
# Ignore
return (False, False)
# Forward line
return (True, False)
def _UpdateDdProgress(self, seconds, mbytes):
"""Updates the internal status variables for dd(1) progress.
@type seconds: float
@param seconds: Timestamp of this update
@type mbytes: float
@param mbytes: Total number of MiB transferred so far
"""
# Add latest sample
self._dd_progress.append((seconds, mbytes))
# Remove old samples
del self._dd_progress[:-self._dd_tp_samples]
# Calculate throughput
throughput = _CalcThroughput(self._dd_progress)
# Calculate percent and ETA
percent = None
eta = None
if self._exp_size is not None:
if self._exp_size != 0:
percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
if throughput:
eta = max(0, float(self._exp_size - mbytes) / throughput)
self._status_file.SetProgress(mbytes, throughput, percent, eta)
def _CalcThroughput(samples):
"""Calculates the throughput in MiB/second.
@type samples: sequence
@param samples: List of samples, each consisting of a (timestamp, mbytes)
tuple
@rtype: float or None
@return: Throughput in MiB/second
"""
if len(samples) < 2:
# Can't calculate throughput
return None
(start_time, start_mbytes) = samples[0]
(end_time, end_mbytes) = samples[-1]
return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)
......@@ -1017,6 +1017,10 @@ class ImportExportStatus(ConfigObject):
"recent_output",
"listen_port",
"connected",
"progress_mbytes",
"progress_throughput",
"progress_eta",
"progress_percent",
"exit_status",
"error_message",
] + _TIMESTAMPS
......
......@@ -76,7 +76,7 @@ class TestCommandBuilder(unittest.TestCase):
cmd_prefix=cmd_prefix,
cmd_suffix=cmd_suffix)
builder = impexpd.CommandBuilder(mode, opts, 1)
builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
# Check complete command
cmd = builder.GetCommand()
......@@ -108,7 +108,7 @@ class TestCommandBuilder(unittest.TestCase):
ca="/some/path/with,a/,comma")
for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]:
builder = impexpd.CommandBuilder(mode, opts, 1)
builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
self.assertRaises(errors.GenericError, builder.GetCommand)
def testModeError(self):
......@@ -117,9 +117,28 @@ class TestCommandBuilder(unittest.TestCase):
assert mode not in [constants.IEM_IMPORT, constants.IEM_EXPORT]
opts = CmdBuilderConfig(host="localhost", port=1234)
builder = impexpd.CommandBuilder(mode, opts, 1)
builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3)
self.assertRaises(errors.GenericError, builder.GetCommand)
class TestCalcThroughput(unittest.TestCase):
def test(self):
self.assertEqual(impexpd._CalcThroughput([]), None)
self.assertEqual(impexpd._CalcThroughput([(0, 0)]), None)
samples = [
(0.0, 0.0),
(10.0, 100.0),
]
self.assertAlmostEqual(impexpd._CalcThroughput(samples), 10.0, 3)
samples = [
(5.0, 7.0),
(10.0, 100.0),
(16.0, 181.0),
]
self.assertAlmostEqual(impexpd._CalcThroughput(samples), 15.818, 3)
if __name__ == "__main__":
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment