Commit a4ccecf6 authored by Michael Hanselmann's avatar Michael Hanselmann

utils: Move process-related code into separate file

Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 44c9b4fe
......@@ -220,6 +220,7 @@ utils_PYTHON = \
lib/utils/log.py \
lib/utils/mlock.py \
lib/utils/nodesetup.py \
lib/utils/process.py \
lib/utils/retry.py \
lib/utils/text.py \
lib/utils/wrapper.py \
......@@ -495,6 +496,7 @@ python_tests = \
test/ganeti.utils.io_unittest.py \
test/ganeti.utils.mlock_unittest.py \
test/ganeti.utils.nodesetup_unittest.py \
test/ganeti.utils.process_unittest.py \
test/ganeti.utils.retry_unittest.py \
test/ganeti.utils.text_unittest.py \
test/ganeti.utils.wrapper_unittest.py \
......
......@@ -106,7 +106,7 @@ class SshRunner:
@param quiet: whether to enable -q to ssh
@rtype: list
@return: the list of options ready to use in L{utils.RunCmd}
@return: the list of options ready to use in L{utils.process.RunCmd}
"""
options = [
......@@ -194,8 +194,8 @@ class SshRunner:
Args: see SshRunner.BuildCmd.
@rtype: L{utils.RunResult}
@return: the result as from L{utils.RunCmd()}
@rtype: L{utils.process.RunResult}
@return: the result as from L{utils.process.RunCmd()}
"""
return utils.RunCmd(self.BuildCmd(*args, **kwargs))
......
......@@ -63,11 +63,9 @@ from ganeti.utils.filelock import * # pylint: disable-msg=W0401
from ganeti.utils.io import * # pylint: disable-msg=W0401
from ganeti.utils.x509 import * # pylint: disable-msg=W0401
from ganeti.utils.nodesetup import * # pylint: disable-msg=W0401
from ganeti.utils.process import * # pylint: disable-msg=W0401
#: when set to True, L{RunCmd} is disabled
_no_fork = False
_RANDOM_UUID_FILE = "/proc/sys/kernel/random/uuid"
_VALID_SERVICE_NAME_RE = re.compile("^[-_.a-zA-Z0-9]{1,128}$")
......@@ -75,620 +73,10 @@ _VALID_SERVICE_NAME_RE = re.compile("^[-_.a-zA-Z0-9]{1,128}$")
UUID_RE = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
'[a-f0-9]{4}-[a-f0-9]{12}$')
(_TIMEOUT_NONE,
_TIMEOUT_TERM,
_TIMEOUT_KILL) = range(3)
#: Shell param checker regexp
_SHELLPARAM_REGEX = re.compile(r"^[-a-zA-Z0-9._+/:%@]+$")
def DisableFork():
"""Disables the use of fork(2).
"""
global _no_fork # pylint: disable-msg=W0603
_no_fork = True
class RunResult(object):
"""Holds the result of running external programs.
@type exit_code: int
@ivar exit_code: the exit code of the program, or None (if the program
didn't exit())
@type signal: int or None
@ivar signal: the signal that caused the program to finish, or None
(if the program wasn't terminated by a signal)
@type stdout: str
@ivar stdout: the standard output of the program
@type stderr: str
@ivar stderr: the standard error of the program
@type failed: boolean
@ivar failed: True in case the program was
terminated by a signal or exited with a non-zero exit code
@ivar fail_reason: a string detailing the termination reason
"""
__slots__ = ["exit_code", "signal", "stdout", "stderr",
"failed", "fail_reason", "cmd"]
def __init__(self, exit_code, signal_, stdout, stderr, cmd, timeout_action,
timeout):
self.cmd = cmd
self.exit_code = exit_code
self.signal = signal_
self.stdout = stdout
self.stderr = stderr
self.failed = (signal_ is not None or exit_code != 0)
fail_msgs = []
if self.signal is not None:
fail_msgs.append("terminated by signal %s" % self.signal)
elif self.exit_code is not None:
fail_msgs.append("exited with exit code %s" % self.exit_code)
else:
fail_msgs.append("unable to determine termination reason")
if timeout_action == _TIMEOUT_TERM:
fail_msgs.append("terminated after timeout of %.2f seconds" % timeout)
elif timeout_action == _TIMEOUT_KILL:
fail_msgs.append(("force termination after timeout of %.2f seconds"
" and linger for another %.2f seconds") %
(timeout, constants.CHILD_LINGER_TIMEOUT))
if fail_msgs and self.failed:
self.fail_reason = CommaJoin(fail_msgs)
if self.failed:
logging.debug("Command '%s' failed (%s); output: %s",
self.cmd, self.fail_reason, self.output)
def _GetOutput(self):
"""Returns the combined stdout and stderr for easier usage.
"""
return self.stdout + self.stderr
output = property(_GetOutput, None, None, "Return full output")
def _BuildCmdEnvironment(env, reset):
"""Builds the environment for an external program.
"""
if reset:
cmd_env = {}
else:
cmd_env = os.environ.copy()
cmd_env["LC_ALL"] = "C"
if env is not None:
cmd_env.update(env)
return cmd_env
def RunCmd(cmd, env=None, output=None, cwd="/", reset_env=False,
interactive=False, timeout=None):
"""Execute a (shell) command.
The command should not read from its standard input, as it will be
closed.
@type cmd: string or list
@param cmd: Command to run
@type env: dict
@param env: Additional environment variables
@type output: str
@param output: if desired, the output of the command can be
saved in a file instead of the RunResult instance; this
parameter denotes the file name (if not None)
@type cwd: string
@param cwd: if specified, will be used as the working
directory for the command; the default will be /
@type reset_env: boolean
@param reset_env: whether to reset or keep the default os environment
@type interactive: boolean
@param interactive: weather we pipe stdin, stdout and stderr
(default behaviour) or run the command interactive
@type timeout: int
@param timeout: If not None, timeout in seconds until child process gets
killed
@rtype: L{RunResult}
@return: RunResult instance
@raise errors.ProgrammerError: if we call this when forks are disabled
"""
if _no_fork:
raise errors.ProgrammerError("utils.RunCmd() called with fork() disabled")
if output and interactive:
raise errors.ProgrammerError("Parameters 'output' and 'interactive' can"
" not be provided at the same time")
if isinstance(cmd, basestring):
strcmd = cmd
shell = True
else:
cmd = [str(val) for val in cmd]
strcmd = ShellQuoteArgs(cmd)
shell = False
if output:
logging.debug("RunCmd %s, output file '%s'", strcmd, output)
else:
logging.debug("RunCmd %s", strcmd)
cmd_env = _BuildCmdEnvironment(env, reset_env)
try:
if output is None:
out, err, status, timeout_action = _RunCmdPipe(cmd, cmd_env, shell, cwd,
interactive, timeout)
else:
timeout_action = _TIMEOUT_NONE
status = _RunCmdFile(cmd, cmd_env, shell, output, cwd)
out = err = ""
except OSError, err:
if err.errno == errno.ENOENT:
raise errors.OpExecError("Can't execute '%s': not found (%s)" %
(strcmd, err))
else:
raise
if status >= 0:
exitcode = status
signal_ = None
else:
exitcode = None
signal_ = -status
return RunResult(exitcode, signal_, out, err, strcmd, timeout_action, timeout)
def SetupDaemonEnv(cwd="/", umask=077):
"""Setup a daemon's environment.
This should be called between the first and second fork, due to
setsid usage.
@param cwd: the directory to which to chdir
@param umask: the umask to setup
"""
os.chdir(cwd)
os.umask(umask)
os.setsid()
def SetupDaemonFDs(output_file, output_fd):
"""Setups up a daemon's file descriptors.
@param output_file: if not None, the file to which to redirect
stdout/stderr
@param output_fd: if not None, the file descriptor for stdout/stderr
"""
# check that at most one is defined
assert [output_file, output_fd].count(None) >= 1
# Open /dev/null (read-only, only for stdin)
devnull_fd = os.open(os.devnull, os.O_RDONLY)
if output_fd is not None:
pass
elif output_file is not None:
# Open output file
try:
output_fd = os.open(output_file,
os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0600)
except EnvironmentError, err:
raise Exception("Opening output file failed: %s" % err)
else:
output_fd = os.open(os.devnull, os.O_WRONLY)
# Redirect standard I/O
os.dup2(devnull_fd, 0)
os.dup2(output_fd, 1)
os.dup2(output_fd, 2)
def StartDaemon(cmd, env=None, cwd="/", output=None, output_fd=None,
pidfile=None):
"""Start a daemon process after forking twice.
@type cmd: string or list
@param cmd: Command to run
@type env: dict
@param env: Additional environment variables
@type cwd: string
@param cwd: Working directory for the program
@type output: string
@param output: Path to file in which to save the output
@type output_fd: int
@param output_fd: File descriptor for output
@type pidfile: string
@param pidfile: Process ID file
@rtype: int
@return: Daemon process ID
@raise errors.ProgrammerError: if we call this when forks are disabled
"""
if _no_fork:
raise errors.ProgrammerError("utils.StartDaemon() called with fork()"
" disabled")
if output and not (bool(output) ^ (output_fd is not None)):
raise errors.ProgrammerError("Only one of 'output' and 'output_fd' can be"
" specified")
if isinstance(cmd, basestring):
cmd = ["/bin/sh", "-c", cmd]
strcmd = ShellQuoteArgs(cmd)
if output:
logging.debug("StartDaemon %s, output file '%s'", strcmd, output)
else:
logging.debug("StartDaemon %s", strcmd)
cmd_env = _BuildCmdEnvironment(env, False)
# Create pipe for sending PID back
(pidpipe_read, pidpipe_write) = os.pipe()
try:
try:
# Create pipe for sending error messages
(errpipe_read, errpipe_write) = os.pipe()
try:
try:
# First fork
pid = os.fork()
if pid == 0:
try:
# Child process, won't return
_StartDaemonChild(errpipe_read, errpipe_write,
pidpipe_read, pidpipe_write,
cmd, cmd_env, cwd,
output, output_fd, pidfile)
finally:
# Well, maybe child process failed
os._exit(1) # pylint: disable-msg=W0212
finally:
CloseFdNoError(errpipe_write)
# Wait for daemon to be started (or an error message to
# arrive) and read up to 100 KB as an error message
errormsg = RetryOnSignal(os.read, errpipe_read, 100 * 1024)
finally:
CloseFdNoError(errpipe_read)
finally:
CloseFdNoError(pidpipe_write)
# Read up to 128 bytes for PID
pidtext = RetryOnSignal(os.read, pidpipe_read, 128)
finally:
CloseFdNoError(pidpipe_read)
# Try to avoid zombies by waiting for child process
try:
os.waitpid(pid, 0)
except OSError:
pass
if errormsg:
raise errors.OpExecError("Error when starting daemon process: %r" %
errormsg)
try:
return int(pidtext)
except (ValueError, TypeError), err:
raise errors.OpExecError("Error while trying to parse PID %r: %s" %
(pidtext, err))
def _StartDaemonChild(errpipe_read, errpipe_write,
pidpipe_read, pidpipe_write,
args, env, cwd,
output, fd_output, pidfile):
"""Child process for starting daemon.
"""
try:
# Close parent's side
CloseFdNoError(errpipe_read)
CloseFdNoError(pidpipe_read)
# First child process
SetupDaemonEnv()
# And fork for the second time
pid = os.fork()
if pid != 0:
# Exit first child process
os._exit(0) # pylint: disable-msg=W0212
# Make sure pipe is closed on execv* (and thereby notifies
# original process)
SetCloseOnExecFlag(errpipe_write, True)
# List of file descriptors to be left open
noclose_fds = [errpipe_write]
# Open PID file
if pidfile:
fd_pidfile = WritePidFile(pidfile)
# Keeping the file open to hold the lock
noclose_fds.append(fd_pidfile)
SetCloseOnExecFlag(fd_pidfile, False)
else:
fd_pidfile = None
SetupDaemonFDs(output, fd_output)
# Send daemon PID to parent
RetryOnSignal(os.write, pidpipe_write, str(os.getpid()))
# Close all file descriptors except stdio and error message pipe
CloseFDs(noclose_fds=noclose_fds)
# Change working directory
os.chdir(cwd)
if env is None:
os.execvp(args[0], args)
else:
os.execvpe(args[0], args, env)
except: # pylint: disable-msg=W0702
try:
# Report errors to original process
WriteErrorToFD(errpipe_write, str(sys.exc_info()[1]))
except: # pylint: disable-msg=W0702
# Ignore errors in error handling
pass
os._exit(1) # pylint: disable-msg=W0212
def WriteErrorToFD(fd, err):
"""Possibly write an error message to a fd.
@type fd: None or int (file descriptor)
@param fd: if not None, the error will be written to this fd
@param err: string, the error message
"""
if fd is None:
return
if not err:
err = "<unknown error>"
RetryOnSignal(os.write, fd, err)
def _CheckIfAlive(child):
"""Raises L{RetryAgain} if child is still alive.
@raises RetryAgain: If child is still alive
"""
if child.poll() is None:
raise RetryAgain()
def _WaitForProcess(child, timeout):
"""Waits for the child to terminate or until we reach timeout.
"""
try:
Retry(_CheckIfAlive, (1.0, 1.2, 5.0), max(0, timeout), args=[child])
except RetryTimeout:
pass
def _RunCmdPipe(cmd, env, via_shell, cwd, interactive, timeout,
_linger_timeout=constants.CHILD_LINGER_TIMEOUT):
"""Run a command and return its output.
@type cmd: string or list
@param cmd: Command to run
@type env: dict
@param env: The environment to use
@type via_shell: bool
@param via_shell: if we should run via the shell
@type cwd: string
@param cwd: the working directory for the program
@type interactive: boolean
@param interactive: Run command interactive (without piping)
@type timeout: int
@param timeout: Timeout after the programm gets terminated
@rtype: tuple
@return: (out, err, status)
"""
poller = select.poll()
stderr = subprocess.PIPE
stdout = subprocess.PIPE
stdin = subprocess.PIPE
if interactive:
stderr = stdout = stdin = None
child = subprocess.Popen(cmd, shell=via_shell,
stderr=stderr,
stdout=stdout,
stdin=stdin,
close_fds=True, env=env,
cwd=cwd)
out = StringIO()
err = StringIO()
linger_timeout = None
if timeout is None:
poll_timeout = None
else:
poll_timeout = RunningTimeout(timeout, True).Remaining
msg_timeout = ("Command %s (%d) run into execution timeout, terminating" %
(cmd, child.pid))
msg_linger = ("Command %s (%d) run into linger timeout, killing" %
(cmd, child.pid))
timeout_action = _TIMEOUT_NONE
if not interactive:
child.stdin.close()
poller.register(child.stdout, select.POLLIN)
poller.register(child.stderr, select.POLLIN)
fdmap = {
child.stdout.fileno(): (out, child.stdout),
child.stderr.fileno(): (err, child.stderr),
}
for fd in fdmap:
SetNonblockFlag(fd, True)
while fdmap:
if poll_timeout:
pt = poll_timeout() * 1000
if pt < 0:
if linger_timeout is None:
logging.warning(msg_timeout)
if child.poll() is None:
timeout_action = _TIMEOUT_TERM
IgnoreProcessNotFound(os.kill, child.pid, signal.SIGTERM)
linger_timeout = RunningTimeout(_linger_timeout, True).Remaining
pt = linger_timeout() * 1000
if pt < 0:
break
else:
pt = None
pollresult = RetryOnSignal(poller.poll, pt)
for fd, event in pollresult:
if event & select.POLLIN or event & select.POLLPRI:
data = fdmap[fd][1].read()
# no data from read signifies EOF (the same as POLLHUP)
if not data:
poller.unregister(fd)
del fdmap[fd]
continue
fdmap[fd][0].write(data)
if (event & select.POLLNVAL or event & select.POLLHUP or
event & select.POLLERR):
poller.unregister(fd)
del fdmap[fd]
if timeout is not None:
assert callable(poll_timeout)
# We have no I/O left but it might still run
if child.poll() is None:
_WaitForProcess(child, poll_timeout())
# Terminate if still alive after timeout
if child.poll() is None:
if linger_timeout is None:
logging.warning(msg_timeout)
timeout_action = _TIMEOUT_TERM
IgnoreProcessNotFound(os.kill, child.pid, signal.SIGTERM)
lt = _linger_timeout
else:
lt = linger_timeout()
_WaitForProcess(child, lt)
# Okay, still alive after timeout and linger timeout? Kill it!
if child.poll() is None:
timeout_action = _TIMEOUT_KILL
logging.warning(msg_linger)
IgnoreProcessNotFound(os.kill, child.pid, signal.SIGKILL)
out = out.getvalue()
err = err.getvalue()
status = child.wait()
return out, err, status, timeout_action
def _RunCmdFile(cmd, env, via_shell, output, cwd):
"""Run a command and save its output to a file.
@type cmd: string or list
@param cmd: Command to run
@type env: dict
@param env: The environment to use
@type via_shell: bool
@param via_shell: if we should run via the shell
@type output: str
@param output: the filename in which to save the output
@type cwd: string
@param cwd: the working directory for the program
@rtype: int
@return: the exit status
"""
fh = open(output, "a")
try:
child = subprocess.Popen(cmd, shell=via_shell,
stderr=subprocess.STDOUT,
stdout=fh,
stdin=subprocess.PIPE,
close_fds=True, env=env,
cwd=cwd)
child.stdin.close()
status = child.wait()
finally:
fh.close()
return status