Commit 2b63951d authored by Petr Pudlak's avatar Petr Pudlak
Browse files

Add a synchronization lock for forking new processes



Preventing multiple forks occurring at once will hopefully prevent GHC
'fork' problems.
Signed-off-by: default avatarPetr Pudlak <pudlak@google.com>
Reviewed-by: default avatarKlaus Aehlig <aehlig@google.com>
parent b0b4f975
......@@ -27,6 +27,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
module Ganeti.JQScheduler
( JQStatus
, jqLivelock
, jqForkLock
, emptyJQStatus
, initJQScheduler
, enqueueNewJobs
......@@ -59,6 +60,7 @@ import Ganeti.Path
import Ganeti.Types
import Ganeti.Utils
import Ganeti.Utils.Livelock
import Ganeti.Utils.MVarLock
data JobWithStat = JobWithStat { jINotify :: Maybe INotify
, jStat :: FStat
......@@ -83,6 +85,7 @@ data JQStatus = JQStatus
{ jqJobs :: IORef Queue
, jqConfig :: IORef (Result ConfigData)
, jqLivelock :: Livelock
, jqForkLock :: Lock
}
......@@ -90,7 +93,9 @@ emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
emptyJQStatus config = do
jqJ <- newIORef Queue { qEnqueued = [], qRunning = []}
(_, livelock) <- mkLivelockFile C.luxiLivelockPrefix
return JQStatus { jqJobs = jqJ, jqConfig = config, jqLivelock = livelock }
forkLock <- newLock
return JQStatus { jqJobs = jqJ, jqConfig = config, jqLivelock = livelock
, jqForkLock = forkLock }
-- | Apply a function on the running jobs.
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
......@@ -291,7 +296,7 @@ scheduleSomeJobs qstate = do
logError msg
requeueJobs qstate . map (\x -> (x, strMsg msg)) $ chosen
Ok cfg -> do
result <- JQ.startJobs cfg (jqLivelock qstate) jobs
result <- JQ.startJobs cfg (jqLivelock qstate) (jqForkLock qstate) jobs
let badWith (x, Bad y) = Just (x, y)
badWith _ = Nothing
let failed = mapMaybe badWith $ zip chosen result
......
......@@ -522,13 +522,14 @@ isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
-- | Start enqueued jobs by executing the Python code.
startJobs :: ConfigData
-> Livelock -- ^ Luxi's livelock path
-> Lock -- ^ lock for forking new processes
-> [QueuedJob] -- ^ the list of jobs to start
-> IO [ErrorResult QueuedJob]
startJobs cfg luxiLivelock jobs = do
startJobs cfg luxiLivelock forkLock jobs = do
qdir <- queueDir
let updateJob job llfile =
void . writeAndReplicateJob cfg qdir $ job { qjLivelock = Just llfile }
let runJob job = do
let runJob job = withLock forkLock $ do
(llfile, _) <- Exec.forkJobProcess (qjId job) luxiLivelock
(updateJob job)
return $ job { qjLivelock = Just llfile }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment