Commit b0b4f975 authored by Petr Pudlak's avatar Petr Pudlak
Browse files

Use the new MVarLock in the job queue and the query server



A small refactoring was done in handling ArchiveJob so that it was
possible to use 'withLock'.
Signed-off-by: default avatarPetr Pudlak <pudlak@google.com>
Reviewed-by: default avatarKlaus Aehlig <aehlig@google.com>
parent e703a8e9
......@@ -74,7 +74,6 @@ module Ganeti.JQueue
import Control.Applicative (liftA2, (<|>))
import Control.Arrow (first, second)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar
import Control.Exception
import Control.Lens (over)
import Control.Monad
......@@ -116,6 +115,7 @@ import Ganeti.Types
import Ganeti.Utils
import Ganeti.Utils.Atomic
import Ganeti.Utils.Livelock (Livelock, isDead)
import Ganeti.Utils.MVarLock
import Ganeti.VCluster (makeVirtualPath)
-- * Data types
......@@ -483,18 +483,15 @@ readSerialFromDisk = do
-- | Allocate new job ids.
-- To avoid races while accessing the serial file, the threads synchronize
-- over a lock, as usual provided by an MVar.
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
-- over a lock, as usual provided by a Lock.
allocateJobIds :: [Node] -> Lock -> Int -> IO (Result [JobId])
allocateJobIds mastercandidates lock n =
if n <= 0
then return . Bad $ "Can only allocate positive number of job ids"
else do
takeMVar lock
else withLock lock $ do
rjobid <- readSerialFromDisk
case rjobid of
Bad s -> do
putMVar lock ()
return . Bad $ s
Bad s -> return . Bad $ s
Ok jid -> do
let current = fromJobId jid
serial_content = show (current + n) ++ "\n"
......@@ -503,7 +500,6 @@ allocateJobIds mastercandidates lock n =
:: IO (Either IOError ())
case write_result of
Left e -> do
putMVar lock ()
let msg = "Failed to write serial file: " ++ show e
logError msg
return . Bad $ msg
......@@ -511,11 +507,10 @@ allocateJobIds mastercandidates lock n =
serial' <- makeVirtualPath serial
_ <- executeRpcCall mastercandidates
$ RpcCallJobqueueUpdate serial' serial_content
putMVar lock ()
return $ mapM makeJobId [(current+1)..(current+n)]
-- | Allocate one new job id.
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
allocateJobId :: [Node] -> Lock -> IO (Result JobId)
allocateJobId mastercandidates lock = do
jids <- allocateJobIds mastercandidates lock 1
return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
......
......@@ -32,8 +32,10 @@ module Ganeti.Query.Server
import Control.Applicative
import Control.Concurrent
import Control.Exception
import Control.Monad (forever, when, zipWithM, liftM, void)
import Control.Monad (forever, when, mzero, guard, zipWithM, liftM, void)
import Control.Monad.IO.Class
import Control.Monad.Trans (lift)
import Control.Monad.Trans.Maybe
import Data.Bits (bitSize)
import qualified Data.Set as Set (toList)
import Data.IORef
......@@ -69,6 +71,7 @@ import Ganeti.Types
import qualified Ganeti.UDSServer as U (Handler(..), listener)
import Ganeti.Utils ( lockFile, exitIfBad, exitUnless, watchFile
, safeRenameFile )
import Ganeti.Utils.MVarLock
import qualified Ganeti.Version as Version
-- | Helper for classic queries.
......@@ -88,7 +91,7 @@ handleClassicQuery cfg qkind names fields _ = do
return $ showJSON <$> (qr >>= queryCompat)
-- | Minimal wrapper to handle the missing config case.
handleCallWrapper :: MVar () -> JQStatus -> Result ConfigData
handleCallWrapper :: Lock -> JQStatus -> Result ConfigData
-> LuxiOp -> IO (ErrorResult JSValue)
handleCallWrapper _ _ (Bad msg) _ =
return . Bad . ConfigurationError $
......@@ -97,7 +100,7 @@ handleCallWrapper _ _ (Bad msg) _ =
handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
-- | Actual luxi operation handler.
handleCall :: MVar () -> JQStatus
handleCall :: Lock -> JQStatus
-> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
handleCall _ _ cdata QueryClusterInfo =
let cluster = configCluster cdata
......@@ -347,31 +350,24 @@ handleCall _ qstat cfg (CancelJob jid) = do
fmap showJSON <$> cancelJob (jqLivelock qstat) jid
Bad s -> return . Ok . showJSON $ (False, s)
handleCall qlock _ cfg (ArchiveJob jid) = do
let archiveFailed = putMVar qlock () >> (return . Ok $ showJSON False)
:: IO (ErrorResult JSValue)
qDir <- queueDir
takeMVar qlock
result <- loadJobFromDisk qDir False jid
case result of
Bad _ -> archiveFailed
Ok (job, _) -> if jobFinalized job
then do
let mcs = Config.getMasterCandidates cfg
live = liveJobFile qDir jid
archive = archivedJobFile qDir jid
renameResult <- safeRenameFile queueDirPermissions
live archive
putMVar qlock ()
case renameResult of
Bad s -> return . Bad . JobQueueError
$ "Archiving failed in an unexpected way: "
++ s
Ok () -> do
_ <- executeRpcCall mcs
$ RpcCallJobqueueRename [(live, archive)]
return . Ok $ showJSON True
else archiveFailed
handleCall qlock _ cfg (ArchiveJob jid) =
-- By adding a layer of MaybeT, we can prematurely end a computation
-- using 'mzero' or other 'MonadPlus' primitive and return 'Ok False'.
runResultT . liftM (showJSON . fromMaybe False) . runMaybeT $ do
qDir <- liftIO queueDir
let mcs = Config.getMasterCandidates cfg
live = liveJobFile qDir jid
archive = archivedJobFile qDir jid
withLock qlock $ do
(job, _) <- (lift . mkResultT $ loadJobFromDisk qDir False jid)
`orElse` mzero
guard $ jobFinalized job
lift . withErrorT JobQueueError
. annotateError "Archiving failed in an unexpected way"
. mkResultT $ safeRenameFile queueDirPermissions live archive
_ <- liftIO . executeRpcCall mcs
$ RpcCallJobqueueRename [(live, archive)]
return True
handleCall qlock _ cfg (AutoArchiveJobs age timeout) = do
qDir <- queueDir
......@@ -379,7 +375,7 @@ handleCall qlock _ cfg (AutoArchiveJobs age timeout) = do
case resultJids of
Bad s -> return . Bad . JobQueueError $ show s
Ok jids -> do
result <- bracket_ (takeMVar qlock) (putMVar qlock ())
result <- withLock qlock
. archiveJobs cfg age timeout
$ sortJobIDs jids
return . Ok $ showJSON result
......@@ -413,7 +409,7 @@ computeJobUpdate cfg jid fields prev_log = do
return (JSArray rfields, rlogs)
type LuxiConfig = (MVar (), JQStatus, ConfigReader)
type LuxiConfig = (Lock, JQStatus, ConfigReader)
luxiExec
:: LuxiConfig
......@@ -476,7 +472,7 @@ main _ _ (server, cref, jq) = do
qlockFile <- jobQueueLockFile
_ <- lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
qlock <- newMVar ()
qlock <- newLock
_ <- P.installHandler P.sigCHLD P.Ignore Nothing
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment