Commit 30f011c4 authored by Petr Pudlak's avatar Petr Pudlak
Browse files

Chain the workers for saving and distributing configuration



The worker that saves the configuration now also calls the workers for
its distribution. It accepts an argument that determines, if the
configuration needs to be fully distributed, and the distribution
workers are triggered accordingly.
Signed-off-by: default avatarPetr Pudlak <pudlak@google.com>
Reviewed-by: default avatarKlaus Aehlig <aehlig@google.com>
parent d546d209
......@@ -134,16 +134,27 @@ mkStatefulAsyncTask logPrio logPrefix start action =
-- The worker's action reads the configuration using the given @IO@ action
-- and uses 'FStat' to check if the configuration hasn't been modified by
-- another process.
--
-- If 'Any' of the input requests is true, given additional worker
-- will be executed synchronously after sucessfully writing the configuration
-- file. Otherwise, they'll be just triggered asynchronously.
saveConfigAsyncTask :: FilePath -- ^ Path to the config file
-> FStat -- ^ The initial state of the config. file
-> IO ConfigState -- ^ An action to read the current config
-> ResultG (AsyncWorker () ())
saveConfigAsyncTask fpath fstat cdRef =
-> [AsyncWorker () ()] -- ^ Workers to be triggered
-- afterwards
-> ResultG (AsyncWorker Any ())
saveConfigAsyncTask fpath fstat cdRef workers =
lift . mkStatefulAsyncTask
EMERGENCY "Can't write the master configuration file" fstat
$ \oldstat _ -> do
$ \oldstat (Any flush) -> do
cd <- liftBase (csConfigData `liftM` cdRef)
writeConfigToFile cd fpath oldstat
<* if flush then logDebug "Running distribution synchronously"
>> triggerAndWaitMany_ workers
else logDebug "Running distribution asynchronously"
>> mapM trigger_ workers
-- | Performs a RPC call on the given list of nodes and logs any failures.
-- If any of the calls fails, fail the computation with 'failError'.
......
......@@ -63,8 +63,10 @@ import Control.Monad.Trans.Control
import Data.Functor.Compose (Compose(..))
import Data.Functor.Identity
import Data.IORef.Lifted
import Data.Monoid (Any(..))
import qualified Data.Set as S
import Data.Tuple (swap)
import System.Time (getClockTime)
import qualified Text.JSON as J
import Ganeti.BasicTypes
......@@ -98,16 +100,15 @@ data DaemonHandle = DaemonHandle
-- all static information that doesn't change during the life-time of the
-- daemon should go here;
-- all IDs of threads that do asynchronous work should probably also go here
, dhSaveConfigWorker :: AsyncWorker () ()
, dhDistMCsWorker :: AsyncWorker () ()
, dhDistSSConfWorker :: AsyncWorker () ()
, dhSaveConfigWorker :: AsyncWorker Any ()
, dhSaveLocksWorker :: AsyncWorker () ()
}
mkDaemonHandle :: FilePath
-> ConfigState
-> GanetiLockWaiting
-> (IO ConfigState -> ResultG (AsyncWorker () ()))
-> (IO ConfigState -> [AsyncWorker () ()]
-> ResultG (AsyncWorker Any ()))
-- ^ A function that creates a worker that asynchronously
-- saves the configuration to the master file.
-> (IO ConfigState -> ResultG (AsyncWorker () ()))
......@@ -126,14 +127,14 @@ mkDaemonHandle cpath cstat lstat
ds <- newIORef $ DaemonState cstat lstat emptyTempResState
let readConfigIO = dsConfigState `liftM` readIORef ds :: IO ConfigState
saveWorker <- saveWorkerFn readConfigIO
ssconfWorker <- distSSConfWorkerFn readConfigIO
distMCsWorker <- distMCsWorkerFn readConfigIO
saveWorker <- saveWorkerFn readConfigIO [ distMCsWorker
, ssconfWorker ]
saveLockWorker <- saveLockWorkerFn $ dsLockWaiting `liftM` readIORef ds
return $ DaemonHandle ds cpath saveWorker distMCsWorker ssconfWorker
saveLockWorker
return $ DaemonHandle ds cpath saveWorker saveLockWorker
-- * The monad and its instances
......@@ -195,20 +196,28 @@ readConfigState = liftM dsConfigState . readIORef . dhDaemonState
modifyConfigState :: (ConfigState -> (ConfigState, a)) -> WConfdMonad a
modifyConfigState f = do
dh <- daemonHandle
let modCS cs = let (cs', r) = f cs
in ((r, cs /= cs'), cs')
(r, modified) <- atomicModifyWithLens (dhDaemonState dh) dsConfigStateL modCS
now <- liftIO getClockTime
-- If the configuration is modified, we also bump its serial number.
-- In order to determine if we need to save, we report if it's modified
-- as well as if it needs to be distributed synchronously.
let modCS cs = case f cs of
(cs', r)
| cs /= cs' -> ( (r, True, needsFullDist cs cs')
, over csConfigDataL (bumpSerial now) cs' )
| otherwise -> ((r, False, False), cs')
(r, modified, distSync) <- atomicModifyWithLens (dhDaemonState dh)
dsConfigStateL modCS
when modified $ do
-- trigger the config. saving worker and wait for it
logDebug "Triggering config write"
liftBase . triggerAndWait_ . dhSaveConfigWorker $ dh
logDebug "Config write finished"
-- trigger the config. distribution worker synchronously
-- TODO: figure out what configuration changes need synchronous updates
-- and otherwise use asynchronous triggers
_ <- liftBase . triggerAndWaitMany_ $ [ dhDistMCsWorker dh
, dhDistSSConfWorker dh
]
if distSync
then do
logDebug "Triggering synchronous config write\
\ together with full distribution"
liftBase . triggerAndWait (Any True) . dhSaveConfigWorker $ dh
logDebug "Config write and distribution finished"
else do
-- trigger the config. saving worker and wait for it
logDebug "Triggering config write and distribution"
liftBase . trigger (Any False) . dhSaveConfigWorker $ dh
return ()
return r
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment