diff --git a/daemons/import-export b/daemons/import-export index 0a6f6daa63468653d5b5605b373fa62787301af0..53c4e28d845d0577704f999a6ede147b8bc01ef7 100755 --- a/daemons/import-export +++ b/daemons/import-export @@ -35,6 +35,7 @@ import signal import subprocess import sys import time +import math from ganeti import constants from ganeti import cli @@ -57,6 +58,16 @@ CHILD_LINGER_TIMEOUT = 5.0 #: How long to wait for a connection to be established DEFAULT_CONNECT_TIMEOUT = 60 +#: Get dd(1) statistics every few seconds +DD_STATISTICS_INTERVAL = 5.0 + +#: Seconds for throughput calculation +DD_THROUGHPUT_INTERVAL = 60.0 + +#: Number of samples for throughput calculation +DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) / + DD_STATISTICS_INTERVAL)) + # Global variable for options options = None @@ -140,6 +151,24 @@ class StatusFile: """ return self._data.connected + def SetProgress(self, mbytes, throughput, percent, eta): + """Sets how much data has been transferred so far. + + @type mbytes: number + @param mbytes: Transferred amount of data in MiB. + @type throughput: float + @param throughput: MiB/second + @type percent: number + @param percent: Percent processed + @type eta: number + @param eta: Expected number of seconds until done + + """ + self._data.progress_mbytes = mbytes + self._data.progress_throughput = throughput + self._data.progress_percent = percent + self._data.progress_eta = eta + def SetExitStatus(self, exit_status, error_message): """Sets the exit status and an error message. @@ -177,7 +206,8 @@ class StatusFile: mode=0400) -def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, +def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, + dd_pid_read_fd, status_file, child_logger, signal_notify, signal_handler, mode): """Handles the child processes' output. @@ -189,15 +219,24 @@ def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, # might buffer data while poll(2) won't mark its file descriptor as # readable again. socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0) + dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0) + dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0) + + tp_samples = DD_THROUGHPUT_SAMPLES child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file, - child_logger) + child_logger, + throughput_samples=tp_samples) try: fdmap = { child.stderr.fileno(): (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)), socat_stderr_read.fileno(): (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)), + dd_pid_read.fileno(): + (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)), + dd_stderr_read.fileno(): + (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)), signal_notify.fileno(): (signal_notify, None), } @@ -212,6 +251,7 @@ def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, listen_timeout = None exit_timeout = None + dd_stats_timeout = None while True: # Break out of loop if only signal notify FD is left @@ -239,6 +279,24 @@ def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, logging.info("Child process didn't exit in time") break + if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0: + notify_status = child_io_proc.NotifyDd() + if notify_status: + # Schedule next notification + dd_stats_timeout = locking.RunningTimeout(DD_STATISTICS_INTERVAL, + True) + else: + # Try again soon (dd isn't ready yet) + dd_stats_timeout = locking.RunningTimeout(1.0, True) + + if dd_stats_timeout: + dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000) + + if timeout is None: + timeout = dd_timeout + else: + timeout = min(timeout, dd_timeout) + for fd, event in utils.RetryOnSignal(poller.poll, timeout): if event & (select.POLLIN | event & select.POLLPRI): (from_, to) = fdmap[fd] @@ -410,14 +468,22 @@ def main(): # Pipe to receive socat's stderr output (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe() + # Pipe to receive dd's stderr output + (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe() + + # Pipe to receive dd's PID + (dd_pid_read_fd, dd_pid_write_fd) = os.pipe() + # Get child process command - cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd) + cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd, + dd_stderr_write_fd, dd_pid_write_fd) cmd = cmd_builder.GetCommand() logging.debug("Starting command %r", cmd) # Start child process - child = ChildProcess(cmd, [socat_stderr_write_fd]) + child = ChildProcess(cmd, [socat_stderr_write_fd, dd_stderr_write_fd, + dd_pid_write_fd]) try: def _ForwardSignal(signum, _): """Forwards signals to child process. @@ -438,10 +504,12 @@ def main(): try: # Close child's side utils.RetryOnSignal(os.close, socat_stderr_write_fd) + utils.RetryOnSignal(os.close, dd_stderr_write_fd) + utils.RetryOnSignal(os.close, dd_pid_write_fd) - if ProcessChildIO(child, socat_stderr_read_fd, status_file, - child_logger, signal_wakeup, signal_handler, - mode): + if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, + dd_pid_read_fd, status_file, child_logger, + signal_wakeup, signal_handler, mode): # The child closed all its file descriptors and there was no # signal # TODO: Implement timeout instead of waiting indefinitely diff --git a/lib/impexpd/__init__.py b/lib/impexpd/__init__.py index e396dc5fefcdc4c8ff4836e5099e6288857d26f9..4f9efdabef0a4a86fe08ae9c7315e16778dbd305 100644 --- a/lib/impexpd/__init__.py +++ b/lib/impexpd/__init__.py @@ -23,9 +23,13 @@ """ +import os import re import socket import logging +import signal +import errno +import time from cStringIO import StringIO from ganeti import constants @@ -57,6 +61,17 @@ SOCAT_LOG_IGNORE = frozenset([ SOCAT_LOG_NOTICE, ]) +#: Used to parse GNU dd(1) statistics +DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*" + r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I) + +#: Used to ignore "N+N records in/out" on dd(1)'s stderr +DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I) + +#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is +#: unavailable and SIGUSR1 is used instead) +DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1) + #: Buffer size: at most this many bytes are transferred at once BUFSIZE = 1024 * 1024 @@ -65,26 +80,36 @@ SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"] (PROG_OTHER, - PROG_SOCAT) = range(1, 3) + PROG_SOCAT, + PROG_DD, + PROG_DD_PID) = range(1, 5) PROG_ALL = frozenset([ PROG_OTHER, PROG_SOCAT, + PROG_DD, + PROG_DD_PID, ]) class CommandBuilder(object): - def __init__(self, mode, opts, socat_stderr_fd): + def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd): """Initializes this class. @param mode: Daemon mode (import or export) @param opts: Options object @type socat_stderr_fd: int @param socat_stderr_fd: File descriptor socat should write its stderr to + @type dd_stderr_fd: int + @param dd_stderr_fd: File descriptor dd should write its stderr to + @type dd_pid_fd: int + @param dd_pid_fd: File descriptor the child should write dd's PID to """ self._opts = opts self._mode = mode self._socat_stderr_fd = socat_stderr_fd + self._dd_stderr_fd = dd_stderr_fd + self._dd_pid_fd = dd_pid_fd @staticmethod def GetBashCommand(cmd): @@ -172,6 +197,18 @@ class CommandBuilder(object): (utils.ShellQuoteArgs(self._GetSocatCommand()), self._socat_stderr_fd)) + dd_cmd = StringIO() + # Setting LC_ALL since we want to parse the output and explicitely + # redirecting stdin, as the background process (dd) would have /dev/null as + # stdin otherwise + dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" % + (BUFSIZE, self._dd_stderr_fd)) + # Send PID to daemon + dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd) + # And wait for dd + dd_cmd.write(" wait $pid;") + dd_cmd.write(" }") + compr = self._opts.compress assert compr in constants.IEC_ALL @@ -181,16 +218,18 @@ class CommandBuilder(object): transport_cmd = "%s | gunzip -c" % socat_cmd else: transport_cmd = socat_cmd + + transport_cmd += " | %s" % dd_cmd.getvalue() elif self._mode == constants.IEM_EXPORT: if compr == constants.IEC_GZIP: transport_cmd = "gzip -c | %s" % socat_cmd else: transport_cmd = socat_cmd + + transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd) else: raise errors.GenericError("Invalid mode '%s'" % self._mode) - # TODO: Use "dd" to measure processed data (allows to give an ETA) - # TODO: Run transport as separate user # The transport uses its own shell to simplify running it as a separate user # in the future. @@ -235,7 +274,7 @@ def _VerifyListening(family, address, port): class ChildIOProcessor(object): - def __init__(self, debug, status_file, logger): + def __init__(self, debug, status_file, logger, throughput_samples): """Initializes this class. """ @@ -246,6 +285,14 @@ class ChildIOProcessor(object): self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog)) for prog in PROG_ALL]) + self._dd_pid = None + self._dd_ready = False + self._dd_tp_samples = throughput_samples + self._dd_progress = [] + + # Expected size of transferred data + self._exp_size = None + def GetLineSplitter(self, prog): """Returns the line splitter for a program. @@ -267,6 +314,38 @@ class ChildIOProcessor(object): ls.close() self._splitter.clear() + def NotifyDd(self): + """Tells dd(1) to write statistics. + + """ + if self._dd_pid is None: + # Can't notify + return False + + if not self._dd_ready: + # There's a race condition between starting the program and sending + # signals. The signal handler is only registered after some time, so we + # have to check whether the program is ready. If it isn't, sending a + # signal will invoke the default handler (and usually abort the program). + if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL): + logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL) + return False + + logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL) + self._dd_ready = True + + logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid) + try: + os.kill(self._dd_pid, DD_INFO_SIGNAL) + except EnvironmentError, err: + if err.errno != errno.ESRCH: + raise + + # Process no longer exists + self._dd_pid = None + + return True + def _ProcessOutput(self, line, prog): """Takes care of child process output. @@ -295,6 +374,21 @@ class ChildIOProcessor(object): else: forward_line = "socat: %s" % line + elif prog == PROG_DD: + (should_forward, force_update) = self._ProcessDdOutput(line) + + if should_forward or self._debug: + forward_line = "dd: %s" % line + else: + forward_line = None + + elif prog == PROG_DD_PID: + if self._dd_pid: + raise RuntimeError("dd PID reported more than once") + logging.debug("Received dd PID %r", line) + self._dd_pid = int(line) + forward_line = None + if forward_line: self._logger.info(forward_line) self._status_file.AddRecentOutput(forward_line) @@ -326,3 +420,73 @@ class ChildIOProcessor(object): return True return False + + def _ProcessDdOutput(self, line): + """Interprets a line of dd(1)'s output. + + """ + m = DD_INFO_RE.match(line) + if m: + seconds = float(m.group("seconds")) + mbytes = utils.BytesToMebibyte(int(m.group("bytes"))) + self._UpdateDdProgress(seconds, mbytes) + return (False, True) + + m = DD_STDERR_IGNORE.match(line) + if m: + # Ignore + return (False, False) + + # Forward line + return (True, False) + + def _UpdateDdProgress(self, seconds, mbytes): + """Updates the internal status variables for dd(1) progress. + + @type seconds: float + @param seconds: Timestamp of this update + @type mbytes: float + @param mbytes: Total number of MiB transferred so far + + """ + # Add latest sample + self._dd_progress.append((seconds, mbytes)) + + # Remove old samples + del self._dd_progress[:-self._dd_tp_samples] + + # Calculate throughput + throughput = _CalcThroughput(self._dd_progress) + + # Calculate percent and ETA + percent = None + eta = None + + if self._exp_size is not None: + if self._exp_size != 0: + percent = max(0, min(100, (100.0 * mbytes) / self._exp_size)) + + if throughput: + eta = max(0, float(self._exp_size - mbytes) / throughput) + + self._status_file.SetProgress(mbytes, throughput, percent, eta) + + +def _CalcThroughput(samples): + """Calculates the throughput in MiB/second. + + @type samples: sequence + @param samples: List of samples, each consisting of a (timestamp, mbytes) + tuple + @rtype: float or None + @return: Throughput in MiB/second + + """ + if len(samples) < 2: + # Can't calculate throughput + return None + + (start_time, start_mbytes) = samples[0] + (end_time, end_mbytes) = samples[-1] + + return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time) diff --git a/lib/objects.py b/lib/objects.py index 59d94bd3fb88f2e38fbf9ad55284a645cd59dd3d..f954081fb39cd4e15cb162fb79c718935aefc4d1 100644 --- a/lib/objects.py +++ b/lib/objects.py @@ -1017,6 +1017,10 @@ class ImportExportStatus(ConfigObject): "recent_output", "listen_port", "connected", + "progress_mbytes", + "progress_throughput", + "progress_eta", + "progress_percent", "exit_status", "error_message", ] + _TIMESTAMPS diff --git a/test/ganeti.impexpd_unittest.py b/test/ganeti.impexpd_unittest.py index e4a82b6644e3a246ca32c4c89e415b764a8579e8..78320333f1d1c3ec92929b2baa8f6a1e4cf28585 100755 --- a/test/ganeti.impexpd_unittest.py +++ b/test/ganeti.impexpd_unittest.py @@ -76,7 +76,7 @@ class TestCommandBuilder(unittest.TestCase): cmd_prefix=cmd_prefix, cmd_suffix=cmd_suffix) - builder = impexpd.CommandBuilder(mode, opts, 1) + builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3) # Check complete command cmd = builder.GetCommand() @@ -108,7 +108,7 @@ class TestCommandBuilder(unittest.TestCase): ca="/some/path/with,a/,comma") for mode in [constants.IEM_IMPORT, constants.IEM_EXPORT]: - builder = impexpd.CommandBuilder(mode, opts, 1) + builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3) self.assertRaises(errors.GenericError, builder.GetCommand) def testModeError(self): @@ -117,9 +117,28 @@ class TestCommandBuilder(unittest.TestCase): assert mode not in [constants.IEM_IMPORT, constants.IEM_EXPORT] opts = CmdBuilderConfig(host="localhost", port=1234) - builder = impexpd.CommandBuilder(mode, opts, 1) + builder = impexpd.CommandBuilder(mode, opts, 1, 2, 3) self.assertRaises(errors.GenericError, builder.GetCommand) +class TestCalcThroughput(unittest.TestCase): + def test(self): + self.assertEqual(impexpd._CalcThroughput([]), None) + self.assertEqual(impexpd._CalcThroughput([(0, 0)]), None) + + samples = [ + (0.0, 0.0), + (10.0, 100.0), + ] + self.assertAlmostEqual(impexpd._CalcThroughput(samples), 10.0, 3) + + samples = [ + (5.0, 7.0), + (10.0, 100.0), + (16.0, 181.0), + ] + self.assertAlmostEqual(impexpd._CalcThroughput(samples), 15.818, 3) + + if __name__ == "__main__": testutils.GanetiTestProgram()