Commit f9323011 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

import/export: Allow script to predict size



Once we have a size for an export (in the context of the
import/export daemon), we can provide the user with a
percentage and ETA.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 1a2e7fe9
......@@ -207,7 +207,7 @@ class StatusFile:
def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
dd_pid_read_fd, status_file, child_logger,
dd_pid_read_fd, exp_size_read_fd, status_file, child_logger,
signal_notify, signal_handler, mode):
"""Handles the child processes' output.
......@@ -221,12 +221,18 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0)
dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0)
exp_size_read = os.fdopen(exp_size_read_fd, "r", 0)
tp_samples = DD_THROUGHPUT_SAMPLES
if options.exp_size == constants.IE_CUSTOM_SIZE:
exp_size = None
else:
exp_size = options.exp_size
child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file,
child_logger,
throughput_samples=tp_samples)
child_logger, tp_samples,
exp_size)
try:
fdmap = {
child.stderr.fileno():
......@@ -237,6 +243,8 @@ def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
(dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)),
dd_stderr_read.fileno():
(dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)),
exp_size_read.fileno():
(exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)),
signal_notify.fileno(): (signal_notify, None),
}
......@@ -372,6 +380,9 @@ def ParseOptions():
type="choice", help="Compression method",
metavar="[%s]" % "|".join(constants.IEC_ALL),
choices=list(constants.IEC_ALL), default=constants.IEC_GZIP)
parser.add_option("--expected-size", dest="exp_size", action="store",
type="string", default=None,
help="Expected import/export size (MiB)")
parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
type="string", help="Command prefix")
parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
......@@ -390,11 +401,20 @@ def ParseOptions():
# Won't return
parser.error("Invalid mode: %s" % mode)
if (options.exp_size is not None and
options.exp_size != constants.IE_CUSTOM_SIZE):
try:
options.exp_size = int(options.exp_size)
except (ValueError, TypeError), err:
# Won't return
parser.error("Invalid value for --expected-size: %s (%s)" %
(options.exp_size, err))
return (status_file_path, mode)
class ChildProcess(subprocess.Popen):
def __init__(self, cmd, noclose_fds):
def __init__(self, env, cmd, noclose_fds):
"""Initializes this class.
"""
......@@ -402,7 +422,7 @@ class ChildProcess(subprocess.Popen):
# Not using close_fds because doing so would also close the socat stderr
# pipe, which we still need.
subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False,
subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False,
stderr=subprocess.PIPE, stdout=None, stdin=None,
preexec_fn=self._ChildPreexec)
self._SetProcessGroup()
......@@ -474,16 +494,26 @@ def main():
# Pipe to receive dd's PID
(dd_pid_read_fd, dd_pid_write_fd) = os.pipe()
# Pipe to receive size predicted by export script
(exp_size_read_fd, exp_size_write_fd) = os.pipe()
# Get child process command
cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd,
dd_stderr_write_fd, dd_pid_write_fd)
cmd = cmd_builder.GetCommand()
# Prepare command environment
cmd_env = os.environ.copy()
if options.exp_size == constants.IE_CUSTOM_SIZE:
cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd)
logging.debug("Starting command %r", cmd)
# Start child process
child = ChildProcess(cmd, [socat_stderr_write_fd, dd_stderr_write_fd,
dd_pid_write_fd])
child = ChildProcess(cmd_env, cmd,
[socat_stderr_write_fd, dd_stderr_write_fd,
dd_pid_write_fd, exp_size_write_fd])
try:
def _ForwardSignal(signum, _):
"""Forwards signals to child process.
......@@ -506,9 +536,11 @@ def main():
utils.RetryOnSignal(os.close, socat_stderr_write_fd)
utils.RetryOnSignal(os.close, dd_stderr_write_fd)
utils.RetryOnSignal(os.close, dd_pid_write_fd)
utils.RetryOnSignal(os.close, exp_size_write_fd)
if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd,
dd_pid_read_fd, status_file, child_logger,
dd_pid_read_fd, exp_size_read_fd,
status_file, child_logger,
signal_wakeup, signal_handler, mode):
# The child closed all its file descriptors and there was no
# signal
......
......@@ -223,6 +223,8 @@ IEC_ALL = frozenset([
IEC_GZIP,
])
IE_CUSTOM_SIZE = "fd"
# Import/export I/O
# Direct file I/O, equivalent to a shell's I/O redirection using '<' or '>'
IEIO_FILE = "file"
......
......@@ -82,12 +82,14 @@ SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
(PROG_OTHER,
PROG_SOCAT,
PROG_DD,
PROG_DD_PID) = range(1, 5)
PROG_DD_PID,
PROG_EXP_SIZE) = range(1, 6)
PROG_ALL = frozenset([
PROG_OTHER,
PROG_SOCAT,
PROG_DD,
PROG_DD_PID,
PROG_EXP_SIZE,
])
......@@ -274,7 +276,7 @@ def _VerifyListening(family, address, port):
class ChildIOProcessor(object):
def __init__(self, debug, status_file, logger, throughput_samples):
def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
"""Initializes this class.
"""
......@@ -291,7 +293,7 @@ class ChildIOProcessor(object):
self._dd_progress = []
# Expected size of transferred data
self._exp_size = None
self._exp_size = exp_size
def GetLineSplitter(self, prog):
"""Returns the line splitter for a program.
......@@ -389,6 +391,22 @@ class ChildIOProcessor(object):
self._dd_pid = int(line)
forward_line = None
elif prog == PROG_EXP_SIZE:
logging.debug("Received predicted size %r", line)
forward_line = None
if line:
try:
exp_size = utils.BytesToMebibyte(int(line))
except (ValueError, TypeError), err:
logging.error("Failed to convert predicted size %r to number: %s",
line, err)
exp_size = None
else:
exp_size = None
self._exp_size = exp_size
if forward_line:
self._logger.info(forward_line)
self._status_file.AddRecentOutput(forward_line)
......
......@@ -259,6 +259,19 @@
DISK_... variables).
</para>
<para>
To provide the user with an estimate on how long the export will take,
a predicted size can be written to the file descriptor passed in the
variable <envar>EXP_SIZE_FD</envar>. The value is in bytes and must be
terminated by a newline character (\n). Older versions of Ganeti don't
support this feature, hence the variable should be checked before use.
Example: <screen>
if test -n "$EXP_SIZE_FD"; then
blockdev --getsize64 $blockdev >&amp;$EXP_SIZE_FD
fi
</screen>
</para>
</refsect2>
<refsect2>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment