From c08d76f5f297ead72045c8dc7212bc76195f47e6 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Wed, 26 May 2010 20:57:42 +0200 Subject: [PATCH] import/export daemon: Record amount of data transferred This reports the amount of data transferred and the throughput (averaged over 60 seconds) to the master daemon. While not yet fully implemented, once the export scripts report the expected data size, we can even provide an ETA and percentage. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Guido Trotter <ultrotter@google.com> --- daemons/import-export | 82 +++++++++++++-- lib/impexpd/__init__.py | 174 +++++++++++++++++++++++++++++++- lib/objects.py | 4 + test/ganeti.impexpd_unittest.py | 25 ++++- 4 files changed, 270 insertions(+), 15 deletions(-) diff --git a/daemons/import-export b/daemons/import-export index 0a6f6daa6..53c4e28d8 100755 --- a/daemons/import-export +++ b/daemons/import-export @@ -35,6 +35,7 @@ import signal import subprocess import sys import time +import math from ganeti import constants from ganeti import cli @@ -57,6 +58,16 @@ CHILD_LINGER_TIMEOUT = 5.0 #: How long to wait for a connection to be established DEFAULT_CONNECT_TIMEOUT = 60 +#: Get dd(1) statistics every few seconds +DD_STATISTICS_INTERVAL = 5.0 + +#: Seconds for throughput calculation +DD_THROUGHPUT_INTERVAL = 60.0 + +#: Number of samples for throughput calculation +DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) / + DD_STATISTICS_INTERVAL)) + # Global variable for options options = None @@ -140,6 +151,24 @@ class StatusFile: """ return self._data.connected + def SetProgress(self, mbytes, throughput, percent, eta): + """Sets how much data has been transferred so far. + + @type mbytes: number + @param mbytes: Transferred amount of data in MiB. + @type throughput: float + @param throughput: MiB/second + @type percent: number + @param percent: Percent processed + @type eta: number + @param eta: Expected number of seconds until done + + """ + self._data.progress_mbytes = mbytes + self._data.progress_throughput = throughput + self._data.progress_percent = percent + self._data.progress_eta = eta + def SetExitStatus(self, exit_status, error_message): """Sets the exit status and an error message. @@ -177,7 +206,8 @@ class StatusFile: mode=0400) -def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, +def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, + dd_pid_read_fd, status_file, child_logger, signal_notify, signal_handler, mode): """Handles the child processes' output. @@ -189,15 +219,24 @@ def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, # might buffer data while poll(2) won't mark its file descriptor as # readable again. socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0) + dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0) + dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0) + + tp_samples = DD_THROUGHPUT_SAMPLES child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file, - child_logger) + child_logger, + throughput_samples=tp_samples) try: fdmap = { child.stderr.fileno(): (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)), socat_stderr_read.fileno(): (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)), + dd_pid_read.fileno(): + (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)), + dd_stderr_read.fileno(): + (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)), signal_notify.fileno(): (signal_notify, None), } @@ -212,6 +251,7 @@ def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, listen_timeout = None exit_timeout = None + dd_stats_timeout = None while True: # Break out of loop if only signal notify FD is left @@ -239,6 +279,24 @@ def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, logging.info("Child process didn't exit in time") break + if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0: + notify_status = child_io_proc.NotifyDd() + if notify_status: + # Schedule next notification + dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL, + True) + else: + # Try again soon (dd isn't ready yet) + dd_stats_timeout = locking.RunningTimeout(1.0, True) + + if dd_stats_timeout: + dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000) + + if timeout is None: + timeout = dd_timeout + else: + timeout = min(timeout, dd_timeout) + for fd, event in utils.RetryOnSignal(poller.poll, timeout): if event & (select.POLLIN | event & select.POLLPRI): (from_, to) = fdmap[fd] @@ -410,14 +468,22 @@ def main(): # Pipe to receive socat's stderr output (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe() + # Pipe to receive dd's stderr output + (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe() + + # Pipe to receive dd's PID + (dd_pid_read_fd, dd_pid_write_fd) = os.pipe() + # Get child process command - cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd) + cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd, + dd_stderr_write_fd, dd_pid_write_fd) cmd = cmd_builder.GetCommand() logging.debug("Starting command %r", cmd) # Start child process - child = ChildProcess(cmd, [socat_stderr_write_fd]) + child = ChildProcess(cmd, [socat_stderr_write_fd, dd_stderr_write_fd, + dd_pid_write_fd]) try: def _ForwardSignal(signum, _): """Forwards signals to child process. @@ -438,10 +504,12 @@ def main(): try: # Close child's side utils.RetryOnSignal(os.close, socat_stderr_write_fd) + utils.RetryOnSignal(os.close, dd_stderr_write_fd) + utils.RetryOnSignal(os.close, dd_pid_write_fd) - if ProcessChildIO(child, socat_stderr_read_fd, status_file, - child_logger, signal_wakeup, signal_handler, - mode): + if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, + dd_pid_read_fd, status_file, child_logger, + signal_wakeup, signal_handler, mode): # The child closed all its file descriptors and there was no # signal # TODO: Implement timeout instead of waiting indefinitely diff --git a/lib/impexpd/__init__.py b/lib/impexpd/__init__.py index e396dc5fe..4f9efdabe 100644 --- a/lib/impexpd/__init__.py +++ b/lib/impexpd/__init__.py @@ -23,9 +23,13 @@ """ +import os import re import socket import logging +import signal +import errno +import time from cStringIO import StringIO from ganeti import constants @@ -57,6 +61,17 @@ SOCAT_LOG_IGNORE = frozenset([ SOCAT_LOG_NOTICE, ]) +#: Used to parse GNU dd(1) statistics +DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*" + r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I) + +#: Used to ignore "N+N records in/out" on dd(1)'s stderr +DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I) + +#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is +#: unavailable and SIGUSR1 is used instead) +DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1) + #: Buffer size: at most this many bytes are transferred at once BUFSIZE = 1024 * 1024 @@ -65,26 +80,36 @@ SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"] (PROG_OTHER, - PROG_SOCAT) = range(1, 3) + PROG_SOCAT, + PROG_DD, + PROG_DD_PID) = range(1, 5) PROG_ALL = frozenset([ PROG_OTHER, PROG_SOCAT, + PROG_DD, + PROG_DD_PID, ]) class CommandBuilder(object): - def __init__(self, mode, opts, socat_stderr_fd): + def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd): """Initializes this class. @param mode: Daemon mode (import or export) @param opts: Options object @type socat_stderr_fd: int @param socat_stderr_fd: File descriptor socat should write its stderr to + @type dd_stderr_fd: int + @param dd_stderr_fd: File descriptor dd should write its stderr to + @type dd_pid_fd: int + @param dd_pid_fd: File descriptor the child should write dd's PID to """ self._opts = opts self._mode = mode self._socat_stderr_fd = socat_stderr_fd + self._dd_stderr_fd = dd_stderr_fd + self._dd_pid_fd = dd_pid_fd @staticmethod def GetBashCommand(cmd): @@ -172,6 +197,18 @@ class CommandBuilder(object): (utils.ShellQuoteArgs(self._GetSocatCommand()), self._socat_stderr_fd)) + dd_cmd = StringIO() + # Setting LC_ALL since we want to parse the output and explicitely + # redirecting stdin, as the background process (dd) would have /dev/null as + # stdin otherwise + dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" % + (BUFSIZE, self._dd_stderr_fd)) + # Send PID to daemon + dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd) + # And wait for dd + dd_cmd.write(" wait $pid;") + dd_cmd.write(" }") + compr = self._opts.compress assert compr in constants.IEC_ALL @@ -181,16 +218,18 @@ class CommandBuilder(object): transport_cmd = "%s | gunzip -c" % socat_cmd else: transport_cmd = socat_cmd + + transport_cmd += " | %s" % dd_cmd.getvalue() elif self._mode == constants.IEM_EXPORT: if compr == constants.IEC_GZIP: transport_cmd = "gzip -c | %s" % socat_cmd else: transport_cmd = socat_cmd + + transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd) else: raise errors.GenericError("Invalid mode '%s'" % self._mode) - # TODO: Use "dd" to measure processed data (allows to give an ETA) - # TODO: Run transport as separate user # The transport uses its own shell to simplify running it as a separate user # in the future. @@ -235,7 +274,7 @@ def _VerifyListening(family, address, port): class ChildIOProcessor(object): - def __init__(self, debug, status_file, logger): + def __init__(self, debug, status_file, logger, throughput_samples): """Initializes this class. """ @@ -246,6 +285,14 @@ class ChildIOProcessor(object): self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog)) for prog in PROG_ALL]) + self._dd_pid = None + self._dd_ready = False + self._dd_tp_samples = throughput_samples + self._dd_progress = [] + + # Expected size of transferred data + self._exp_size = None + def GetLineSplitter(self, prog): """Returns the line splitter for a program. @@ -267,6 +314,38 @@ class ChildIOProcessor(object): ls.close() self._splitter.clear() + def NotifyDd(self): + """Tells dd(1) to write statistics. + + """ + if self._dd_pid is None: + # Can't notify + return False + + if not self._dd_ready: + # There's a race condition between starting the program and sending + # signals. The signal handler is only registered after some time, so we + # have to check whether the program is ready. If it isn't, sending a + # signal will invoke the default handler (and usually abort the program). + if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL): + logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL) + return False + + logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL) + self._dd_ready = True + + logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid) + try: + os.kill(self._dd_pid, DD_INFO_SIGNAL) + except EnvironmentError, err: + if err.errno != errno.ESRCH: + raise + + # Process no longer exists + self._dd_pid = None + + return True + def _ProcessOutput(self, line, prog): """Takes care of child process output. @@ -295,6 +374,21 @@ class ChildIOProcessor(object): else: forward_line = "socat: %s" % line + elif prog == PROG_DD: + (should_forward, force_update) = self._ProcessDdOutput(line) + + if should_forward or self._debug: + forward_line = "dd: %s" % line + else: + forward_line = None + + elif prog == PROG_DD_PID: + if self._dd_pid: + raise RuntimeError("dd PID reported more than once") + logging.debug("Received dd PID %r", line) + self._dd_pid = int(line) + forward_line = None + if forward_line: self._logger.info(forward_line) self._status_file.AddRecentOutput(forward_line) @@ -326,3 +420,73 @@ class ChildIOProcessor(object): return True return False + + def _ProcessDdOutput(self, line): + """Interprets a line of dd(1)'s output. + + """ + m = DD_INFO_RE.match(line) + if m: + seconds = float(m.group("seconds")) + mbytes = utils.BytesToMebibyte(int(m.group("bytes"))) + self._UpdateDdProgress(seconds, mbytes) + return (False, True) + + m = DD_STDERR_IGNORE.match(line) + if m: + # Ignore + return (False, False) + + # Forward line + return (True, False) + + def _UpdateDdProgress(self, seconds, mbytes): + """Updates the internal status variables for dd(1) progress. + + @type seconds: float + @param seconds: Timestamp of this update + @type mbytes: float + @param mbytes: Total number of MiB transferred so far + + """ + # Add latest sample + self._dd_progress.append((seconds, mbytes)) + + # Remove old samples + del self._dd_progress[:-self._dd_tp_samples] + + # Calculate throughput + throughput = _CalcThroughput(self._dd_progress) + + # Calculate percent and ETA + percent = None + eta = None + + if self._exp_size is not None: + if self._exp_size != 0: + percent = max(0, min(100, (100.0 * mbytes) / self._exp_size)) + + if throughput: + eta = max(0, float(self._exp_size - mbytes) / throughput) + + self._status_file.SetProgress(mbytes, throughput, percent, eta) + + +def _CalcThroughput(samples): + """Calculates the throughput in MiB/second. + + @type samples: sequence + @param samples: List of samples, each consisting of a (timestamp, mbytes) + tuple + @rtype: float or None + @return: Throughput in MiB/second + + """ + if len(samples) < 2: + # Can't calculate throughput + return None + + (start_time, start_mbytes) = samples[0] + (end_time, end_mbytes) = samples[-1] + + return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time) diff --git a/lib/objects.py b/lib/objects.py index 59d94bd3f..f954081fb 100644 --- a/lib/objects.py +++ b/lib/objects.py @@ -1017,6 +1017,10 @@ class ImportExportStatus(ConfigObject): "recent_output", "listen_port", "connected", + "progress_mbytes", + "progress_throughput", + "progress_eta", + "progress_percent", "exit_status", "error_message", ] + _TIMESTAMPS diff --git a/test/ganeti.impexpd_unittest.py b/test/ganeti.impexpd_unittest.py index e4a82b664..78320333f 100755 --- a/test/ganeti.impexpd_unittest.py +++ b/test/ganeti.impexpd_unittest.py @@ -76,7 +76,7 @@ class TestCommandBuilder(unittest.TestCase): cmd_prefix=cmd_prefix, cmd_suffix=cmd_suffix) - builder = impexpd.CommandBuilder(mode, opts, 1) + builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3) # Check complete command cmd = builder.GetCommand() @@ -108,7 +108,7 @@ class TestCommandBuilder(unittest.TestCase): ca="/some/path/with,a/,comma") for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]: - builder = impexpd.CommandBuilder(mode, opts, 1) + builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3) self.assertRaises(errors.GenericError, builder.GetCommand) def testModeError(self): @@ -117,9 +117,28 @@ class TestCommandBuilder(unittest.TestCase): assert mode not in [constants.IEM_IMPORT, constants.IEM_EXPORT] opts = CmdBuilderConfig(host="localhost", port=1234) - builder = impexpd.CommandBuilder(mode, opts, 1) + builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3) self.assertRaises(errors.GenericError, builder.GetCommand) +class TestCalcThroughput(unittest.TestCase): + def test(self): + self.assertEqual(impexpd._CalcThroughput([]), None) + self.assertEqual(impexpd._CalcThroughput([(0, 0)]), None) + + samples = [ + (0.0, 0.0), + (10.0, 100.0), + ] + self.assertAlmostEqual(impexpd._CalcThroughput(samples), 10.0, 3) + + samples = [ + (5.0, 7.0), + (10.0, 100.0), + (16.0, 181.0), + ] + self.assertAlmostEqual(impexpd._CalcThroughput(samples), 15.818, 3) + + if __name__ == "__main__": testutils.GanetiTestProgram() -- GitLab