Commit fe39b63b authored by Hrvoje Ribicic's avatar Hrvoje Ribicic

Merge branch 'stable-2.13' into stable-2.14

* stable-2.13
  Change wording in documentation wrt configure-time paths
  Do not distribute files with configure-specific information
  LXC: Add udevadm settle invocation to prevent errors

* stable-2.12
  QA: Fix CheckFileUnmodified to work with vcluster
  QA: Fix white-spaces in CheckFileUnmodified
  QA: Check that the cluster verify doesn't change the config
  QA: Allow to check that an operation doesn't change a file
  Use only shared configuration lock for ComputeDRBDMap
  Only assert properties of non-None objects
  If any IO error happens during job forking, retry
  Add a function for retrying `MonadError` computations
  Annotate every send/receive operation in Exec.hs
  Refactor `rethrowAnnotateIOError` and simplify its usage
  Query.Exec: Describe error if talking to job process fails
  Query.Exec: Log error when talking to job process fails
  Fix the generation of Makefile.ghc rules for *_hi
  Fix error handling for failed fork jobs
  If a forked job process malfunctions, kill it thoroughly
  Add function to run checked computations in `MonadError`
  Add job ID and process ID to log statements in Exec.hs

* stable-2.11
  Improve speed of Xen hypervisor unit tests
  Improve Xen instance state handling
  Renew crypto retries for non-master nodes
  Retries for the master's SSL cert renewal
  Unit tests for offline nodes
  De-duplicate testing code regarding pathutils
  Make LURenewCrypto handle unreachable nodes properly
  Error handling on failed SSL cert renewal for master
  Unit test for LURenewCrypto's valid case
  Mock support for pathutils
  Increase timeout of crypto token RPC

* stable-2.10
  Make QA fail if KVM hotplugging fails
  Always preserve QA command output
  Don't lose stdout/stderr in AssertCommand
  qa_utils: Allow passing fail=None to AssertCommand
  qa_utils: Make AssertCommand return stdout/stderr as well
  Allow plain/DRBD conversions regardless of lack of disks
  Add support for ipolicy modifications to mock config

Conflicts:
	lib/cmdlib/cluster/verify.py
	lib/cmdlib/instance.py
	lib/config/__init__.py
	qa/qa_utils.py
Resolution:
        verify.py: Merge renew-crypto changes in manually.
        instance.py: Remove fixes for DRBD->plain conversion as
          instances with no disks belong to the diskless template
          implicitly as of 2.14.
        __init__.py: Keep lock shared, rename decorator.
        qa_utils.py: Take 2.10 changes.
Signed-off-by: default avatarHrvoje Ribicic <riba@google.com>
Reviewed-by: default avatarHelga Velroyen <helgav@google.com>
parents e281d203 7420ec46
......@@ -408,7 +408,7 @@ BUILT_EXAMPLES = \
doc/examples/systemd/ganeti-rapi.service \
doc/examples/systemd/ganeti-wconfd.service
dist_ifup_SCRIPTS = \
nodist_ifup_SCRIPTS = \
tools/kvm-ifup-os \
tools/xen-ifup-os
......@@ -1315,7 +1315,7 @@ Makefile.ghc: $(HS_MAKEFILE_GHC_SRCS) Makefile $(HASKELL_PACKAGE_VERSIONS_FILE)
# object listed in Makefile.ghc.
# e.g. src/hluxid.o : src/Ganeti/Daemon.hi
# => src/hluxid.o : src/Ganeti/Daemon.hi src/Ganeti/Daemon.o
sed -i -re 's/([^ ]+)\.hi$$/\1.hi \1.o/' $@
sed -i -r -e 's/([^ ]+)\.hi$$/\1.hi \1.o/' -e 's/([^ ]+)_hi$$/\1_hi \1_o/' $@
@include_makefile_ghc@
......@@ -1933,6 +1933,7 @@ python_test_support = \
test/py/cmdlib/testsupport/iallocator_mock.py \
test/py/cmdlib/testsupport/livelock_mock.py \
test/py/cmdlib/testsupport/netutils_mock.py \
test/py/cmdlib/testsupport/pathutils_mock.py \
test/py/cmdlib/testsupport/processor_mock.py \
test/py/cmdlib/testsupport/rpc_runner_mock.py \
test/py/cmdlib/testsupport/ssh_mock.py \
......
......@@ -164,7 +164,9 @@ There are several disk templates you can choose from:
.. note::
Disk templates marked with an asterisk require Ganeti to access the
file system. Ganeti will refuse to do so unless you whitelist the
relevant paths in :pyeval:`pathutils.FILE_STORAGE_PATHS_FILE`.
relevant paths in the file storage paths configuration which,
with default configure-time paths is located
in :pyeval:`pathutils.FILE_STORAGE_PATHS_FILE`.
The default paths used by Ganeti are:
......
......@@ -108,6 +108,7 @@ class LUClusterRenewCrypto(NoHooksLU):
"""
_MAX_NUM_RETRIES = 3
REQ_BGL = False
def ExpandNames(self):
......@@ -128,7 +129,7 @@ class LUClusterRenewCrypto(NoHooksLU):
self._ssh_renewal_suppressed = \
not self.cfg.GetClusterInfo().modify_ssh_setup and self.op.ssh_keys
def _RenewNodeSslCertificates(self):
def _RenewNodeSslCertificates(self, feedback_fn):
"""Renews the nodes' SSL certificates.
Note that most of this operation is done in gnt_cluster.py, this LU only
......@@ -149,18 +150,58 @@ class LUClusterRenewCrypto(NoHooksLU):
except IOError:
logging.info("No old certificate available.")
new_master_digest = _UpdateMasterClientCert(self, self.cfg, master_uuid)
for _ in range(self._MAX_NUM_RETRIES):
try:
# Technically it should not be necessary to set the cert
# paths. However, due to a bug in the mock library, we
# have to do this to be able to test the function properly.
_UpdateMasterClientCert(
self, self.cfg, master_uuid,
client_cert=pathutils.NODED_CLIENT_CERT_FILE,
client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP)
break
except errors.OpExecError as e:
pass
else:
feedback_fn("Could not renew the master's client SSL certificate."
" Cleaning up. Error: %s." % e)
# Cleaning up temporary certificates
self.cfg.RemoveNodeFromCandidateCerts("%s-SERVER" % master_uuid)
self.cfg.RemoveNodeFromCandidateCerts("%s-OLDMASTER" % master_uuid)
try:
utils.RemoveFile(pathutils.NODED_CLIENT_CERT_FILE_TMP)
except IOError:
pass
return
self.cfg.AddNodeToCandidateCerts(master_uuid, new_master_digest)
node_errors = {}
nodes = self.cfg.GetAllNodesInfo()
for (node_uuid, node_info) in nodes.items():
if node_info.offline:
logging.info("* Skipping offline node %s", node_info.name)
continue
if node_uuid != master_uuid:
new_digest = CreateNewClientCert(self, node_uuid)
if node_info.master_candidate:
self.cfg.AddNodeToCandidateCerts(node_uuid, new_digest)
for _ in range(self._MAX_NUM_RETRIES):
try:
new_digest = CreateNewClientCert(self, node_uuid)
if node_info.master_candidate:
self.cfg.AddNodeToCandidateCerts(node_uuid,
new_digest)
break
except errors.OpExecError as last_exception:
pass
else:
if last_exception:
node_errors[node_uuid] = last_exception
if node_errors:
msg = ("Some nodes' SSL client certificates could not be renewed."
" Please make sure those nodes are reachable and rerun"
" the operation. The affected nodes and their errors are:\n")
for uuid, e in node_errors.items():
msg += "Node %s: %s\n" % (uuid, e)
feedback_fn(msg)
self.cfg.RemoveNodeFromCandidateCerts("%s-SERVER" % master_uuid)
self.cfg.RemoveNodeFromCandidateCerts("%s-OLDMASTER" % master_uuid)
......@@ -187,8 +228,10 @@ class LUClusterRenewCrypto(NoHooksLU):
def Exec(self, feedback_fn):
if self.op.node_certificates:
self._RenewNodeSslCertificates()
feedback_fn("Renewing Node SSL certificates")
self._RenewNodeSslCertificates(feedback_fn)
if self.op.ssh_keys and not self._ssh_renewal_suppressed:
feedback_fn("Renewing SSH keys")
self._RenewSshKeys()
elif self._ssh_renewal_suppressed:
feedback_fn("Cannot renew SSH keys if the cluster is configured to not"
......
......@@ -1503,7 +1503,6 @@ class LUInstanceSetParams(LogicalUnit):
assert len(secondary_nodes) == 1
assert utils.AnyDiskOfType(disks, [constants.DT_DRBD8])
snode_uuid = secondary_nodes[0]
feedback_fn("Converting disk template from 'drbd' to 'plain'")
old_disks = AnnotateDiskParams(self.instance, disks, self.cfg)
......@@ -1537,7 +1536,7 @@ class LUInstanceSetParams(LogicalUnit):
feedback_fn("Removing volumes on the secondary node...")
RemoveDisks(self, self.instance, disks=old_disks,
target_node_uuid=snode_uuid)
target_node_uuid=secondary_nodes[0])
feedback_fn("Removing unneeded volumes on the primary node...")
meta_disks = []
......
......@@ -1241,7 +1241,7 @@ class ConfigWriter(object):
self._ConfigData().cluster.highest_used_port = port
return port
@ConfigSync()
@ConfigSync(shared=1)
def ComputeDRBDMap(self):
"""Compute the used DRBD minor/nodes.
......
......@@ -154,7 +154,7 @@ def _ParseInstanceList(lines, include_node):
return result
def _GetAllInstanceList(fn, include_node, _timeout=5):
def _GetAllInstanceList(fn, include_node, delays, timeout):
"""Return the list of instances including running and shutdown.
See L{_RunInstanceList} and L{_ParseInstanceList} for parameter details.
......@@ -162,7 +162,7 @@ def _GetAllInstanceList(fn, include_node, _timeout=5):
"""
instance_list_errors = []
try:
lines = utils.Retry(_RunInstanceList, (0.3, 1.5, 1.0), _timeout,
lines = utils.Retry(_RunInstanceList, delays, timeout,
args=(fn, instance_list_errors))
except utils.RetryTimeout:
if instance_list_errors:
......@@ -182,7 +182,7 @@ def _IsInstanceRunning(instance_info):
"""Determine whether an instance is running.
An instance is running if it is in the following Xen states:
running, blocked, or paused.
running, blocked, paused, or dying (about to be destroyed / shutdown).
For some strange reason, Xen once printed 'rb----' which does not make any
sense because an instance cannot be both running and blocked. Fortunately,
......@@ -193,6 +193,9 @@ def _IsInstanceRunning(instance_info):
to be scheduled to run.
http://old-list-archives.xenproject.org/xen-users/2007-06/msg00849.html
A dying instance is about to be removed, but it is still consuming resources,
and counts as running.
@type instance_info: string
@param instance_info: Information about instance, as supplied by Xen.
@rtype: bool
......@@ -202,15 +205,51 @@ def _IsInstanceRunning(instance_info):
return instance_info == "r-----" \
or instance_info == "rb----" \
or instance_info == "-b----" \
or instance_info == "--p---" \
or instance_info == "-----d" \
or instance_info == "------"
def _IsInstanceShutdown(instance_info):
return instance_info == "---s--"
"""Determine whether the instance is shutdown.
An instance is shutdown when a user shuts it down from within, and we do not
remove domains to be able to detect that.
The dying state has been added as a precaution, as Xen's status reporting is
weird.
"""
return instance_info == "---s--" \
or instance_info == "---s-d"
def _IgnorePaused(instance_info):
"""Removes information about whether a Xen state is paused from the state.
As it turns out, an instance can be reported as paused in almost any
condition. Paused instances can be paused, running instances can be paused for
scheduling, and any other condition can appear to be paused as a result of
races or improbable conditions in Xen's status reporting.
As we do not use Xen's pause commands in any way at the time, we can simply
ignore the paused field and save ourselves a lot of trouble.
Should we ever use the pause commands, several samples would be needed before
we could confirm the domain as paused.
"""
return instance_info.replace('p', '-')
def _XenToHypervisorInstanceState(instance_info):
"""Maps Xen states to hypervisor states.
@type instance_info: string
@param instance_info: Information about instance, as supplied by Xen.
@rtype: L{hv_base.HvInstanceState}
"""
instance_info = _IgnorePaused(instance_info)
if _IsInstanceRunning(instance_info):
return hv_base.HvInstanceState.RUNNING
elif _IsInstanceShutdown(instance_info):
......@@ -221,23 +260,23 @@ def _XenToHypervisorInstanceState(instance_info):
instance_info)
def _GetRunningInstanceList(fn, include_node, _timeout=5):
def _GetRunningInstanceList(fn, include_node, delays, timeout):
"""Return the list of running instances.
See L{_GetAllInstanceList} for parameter details.
"""
instances = _GetAllInstanceList(fn, include_node, _timeout)
instances = _GetAllInstanceList(fn, include_node, delays, timeout)
return [i for i in instances if hv_base.HvInstanceState.IsRunning(i[4])]
def _GetShutdownInstanceList(fn, include_node, _timeout=5):
def _GetShutdownInstanceList(fn, include_node, delays, timeout):
"""Return the list of shutdown instances.
See L{_GetAllInstanceList} for parameter details.
"""
instances = _GetAllInstanceList(fn, include_node, _timeout)
instances = _GetAllInstanceList(fn, include_node, delays, timeout)
return [i for i in instances if hv_base.HvInstanceState.IsShutdown(i[4])]
......@@ -424,6 +463,9 @@ class XenHypervisor(hv_base.BaseHypervisor):
_NICS_DIR = _ROOT_DIR + "/nic" # contains NICs' info
_DIRS = [_ROOT_DIR, _NICS_DIR]
_INSTANCE_LIST_DELAYS = (0.3, 1.5, 1.0)
_INSTANCE_LIST_TIMEOUT = 5
ANCILLARY_FILES = [
XEND_CONFIG_FILE,
XL_CONFIG_FILE,
......@@ -698,7 +740,8 @@ class XenHypervisor(hv_base.BaseHypervisor):
"""
return _GetAllInstanceList(lambda: self._RunXen(["list"], hvparams),
include_node)
include_node, delays=self._INSTANCE_LIST_DELAYS,
timeout=self._INSTANCE_LIST_TIMEOUT)
def ListInstances(self, hvparams=None):
"""Get the list of running instances.
......@@ -712,7 +755,8 @@ class XenHypervisor(hv_base.BaseHypervisor):
"""
instance_list = _GetRunningInstanceList(
lambda: self._RunXen(["list"], hvparams),
False)
False, delays=self._INSTANCE_LIST_DELAYS,
timeout=self._INSTANCE_LIST_TIMEOUT)
return [info[0] for info in instance_list]
def GetInstanceInfo(self, instance_name, hvparams=None):
......
......@@ -1912,7 +1912,8 @@ class JobQueue(object):
# Try to load from disk
job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
assert not job.writable, "Got writable job" # pylint: disable=E1101
if job:
assert not job.writable, "Got writable job" # pylint: disable=E1101
if job:
return job.CalcStatus()
......
......@@ -529,7 +529,7 @@ _NODE_CALLS = [
("ovs_name", None, "Name of the OpenvSwitch to create"),
("ovs_link", None, "Link of the OpenvSwitch to the outside"),
], None, None, "This will create and setup the OpenvSwitch"),
("node_crypto_tokens", SINGLE, None, constants.RPC_TMO_NORMAL, [
("node_crypto_tokens", SINGLE, None, constants.RPC_TMO_SLOW, [
("token_request", None,
"List of tuples of requested crypto token types, actions"),
], None, None, "Handle crypto tokens of the node."),
......
......@@ -279,6 +279,13 @@ def ReleaseBdevPartitionMapping(loop_dev_path):
raise errors.CommandError("Failed to release partition mapping of %s: %s" %
(loop_dev_path, result.output))
# The invocation of udevadm settle was added here because users had issues
# with the loopback device still being busy after kpartx / earlier commands
# did their work.
result = utils_process.RunCmd(["udevadm", "settle"])
if result.failed:
raise errors.CommandError("Waiting on udev failed: %s" % result.output)
result = utils_process.RunCmd(["losetup", "-d", loop_dev_path])
if result.failed:
raise errors.CommandError("Failed to detach %s: %s" %
......
......@@ -52,7 +52,7 @@ import qa_logging
import qa_utils
from qa_utils import AssertEqual, AssertCommand, AssertRedirectedCommand, \
GetCommandOutput
GetCommandOutput, CheckFileUnmodified
# Prefix for LVM volumes created by QA code during tests
......@@ -151,8 +151,9 @@ def AssertClusterVerify(fail=False, errors=None,
cvcmd = "gnt-cluster verify"
mnode = qa_config.GetMasterNode()
if errors or warnings or no_warnings:
cvout = GetCommandOutput(mnode.primary, cvcmd + " --error-codes",
fail=(fail or errors))
with CheckFileUnmodified(mnode.primary, pathutils.CLUSTER_CONF_FILE):
cvout = GetCommandOutput(mnode.primary, cvcmd + " --error-codes",
fail=(fail or errors))
print cvout
(act_errs, act_warns) = _GetCVErrorCodes(cvout)
if errors:
......@@ -163,7 +164,8 @@ def AssertClusterVerify(fail=False, errors=None,
_CheckVerifyNoWarnings(act_warns, no_warnings)
else:
AssertCommand(cvcmd, fail=fail, node=mnode)
with CheckFileUnmodified(mnode.primary, pathutils.CLUSTER_CONF_FILE):
AssertCommand(cvcmd, fail=fail, node=mnode)
# data for testing failures due to bad keys/values for disk parameters
......
......@@ -583,6 +583,25 @@ def TestInstanceInfo(instance):
AssertCommand(["gnt-instance", "info", instance.name])
def _TestKVMHotplug(instance):
"""Tests hotplug modification commands, noting that they
"""
args_to_try = [
["--net", "-1:add", "--hotplug"],
["--net", "-1:modify,mac=aa:bb:cc:dd:ee:ff", "--hotplug", "--force"],
["--net", "-1:remove", "--hotplug"],
["--disk", "-1:add,size=1G", "--hotplug"],
["--disk", "-1:remove", "--hotplug"],
]
for alist in args_to_try:
_, stdout, stderr = \
AssertCommand(["gnt-instance", "modify"] + alist + [instance.name])
if "failed" in stdout or "failed" in stderr:
raise qa_error.Error("Hotplugging command failed; please check output"
" for further information")
@InstanceCheck(INST_UP, INST_UP, FIRST_ARG)
def TestInstanceModify(instance):
"""gnt-instance modify"""
......@@ -634,15 +653,7 @@ def TestInstanceModify(instance):
])
elif default_hv == constants.HT_KVM and \
qa_config.TestEnabled("instance-device-hotplug"):
args.extend([
["--net", "-1:add", "--hotplug"],
["--net", "-1:modify,mac=aa:bb:cc:dd:ee:ff", "--hotplug", "--force"],
["--net", "-1:remove", "--hotplug"],
])
args.extend([
["--disk", "-1:add,size=1G", "--hotplug"],
["--disk", "-1:remove", "--hotplug"],
])
_TestKVMHotplug(instance)
elif default_hv == constants.HT_LXC:
args.extend([
["-H", "%s=0" % constants.HV_CPU_MASK],
......
......@@ -32,6 +32,7 @@
"""
import contextlib
import copy
import datetime
import operator
......@@ -155,6 +156,25 @@ def _AssertRetCode(rcode, fail, cmdstr, nodename):
(cmdstr, nodename, rcode))
def _PrintCommandOutput(stdout, stderr):
"""Prints the output of commands, minimizing wasted space.
@type stdout: string
@type stderr: string
"""
if stdout:
stdout_clean = stdout.rstrip('\n')
if stderr:
print "Stdout was:\n%s" % stdout_clean
else:
print stdout_clean
if stderr:
print "Stderr was:"
print >> sys.stderr, stderr.rstrip('\n')
def AssertCommand(cmd, fail=False, node=None, log_cmd=True, max_seconds=None):
"""Checks that a remote command succeeds.
......@@ -191,12 +211,13 @@ def AssertCommand(cmd, fail=False, node=None, log_cmd=True, max_seconds=None):
stdout, stderr = popen.communicate()
rcode = popen.returncode
duration_seconds = TimedeltaToTotalSeconds(datetime.datetime.now() - start)
if fail is not None:
try:
try:
if fail is not None:
_AssertRetCode(rcode, fail, cmdstr, nodename)
except:
print "Stdout was:\n%s\nStderr was:\n%s\n" % (stdout, stderr)
raise
finally:
if log_cmd:
_PrintCommandOutput(stdout, stderr)
if max_seconds is not None:
if duration_seconds > max_seconds:
......@@ -480,6 +501,32 @@ def BackupFile(node, path):
return result
@contextlib.contextmanager
def CheckFileUnmodified(node, filename):
"""Checks that the content of a given file remains the same after running a
wrapped code.
@type node: string
@param node: node the command should run on
@type filename: string
@param filename: absolute filename to check
"""
cmd = utils.ShellQuoteArgs(["sha1sum", MakeNodePath(node, filename)])
def Read():
return GetCommandOutput(node, cmd).strip()
# read the configuration
before = Read()
yield
# check that the configuration hasn't changed
after = Read()
if before != after:
raise qa_error.Error("File '%s' has changed unexpectedly on node %s"
" during the last operation" % (filename, node))
def ResolveInstanceName(instance):
"""Gets the full name of an instance.
......
......@@ -45,6 +45,7 @@ module Ganeti.BasicTypes
, toError
, toErrorBase
, toErrorStr
, tryError
, Error(..) -- re-export from Control.Monad.Error
, MonadIO(..) -- re-export from Control.Monad.IO.Class
, isOk
......@@ -256,6 +257,13 @@ toErrorBase = (toError =<<) . liftBase . runResultT
toErrorStr :: (MonadError e m, Error e) => Result a -> m a
toErrorStr = withError strMsg
-- | Run a given computation and if an error occurs, return it as `Left` of
-- `Either`.
-- This is a generalized version of 'try'.
tryError :: (MonadError e m) => m a -> m (Either e a)
tryError = flip catchError (return . Left) . liftM Right
{-# INLINE tryError #-}
-- | Converts a monadic result with a 'String' message into
-- a 'ResultT' with an arbitrary 'Error'.
--
......
......@@ -60,21 +60,19 @@ module Ganeti.Query.Exec
, forkJobProcess
) where
import Control.Concurrent
import Control.Exception.Lifted (onException, throwIO)
import qualified Control.Exception.Lifted as E
import Control.Concurrent (rtsSupportsBoundThreads)
import Control.Concurrent.Lifted (threadDelay)
import Control.Monad
import Control.Monad.Error
import Control.Monad.Trans.Maybe
import Data.Functor
import qualified Data.Map as M
import Data.Maybe (listToMaybe, mapMaybe)
import System.Directory (getDirectoryContents)
import System.Environment
import System.IO.Error (tryIOError, annotateIOError)
import System.IO.Error (tryIOError, annotateIOError, modifyIOError)
import System.Posix.Process
import System.Posix.IO
import System.Posix.Signals (sigTERM, signalProcess)
import System.Posix.Signals (sigABRT, sigKILL, sigTERM, signalProcess)
import System.Posix.Types (Fd, ProcessID)
import System.Time
import Text.Printf
......@@ -119,9 +117,9 @@ listOpenFds = liftM filterReadable
-- | Catches a potential `IOError` and sets its description via
-- `annotateIOError`. This makes exceptions more informative when they
-- are thrown from an unnamed `Handle`.
rethrowAnnotateIOError :: IO a -> String -> IO a
rethrowAnnotateIOError f desc =
E.catch f (\e -> throwIO $ annotateIOError e desc Nothing Nothing)
rethrowAnnotateIOError :: String -> IO a -> IO a
rethrowAnnotateIOError desc =
modifyIOError (\e -> annotateIOError e desc Nothing Nothing)
-- Code that is executed in a @fork@-ed process and that the replaces iteself
-- with the actual job process
......@@ -210,69 +208,69 @@ forkJobProcess :: (Error e, Show e)
-- and process id in the job file
-> ResultT e IO (FilePath, ProcessID)
forkJobProcess jid luxiLivelock update = do
let jidStr = show . fromJobId $ jid
logDebug $ "Setting the lockfile temporarily to " ++ luxiLivelock
++ " for job " ++ jidStr
update luxiLivelock
-- Due to a bug in GHC forking process, we want to retry,
-- if the forked process fails to start.
-- If it fails later on, the failure is handled by 'ResultT'
-- and no retry is performed.
let execWriterLogInside =
MaybeT . ResultT . execWriterLogT . runResultT . runMaybeT
resultOpt <- retryMaybeN C.luxidRetryForkCount
let execWriterLogInside = ResultT . execWriterLogT . runResultT
retryErrorN C.luxidRetryForkCount
$ \tryNo -> execWriterLogInside $ do
let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS
when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS)
(pid, master) <- liftIO $ forkWithPipe connectConfig (runJobProcess jid)
let jobLogPrefix = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++ "] "
logDebugJob = logDebug . (jobLogPrefix ++)
logDebugJob "Forked a new process"
let killIfAlive [] = return ()
killIfAlive (sig : sigs) = do
logDebugJob "Getting the status of the process"
status <- tryError . liftIO $ getProcessStatus False True pid
case status of
Left e -> logDebugJob $ "Job process already gone: " ++ show e
Right (Just s) -> logDebugJob $ "Child process status: " ++ show s
Right Nothing -> do
logDebugJob $ "Child process running, killing by " ++ show sig
liftIO $ signalProcess sig pid
unless (null sigs) $ do
threadDelay 100000 -- wait for 0.1s and check again
killIfAlive sigs
let onError = do
logDebug "Closing the pipe to the client"
logDebugJob "Closing the pipe to the client"
withErrorLogAt WARNING "Closing the communication pipe failed"
(liftIO (closeClient master)) `orElse` return ()
logDebug $ "Getting the status of job process "
++ show (fromJobId jid)
status <- liftIO $ getProcessStatus False True pid
case status of
Just s -> logDebug $ "Child process (job " ++ show (fromJobId jid)
++ ") status: " ++ show s
Nothing -> do
logDebug $ "Child process (job " ++ show (fromJobId jid)
++ ") running, killing by SIGTERM"
liftIO $ signalProcess sigTERM pid
flip onException onError $ do
let recv = liftIO $ recvMsg master
`rethrowAnnotateIOError` "ganeti job process input pipe"
`onException`
logError "recv from ganeti job process pipe failed"
send x = liftIO $ sendMsg master x
`rethrowAnnotateIOError` "ganeti job process output pipe"
`onException`
logError "send to ganeti job process pipe failed"
logDebug "Getting the lockfile of the client"
lockfile <- recv `orElse` mzero
logDebug $ "Setting the lockfile to the final " ++ lockfile
killIfAlive [sigTERM, sigABRT, sigKILL]
flip catchError (\e -> onError >> throwError e)
$ do
let annotatedIO msg k = do
logDebugJob msg
liftIO $ rethrowAnnotateIOError (jobLogPrefix ++ msg) k
let recv msg = annotatedIO msg (recvMsg master)
send msg x = annotatedIO msg (sendMsg master x)
lockfile <- recv "Getting the lockfile of the client"