Commit 4b3e9fa7 authored by Klaus Aehlig's avatar Klaus Aehlig Committed by Petr Pudlak

Clean up dead jobs from the job queue

Make the onTimeWatcher of the job queue scheduler also verify
that all notionally running jobs are indeed alive. If a job is
found dead, remove it from the list of running jobs and update
the job file to reflect the unexpected death.
Signed-off-by: default avatarKlaus Aehlig <aehlig@google.com>
Signed-off-by: default avatarPetr Pudlak <pudlak@google.com>
Reviewed-by: default avatarPetr Pudlak <pudlak@google.com>
parent ec98ea2b
......@@ -291,17 +291,53 @@ showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
in "Waiting jobs: " ++ showids waiting
++ "; running jobs: " ++ showids running
-- | Pure function to remove a job from the list of running
-- jobs, if it is still there. Return whether it was still among
-- the running jobs.
rmFromRunning :: JobId -> Queue -> (Queue, Bool)
rmFromRunning jid queue =
let running = qRunning queue
(running', removed) = partition ((/=) jid . qjId . jJob) running
in ( queue { qRunning = running' }, not $ null removed)
-- | Check if a job died, and clean up if so.
checkForDeath :: JQStatus -> JobWithStat -> IO ()
checkForDeath state jobWS = do
let job = jJob jobWS
jid = qjId job
sjid = show $ fromJobId jid
livelock = qjLivelock job
logDebug $ "Livelock of job " ++ sjid ++ " is " ++ show livelock
died <- maybe (return False) isDead livelock
when died $ do
logInfo $ "Detected death of job " ++ sjid
-- if we manage to remove the job from the queue, we own the job file
-- and can manipulate it.
removed <- atomicModifyIORef (jqJobs state) $ rmFromRunning jid
when removed . void . runResultT $ do
logDebug $ "Removed job " ++ sjid ++ " from the list of running"
:: ResultG ()
jobWS' <- mkResultT $ readJobFromDisk jid
now <- liftIO currentTimestamp
qDir <- liftIO queueDir
let failedJob = failQueuedJob now $ jJob jobWS'
cfg <- mkResultT . readIORef $ jqConfig state
writeAndReplicateJob cfg qDir failedJob
-- | Time-based watcher for updating the job queue.
onTimeWatcher :: JQStatus -> IO ()
onTimeWatcher qstate = forever $ do
threadDelay watchInterval
logDebug "Job queue watcher timer fired"
jobs <- readIORef (jqJobs qstate)
mapM_ (updateJob qstate) $ qRunning jobs
cleanupFinishedJobs qstate
mapM_ (checkForDeath qstate) $ qRunning jobs
jobs' <- readIORef (jqJobs qstate)
logInfo $ showQueue jobs'
mapM_ (updateJob qstate) $ qRunning jobs'
cleanupFinishedJobs qstate
jobs'' <- readIORef (jqJobs qstate)
logInfo $ showQueue jobs''
scheduleSomeJobs qstate
logDebug "Job queue watcher cycle finished"
-- | Read a single, non-archived, job, specified by its id, from disk.
readJobFromDisk :: JobId -> IO (Result JobWithStat)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment