Commit 7b0a9096 authored by Petr Pudlak's avatar Petr Pudlak

Add an utility function for writing and replicating a job

Use the function where appropriate.

Also handling of CancelJob is slightly refactored to use ResultT, which
is used by the new function.
Signed-off-by: default avatarPetr Pudlak <pudlak@google.com>
Reviewed-by: default avatarKlaus Aehlig <aehlig@google.com>
parent 64f3a6ea
......@@ -62,6 +62,7 @@ module Ganeti.JQueue
, allocateJobId
, writeJobToDisk
, replicateManyJobs
, writeAndReplicateJob
, isQueueOpen
, startJobs
, cancelJob
......@@ -486,6 +487,14 @@ replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
replicateManyJobs rootdir mastercandidates =
mapM_ (replicateJob rootdir mastercandidates)
-- | Writes a job to a file and replicates it to master candidates.
writeAndReplicateJob :: (Error e)
=> ConfigData -> FilePath -> QueuedJob
-> ResultT e IO [(Node, ERpcError ())]
writeAndReplicateJob cfg rootdir job = do
mkResultT $ writeJobToDisk rootdir job
liftIO $ replicateJob rootdir (Config.getMasterCandidates cfg) job
-- | Read the job serial number from disk.
readSerialFromDisk :: IO (Result JobId)
readSerialFromDisk = do
......
......@@ -231,14 +231,12 @@ handleCall _ _ cfg (QueryExports nodes lock) =
(map Left nodes) ["node", "export"] lock
handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) = runResultT $ do
let mcs = Config.getMasterCandidates cfg
jid <- mkResultT $ allocateJobId mcs qlock
jid <- mkResultT $ allocateJobId (Config.getMasterCandidates cfg) qlock
ts <- liftIO currentTimestamp
job <- liftM (extendJobReasonTrail . setReceivedTimestamp ts)
$ queuedJobFromOpCodes jid ops
qDir <- liftIO queueDir
mkResultT $ writeJobToDisk qDir job
liftIO $ replicateManyJobs qDir mcs [job]
_ <- writeAndReplicateJob cfg qDir job
_ <- liftIO . forkIO $ enqueueNewJobs qstat [job]
return . showJSON . fromJobId $ jid
......@@ -331,26 +329,19 @@ handleCall _ qstat cfg (CancelJob jid) = do
let jName = (++) "job " . show $ fromJobId jid
dequeueResult <- dequeueJob qstat jid
case dequeueResult of
Ok True -> do
logDebug $ jName ++ " dequeued, marking as canceled"
qDir <- queueDir
readResult <- loadJobFromDisk qDir True jid
let jobFileFailed = return . Ok . showJSON . (,) False
. (++) ("Dequeued " ++ jName
++ ", but failed to mark as cancelled: ")
:: String -> IO (ErrorResult JSValue)
case readResult of
Bad s -> jobFileFailed s
Ok (job, _) -> do
now <- currentTimestamp
let job' = cancelQueuedJob now job
mcs = Config.getMasterCandidates cfg
write_result <- writeJobToDisk qDir job'
case write_result of
Bad s -> jobFileFailed s
Ok () -> do
replicateManyJobs qDir mcs [job']
return . Ok . showJSON $ (True, "Dequeued " ++ jName)
Ok True ->
let jobFileFailed = (,) False
. (++) ("Dequeued " ++ jName
++ ", but failed to mark as cancelled: ")
jobFileSucceeded _ = (True, "Dequeued " ++ jName)
in liftM (Ok . showJSON . genericResult jobFileFailed jobFileSucceeded)
. runResultT $ do
logDebug $ jName ++ " dequeued, marking as canceled"
qDir <- liftIO queueDir
(job, _) <- ResultT $ loadJobFromDisk qDir True jid
now <- liftIO currentTimestamp
let job' = cancelQueuedJob now job
writeAndReplicateJob cfg qDir job'
Ok False -> do
logDebug $ jName ++ " not queued; trying to cancel directly"
cancelJob jid
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment