Commit 369671f4 authored by Dato Simó's avatar Dato Simó
Browse files

Hbal.hs: move job execution functions to Jobs.hs



Ganeti.Jobs now holds functions that can be used to submit and monitor the
status of jobs. In particular, execJobsWait and waitForJobs are factored
out of Hbal.hs.
Signed-off-by: default avatarDato Simó <dato@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 42afc235
......@@ -30,7 +30,6 @@ module Ganeti.HTools.Program.Hbal
, iterateDepth
) where
import Control.Concurrent (threadDelay)
import Control.Exception (bracket)
import Control.Monad
import Data.List
......@@ -51,12 +50,12 @@ import qualified Ganeti.HTools.Instance as Instance
import Ganeti.BasicTypes
import Ganeti.Common
import Ganeti.Errors
import Ganeti.HTools.CLI
import Ganeti.HTools.ExtLoader
import Ganeti.HTools.Types
import Ganeti.HTools.Loader
import Ganeti.OpCodes (wrapOpCode, setOpComment, OpCode, MetaOpCode)
import Ganeti.Jobs as Jobs
import Ganeti.Types
import Ganeti.Utils
......@@ -174,24 +173,6 @@ saveBalanceCommands opts cmd_data = do
writeFile out_path (shTemplate ++ cmd_data)
printf "The commands have been written to file '%s'\n" out_path
-- | Polls a set of jobs at a fixed interval until all are finished
-- one way or another.
waitForJobs :: L.Client -> [L.JobId] -> IO (Result [JobStatus])
waitForJobs client jids = do
sts <- L.queryJobsStatus client jids
case sts of
Bad e -> return . Bad $ "Checking job status: " ++ formatError e
Ok s -> if any (<= JOB_STATUS_RUNNING) s
then do
-- TODO: replace hardcoded value with a better thing
threadDelay (1000000 * 15)
waitForJobs client jids
else return $ Ok s
-- | Check that a set of job statuses is all success.
checkJobsStatus :: [JobStatus] -> Bool
checkJobsStatus = all (== JOB_STATUS_SUCCESS)
-- | Wrapper over execJobSet checking for early termination via an IORef.
execCancelWrapper :: String -> Node.List
-> Instance.List -> IORef Int -> [JobSet] -> IO (Result ())
......@@ -212,25 +193,20 @@ execJobSet master nl il cref (js:jss) = do
let jobs = map (\(_, idx, move, _) ->
map annotateOpCode $
Cluster.iMoveToJob nl il idx move) js
let descr = map (\(_, idx, _, _) -> Container.nameOf il idx) js
descr = map (\(_, idx, _, _) -> Container.nameOf il idx) js
logfn = putStrLn . ("Got job IDs" ++) . commaJoin . map (show . fromJobId)
putStrLn $ "Executing jobset for instances " ++ commaJoin descr
jrs <- bracket (L.getClient master) L.closeClient
(\client -> do
jids <- L.submitManyJobs client jobs
case jids of
Bad e -> return . Bad $ "Job submission error: " ++ formatError e
Ok x -> do
putStrLn $ "Got job IDs " ++
commaJoin (map (show . fromJobId) x)
waitForJobs client x
)
(\client -> Jobs.execJobsWait client jobs logfn)
case jrs of
Bad x -> return $ Bad x
Ok x -> if checkJobsStatus x
Ok x -> if null failures
then execCancelWrapper master nl il cref jss
else return . Bad . unlines $ [
"Not all jobs completed successfully: " ++ show x,
"Not all jobs completed successfully: " ++ show failures,
"Aborting."]
where
failures = filter ((/= JOB_STATUS_SUCCESS) . snd) x
-- | Executes the jobs, if possible and desired.
maybeExecJobs :: Options
......
{-| Implementation of the job information.
{-| Generic code to work with jobs, e.g. submit jobs and check their status.
-}
......@@ -24,5 +24,42 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-}
module Ganeti.Jobs
(
( execJobsWait
, waitForJobs
) where
import Control.Concurrent (threadDelay)
import Ganeti.BasicTypes
import Ganeti.Errors
import qualified Ganeti.Luxi as L
import Ganeti.OpCodes
import Ganeti.Types
-- | Executes a set of jobs and waits for their completion, returning their
-- status.
execJobsWait :: L.Client -- ^ The Luxi client
-> [[MetaOpCode]] -- ^ The list of jobs
-> ([L.JobId] -> IO ()) -- ^ Post-submission callback
-> IO (Result [(L.JobId, JobStatus)])
execJobsWait client opcodes callback = do
jids <- L.submitManyJobs client opcodes
case jids of
Bad e -> return . Bad $ "Job submission error: " ++ formatError e
Ok jids' -> do
callback jids'
waitForJobs client jids'
-- | Polls a set of jobs at a fixed interval until all are finished
-- one way or another.
waitForJobs :: L.Client -> [L.JobId] -> IO (Result [(L.JobId, JobStatus)])
waitForJobs client jids = do
sts <- L.queryJobsStatus client jids
case sts of
Bad e -> return . Bad $ "Checking job status: " ++ formatError e
Ok sts' -> if any (<= JOB_STATUS_RUNNING) sts'
then do
-- TODO: replace hardcoded value with a better thing
threadDelay (1000000 * 15)
waitForJobs client jids
else return . Ok $ zip jids sts'
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment