Commit ec98ea2b authored by Petr Pudlak's avatar Petr Pudlak

Cancel jobs by sending SIGTERM

We can only send the signal if the job is alive and if there is a
process ID in the job file (which means that the signal handler has been
installed). If it's missing, we need to wait and retry.

In addition, after we send the signal, we wait for the job to actually
die, to retain the original semantics.
Signed-off-by: default avatarPetr Pudlak <pudlak@google.com>
Reviewed-by: default avatarKlaus Aehlig <aehlig@google.com>
parent cd202891
......@@ -4670,6 +4670,11 @@ luxiWfjcTimeout = (luxiDefRwto - 1) `div` 2
luxiLivelockPrefix :: String
luxiLivelockPrefix = "luxi-daemon"
-- | The LUXI daemon waits this number of seconds for ensuring that a canceled
-- job terminates before giving up.
luxiCancelJobTimeout :: Int
luxiCancelJobTimeout = (luxiDefRwto - 1) `div` 4
-- * Query language constants
-- ** Logic operators with one or more operands, each of which is a
......
......@@ -73,12 +73,14 @@ module Ganeti.JQueue
import Control.Applicative (liftA2, (<|>))
import Control.Arrow (first, second)
import Control.Concurrent (forkIO)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.Functor ((<$))
import Control.Monad.Trans (lift)
import Control.Monad.Trans.Maybe
import Data.Functor ((<$), (<$>))
import Data.List
import Data.Maybe
import Data.Ord (comparing)
......@@ -88,6 +90,7 @@ import System.Directory
import System.FilePath
import System.IO.Error (isDoesNotExistError)
import System.Posix.Files
import System.Posix.Signals (sigTERM, signalProcess)
import System.Time
import qualified Text.JSON
import Text.JSON.Types
......@@ -95,7 +98,7 @@ import Text.JSON.Types
import Ganeti.BasicTypes
import qualified Ganeti.Config as Config
import qualified Ganeti.Constants as C
import Ganeti.Errors (ErrorResult)
import Ganeti.Errors (ErrorResult, ResultG)
import Ganeti.JSON
import Ganeti.Logging
import Ganeti.Luxi
......@@ -110,7 +113,7 @@ import Ganeti.THH.Field
import Ganeti.Types
import Ganeti.Utils
import Ganeti.Utils.Atomic
import Ganeti.Utils.Livelock (Livelock)
import Ganeti.Utils.Livelock (Livelock, isDead)
import Ganeti.VCluster (makeVirtualPath)
-- * Data types
......@@ -573,21 +576,55 @@ startJobs :: ConfigData
-> IO [ErrorResult QueuedJob]
startJobs cfg luxiLivelock jobs = do
qdir <- queueDir
let updateJobLivelock job llfile =
() <$ writeAndReplicateJob cfg qdir (job { qjLivelock = Just llfile })
let updateJob job llfile =
void . writeAndReplicateJob cfg qdir $ job { qjLivelock = Just llfile }
let runJob job = do
llfile <- Exec.forkJobProcess (qjId job) luxiLivelock
(updateJobLivelock job)
(llfile, _) <- Exec.forkJobProcess (qjId job) luxiLivelock
(updateJob job)
return $ job { qjLivelock = Just llfile }
mapM (runResultT . runJob) jobs
-- | Waits for a job to finalize its execution.
waitForJob :: JobId -> Int -> ResultG (Bool, String)
waitForJob jid tmout = do
qDir <- liftIO queueDir
let jobfile = liveJobFile qDir jid
load = liftM fst <$> loadJobFromDisk qDir False jid
jobR <- liftIO $ watchFileBy jobfile tmout
(genericResult (const False) jobFinalized) load
case calcJobStatus <$> jobR of
Ok s | s == JOB_STATUS_CANCELED ->
return (True, "Job successfully cancelled")
| otherwise ->
return (False, "Job exited with status " ++ show s)
Bad e -> failError $ "Can't read job status: " ++ e
-- | Try to cancel a job that has already been handed over to execution,
-- currently by asking masterd to cancel it.
cancelJob :: JobId -> IO (ErrorResult JSValue)
cancelJob jid = do
socketpath <- defaultMasterSocket
client <- getLuxiClient socketpath
callMethod (CancelJob jid) client
-- by terminating the process.
cancelJob :: JobId -> IO (ErrorResult (Bool, String))
cancelJob jid = runResultT $ do
-- we can't terminate the job if it's just being started, so
-- retry several times in such a case
result <- runMaybeT . msum . flip map [0..5 :: Int] $ \tryNo -> do
-- if we're retrying, sleep for some time
when (tryNo > 0) . liftIO . threadDelay $ 100000 * (2 ^ tryNo)
-- first check if the job is alive so that we don't kill some other
-- process by accident
qDir <- liftIO queueDir
(job, _) <- lift . mkResultT $ loadJobFromDisk qDir True jid
let jName = ("Job " ++) . show . fromJobId . qjId $ job
dead <- maybe (return False) (liftIO . isDead) (qjLivelock job)
case qjProcessId job of
_ | dead ->
return (True, jName ++ " has been already dead")
Just pid -> do
liftIO $ signalProcess sigTERM pid
lift $ waitForJob jid C.luxiCancelJobTimeout
_ -> do
logDebug $ jName ++ " in its startup phase, retrying"
mzero
return $ fromMaybe (False, "Timeout: job still in its startup phase") result
-- | Permissions for the archive directories.
queueDirPermissions :: FilePermissions
......
......@@ -347,7 +347,7 @@ handleCall _ qstat cfg (CancelJob jid) = do
writeAndReplicateJob cfg qDir job'
Ok False -> do
logDebug $ jName ++ " not queued; trying to cancel directly"
cancelJob jid
fmap showJSON <$> cancelJob jid
Bad s -> return . Ok . showJSON $ (False, s)
handleCall qlock _ cfg (ArchiveJob jid) = do
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment