Commit efb4c025 authored by Petr Pudlak's avatar Petr Pudlak

Execute jobs as processes from Luxi

.. instead of just letting the master daemon to handle them.

We try to start all given jobs independently and requeue those that
failed.
Signed-off-by: default avatarPetr Pudlak <pudlak@google.com>
Reviewed-by: default avatarKlaus Aehlig <aehlig@google.com>
parent 4b887066
......@@ -39,13 +39,16 @@ import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.Function (on)
import Data.Functor ((<$))
import Data.IORef
import Data.List
import Data.Maybe
import Data.IORef
import qualified Data.Set as S
import System.INotify
import Ganeti.BasicTypes
import Ganeti.Constants as C
import Ganeti.Errors
import Ganeti.JQueue as JQ
import Ganeti.Logging
import Ganeti.Objects
......@@ -223,16 +226,40 @@ selectJobsToRun count queue =
remain = deleteFirstsBy ((==) `on` (qjId . jJob)) (qEnqueued queue) chosen
in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
-- | Logs errors of failed jobs and returns the set of job IDs.
logFailedJobs :: (MonadLog m)
=> [(JobWithStat, GanetiException)] -> m (S.Set JobId)
logFailedJobs [] = return S.empty
logFailedJobs jobs = do
let jids = S.fromList . map (qjId . jJob . fst) $ jobs
jidsString = commaJoin . map (show . fromJobId) . S.toList $ jids
logWarning $ "Starting jobs " ++ jidsString ++ " failed: "
++ show (map snd jobs)
return jids
-- | Requeue jobs that were previously selected for execution
-- but couldn't be started.
requeueJobs :: JQStatus -> [JobWithStat] -> IOError -> IO ()
requeueJobs qstate jobs err = do
let jids = map (qjId . jJob) jobs
jidsString = commaJoin $ map (show . fromJobId) jids
rmJobs = filter ((`notElem` jids) . qjId . jJob)
logWarning $ "Starting jobs failed: " ++ show err
logWarning $ "Rescheduling jobs: " ++ jidsString
modifyJobs qstate $ onQueuedJobs (jobs ++) . onRunningJobs rmJobs
requeueJobs :: JQStatus -> [(JobWithStat, GanetiException)] -> IO ()
requeueJobs qstate jobs = do
jids <- logFailedJobs jobs
let rmJobs = filter ((`S.notMember` jids) . qjId . jJob)
logWarning "Rescheduling jobs"
modifyJobs qstate $ onQueuedJobs (map fst jobs ++)
. onRunningJobs rmJobs
-- | Fail jobs that were previously selected for execution
-- but couldn't be started.
failJobs :: ConfigData -> JQStatus -> [(JobWithStat, GanetiException)]
-> IO ()
failJobs cfg qstate jobs = do
qdir <- queueDir
jids <- logFailedJobs jobs
let rmJobs = filter ((`S.notMember` jids) . qjId . jJob)
logWarning "Failing jobs"
modifyJobs qstate $ onRunningJobs rmJobs
let trySaveJob :: JobWithStat -> ResultT String IO ()
trySaveJob = (() <$) . writeAndReplicateJob cfg qdir . jJob
mapM_ (runResultT . trySaveJob . fst) jobs
-- | Schedule jobs to be run. This is the IO wrapper around the
-- pure `selectJobsToRun`.
......@@ -244,8 +271,18 @@ scheduleSomeJobs qstate = do
unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
$ map (show . fromJobId . qjId) jobs
mapM_ (attachWatcher qstate) chosen
result <- try $ JQ.startJobs jobs
either (requeueJobs qstate chosen) return result
cfgR <- readIORef (jqConfig qstate)
case cfgR of
Bad err -> do
let msg = "Configuration unavailable: " ++ err
logError msg
requeueJobs qstate . map (\x -> (x, strMsg msg)) $ chosen
Ok cfg -> do
result <- JQ.startJobs cfg (jqLivelock qstate) jobs
let badWith (x, Bad y) = Just (x, y)
badWith _ = Nothing
let failed = mapMaybe badWith $ zip chosen result
unless (null failed) $ failJobs cfg qstate failed
-- | Format the job queue status in a compact, human readable way.
showQueue :: Queue -> String
......
......@@ -102,6 +102,7 @@ import Ganeti.Luxi
import Ganeti.Objects (ConfigData, Node)
import Ganeti.OpCodes
import Ganeti.Path
import Ganeti.Query.Exec as Exec
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
import Ganeti.THH
......@@ -565,15 +566,20 @@ allocateJobId mastercandidates lock = do
isQueueOpen :: IO Bool
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
-- | Start enqueued jobs, currently by handing them over to masterd.
startJobs :: [QueuedJob] -> IO ()
startJobs jobs = do
socketpath <- defaultMasterSocket
client <- getLuxiClient socketpath
pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
let failures = map show $ justBad pickupResults
unless (null failures)
. logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
-- | Start enqueued jobs by executing the Python code.
startJobs :: ConfigData
-> Livelock -- ^ Luxi's livelock path
-> [QueuedJob] -- ^ the list of jobs to start
-> IO [ErrorResult QueuedJob]
startJobs cfg luxiLivelock jobs = do
qdir <- queueDir
let updateJobLivelock job llfile =
() <$ writeAndReplicateJob cfg qdir (job { qjLivelock = Just llfile })
let runJob job = do
llfile <- Exec.forkJobProcess (qjId job) luxiLivelock
(updateJobLivelock job)
return $ job { qjLivelock = Just llfile }
mapM (runResultT . runJob) jobs
-- | Try to cancel a job that has already been handed over to execution,
-- currently by asking masterd to cancel it.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment