diff --git a/daemons/import-export b/daemons/import-export index 53c4e28d845d0577704f999a6ede147b8bc01ef7..870f9a0660afc0a065607ae412ec2bb6de7d3348 100755 --- a/daemons/import-export +++ b/daemons/import-export @@ -207,7 +207,7 @@ class StatusFile: def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, - dd_pid_read_fd, status_file, child_logger, + dd_pid_read_fd, exp_size_read_fd, status_file, child_logger, signal_notify, signal_handler, mode): """Handles the child processes' output. @@ -221,12 +221,18 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0) dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0) dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0) + exp_size_read = os.fdopen(exp_size_read_fd, "r", 0) tp_samples = DD_THROUGHPUT_SAMPLES + if options.exp_size == constants.IE_CUSTOM_SIZE: + exp_size = None + else: + exp_size = options.exp_size + child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file, - child_logger, - throughput_samples=tp_samples) + child_logger, tp_samples, + exp_size) try: fdmap = { child.stderr.fileno(): @@ -237,6 +243,8 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)), dd_stderr_read.fileno(): (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)), + exp_size_read.fileno(): + (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)), signal_notify.fileno(): (signal_notify, None), } @@ -372,6 +380,9 @@ def ParseOptions(): type="choice", help="Compression method", metavar="[%s]" % "|".join(constants.IEC_ALL), choices=list(constants.IEC_ALL), default=constants.IEC_GZIP) + parser.add_option("--expected-size", dest="exp_size", action="store", + type="string", default=None, + help="Expected import/export size (MiB)") parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store", type="string", help="Command prefix") parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store", @@ -390,11 +401,20 @@ def ParseOptions(): # Won't return parser.error("Invalid mode: %s" % mode) + if (options.exp_size is not None and + options.exp_size != constants.IE_CUSTOM_SIZE): + try: + options.exp_size = int(options.exp_size) + except (ValueError, TypeError), err: + # Won't return + parser.error("Invalid value for --expected-size: %s (%s)" % + (options.exp_size, err)) + return (status_file_path, mode) class ChildProcess(subprocess.Popen): - def __init__(self, cmd, noclose_fds): + def __init__(self, env, cmd, noclose_fds): """Initializes this class. """ @@ -402,7 +422,7 @@ class ChildProcess(subprocess.Popen): # Not using close_fds because doing so would also close the socat stderr # pipe, which we still need. - subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False, + subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False, stderr=subprocess.PIPE, stdout=None, stdin=None, preexec_fn=self._ChildPreexec) self._SetProcessGroup() @@ -474,16 +494,26 @@ def main(): # Pipe to receive dd's PID (dd_pid_read_fd, dd_pid_write_fd) = os.pipe() + # Pipe to receive size predicted by export script + (exp_size_read_fd, exp_size_write_fd) = os.pipe() + # Get child process command cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd, dd_stderr_write_fd, dd_pid_write_fd) cmd = cmd_builder.GetCommand() + # Prepare command environment + cmd_env = os.environ.copy() + + if options.exp_size == constants.IE_CUSTOM_SIZE: + cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd) + logging.debug("Starting command %r", cmd) # Start child process - child = ChildProcess(cmd, [socat_stderr_write_fd, dd_stderr_write_fd, - dd_pid_write_fd]) + child = ChildProcess(cmd_env, cmd, + [socat_stderr_write_fd, dd_stderr_write_fd, + dd_pid_write_fd, exp_size_write_fd]) try: def _ForwardSignal(signum, _): """Forwards signals to child process. @@ -506,9 +536,11 @@ def main(): utils.RetryOnSignal(os.close, socat_stderr_write_fd) utils.RetryOnSignal(os.close, dd_stderr_write_fd) utils.RetryOnSignal(os.close, dd_pid_write_fd) + utils.RetryOnSignal(os.close, exp_size_write_fd) if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, - dd_pid_read_fd, status_file, child_logger, + dd_pid_read_fd, exp_size_read_fd, + status_file, child_logger, signal_wakeup, signal_handler, mode): # The child closed all its file descriptors and there was no # signal diff --git a/lib/constants.py b/lib/constants.py index a70ede25b4b4863fd636d06015e58e0917863cd4..94eac9dd6848e03b872508d8869ced79bd6cfbe9 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -223,6 +223,8 @@ IEC_ALL = frozenset([ IEC_GZIP, ]) +IE_CUSTOM_SIZE = "fd" + # Import/export I/O # Direct file I/O, equivalent to a shell's I/O redirection using '<' or '>' IEIO_FILE = "file" diff --git a/lib/impexpd/__init__.py b/lib/impexpd/__init__.py index 4f9efdabef0a4a86fe08ae9c7315e16778dbd305..bcd4dab1cffcb507cff11860f38daf14ac869333 100644 --- a/lib/impexpd/__init__.py +++ b/lib/impexpd/__init__.py @@ -82,12 +82,14 @@ SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"] (PROG_OTHER, PROG_SOCAT, PROG_DD, - PROG_DD_PID) = range(1, 5) + PROG_DD_PID, + PROG_EXP_SIZE) = range(1, 6) PROG_ALL = frozenset([ PROG_OTHER, PROG_SOCAT, PROG_DD, PROG_DD_PID, + PROG_EXP_SIZE, ]) @@ -274,7 +276,7 @@ def _VerifyListening(family, address, port): class ChildIOProcessor(object): - def __init__(self, debug, status_file, logger, throughput_samples): + def __init__(self, debug, status_file, logger, throughput_samples, exp_size): """Initializes this class. """ @@ -291,7 +293,7 @@ class ChildIOProcessor(object): self._dd_progress = [] # Expected size of transferred data - self._exp_size = None + self._exp_size = exp_size def GetLineSplitter(self, prog): """Returns the line splitter for a program. @@ -389,6 +391,22 @@ class ChildIOProcessor(object): self._dd_pid = int(line) forward_line = None + elif prog == PROG_EXP_SIZE: + logging.debug("Received predicted size %r", line) + forward_line = None + + if line: + try: + exp_size = utils.BytesToMebibyte(int(line)) + except (ValueError, TypeError), err: + logging.error("Failed to convert predicted size %r to number: %s", + line, err) + exp_size = None + else: + exp_size = None + + self._exp_size = exp_size + if forward_line: self._logger.info(forward_line) self._status_file.AddRecentOutput(forward_line) diff --git a/man/ganeti-os-interface.sgml b/man/ganeti-os-interface.sgml index 7e2e8d144fe39676350a94adf743d84ca4daa1cf..64d34eb11dda28730db4c94e1672964a8c0a2555 100644 --- a/man/ganeti-os-interface.sgml +++ b/man/ganeti-os-interface.sgml @@ -259,6 +259,19 @@ DISK_... variables). </para> + <para> + To provide the user with an estimate on how long the export will take, + a predicted size can be written to the file descriptor passed in the + variable <envar>EXP_SIZE_FD</envar>. The value is in bytes and must be + terminated by a newline character (\n). Older versions of Ganeti don't + support this feature, hence the variable should be checked before use. + Example: <screen> +if test -n "$EXP_SIZE_FD"; then + blockdev --getsize64 $blockdev >&$EXP_SIZE_FD +fi +</screen> + </para> + </refsect2> <refsect2>