Commit 39c1e700 authored by Klaus Aehlig's avatar Klaus Aehlig
Browse files

Use a LockWaiting structure instead of a LockAllocation



In this way, we will be able to support in WConfD waiting for locks
to become available instead of having to poll for them.
Signed-off-by: default avatarKlaus Aehlig <aehlig@google.com>
Reviewed-by: default avatarPetr Pudlak <pudlak@google.com>
parent 58e173a5
......@@ -30,7 +30,7 @@ module Ganeti.Locking.Locks
, lockName
, ClientType(..)
, ClientId(..)
, GanetiLockAllocation
, GanetiLockWaiting
, loadLockAllocation
, writeLocksAsyncTask
, LockLevel(..)
......@@ -49,8 +49,8 @@ import qualified Text.JSON as J
import Ganeti.BasicTypes
import Ganeti.Errors (ResultG, GanetiException)
import Ganeti.JSON (readEitherString, fromJResultE)
import Ganeti.Locking.Allocation
import Ganeti.Locking.Types
import Ganeti.Locking.Waiting
import Ganeti.Logging.Lifted (MonadLog, logDebug, logEmergency)
import Ganeti.Types
import Ganeti.Utils.Atomic
......@@ -230,33 +230,33 @@ instance J.JSON ClientId where
-- | The type of lock Allocations in Ganeti. In Ganeti, the owner of
-- locks are jobs.
type GanetiLockAllocation = LockAllocation GanetiLocks ClientId
type GanetiLockWaiting = LockWaiting GanetiLocks ClientId Integer
-- | Load a lock allocation from disk.
loadLockAllocation :: FilePath -> ResultG GanetiLockAllocation
loadLockAllocation :: FilePath -> ResultG GanetiLockWaiting
loadLockAllocation =
liftIO . readFile
>=> fromJResultE "parsing lock allocation" . J.decodeStrict
>=> fromJResultE "parsing lock waiting structure" . J.decodeStrict
-- | Write lock allocation to disk, overwriting any previously lock
-- allocation stored there.
writeLocks :: (MonadBase IO m, MonadError GanetiException m, MonadLog m)
=> FilePath -> GanetiLockAllocation -> m ()
writeLocks fpath lockAlloc = do
logDebug "Async. lock allocation writer: Starting write"
toErrorBase . liftIO . atomicWriteFile fpath $ J.encode lockAlloc
logDebug "Async. lock allocation writer: written"
=> FilePath -> GanetiLockWaiting -> m ()
writeLocks fpath lockWait = do
logDebug "Async. lock status writer: Starting write"
toErrorBase . liftIO . atomicWriteFile fpath $ J.encode lockWait
logDebug "Async. lock status writer: written"
-- | Construct an asynchronous worker whose action is to save the
-- current state of the lock allocation.
-- The worker's action reads the lock allocation using the given @IO@
-- action. Any inbetween changes to the file are tacitly ignored.
writeLocksAsyncTask :: FilePath -- ^ Path to the lock file
-> IO GanetiLockAllocation -- ^ An action to read the
-- current lock allocation
-> IO GanetiLockWaiting -- ^ An action to read the
-- current lock allocation
-> ResultG (AsyncWorker ())
writeLocksAsyncTask fpath lockAllocAction = mkAsyncWorker $
writeLocksAsyncTask fpath lockWaitingAction = mkAsyncWorker $
catchError (do
locks <- liftBase lockAllocAction
locks <- liftBase lockWaitingAction
writeLocks fpath locks
) (logEmergency . (++) "Can't write lock allocation status: " . show)
......@@ -41,6 +41,7 @@ import qualified Ganeti.JSON as J
import qualified Ganeti.Locking.Allocation as L
import Ganeti.Locking.Locks ( GanetiLocks(ConfigLock), LockLevel(LevelConfig)
, lockLevel, LockLevel, ClientId )
import qualified Ganeti.Locking.Waiting as LW
import Ganeti.Objects (ConfigData)
import Ganeti.WConfd.Language
import Ganeti.WConfd.Monad
......@@ -124,38 +125,34 @@ tryUpdateLocks :: ClientId -> GanetiLockRequest -> WConfdMonad [ClientId]
tryUpdateLocks cid req =
liftM S.toList
. (>>= toErrorStr)
$ modifyLockAllocation (L.updateLocks cid (fromGanetiLockRequest req))
$ modifyLockWaiting (LW.updateLocks cid (fromGanetiLockRequest req))
-- | Free all locks of a given owner (i.e., a job-id lockfile pair).
freeLocks :: ClientId -> WConfdMonad ()
freeLocks cid =
modifyLockAllocation_ (`L.freeLocks` cid)
modifyLockWaiting_ $ LW.releaseResources cid
-- | Free all locks of a given owner (i.e., a job-id lockfile pair)
-- of a given level in the Ganeti sense (e.g., "cluster", "node").
freeLocksLevel :: ClientId -> LockLevel -> WConfdMonad ()
freeLocksLevel cid level =
modifyLockAllocation_ (L.freeLocksPredicate ((==) level . lockLevel)
`flip` cid)
modifyLockWaiting_ $ LW.freeLocksPredicate ((==) level . lockLevel) cid
-- | Downgrade all locks of the given level to shared.
downGradeLocksLevel :: ClientId -> LockLevel -> WConfdMonad ()
downGradeLocksLevel cid level =
modifyLockAllocation_ $ L.downGradePredicate ((==) level . lockLevel) cid
modifyLockWaiting_ $ LW.downGradeLocksPredicate ((==) level . lockLevel) cid
-- | Intersect the possesed locks of an owner with a given set.
intersectLocks :: ClientId -> [GanetiLocks] -> WConfdMonad ()
intersectLocks cid =
modifyLockAllocation_ . L.intersectLocks cid
intersectLocks cid locks = modifyLockWaiting_ $ LW.intersectLocks locks cid
-- | Opportunistically allocate locks for a given owner.
opportunisticLockUnion :: ClientId
-> [(GanetiLocks, L.OwnerState)]
-> WConfdMonad [GanetiLocks]
opportunisticLockUnion cid req =
liftM S.toList
. modifyLockAllocation
$ L.opportunisticLockUnion cid req
modifyLockWaiting $ LW.opportunisticLockUnion cid req
-- * The list of all functions exported to RPC.
......
......@@ -45,6 +45,7 @@ import System.Posix.IO
import Ganeti.BasicTypes
import qualified Ganeti.Constants as C
import qualified Ganeti.Locking.Allocation as L
import qualified Ganeti.Locking.Waiting as LW
import Ganeti.Locking.Locks (ClientId(..))
import Ganeti.Logging.Lifted (logDebug, logInfo)
import Ganeti.WConfd.Monad
......@@ -76,7 +77,7 @@ cleanupLocksTask = forever . runResultT $ do
died <- liftIO (isDead fpath)
when died $ do
logInfo $ show owner ++ " died, releasing locks"
modifyLockAllocation_ (`L.freeLocks` owner)
modifyLockWaiting_ (LW.releaseResources owner)
_ <- liftIO . try $ removeFile fpath
:: WConfdMonad (Either IOError ())
return ()
......
......@@ -44,26 +44,29 @@ module Ganeti.WConfd.Monad
, daemonHandle
, modifyConfigState
, readConfigState
, modifyLockAllocation
, modifyLockAllocation_
, modifyLockWaiting
, modifyLockWaiting_
, readLockAllocation
) where
import Control.Applicative
import Control.Arrow ((&&&))
import Control.Arrow ((&&&), second)
import Control.Monad
import Control.Monad.Base
import Control.Monad.Error
import Control.Monad.Reader
import Control.Monad.Trans.Control
import Data.IORef.Lifted
import qualified Data.Set as S
import Data.Tuple (swap)
import qualified Text.JSON as J
import Ganeti.BasicTypes
import Ganeti.Errors
import Ganeti.Lens
import Ganeti.Locking.Allocation (LockAllocation)
import Ganeti.Locking.Locks
import Ganeti.Locking.Waiting (getAllocation)
import Ganeti.Logging
import Ganeti.Utils.AsyncWorker
import Ganeti.WConfd.ConfigState
......@@ -74,7 +77,7 @@ import Ganeti.WConfd.ConfigState
-- locking state.
data DaemonState = DaemonState
{ dsConfigState :: ConfigState
, dsLockAllocation :: GanetiLockAllocation
, dsLockWaiting :: GanetiLockWaiting
}
$(makeCustomLenses ''DaemonState)
......@@ -93,7 +96,7 @@ data DaemonHandle = DaemonHandle
mkDaemonHandle :: FilePath
-> ConfigState
-> GanetiLockAllocation
-> GanetiLockWaiting
-> (IO ConfigState -> ResultG (AsyncWorker ()))
-- ^ A function that creates a worker that asynchronously
-- saves the configuration to the master file.
......@@ -103,7 +106,7 @@ mkDaemonHandle :: FilePath
-> (IO ConfigState -> ResultG (AsyncWorker ()))
-- ^ A function that creates a worker that asynchronously
-- distributes SSConf to nodes
-> (IO GanetiLockAllocation -> ResultG (AsyncWorker ()))
-> (IO GanetiLockWaiting -> ResultG (AsyncWorker ()))
-- ^ A function that creates a worker that asynchronously
-- saves the lock allocation state.
-> ResultG DaemonHandle
......@@ -117,7 +120,7 @@ mkDaemonHandle cpath cstat lstat
ssconfWorker <- distSSConfWorkerFn readConfigIO
distMCsWorker <- distMCsWorkerFn readConfigIO
saveLockWorker <- saveLockWorkerFn $ dsLockAllocation `liftM` readIORef ds
saveLockWorker <- saveLockWorkerFn $ dsLockWaiting `liftM` readIORef ds
return $ DaemonHandle ds cpath saveWorker distMCsWorker ssconfWorker
saveLockWorker
......@@ -200,27 +203,33 @@ modifyConfigState f = do
return ()
return r
-- | Atomically modifies the lock allocation state in WConfdMonad.
modifyLockAllocation :: (GanetiLockAllocation -> (GanetiLockAllocation, a))
-- | Atomically modifies the lock waiting state in WConfdMonad.
modifyLockWaiting :: (GanetiLockWaiting -> ( GanetiLockWaiting
, (a, S.Set ClientId) ))
-> WConfdMonad a
modifyLockAllocation f = do
modifyLockWaiting f = do
dh <- lift . WConfdMonadInt $ ask
let f' = swap . (fst &&& id) . f
(lockAlloc, r) <- atomicModifyIORef (dhDaemonState dh)
(swap . traverseOf dsLockAllocationL f')
(lockAlloc, (r, nfy)) <- atomicModifyIORef
(dhDaemonState dh)
(swap . traverseOf dsLockWaitingL f')
logDebug $ "Current lock status: " ++ J.encode lockAlloc
logDebug "Triggering lock state write"
liftBase . triggerAndWait . dhSaveLocksWorker $ dh
logDebug "Lock write finished"
unless (S.null nfy) $ do
logDebug . (++) "Locks became available for " . show $ S.toList nfy
logWarning "Process notification not yet implemented"
return r
-- | Atomically modifies the lock allocation state in WConfdMonad, not
-- producing any result
modifyLockAllocation_ :: (GanetiLockAllocation -> GanetiLockAllocation)
modifyLockWaiting_ :: (GanetiLockWaiting -> (GanetiLockWaiting, S.Set ClientId))
-> WConfdMonad ()
modifyLockAllocation_ = modifyLockAllocation . (flip (,) () .)
modifyLockWaiting_ = modifyLockWaiting . ((second $ (,) ()) .)
-- | Read the lock allocation state.
readLockAllocation :: WConfdMonad GanetiLockAllocation
readLockAllocation = liftM dsLockAllocation . readIORef . dhDaemonState
-- | Read the underlying lock allocation.
readLockAllocation :: WConfdMonad (LockAllocation GanetiLocks ClientId)
readLockAllocation = liftM (getAllocation . dsLockWaiting)
. readIORef . dhDaemonState
=<< daemonHandle
......@@ -40,8 +40,8 @@ import System.Directory (doesFileExist)
import Ganeti.BasicTypes
import Ganeti.Daemon
import Ganeti.Logging (logInfo, logDebug)
import Ganeti.Locking.Allocation
import Ganeti.Locking.Locks
import Ganeti.Locking.Waiting
import qualified Ganeti.Path as Path
import Ganeti.THH.RPC
import Ganeti.UDSServer
......@@ -84,7 +84,7 @@ prepMain _ _ = do
(cdata, cstat) <- loadConfigFromFile conf_file
lock <- if lock_file_present
then loadLockAllocation lock_file
else return emptyAllocation
else return emptyWaiting
mkDaemonHandle conf_file
(mkConfigState cdata)
lock
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment