JQScheduler.hs 22.8 KB
Newer Older
1
{-# LANGUAGE RankNTypes #-}
2
3
4
5
6
7
8
{-| Implementation of a reader for the job queue.

-}

{-

Copyright (C) 2013 Google Inc.
9
All rights reserved.
10

11
12
13
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
14

15
16
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
17

18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33
34
35
36
37

-}

module Ganeti.JQScheduler
  ( JQStatus
38
  , jqLivelock
39
  , jqForkLock
40
  , emptyJQStatus
41
  , selectJobsToRun
Niklas Hambuechen's avatar
Niklas Hambuechen committed
42
  , scheduleSomeJobs
43
44
  , initJQScheduler
  , enqueueNewJobs
45
  , dequeueJob
46
  , setJobPriority
47
  , cleanupIfDead
48
49
  ) where

Niklas Hambuechen's avatar
Niklas Hambuechen committed
50
import Control.Applicative (liftA2, (<$>))
51
52
import Control.Arrow
import Control.Concurrent
53
import Control.Exception
54
import Control.Monad
55
import Control.Monad.IO.Class
56
import Data.Function (on)
57
58
import Data.Functor ((<$))
import Data.IORef
59
import Data.List
Klaus Aehlig's avatar
Klaus Aehlig committed
60
import Data.Maybe
Niklas Hambuechen's avatar
Niklas Hambuechen committed
61
import qualified Data.Map as Map
62
import Data.Ord (comparing)
Niklas Hambuechen's avatar
Niklas Hambuechen committed
63
import Data.Set (Set)
64
import qualified Data.Set as S
65
import System.INotify
66
67
68

import Ganeti.BasicTypes
import Ganeti.Constants as C
69
import Ganeti.Errors
Niklas Hambuechen's avatar
Niklas Hambuechen committed
70
import Ganeti.JQScheduler.Filtering (applyingFilter, jobFiltering)
71
import Ganeti.JQScheduler.Types
72
import Ganeti.JQScheduler.ReasonRateLimiting (reasonRateLimit)
73
import Ganeti.JQueue as JQ
Niklas Hambuechen's avatar
Niklas Hambuechen committed
74
import Ganeti.JSON (fromContainer)
75
import Ganeti.Lens hiding (chosen)
76
import Ganeti.Logging
77
import Ganeti.Objects
78
79
80
import Ganeti.Path
import Ganeti.Types
import Ganeti.Utils
81
import Ganeti.Utils.Livelock
82
import Ganeti.Utils.MVarLock
83
84
85
86
87
88
89
90
91
92
93
94
95
96


{-| Representation of the job queue

We keep two lists of jobs (together with information about the last
fstat result observed): the jobs that are enqueued, but not yet handed
over for execution, and the jobs already handed over for execution. They
are kept together in a single IORef, so that we can atomically update
both, in particular when scheduling jobs to be handed over for execution.

-}

data JQStatus = JQStatus
  { jqJobs :: IORef Queue
97
  , jqConfig :: IORef (Result ConfigData)
98
  , jqLivelock :: Livelock
99
  , jqForkLock :: Lock
100
101
102
  }


103
104
emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
emptyJQStatus config = do
105
  jqJ <- newIORef Queue { qEnqueued = [], qRunning = [], qManipulated = [] }
106
  (_, livelock) <- mkLivelockFile C.luxiLivelockPrefix
107
108
109
  forkLock <- newLock
  return JQStatus { jqJobs = jqJ, jqConfig = config, jqLivelock = livelock
                  , jqForkLock = forkLock }
110
111
112

-- | Apply a function on the running jobs.
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
113
onRunningJobs = over qRunningL
114
115
116

-- | Apply a function on the queued jobs.
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
117
onQueuedJobs = over qEnqueuedL
118
119
120

-- | Obtain a JobWithStat from a QueuedJob.
unreadJob :: QueuedJob -> JobWithStat
121
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
122
123
124
125
126

-- | Reload interval for polling the running jobs for updates in microseconds.
watchInterval :: Int
watchInterval = C.luxidJobqueuePollInterval * 1000000 

127
128
129
130
131
132
133
-- | Read a cluster parameter from the configuration, using a default if the
-- configuration is not available.
getConfigValue :: (Cluster -> a) -> a -> JQStatus -> IO a
getConfigValue param defaultvalue =
  liftM (genericResult (const defaultvalue) (param . configCluster))
  . readIORef . jqConfig

134
135
136
137
-- | Get the maximual number of jobs to be run simultaneously from the
-- configuration. If the configuration is not available, be conservative
-- and use the smallest possible value, i.e., 1.
getMaxRunningJobs :: JQStatus -> IO Int
138
139
140
141
142
143
144
145
146
147
148
getMaxRunningJobs = getConfigValue clusterMaxRunningJobs 1

-- | Get the maximual number of jobs to be tracked simultaneously from the
-- configuration. If the configuration is not available, be conservative
-- and use the smallest possible value, i.e., 1.
getMaxTrackedJobs :: JQStatus -> IO Int
getMaxTrackedJobs = getConfigValue clusterMaxTrackedJobs 1

-- | Get the number of jobs currently running.
getRQL :: JQStatus -> IO Int
getRQL = liftM (length . qRunning) . readIORef . jqJobs
149
150
151
152
153
154
155

-- | Wrapper function to atomically update the jobs in the queue status.
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)

-- | Reread a job from disk, if the file has changed.
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
156
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job})  = do
157
158
159
160
  let jid = qjId job
  qdir <- queueDir
  let fpath = liveJobFile qdir jid
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
161
162
163
  changedResult <- try $ needsReload fstat fpath
                   :: IO (Either IOError (Maybe FStat))
  let changed = either (const $ Just nullFStat) id changedResult
164
165
166
167
168
169
  case changed of
    Nothing -> do
      logDebug $ "File " ++ fpath ++ " not changed on disk."
      return Nothing
    Just fstat' -> do
      let jids = show $ fromJobId jid
170
171
      logInfo $ "Rereading job "  ++ jids
      readResult <- loadJobFromDisk qdir True jid
172
173
174
175
176
      case readResult of
        Bad s -> do
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
          return Nothing
        Ok (job', _) -> do
Petr Pudlak's avatar
Petr Pudlak committed
177
178
          logDebug $ "Read job " ++ jids ++ ", status is "
                     ++ show (calcJobStatus job')
179
180
          return . Just $ jWS {jStat=fstat', jJob=job'}
                          -- jINotify unchanged
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196

-- | Update a job in the job queue, if it is still there. This is the
-- pure function for inserting a previously read change into the queue.
-- as the change contains its time stamp, we don't have to worry about a
-- later read change overwriting a newer read state. If this happens, the
-- fstat value will be outdated, so the next poller run will fix this.
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
updateJobStatus job' =
  let jid = qjId $ jJob job' in
  map (\job -> if qjId (jJob job) == jid then job' else job)

-- | Update a single job by reading it from disk, if necessary.
updateJob :: JQStatus -> JobWithStat -> IO ()
updateJob state jb = do
  jb' <- readJobStatus jb
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
197
198
199
200
  when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
    logDebug "Scheduler noticed a job to have finished."
    cleanupFinishedJobs state
    scheduleSomeJobs state
201

202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
-- | Move a job from one part of the queue to another.
-- Return the job that was moved, or 'Nothing' if it wasn't found in
-- the queue.
moveJob :: Lens' Queue [JobWithStat] -- ^ from queue
        -> Lens' Queue [JobWithStat] -- ^ to queue
        -> JobId
        -> Queue
        -> (Queue, Maybe JobWithStat)
moveJob fromQ toQ jid queue =
    -- traverse over the @(,) [JobWithStats]@ functor to extract the job
    case traverseOf fromQ (partition ((== jid) . qjId . jJob)) queue of
      (job : _, queue') -> (over toQ (++ [job]) queue', Just job)
      _                 -> (queue, Nothing)

-- | Atomically move a job from one part of the queue to another.
-- Return the job that was moved, or 'Nothing' if it wasn't found in
-- the queue.
moveJobAtomic :: Lens' Queue [JobWithStat] -- ^ from queue
              -> Lens' Queue [JobWithStat] -- ^ to queue
              -> JobId
              -> JQStatus
              -> IO (Maybe JobWithStat)
moveJobAtomic fromQ toQ jid qstat =
  atomicModifyIORef (jqJobs qstat) (moveJob fromQ toQ jid)

-- | Manipulate a running job by atomically moving it from 'qRunning'
-- into 'qManipulated', running a given IO action and then atomically
-- returning it back.
--
-- Returns the result of the IO action, or 'Nothing', if the job wasn't found
-- in the queue.
manipulateRunningJob :: JQStatus -> JobId -> IO a -> IO (Maybe a)
manipulateRunningJob qstat jid k = do
  jobOpt <- moveJobAtomic qRunningL qManipulatedL jid qstat
  case jobOpt of
    Nothing -> return Nothing
    Just _  -> (Just `liftM` k)
               `finally` moveJobAtomic qManipulatedL qRunningL jid qstat

241
242
243
-- | Sort out the finished jobs from the monitored part of the queue.
-- This is the pure part, splitting the queue into a remaining queue
-- and the jobs that were removed.
244
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
245
sortoutFinishedJobs queue =
246
  let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
247
  in (queue {qRunning=run'}, fin)
248
249
250
251
252
253

-- | Actually clean up the finished jobs. This is the IO wrapper around
-- the pure `sortoutFinishedJobs`.
cleanupFinishedJobs :: JQStatus -> IO ()
cleanupFinishedJobs qstate = do
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
254
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
255
256
257
      jlist = commaJoin $ map showJob finished
  unless (null finished)
    . logInfo $ "Finished jobs: " ++ jlist
Klaus Aehlig's avatar
Klaus Aehlig committed
258
  mapM_ (maybe (return ()) killINotify . jINotify) finished
259

Klaus Aehlig's avatar
Klaus Aehlig committed
260
261
262
263
264
265
266
-- | Watcher task for a job, to update it on file changes. It also
-- reinstantiates itself upon receiving an Ignored event.
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
jobWatcher state jWS e = do
  let jid = qjId $ jJob jWS
      jids = show $ fromJobId jid
  logInfo $ "Scheduler notified of change of job " ++ jids
Petr Pudlak's avatar
Petr Pudlak committed
267
  logDebug $ "Scheduler notify event for " ++ jids ++ ": " ++ show e
Klaus Aehlig's avatar
Klaus Aehlig committed
268
269
270
271
272
273
274
275
276
277
278
279
  let inotify = jINotify jWS
  when (e == Ignored  && isJust inotify) $ do
    qdir <- queueDir
    let fpath = liveJobFile qdir jid
    _ <- addWatch (fromJust inotify) [Modify, Delete] fpath
           (jobWatcher state jWS)
    return ()
  updateJob state jWS

-- | Attach the job watcher to a running job.
attachWatcher :: JQStatus -> JobWithStat -> IO ()
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
280
281
282
283
284
285
286
287
288
289
290
291
292
293
  max_watch <- getMaxTrackedJobs state
  rql <- getRQL state
  if rql < max_watch
   then do
     inotify <- initINotify
     qdir <- queueDir
     let fpath = liveJobFile qdir . qjId $ jJob jWS
         jWS' = jWS { jINotify=Just inotify }
     logDebug $ "Attaching queue watcher for " ++ fpath
     _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
     modifyJobs state . onRunningJobs $ updateJobStatus jWS'
   else logDebug $ "Not attaching watcher for job "
                   ++ (show . fromJobId . qjId $ jJob jWS)
                   ++ ", run queue length is " ++ show rql
Klaus Aehlig's avatar
Klaus Aehlig committed
294

295
296
297
298
299
300
301
302
-- | For a queued job, determine whether it is eligible to run, i.e.,
-- if no jobs it depends on are either enqueued or running.
jobEligible :: Queue -> JobWithStat -> Bool
jobEligible queue jWS =
  let jdeps = getJobDependencies $ jJob jWS
      blocks = flip elem jdeps . qjId . jJob
  in not . any blocks . liftA2 (++) qRunning qEnqueued $ queue

303
304
-- | Decide on which jobs to schedule next for execution. This is the
-- pure function doing the scheduling.
305
306
selectJobsToRun :: Int  -- ^ How many jobs are allowed to run at the
                        -- same time.
Niklas Hambuechen's avatar
Niklas Hambuechen committed
307
                -> Set FilterRule -- ^ Filter rules to respect for scheduling
308
309
                -> Queue
                -> (Queue, [JobWithStat])
Niklas Hambuechen's avatar
Niklas Hambuechen committed
310
selectJobsToRun count filters queue =
311
  let n = count - length (qRunning queue) - length (qManipulated queue)
312
      chosen = take n
Niklas Hambuechen's avatar
Niklas Hambuechen committed
313
               . jobFiltering queue filters
314
               . reasonRateLimit queue
315
316
317
               . sortBy (comparing (calcJobPriority . jJob))
               . filter (jobEligible queue)
               $ qEnqueued queue
318
      remain = deleteFirstsBy ((==) `on` (qjId . jJob)) (qEnqueued queue) chosen
319
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
320

321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
-- | Logs errors of failed jobs and returns the set of job IDs.
logFailedJobs :: (MonadLog m)
              => [(JobWithStat, GanetiException)] -> m (S.Set JobId)
logFailedJobs [] = return S.empty
logFailedJobs jobs = do
  let jids = S.fromList . map (qjId . jJob . fst) $ jobs
      jidsString = commaJoin . map (show . fromJobId) . S.toList $ jids
  logWarning $ "Starting jobs " ++ jidsString ++ " failed: "
               ++ show (map snd jobs)
  return jids

-- | Fail jobs that were previously selected for execution
-- but couldn't be started.
failJobs :: ConfigData -> JQStatus -> [(JobWithStat, GanetiException)]
         -> IO ()
failJobs cfg qstate jobs = do
  qdir <- queueDir
338
  now <- currentTimestamp
339
  jids <- logFailedJobs jobs
340
  let sjobs = intercalate "." . map (show . fromJobId) $ S.toList jids
341
  let rmJobs = filter ((`S.notMember` jids) . qjId . jJob)
342
  logWarning $ "Failing jobs " ++ sjobs
343
344
345
  modifyJobs qstate $ onRunningJobs rmJobs
  let trySaveJob :: JobWithStat -> ResultT String IO ()
      trySaveJob = (() <$) . writeAndReplicateJob cfg qdir . jJob
346
347
348
349
350
351
352
353
      reason jid msg =
        ( "gnt:daemon:luxid:startjobs"
        , "job " ++ show (fromJobId jid) ++ " failed to start: " ++ msg
        , reasonTrailTimestamp now )
      failJob err job = failQueuedJob (reason (qjId job) (show err)) now job
      failAndSaveJobWithStat (jws, err) =
        trySaveJob . over jJobL (failJob err) $ jws
  mapM_ (runResultT . failAndSaveJobWithStat) jobs
354
  logDebug $ "Failed jobs " ++ sjobs
355

Niklas Hambuechen's avatar
Niklas Hambuechen committed
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393

-- | Checks if any jobs match a REJECT filter rule, and cancels them.
cancelRejectedJobs :: JQStatus -> ConfigData -> Set FilterRule -> IO ()
cancelRejectedJobs qstate cfg filters = do

  enqueuedJobs <- map jJob . qEnqueued <$> readIORef (jqJobs qstate)

  -- Determine which jobs are rejected.
  let jobsToCancel =
        [ (job, fr) | job <- enqueuedJobs
                    , Just fr <- [applyingFilter filters job]
                    , frAction fr == Reject ]

  -- Cancel them.
  qDir <- queueDir
  forM_ jobsToCancel $ \(job, fr) -> do
    let jid = qjId job
    logDebug $ "Cancelling job " ++ show (fromJobId jid)
               ++ " because it was REJECTed by filter rule " ++ frUuid fr
    -- First dequeue, then cancel.
    dequeueResult <- dequeueJob qstate jid
    case dequeueResult of
      Ok True -> do
        now <- currentTimestamp
        r <- runResultT
               $ writeAndReplicateJob cfg qDir (cancelQueuedJob now job)
        case r of
          Ok _ -> return ()
          Bad err -> logError $
            "Failed to write config when cancelling job: " ++ err
      Ok False -> do
        logDebug $ "Job " ++ show (fromJobId jid)
                   ++ " not queued; trying to cancel directly"
        _ <- cancelJob False (jqLivelock qstate) jid  -- sigTERM-kill only
        return ()
      Bad s -> logError s -- passing a nonexistent job ID is an error here


394
395
396
397
-- | Schedule jobs to be run. This is the IO wrapper around the
-- pure `selectJobsToRun`.
scheduleSomeJobs :: JQStatus -> IO ()
scheduleSomeJobs qstate = do
398
399
400
401
402
403
  cfgR <- readIORef (jqConfig qstate)
  case cfgR of
    Bad err -> do
      let msg = "Configuration unavailable: " ++ err
      logError msg
    Ok cfg -> do
Niklas Hambuechen's avatar
Niklas Hambuechen committed
404
405
406
407
408
      let filters = S.fromList . Map.elems . fromContainer $ configFilters cfg

      -- Check if jobs are rejected by a REJECT filter, and cancel them.
      cancelRejectedJobs qstate cfg filters

409
410
      -- Select the jobs to run.
      count <- getMaxRunningJobs qstate
Niklas Hambuechen's avatar
Niklas Hambuechen committed
411
412
      chosen <- atomicModifyIORef (jqJobs qstate)
                                  (selectJobsToRun count filters)
413
414
415
416
417
418
419
420
      let jobs = map jJob chosen
      unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
        $ map (show . fromJobId . qjId) jobs

      -- Attach the watcher.
      mapM_ (attachWatcher qstate) chosen

      -- Start the jobs.
421
      result <- JQ.startJobs cfg (jqLivelock qstate) (jqForkLock qstate) jobs
422
423
424
425
      let badWith (x, Bad y) = Just (x, y)
          badWith _          = Nothing
      let failed = mapMaybe badWith $ zip chosen result
      unless (null failed) $ failJobs cfg qstate failed
426
427
428
429
430
431
432
433

-- | Format the job queue status in a compact, human readable way.
showQueue :: Queue -> String
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
  let showids = show . map (fromJobId . qjId . jJob)
  in "Waiting jobs: " ++ showids waiting 
       ++ "; running jobs: " ++ showids running

434
435
436
-- | Check if a job died, and clean up if so. Return True, if
-- the job was found dead.
checkForDeath :: JQStatus -> JobWithStat -> IO Bool
437
438
439
440
441
442
checkForDeath state jobWS = do
  let job = jJob jobWS
      jid = qjId job
      sjid = show $ fromJobId jid
      livelock = qjLivelock job
  logDebug $ "Livelock of job " ++ sjid ++ " is " ++ show livelock
443
444
445
  died <- maybe (return False) isDead
          . mfilter (/= jqLivelock state)
          $ livelock
446
  logDebug $ "Death of " ++ sjid ++ ": " ++ show died
447
448
449
450
  when died $ do
    logInfo $ "Detected death of job " ++ sjid
    -- if we manage to remove the job from the queue, we own the job file
    -- and can manipulate it.
451
452
453
454
455
456
457
    void . manipulateRunningJob state jid . runResultT $ do
      jobWS' <- mkResultT $ readJobFromDisk jid :: ResultG JobWithStat
      unless (jobFinalized . jJob $ jobWS') . void $ do
        -- If the job isn't finalized, but dead, add a corresponding
        -- failed status.
        now <- liftIO currentTimestamp
        qDir <- liftIO queueDir
Klaus Aehlig's avatar
Klaus Aehlig committed
458
        let reason = ( "gnt:daemon:luxid:deathdetection"
459
460
461
462
463
                     , "detected death of job " ++ sjid
                     , reasonTrailTimestamp now )
            failedJob = failQueuedJob reason now $ jJob jobWS'
        cfg <- mkResultT . readIORef $ jqConfig state
        writeAndReplicateJob cfg qDir failedJob
464
  return died
465

466
-- | Trigger job detection for the job with the given job id.
467
468
-- Return True, if the job is dead.
cleanupIfDead :: JQStatus -> JobId -> IO Bool
469
470
471
472
cleanupIfDead state jid = do
  logDebug $ "Extra job-death detection for " ++ show (fromJobId jid)
  jobs <- readIORef (jqJobs state)
  let jobWS = find ((==) jid . qjId . jJob) $ qRunning jobs
473
  maybe (return True) (checkForDeath state) jobWS
474

475
476
477
478
-- | Time-based watcher for updating the job queue.
onTimeWatcher :: JQStatus -> IO ()
onTimeWatcher qstate = forever $ do
  threadDelay watchInterval
Klaus Aehlig's avatar
Klaus Aehlig committed
479
  logDebug "Job queue watcher timer fired"
480
  jobs <- readIORef (jqJobs qstate)
481
  mapM_ (checkForDeath qstate) $ qRunning jobs
482
  jobs' <- readIORef (jqJobs qstate)
483
484
485
486
  mapM_ (updateJob qstate) $ qRunning jobs'
  cleanupFinishedJobs qstate
  jobs'' <- readIORef (jqJobs qstate)
  logInfo $ showQueue jobs''
487
  scheduleSomeJobs qstate
488
  logDebug "Job queue watcher cycle finished"
489

490
491
492
493
494
495
496
497
498
-- | Read a single, non-archived, job, specified by its id, from disk.
readJobFromDisk :: JobId -> IO (Result JobWithStat)
readJobFromDisk jid = do
  qdir <- queueDir
  let fpath = liveJobFile qdir jid
  logDebug $ "Reading " ++ fpath
  tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
  let fstat = either (const nullFStat) id tryFstat
  loadResult <- JQ.loadJobFromDisk qdir False jid
499
  return $ liftM (JobWithStat Nothing fstat . fst) loadResult
500
501
502
503
504
505
506

-- | Read all non-finalized jobs from disk.
readJobsFromDisk :: IO [JobWithStat]
readJobsFromDisk = do
  logInfo "Loading job queue"
  qdir <- queueDir
  eitherJids <- JQ.getJobIDs [qdir]
507
  let jids = genericResult (const []) JQ.sortJobIDs eitherJids
508
509
510
511
512
      jidsstring = commaJoin $ map (show . fromJobId) jids
  logInfo $ "Non-archived jobs on disk: " ++ jidsstring
  jobs <- mapM readJobFromDisk jids
  return $ justOk jobs

513
514
515
516
-- | Set up the job scheduler. This will also start the monitoring
-- of changes to the running jobs.
initJQScheduler :: JQStatus -> IO ()
initJQScheduler qstate = do
517
518
519
520
521
522
523
  alljobs <- readJobsFromDisk
  let jobs = filter (not . jobFinalized . jJob) alljobs
      (running, queued) = partition (jobStarted . jJob) jobs
  modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
  jqjobs <- readIORef (jqJobs qstate)
  logInfo $ showQueue jqjobs
  scheduleSomeJobs qstate
524
525
526
527
528
529
530
531
532
533
534
  logInfo "Starting time-based job queue watcher"
  _ <- forkIO $ onTimeWatcher qstate
  return ()

-- | Enqueue new jobs. This will guarantee that the jobs will be executed
-- eventually.
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
enqueueNewJobs state jobs = do
  logInfo . (++) "New jobs enqueued: " . commaJoin
    $ map (show . fromJobId . qjId) jobs
  let jobs' = map unreadJob jobs
535
536
537
      insertFn = insertBy (compare `on` fromJobId . qjId . jJob)
      addJobs oldjobs = foldl (flip insertFn) oldjobs jobs'
  modifyJobs state (onQueuedJobs addJobs)
538
  scheduleSomeJobs state
539
540

-- | Pure function for removing a queued job from the job queue by
541
542
-- atomicModifyIORef. The answer is Just the job if the job could be removed
-- before being handed over to execution, Nothing if it already was started
543
-- and a Bad result if the job is not found in the queue.
544
rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob))
545
546
547
rmJob jid q =
  let isJid = (jid ==) . qjId . jJob
      (found, queued') = partition isJid $ qEnqueued q
548
549
550
551
552
553
554
555
      isRunning = any isJid $ qRunning q
      sJid = (++) "Job " . show $ fromJobId jid
  in case (found, isRunning) of
    ([job], _) -> (q {qEnqueued = queued'}, Ok . Just $ jJob job)
    (_:_, _) -> (q, Bad $ "Queue in inconsistent state."
                           ++ sJid ++ " queued multiple times")
    (_, True) -> (q, Ok Nothing)
    _ -> (q, Bad $ sJid ++ " not found in queue")
556
557
558
559
560
561
562
563

-- | Try to remove a queued job from the job queue. Return True, if
-- the job could be removed from the queue before being handed over
-- to execution, False if the job already started, and a Bad result
-- if the job is unknown.
dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
dequeueJob state jid = do
  result <- atomicModifyIORef (jqJobs state) $ rmJob jid
564
  let result' = fmap isJust result
565
  logDebug $ "Result of dequeing job " ++ show (fromJobId jid)
566
567
              ++ " is " ++ show result'
  return result'
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585

-- | Change the priority of a queued job (once the job is handed over
-- to execution, the job itself needs to be informed). To avoid the
-- job being started unmodified, it is temporarily unqueued during the
-- change. Return the modified job, if the job's priority was sucessfully
-- modified, Nothing, if the job already started, and a Bad value, if the job
-- is unkown.
setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob))
setJobPriority state jid prio = runResultT $ do
  maybeJob <- mkResultT . atomicModifyIORef (jqJobs state) $ rmJob jid
  case maybeJob of
    Nothing -> return Nothing
    Just job -> do
      let job' = changeJobPriority prio job
      qDir <- liftIO queueDir
      mkResultT $ writeJobToDisk qDir job'
      liftIO $ enqueueNewJobs state [job']
      return $ Just job'