Commit c867cfe1 authored by Klaus Aehlig's avatar Klaus Aehlig
Browse files

Add a utility function to try archiving jobs



Provide a function that walks through a list of job ids and
archives them if appropriate. Abort that process if a given
timeout is reached.
Signed-off-by: default avatarKlaus Aehlig <aehlig@google.com>
Reviewed-by: default avatarPetr Pudlak <pudlak@google.com>
parent 8b5a4b9a
......@@ -61,10 +61,12 @@ module Ganeti.JQueue
, isQueueOpen
, startJobs
, cancelJob
, archiveJobs
) where
import Control.Applicative (liftA2, (<|>))
import Control.Arrow (first, second)
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad
......@@ -83,16 +85,17 @@ import qualified Text.JSON
import Text.JSON.Types
import Ganeti.BasicTypes
import qualified Ganeti.Config as Config
import qualified Ganeti.Constants as C
import Ganeti.Errors (ErrorResult)
import Ganeti.JSON
import Ganeti.Logging
import Ganeti.Luxi
import Ganeti.Objects (Node)
import Ganeti.Objects (ConfigData, Node)
import Ganeti.OpCodes
import Ganeti.Path
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
RpcCallJobqueueUpdate(..))
RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
import Ganeti.THH
import Ganeti.Types
import Ganeti.Utils
......@@ -496,3 +499,77 @@ cancelJob jid = do
socketpath <- defaultMasterSocket
client <- getLuxiClient socketpath
callMethod (CancelJob jid) client
-- | Try, at most until the given endtime, to archive some of the given
-- jobs, if they are older than the specified cut-off time; also replicate
-- archival of the additional jobs. Return the pair of the number of jobs
-- archived, and the number of jobs remaining int he queue, asuming the
-- given numbers about the not considered jobs.
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
-> FilePath -- ^ queue root directory
-> ClockTime -- ^ Endtime
-> Timestamp -- ^ cut-off time for archiving jobs
-> Int -- ^ number of jobs alread archived
-> [JobId] -- ^ Additional jobs to replicate
-> [JobId] -- ^ List of job-ids still to consider
-> IO (Int, Int)
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
unless (null torepl) . (>> return ())
. forkIO $ replicateFn torepl
return (arch, 0)
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
continue = archiveMore arch torepl jids
jidname = show $ fromJobId jid
time <- getClockTime
if time >= endt
then do
_ <- forkIO $ replicateFn torepl
return (arch, length (jid:jids))
else do
logDebug $ "Inspecting job " ++ jidname ++ " for archival"
loadResult <- loadJobFromDisk qDir False jid
case loadResult of
Bad _ -> continue
Ok (job, _) ->
if jobArchivable cutt job
then do
let live = liveJobFile qDir jid
archive = archivedJobFile qDir jid
renameResult <- safeRenameFile live archive
case renameResult of
Bad s -> do
logWarning $ "Renaming " ++ live ++ " to " ++ archive
++ " failed unexpectedly: " ++ s
continue
Ok () -> do
let torepl' = jid:torepl
if length torepl' >= 10
then do
_ <- forkIO $ replicateFn torepl'
archiveMore (arch + 1) [] jids
else archiveMore (arch + 1) torepl' jids
else continue
-- | Archive jobs older than the given time, but do not exceed the timeout for
-- carrying out this task.
archiveJobs :: ConfigData -- ^ cluster configuration
-> Int -- ^ time the job has to be in the past in order
-- to be archived
-> Int -- ^ timeout
-> [JobId] -- ^ jobs to consider
-> IO (Int, Int)
archiveJobs cfg age timeout jids = do
now <- getClockTime
qDir <- queueDir
let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
cuttime = if age < 0 then noTimestamp
else advanceTimestamp (- age) (fromClockTime now)
mcs = Config.getMasterCandidates cfg
replicateFn jobs = do
let olds = map (liveJobFile qDir) jobs
news = map (archivedJobFile qDir) jobs
_ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
return ()
archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment