Commit 4bd16f46 authored by Petr Pudlak's avatar Petr Pudlak
Browse files

Add parameters to calls to asynchronous workers



Since a worker accumulates several request together, the type of a
parameter must be a monoid so that the requests can be combined into one
value used for running the worker's action.
Signed-off-by: default avatarPetr Pudlak <pudlak@google.com>
Reviewed-by: default avatarKlaus Aehlig <aehlig@google.com>
parent deb9ff16
......@@ -254,8 +254,8 @@ writeLocks fpath lockWait = do
writeLocksAsyncTask :: FilePath -- ^ Path to the lock file
-> IO GanetiLockWaiting -- ^ An action to read the
-- current lock allocation
-> ResultG (AsyncWorker ())
writeLocksAsyncTask fpath lockWaitingAction = mkAsyncWorker $
-> ResultG (AsyncWorker () ())
writeLocksAsyncTask fpath lockWaitingAction = mkAsyncWorker_ $
catchError (do
locks <- liftBase lockWaitingAction
writeLocks fpath locks
......
......@@ -48,9 +48,13 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
module Ganeti.Utils.AsyncWorker
( AsyncWorker
, mkAsyncWorker
, mkAsyncWorker_
, trigger
, trigger_
, triggerAndWait
, triggerAndWait_
, triggerAndWaitMany
, triggerAndWaitMany_
) where
import Control.Monad
......@@ -60,6 +64,7 @@ import Control.Concurrent (ThreadId)
import Control.Concurrent.Lifted (fork)
import Control.Concurrent.MVar.Lifted
import Data.Functor.Identity
import Data.Monoid
import qualified Data.Traversable as T
import Data.IORef.Lifted
......@@ -69,27 +74,32 @@ import Data.IORef.Lifted
-- Note that the action needs to be run even if the list is empty, as it
-- means that there are pending requests, only nobody needs to be notified of
-- their results.
data TriggerState a
data TriggerState i a
= Idle
| Pending [MVar a]
| Pending i [MVar a]
-- | Adds a new trigger to the current state (therefore the result is always
-- 'Pending'), optionally adding a 'MVar' that will receive the output.
addTrigger :: Maybe (MVar a) -> TriggerState a -> TriggerState a
addTrigger mmvar state = let rs = recipients state
in Pending $ maybe rs (: rs) mmvar
addTrigger :: (Monoid i)
=> i -> Maybe (MVar a) -> TriggerState i a -> TriggerState i a
addTrigger i mmvar state = let rs = recipients state
in Pending (input state <> i)
(maybe rs (: rs) mmvar)
where
recipients Idle = []
recipients (Pending rs) = rs
recipients Idle = []
recipients (Pending _ rs) = rs
input Idle = mempty
input (Pending j _) = j
-- | Represent an asynchronous worker whose single action execution returns a
-- value of type @a@.
data AsyncWorker a
= AsyncWorker ThreadId (IORef (TriggerState a)) (MVar ())
data AsyncWorker i a
= AsyncWorker ThreadId (IORef (TriggerState i a)) (MVar ())
-- | Given an action, construct an 'AsyncWorker'.
mkAsyncWorker :: (MonadBaseControl IO m) => m a -> m (AsyncWorker a)
mkAsyncWorker :: (Monoid i, MonadBaseControl IO m)
=> (i -> m a) -> m (AsyncWorker i a)
mkAsyncWorker act = do
trig <- newMVar ()
ref <- newIORef Idle
......@@ -98,28 +108,38 @@ mkAsyncWorker act = do
state <- swap ref Idle -- check the state of pending requests
-- if there are pending requests, run the action and send them results
case state of
Idle -> return () -- all trigers have been processed, we've
-- been woken up by a trigger that has been
-- already included in the last run
Pending rs -> act >>= forM_ rs . flip tryPutMVar
Idle -> return () -- all trigers have been processed, we've
-- been woken up by a trigger that has been
-- already included in the last run
Pending i rs -> act i >>= forM_ rs . flip tryPutMVar
return $ AsyncWorker thId ref trig
where
swap :: (MonadBase IO m) => IORef a -> a -> m a
swap ref x = atomicModifyIORef ref ((,) x)
-- | Given an action, construct an 'AsyncWorker' with no input.
mkAsyncWorker_ :: (MonadBaseControl IO m)
=> m a -> m (AsyncWorker () a)
mkAsyncWorker_ = mkAsyncWorker . const
-- An internal function for triggering a worker, optionally registering
-- a callback 'MVar'
triggerInternal :: (MonadBase IO m)
=> Maybe (MVar a) -> AsyncWorker a -> m ()
triggerInternal mmvar (AsyncWorker _ ref trig) = do
atomicModifyIORef ref (\ts -> (addTrigger mmvar ts, ()))
triggerInternal :: (MonadBase IO m, Monoid i)
=> i -> Maybe (MVar a) -> AsyncWorker i a -> m ()
triggerInternal i mmvar (AsyncWorker _ ref trig) = do
atomicModifyIORef ref (\ts -> (addTrigger i mmvar ts, ()))
_ <- tryPutMVar trig ()
return ()
-- | Trigger a worker, letting it run its action asynchronously, but do not
-- wait for the result.
trigger :: (MonadBase IO m) => AsyncWorker a -> m ()
trigger = triggerInternal Nothing
trigger :: (MonadBase IO m, Monoid i) => i -> AsyncWorker i a -> m ()
trigger = flip triggerInternal Nothing
-- | Trigger a worker with no input, letting it run its action asynchronously,
-- but do not wait for the result.
trigger_ :: (MonadBase IO m) => AsyncWorker () a -> m ()
trigger_ = trigger ()
-- | Trigger a list of workers and wait until all the actions following these
-- triggers finish. Returns the results of the actions.
......@@ -127,16 +147,29 @@ trigger = triggerInternal Nothing
-- Note that there is a significant difference between 'triggerAndWaitMany'
-- and @mapM triggerAndWait@. The latter runs all the actions of the workers
-- sequentially, while the former runs them in parallel.
triggerAndWaitMany :: (T.Traversable t, MonadBase IO m)
=> t (AsyncWorker a) -> m (t a)
triggerAndWaitMany workers =
triggerAndWaitMany :: (T.Traversable t, MonadBase IO m, Monoid i)
=> i -> t (AsyncWorker i a) -> m (t a)
triggerAndWaitMany i workers =
let trig w = do
result <- newEmptyMVar
triggerInternal (Just result) w
triggerInternal i (Just result) w
return result
in T.mapM trig workers >>= T.mapM takeMVar
-- | Trigger a list of workers with no input and wait until all the actions
-- following these triggers finish. Returns the results of the actions.
--
-- See 'triggetAndWaitMany'.
triggerAndWaitMany_ :: (T.Traversable t, MonadBase IO m)
=> t (AsyncWorker () a) -> m (t a)
triggerAndWaitMany_ = triggerAndWaitMany ()
-- | Trigger a worker and wait until the action following this trigger
-- finishes. Return the result of the action.
triggerAndWait :: (MonadBase IO m) => AsyncWorker a -> m a
triggerAndWait = liftM runIdentity . triggerAndWaitMany . Identity
triggerAndWait :: (MonadBase IO m, Monoid i) => i -> AsyncWorker i a -> m a
triggerAndWait i = liftM runIdentity . triggerAndWaitMany i . Identity
-- | Trigger a worker with no input and wait until the action following this
-- trigger finishes. Return the result of the action.
triggerAndWait_ :: (MonadBase IO m) => AsyncWorker () a -> m a
triggerAndWait_ = triggerAndWait ()
......@@ -42,6 +42,7 @@ import Control.Monad.Base
import Control.Monad.Error
import qualified Control.Monad.State.Strict as S
import Control.Monad.Trans.Control
import Data.Monoid
import Ganeti.BasicTypes
import Ganeti.Errors
......@@ -106,25 +107,26 @@ finishOrLog logPrio logPrefix =
genericResult (logAt logPrio . (++) (logPrefix ++ ": ") . show)
-- | Creates a stateless asynchronous task that handles errors in its actions.
mkStatelessAsyncTask :: (MonadBaseControl IO m, MonadLog m, Show e)
mkStatelessAsyncTask :: (MonadBaseControl IO m, MonadLog m, Show e, Monoid i)
=> Priority
-> String
-> ResultT e m ()
-> m (AsyncWorker ())
-> (i -> ResultT e m ())
-> m (AsyncWorker i ())
mkStatelessAsyncTask logPrio logPrefix action =
mkAsyncWorker $ runResultT action >>= finishOrLog logPrio logPrefix return
mkAsyncWorker $ runResultT . action
>=> finishOrLog logPrio logPrefix return
-- | Creates an asynchronous task that handles errors in its actions.
-- If an error occurs, it's logged and the internal state remains unchanged.
mkStatefulAsyncTask :: (MonadBaseControl IO m, MonadLog m, Show e)
mkStatefulAsyncTask :: (MonadBaseControl IO m, MonadLog m, Show e, Monoid i)
=> Priority
-> String
-> s
-> (s -> ResultT e m s)
-> m (AsyncWorker ())
-> (s -> i -> ResultT e m s)
-> m (AsyncWorker i ())
mkStatefulAsyncTask logPrio logPrefix start action =
flip S.evalStateT start . mkAsyncWorker $
S.get >>= lift . runResultT . action
flip S.evalStateT start . mkAsyncWorker $ \i ->
S.get >>= lift . runResultT . flip action i
>>= finishOrLog logPrio logPrefix S.put -- put on success
-- | Construct an asynchronous worker whose action is to save the
......@@ -135,11 +137,11 @@ mkStatefulAsyncTask logPrio logPrefix start action =
saveConfigAsyncTask :: FilePath -- ^ Path to the config file
-> FStat -- ^ The initial state of the config. file
-> IO ConfigState -- ^ An action to read the current config
-> ResultG (AsyncWorker ())
-> ResultG (AsyncWorker () ())
saveConfigAsyncTask fpath fstat cdRef =
lift . mkStatefulAsyncTask
EMERGENCY "Can't write the master configuration file" fstat
$ \oldstat -> do
$ \oldstat _ -> do
cd <- liftBase (csConfigData `liftM` cdRef)
writeConfigToFile cd fpath oldstat
......@@ -156,11 +158,11 @@ execRpcCallAndLog nodes req = do
distMCsAsyncTask :: RuntimeEnts
-> FilePath -- ^ Path to the config file
-> IO ConfigState -- ^ An action to read the current config
-> ResultG (AsyncWorker ())
-> ResultG (AsyncWorker () ())
distMCsAsyncTask ents cpath cdRef =
lift . mkStatelessAsyncTask ERROR "Can't distribute the configuration\
\ to master candidates"
$ do
$ \_ -> do
cd <- liftBase (csConfigData <$> cdRef) :: ResultG ConfigData
logDebug $ "Distributing the configuration to master candidates,\
\ serial no " ++ show (serialOf cd)
......@@ -175,10 +177,10 @@ distMCsAsyncTask ents cpath cdRef =
-- if different, distributes it.
distSSConfAsyncTask
:: IO ConfigState -- ^ An action to read the current config
-> ResultG (AsyncWorker ())
-> ResultG (AsyncWorker () ())
distSSConfAsyncTask cdRef =
lift . mkStatefulAsyncTask ERROR "Can't distribute Ssconf" emptySSConf
$ \oldssc -> do
$ \oldssc _ -> do
cd <- liftBase (csConfigData <$> cdRef) :: ResultG ConfigData
let ssc = mkSSConf cd
if oldssc == ssc
......
......@@ -98,25 +98,25 @@ data DaemonHandle = DaemonHandle
-- all static information that doesn't change during the life-time of the
-- daemon should go here;
-- all IDs of threads that do asynchronous work should probably also go here
, dhSaveConfigWorker :: AsyncWorker ()
, dhDistMCsWorker :: AsyncWorker ()
, dhDistSSConfWorker :: AsyncWorker ()
, dhSaveLocksWorker :: AsyncWorker ()
, dhSaveConfigWorker :: AsyncWorker () ()
, dhDistMCsWorker :: AsyncWorker () ()
, dhDistSSConfWorker :: AsyncWorker () ()
, dhSaveLocksWorker :: AsyncWorker () ()
}
mkDaemonHandle :: FilePath
-> ConfigState
-> GanetiLockWaiting
-> (IO ConfigState -> ResultG (AsyncWorker ()))
-> (IO ConfigState -> ResultG (AsyncWorker () ()))
-- ^ A function that creates a worker that asynchronously
-- saves the configuration to the master file.
-> (IO ConfigState -> ResultG (AsyncWorker ()))
-> (IO ConfigState -> ResultG (AsyncWorker () ()))
-- ^ A function that creates a worker that asynchronously
-- distributes the configuration to master candidates
-> (IO ConfigState -> ResultG (AsyncWorker ()))
-> (IO ConfigState -> ResultG (AsyncWorker () ()))
-- ^ A function that creates a worker that asynchronously
-- distributes SSConf to nodes
-> (IO GanetiLockWaiting -> ResultG (AsyncWorker ()))
-> (IO GanetiLockWaiting -> ResultG (AsyncWorker () ()))
-- ^ A function that creates a worker that asynchronously
-- saves the lock allocation state.
-> ResultG DaemonHandle
......@@ -201,14 +201,14 @@ modifyConfigState f = do
when modified $ do
-- trigger the config. saving worker and wait for it
logDebug "Triggering config write"
liftBase . triggerAndWait . dhSaveConfigWorker $ dh
liftBase . triggerAndWait_ . dhSaveConfigWorker $ dh
logDebug "Config write finished"
-- trigger the config. distribution worker synchronously
-- TODO: figure out what configuration changes need synchronous updates
-- and otherwise use asynchronous triggers
_ <- liftBase . triggerAndWaitMany $ [ dhDistMCsWorker dh
, dhDistSSConfWorker dh
]
_ <- liftBase . triggerAndWaitMany_ $ [ dhDistMCsWorker dh
, dhDistSSConfWorker dh
]
return ()
return r
......@@ -248,7 +248,7 @@ modifyLockWaiting f = do
(dhDaemonState dh) dsLockWaitingL f'
logDebug $ "Current lock status: " ++ J.encode lockAlloc
logDebug "Triggering lock state write"
liftBase . triggerAndWait . dhSaveLocksWorker $ dh
liftBase . triggerAndWait_ . dhSaveLocksWorker $ dh
logDebug "Lock write finished"
unless (S.null nfy) $ do
logDebug . (++) "Locks became available for " . show $ S.toList nfy
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment