From 48e4da5c91e7b6332daf00b4bfd79f200adc86c6 Mon Sep 17 00:00:00 2001 From: Klaus Aehlig Date: Wed, 27 Nov 2013 11:16:26 +0100 Subject: [PATCH] Add a scheduler to keep track of the job queue In order to allow informed decissions on when to start a job, it is necessary for luxid to keep track of the (active part of the) job queue. Add a scheduler, similar to the config reader, that does this, but also schedules jobs to be executed. At the moment this is just the simple mechanism of starting jobs until a fixed number of running jobs is reached. Signed-off-by: Klaus Aehlig Reviewed-by: Petr Pudlak --- Makefile.am | 1 + src/Ganeti/Constants.hs | 14 +++ src/Ganeti/JQScheduler.hs | 208 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 223 insertions(+) create mode 100644 src/Ganeti/JQScheduler.hs diff --git a/Makefile.am b/Makefile.am index 10e99621e..eab7bd994 100644 --- a/Makefile.am +++ b/Makefile.am @@ -689,6 +689,7 @@ HS_LIB_SRCS = \ src/Ganeti/Hs2Py/GenOpCodes.hs \ src/Ganeti/Hs2Py/OpDoc.hs \ src/Ganeti/JQueue.hs \ + src/Ganeti/JQScheduler.hs \ src/Ganeti/JSON.hs \ src/Ganeti/Jobs.hs \ src/Ganeti/Logging.hs \ diff --git a/src/Ganeti/Constants.hs b/src/Ganeti/Constants.hs index d09de7ea2..6641be6fa 100644 --- a/src/Ganeti/Constants.hs +++ b/src/Ganeti/Constants.hs @@ -3857,6 +3857,20 @@ partMargin = 0.01 partReserved :: Double partReserved = 0.02 +-- * Luxid job scheduling + +-- | Time intervall in seconds for polling updates on the job queue. This +-- intervall is only relevant if the number of running jobs reaches the maximal +-- allowed number, as otherwise new jobs will be started immediately anyway. +luxidJobqueuePollInterval :: Int +luxidJobqueuePollInterval = 3 + +-- | Maximal number of jobs to be running at the same time. Once this number is +-- reached, new jobs will just be queued and only started, once some of the +-- other jobs have finished. +luxidMaximalRunningJobs :: Int +luxidMaximalRunningJobs = 20 + -- * Confd confdProtocolVersion :: Int diff --git a/src/Ganeti/JQScheduler.hs b/src/Ganeti/JQScheduler.hs new file mode 100644 index 000000000..25a7716c9 --- /dev/null +++ b/src/Ganeti/JQScheduler.hs @@ -0,0 +1,208 @@ +{-| Implementation of a reader for the job queue. + +-} + +{- + +Copyright (C) 2013 Google Inc. + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 2 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +02110-1301, USA. + +-} + +module Ganeti.JQScheduler + ( JQStatus + , emptyJQStatus + , initJQScheduler + , enqueueNewJobs + ) where + +import Control.Arrow +import Control.Concurrent +import Control.Monad +import Data.List +import Data.IORef + +import Ganeti.BasicTypes +import Ganeti.Constants as C +import Ganeti.JQueue as JQ +import Ganeti.Logging +import Ganeti.Path +import Ganeti.Types +import Ganeti.Utils + +data JobWithStat = JobWithStat { jStat :: FStat, jJob :: QueuedJob } +data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] } + +{-| Representation of the job queue + +We keep two lists of jobs (together with information about the last +fstat result observed): the jobs that are enqueued, but not yet handed +over for execution, and the jobs already handed over for execution. They +are kept together in a single IORef, so that we can atomically update +both, in particular when scheduling jobs to be handed over for execution. + +-} + +data JQStatus = JQStatus + { jqJobs :: IORef Queue + } + + +emptyJQStatus :: IO JQStatus +emptyJQStatus = do + jqJ <- newIORef Queue {qEnqueued=[], qRunning=[]} + return JQStatus { jqJobs=jqJ } + +-- | Apply a function on the running jobs. +onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue +onRunningJobs f queue = queue {qRunning=f $ qRunning queue} + +-- | Apply a function on the queued jobs. +onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue +onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue} + +-- | Obtain a JobWithStat from a QueuedJob. +unreadJob :: QueuedJob -> JobWithStat +unreadJob job = JobWithStat {jJob=job, jStat=nullFStat} + +-- | Reload interval for polling the running jobs for updates in microseconds. +watchInterval :: Int +watchInterval = C.luxidJobqueuePollInterval * 1000000 + +-- | Maximal number of jobs to be running at the same time. +maxRunningJobs :: Int +maxRunningJobs = C.luxidMaximalRunningJobs + +-- | Wrapper function to atomically update the jobs in the queue status. +modifyJobs :: JQStatus -> (Queue -> Queue) -> IO () +modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) () . f) + +-- | Reread a job from disk, if the file has changed. +readJobStatus :: JobWithStat -> IO (Maybe JobWithStat) +readJobStatus (JobWithStat {jStat=fstat, jJob=job}) = do + let jid = qjId job + qdir <- queueDir + let fpath = liveJobFile qdir jid + logDebug $ "Checking if " ++ fpath ++ " changed on disk." + changed <- needsReload fstat fpath + case changed of + Nothing -> do + logDebug $ "File " ++ fpath ++ " not changed on disk." + return Nothing + Just fstat' -> do + logInfo $ "Rereading " ++ fpath + readResult <- loadJobFromDisk qdir False jid + let jids = show $ fromJobId jid + case readResult of + Bad s -> do + logWarning $ "Failed to read job " ++ jids ++ ": " ++ s + return Nothing + Ok (job', _) -> do + logDebug + $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job') + return . Just $ JobWithStat {jStat=fstat', jJob=job'} + +-- | Update a job in the job queue, if it is still there. This is the +-- pure function for inserting a previously read change into the queue. +-- as the change contains its time stamp, we don't have to worry about a +-- later read change overwriting a newer read state. If this happens, the +-- fstat value will be outdated, so the next poller run will fix this. +updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat] +updateJobStatus job' = + let jid = qjId $ jJob job' in + map (\job -> if qjId (jJob job) == jid then job' else job) + +-- | Update a single job by reading it from disk, if necessary. +updateJob :: JQStatus -> JobWithStat -> IO () +updateJob state jb = do + jb' <- readJobStatus jb + maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb' + +-- | Sort out the finished jobs from the monitored part of the queue. +-- This is the pure part, splitting the queue into a remaining queue +-- and the jobs that were removed. +sortoutFinishedJobs :: Queue -> (Queue, [QueuedJob]) +sortoutFinishedJobs queue = + let (run', fin) = partition + ((<= JOB_STATUS_RUNNING) . calcJobStatus . jJob) + . qRunning $ queue + in (queue {qRunning=run'}, map jJob fin) + +-- | Actually clean up the finished jobs. This is the IO wrapper around +-- the pure `sortoutFinishedJobs`. +cleanupFinishedJobs :: JQStatus -> IO () +cleanupFinishedJobs qstate = do + finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs + let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) + jlist = commaJoin $ map showJob finished + unless (null finished) + . logInfo $ "Finished jobs: " ++ jlist + +-- | Decide on which jobs to schedule next for execution. This is the +-- pure function doing the scheduling. +selectJobsToRun :: Queue -> (Queue, [QueuedJob]) +selectJobsToRun queue = + let n = maxRunningJobs - length (qRunning queue) + (chosen, remain) = splitAt n (qEnqueued queue) + in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen} + , map jJob chosen) + +-- | Schedule jobs to be run. This is the IO wrapper around the +-- pure `selectJobsToRun`. +scheduleSomeJobs :: JQStatus -> IO () +scheduleSomeJobs qstate = do + chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun + unless (null chosen) . logInfo . (++) "Staring jobs: " . commaJoin + $ map (show . fromJobId . qjId) chosen + JQ.startJobs chosen + +-- | Format the job queue status in a compact, human readable way. +showQueue :: Queue -> String +showQueue (Queue {qEnqueued=waiting, qRunning=running}) = + let showids = show . map (fromJobId . qjId . jJob) + in "Waiting jobs: " ++ showids waiting + ++ "; running jobs: " ++ showids running + +-- | Time-based watcher for updating the job queue. +onTimeWatcher :: JQStatus -> IO () +onTimeWatcher qstate = forever $ do + threadDelay watchInterval + logDebug "Watcher timer fired" + jobs <- readIORef (jqJobs qstate) + mapM_ (updateJob qstate) $ qRunning jobs + cleanupFinishedJobs qstate + jobs' <- readIORef (jqJobs qstate) + logInfo $ showQueue jobs' + scheduleSomeJobs qstate + +-- | Set up the job scheduler. This will also start the monitoring +-- of changes to the running jobs. +initJQScheduler :: JQStatus -> IO () +initJQScheduler qstate = do + logInfo "Starting time-based job queue watcher" + _ <- forkIO $ onTimeWatcher qstate + return () + +-- | Enqueue new jobs. This will guarantee that the jobs will be executed +-- eventually. +enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO () +enqueueNewJobs state jobs = do + logInfo . (++) "New jobs enqueued: " . commaJoin + $ map (show . fromJobId . qjId) jobs + let jobs' = map unreadJob jobs + modifyJobs state (onQueuedJobs (++ jobs')) + scheduleSomeJobs state -- GitLab