Commit e5fba493 authored by Klaus Aehlig's avatar Klaus Aehlig

Make luxid handle SubmitJob

As luxid is to take over responsibility for the job queue,
handle this request by writing the job to the queue and then
informing masterd; masterd will also distribute the job to
all master candidates.
Signed-off-by: default avatarKlaus Aehlig <aehlig@google.com>
Reviewed-by: default avatarMichele Tartara <mtartara@google.com>
parent 1b94c0db
......@@ -34,7 +34,7 @@ module Ganeti.Query.Server
import Control.Applicative
import Control.Concurrent
import Control.Exception
import Control.Monad (forever)
import Control.Monad (forever, when)
import Data.Bits (bitSize)
import qualified Data.Set as Set (toList)
import Data.IORef
......@@ -52,13 +52,16 @@ import Ganeti.Objects
import qualified Ganeti.Config as Config
import Ganeti.ConfigReader
import Ganeti.BasicTypes
import Ganeti.JQueue
import Ganeti.Logging
import Ganeti.Luxi
import qualified Ganeti.Query.Language as Qlang
import qualified Ganeti.Query.Cluster as QCluster
import Ganeti.Path (queueDir, jobQueueLockFile, defaultLuxiSocket)
import Ganeti.Query.Query
import Ganeti.Query.Filter (makeSimpleFilter)
import Ganeti.Types
import Ganeti.Utils (lockFile, exitIfBad)
-- | Helper for classic queries.
handleClassicQuery :: ConfigData -- ^ Cluster config
......@@ -76,16 +79,17 @@ handleClassicQuery cfg qkind names fields _ = do
return $ showJSON <$> (qr >>= queryCompat)
-- | Minimal wrapper to handle the missing config case.
handleCallWrapper :: Result ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
handleCallWrapper (Bad msg) _ =
handleCallWrapper :: MVar () -> Result ConfigData
-> LuxiOp -> IO (ErrorResult JSValue)
handleCallWrapper _ (Bad msg) _ =
return . Bad . ConfigurationError $
"I do not have access to a valid configuration, cannot\
\ process queries: " ++ msg
handleCallWrapper (Ok config) op = handleCall config op
handleCallWrapper qlock (Ok config) op = handleCall qlock config op
-- | Actual luxi operation handler.
handleCall :: ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
handleCall cdata QueryClusterInfo =
handleCall :: MVar () -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
handleCall _ cdata QueryClusterInfo =
let cluster = configCluster cdata
master = QCluster.clusterMasterNodeName cdata
hypervisors = clusterEnabledHypervisors cluster
......@@ -154,7 +158,7 @@ handleCall cdata QueryClusterInfo =
Ok _ -> return . Ok . J.makeObj $ obj
Bad ex -> return $ Bad ex
handleCall cfg (QueryTags kind name) = do
handleCall _ cfg (QueryTags kind name) = do
let tags = case kind of
TagKindCluster -> Ok . clusterTags $ configCluster cfg
TagKindGroup -> groupTags <$> Config.getGroup cfg name
......@@ -165,41 +169,69 @@ handleCall cfg (QueryTags kind name) = do
ECodeInval
return (J.showJSON <$> tags)
handleCall cfg (Query qkind qfields qfilter) = do
handleCall _ cfg (Query qkind qfields qfilter) = do
result <- query cfg True (Qlang.Query qkind qfields qfilter)
return $ J.showJSON <$> result
handleCall _ (QueryFields qkind qfields) = do
handleCall _ _ (QueryFields qkind qfields) = do
let result = queryFields (Qlang.QueryFields qkind qfields)
return $ J.showJSON <$> result
handleCall cfg (QueryNodes names fields lock) =
handleCall _ cfg (QueryNodes names fields lock) =
handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
(map Left names) fields lock
handleCall cfg (QueryGroups names fields lock) =
handleCall _ cfg (QueryGroups names fields lock) =
handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
(map Left names) fields lock
handleCall cfg (QueryJobs names fields) =
handleCall _ cfg (QueryJobs names fields) =
handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
(map (Right . fromIntegral . fromJobId) names) fields False
handleCall cfg (QueryNetworks names fields lock) =
handleCall _ cfg (QueryNetworks names fields lock) =
handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
(map Left names) fields lock
handleCall _ op =
handleCall qlock cfg (SubmitJobToDrainedQueue ops) =
do
jobid <- allocateJobId (Config.getMasterCandidates cfg) qlock
case jobid of
Bad s -> return . Bad . GenericError $ s
Ok jid -> do
qDir <- queueDir
job <- queuedJobFromOpCodes jid ops
write_result <- writeJobToDisk qDir job
case write_result of
Bad s -> return . Bad . GenericError $ s
Ok () -> do
socketpath <- defaultLuxiSocket
client <- getClient socketpath
pickupResult <- callMethod (PickupJob jid) client
closeClient client
case pickupResult of
Ok _ -> return ()
Bad e -> logWarning $ "Failded to notify masterd: " ++ show e
return . Ok . showJSON . fromJobId $ jid
handleCall qlock cfg (SubmitJob ops) =
do
open <- isQueueOpen
if not open
then return . Bad . GenericError $ "Queue drained"
else handleCall qlock cfg (SubmitJobToDrainedQueue ops)
handleCall _ _ op =
return . Bad $
GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
-- | Given a decoded luxi request, executes it and sends the luxi
-- response back to the client.
handleClientMsg :: Client -> ConfigReader -> LuxiOp -> IO Bool
handleClientMsg client creader args = do
handleClientMsg :: MVar () -> Client -> ConfigReader -> LuxiOp -> IO Bool
handleClientMsg qlock client creader args = do
cfg <- creader
logDebug $ "Request: " ++ show args
call_result <- handleCallWrapper cfg args
call_result <- handleCallWrapper qlock cfg args
(!status, !rval) <-
case call_result of
Bad err -> do
......@@ -216,8 +248,8 @@ handleClientMsg client creader args = do
-- | Handles one iteration of the client protocol: receives message,
-- checks it for validity and decodes it, returns response.
handleClient :: Client -> ConfigReader -> IO Bool
handleClient client creader = do
handleClient :: MVar () -> Client -> ConfigReader -> IO Bool
handleClient qlock client creader = do
!msg <- recvMsgExt client
logDebug $ "Received message: " ++ show msg
case msg of
......@@ -231,23 +263,23 @@ handleClient client creader = do
logWarning errmsg
sendMsg client $ buildResponse False (showJSON errmsg)
return False
Ok args -> handleClientMsg client creader args
Ok args -> handleClientMsg qlock client creader args
-- | Main client loop: runs one loop of 'handleClient', and if that
-- doesn't report a finished (closed) connection, restarts itself.
clientLoop :: Client -> ConfigReader -> IO ()
clientLoop client creader = do
result <- handleClient client creader
clientLoop :: MVar () -> Client -> ConfigReader -> IO ()
clientLoop qlock client creader = do
result <- handleClient qlock client creader
if result
then clientLoop client creader
then clientLoop qlock client creader
else closeClient client
-- | Main listener loop: accepts clients, forks an I/O thread to handle
-- that client.
listener :: ConfigReader -> S.Socket -> IO ()
listener creader socket = do
listener :: MVar () -> ConfigReader -> S.Socket -> IO ()
listener qlock creader socket = do
client <- acceptClient socket
_ <- forkIO $ clientLoop client creader
_ <- forkIO $ clientLoop qlock client creader
return ()
-- | Type alias for prepMain results
......@@ -272,7 +304,11 @@ main :: MainFn () PrepResult
main _ _ (socket_path, server, cref) = do
initConfigReader id cref
let creader = readIORef cref
qlockFile <- jobQueueLockFile
lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
qlock <- newMVar ()
finally
(forever $ listener creader server)
(forever $ listener qlock creader server)
(closeServer socket_path server)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment