Commit 1b75c626 authored by Petr Pudlak's avatar Petr Pudlak

Abstract WConfd resources that need to be saved/loaded

Currently, the part that loads/saves locks is generic, with no lock
specifics. Abstract it so that it can be reused for temporary
reservations.
Signed-off-by: default avatarPetr Pudlak <pudlak@google.com>
Reviewed-by: default avatarKlaus Aehlig <aehlig@google.com>
parent 4bb0eebd
......@@ -881,6 +881,7 @@ HS_LIB_SRCS = \
src/Ganeti/WConfd/DeathDetection.hs \
src/Ganeti/WConfd/Language.hs \
src/Ganeti/WConfd/Monad.hs \
src/Ganeti/WConfd/Persistent.hs \
src/Ganeti/WConfd/Server.hs \
src/Ganeti/WConfd/Ssconf.hs \
src/Ganeti/WConfd/TempRes.hs
......
......@@ -31,30 +31,20 @@ module Ganeti.Locking.Locks
, ClientType(..)
, ClientId(..)
, GanetiLockWaiting
, loadLockAllocation
, writeLocksAsyncTask
, LockLevel(..)
, lockLevel
) where
import Control.Applicative ((<$>), (<*>), pure)
import Control.Monad ((>=>), liftM)
import Control.Monad.Base (MonadBase, liftBase)
import Control.Monad.Error (MonadError, catchError)
import Data.List (stripPrefix)
import System.Posix.Types (ProcessID)
import qualified Text.JSON as J
import Ganeti.BasicTypes
import Ganeti.Errors (ResultG, GanetiException)
import Ganeti.JSON (readEitherString, fromJResultE)
import Ganeti.JSON (readEitherString)
import Ganeti.Locking.Types
import Ganeti.Locking.Waiting
import Ganeti.Logging.Lifted (MonadLog, logDebug, logEmergency)
import Ganeti.Types
import Ganeti.Utils.Atomic
import Ganeti.Utils.AsyncWorker
-- | The type of Locks available in Ganeti. The order of this type
-- is the lock oder.
......@@ -231,32 +221,3 @@ instance J.JSON ClientId where
-- | The type of lock Allocations in Ganeti. In Ganeti, the owner of
-- locks are jobs.
type GanetiLockWaiting = LockWaiting GanetiLocks ClientId Integer
-- | Load a lock allocation from disk.
loadLockAllocation :: FilePath -> ResultG GanetiLockWaiting
loadLockAllocation =
liftIO . readFile
>=> fromJResultE "parsing lock waiting structure" . J.decodeStrict
-- | Write lock allocation to disk, overwriting any previous lock
-- allocation stored there.
writeLocks :: (MonadBase IO m, MonadError GanetiException m, MonadLog m)
=> FilePath -> GanetiLockWaiting -> m ()
writeLocks fpath lockWait = do
logDebug "Async. lock status writer: Starting write"
toErrorBase . liftIO . atomicWriteFile fpath $ J.encode lockWait
logDebug "Async. lock status writer: written"
-- | Construct an asynchronous worker whose action is to save the
-- current state of the lock allocation.
-- The worker's action reads the lock allocation using the given @IO@
-- action. Any inbetween changes to the file are tacitly ignored.
writeLocksAsyncTask :: FilePath -- ^ Path to the lock file
-> IO GanetiLockWaiting -- ^ An action to read the
-- current lock allocation
-> ResultG (AsyncWorker () ())
writeLocksAsyncTask fpath lockWaitingAction = mkAsyncWorker_ $
catchError (do
locks <- liftBase lockWaitingAction
writeLocks fpath locks
) (logEmergency . (++) "Can't write lock allocation status: " . show)
......@@ -42,11 +42,11 @@ import System.Directory (removeFile)
import Ganeti.BasicTypes
import qualified Ganeti.Constants as C
import qualified Ganeti.Locking.Allocation as L
import qualified Ganeti.Locking.Waiting as LW
import Ganeti.Locking.Locks (ClientId(..))
import Ganeti.Logging.Lifted (logDebug, logInfo)
import Ganeti.Utils.Livelock
import Ganeti.WConfd.Monad
import Ganeti.WConfd.Persistent
-- | Interval to run clean-up tasks in microseconds
cleanupInterval :: Int
......@@ -63,7 +63,7 @@ cleanupLocksTask = forever . runResultT $ do
died <- liftIO (isDead fpath)
when died $ do
logInfo $ show owner ++ " died, releasing locks"
modifyLockWaiting_ (LW.releaseResources owner)
persCleanup persistentLocks owner
_ <- liftIO . E.try $ removeFile fpath
:: WConfdMonad (Either IOError ())
return ()
......
{-# LANGUAGE MultiParamTypeClasses, TypeFamilies #-}
{-| Common types and functions for persistent resources
In particular:
- locks
- temporary reservations
-}
{-
Copyright (C) 2014 Google Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
-}
module Ganeti.WConfd.Persistent
( Persistent(..)
, writePersistentAsyncTask
, readPersistent
, persistentLocks
) where
import Control.Monad.Error
import System.Directory (doesFileExist)
import qualified Text.JSON as J
import Ganeti.BasicTypes
import Ganeti.Errors
import qualified Ganeti.JSON as J
import Ganeti.Locking.Waiting (emptyWaiting)
import Ganeti.Locking.Locks (ClientId(..), GanetiLockWaiting)
import Ganeti.Logging
import qualified Ganeti.Path as Path
import Ganeti.WConfd.Core (freeLocks)
import Ganeti.WConfd.Monad
import Ganeti.Utils.Atomic
import Ganeti.Utils.AsyncWorker
-- * Common definitions
-- ** The data type that collects all required operations
-- | A collection of operations needed for persisting a resource.
data Persistent a = Persistent
{ persName :: String
, persPath :: IO FilePath
, persEmpty :: a
, persCleanup :: ClientId -> WConfdMonad ()
-- ^ The clean-up action needs to be a full 'WConfdMonad' action as it
-- might need to do some complex processing, such as notifying
-- clients that some locks are available.
}
-- ** Common functions
-- | Construct an asynchronous worker whose action is to save the
-- current state of the persistent state.
-- The worker's action reads the state using the given @IO@
-- action. Any inbetween changes to the file are tacitly ignored.
writePersistentAsyncTask
:: (J.JSON a) => Persistent a -> IO a -> ResultG (AsyncWorker () ())
writePersistentAsyncTask pers readAction = mkAsyncWorker_ $
catchError (do
let prefix = "Async. " ++ persName pers ++ " writer: "
fpath <- liftIO $ persPath pers
logDebug $ prefix ++ "Starting write to " ++ fpath
state <- liftIO readAction
toErrorBase . liftIO . atomicWriteFile fpath . J.encode $ state
logDebug $ prefix ++ "written"
) (logEmergency . (++) ("Can't write " ++ persName pers ++ " state: ")
. show)
-- | Load a persistent data structure from disk.
readPersistent :: (J.JSON a) => Persistent a -> ResultG a
readPersistent pers = do
logDebug $ "Reading " ++ persName pers
file <- liftIO $ persPath pers
file_present <- liftIO $ doesFileExist file
if file_present
then
liftIO (persPath pers >>= readFile)
>>= J.fromJResultE ("parsing " ++ persName pers) . J.decodeStrict
else do
logInfo $ "Note: No saved data for " ++ persName pers
++ ", tacitly assuming empty."
return (persEmpty pers)
-- * Implementations
-- ** Locks
persistentLocks :: Persistent GanetiLockWaiting
persistentLocks = Persistent
{ persName = "lock allocation state"
, persPath = Path.lockStatusFile
, persEmpty = emptyWaiting
, persCleanup = freeLocks
}
......@@ -35,14 +35,11 @@ import Control.Concurrent (forkIO)
import Control.Exception
import Control.Monad
import Control.Monad.Error
import System.Directory (doesFileExist)
import Ganeti.BasicTypes
import Ganeti.Daemon
import Ganeti.Daemon.Utils (handleMasterVerificationOptions)
import Ganeti.Logging (logInfo, logDebug)
import Ganeti.Locking.Locks
import Ganeti.Locking.Waiting
import Ganeti.Logging (logDebug)
import qualified Ganeti.Path as Path
import Ganeti.THH.RPC
import Ganeti.UDSServer
......@@ -50,11 +47,12 @@ import Ganeti.UDSServer
import Ganeti.Errors (formatError)
import Ganeti.Runtime
import Ganeti.WConfd.ConfigState
import Ganeti.WConfd.ConfigVerify
import Ganeti.WConfd.ConfigWriter
import Ganeti.WConfd.Core
import Ganeti.WConfd.DeathDetection (cleanupLocksTask)
import Ganeti.WConfd.Monad
import Ganeti.WConfd.ConfigVerify
import Ganeti.WConfd.Persistent
handler :: DaemonHandle -> RpcServer WConfdMonadInt
handler ch = $( mkRpcM exportedFunctions )
......@@ -78,26 +76,20 @@ prepMain _ _ = do
-- TODO: Lock the configuration file so that running the daemon twice fails?
conf_file <- Path.clusterConfFile
lock_file <- Path.lockStatusFile
lock_file_present <- doesFileExist lock_file
unless lock_file_present
$ logInfo "No saved lock status; assuming all locks free"
dh <- toErrorBase
. withErrorT (strMsg . ("Initialization of the daemon failed" ++)
. formatError) $ do
ents <- getEnts
(cdata, cstat) <- loadConfigFromFile conf_file
verifyConfigErr cdata
lock <- if lock_file_present
then loadLockAllocation lock_file
else return emptyWaiting
lock <- readPersistent persistentLocks
mkDaemonHandle conf_file
(mkConfigState cdata)
lock
(saveConfigAsyncTask conf_file cstat)
(distMCsAsyncTask ents conf_file)
distSSConfAsyncTask
(writeLocksAsyncTask lock_file)
(writePersistentAsyncTask persistentLocks)
return (s, dh)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment