Commit a7e484c4 authored by Iustin Pop's avatar Iustin Pop

Add support for job queries in hconfd

This adds support for job queries, including (basic) unit-tests.

I've tested this for memory and cpu usage as follows:

- 3600 jobs (live queue):
  - via masterd, default: ~1.1s (masterd: ~60MB ram)
  - via confd,   default: ~1.1s (hconfd:  ~25MB ram)
  - via masterd, id only: ~1.0s (masterd: ~57MB ram)
  - via confd,   id only: ~0.2s (hconfd:  ~15MB ram)

- all jobs (128K in total, around 570MB size on disk):
  - via masterd, default: 1m22s (masterd cpu: 48s), masterd: 1.4G ram
  - via confd,   default: 1m16s (hconfd  cpu: 51s), hconfd:  570MB ram peak
    (peak only right before starting luxi send, hconfd decreases in RSS as
    results are streamed over luxi, back to ~18MB after the send)
  - via masterd, id only:  ~49s (masterd cpu: 45s), masterd: 1.3G ram
  - via confd,   id only:  ~39s (hconfd cpu: 35s),  hconfd:  110MB ram peak
    (right before luxi send, decreasing as results are sent, back to ~14MB
    after the send)

Given this, and that in production it's not likely to have hundreds of
thousand of job files, I believe the implementation is safe from this
point of view.
Signed-off-by: default avatarIustin Pop <iustin@google.com>
Reviewed-by: default avatarHelga Velroyen <helgav@google.com>
parent 037762a9
......@@ -496,6 +496,7 @@ HS_LIB_SRCS = \
htools/Ganeti/Query/Common.hs \
htools/Ganeti/Query/Filter.hs \
htools/Ganeti/Query/Group.hs \
htools/Ganeti/Query/Job.hs \
htools/Ganeti/Query/Language.hs \
htools/Ganeti/Query/Node.hs \
htools/Ganeti/Query/Query.hs \
......
......@@ -44,10 +44,12 @@ import Test.Ganeti.Objects (genEmptyCluster)
import Ganeti.BasicTypes
import Ganeti.Errors
import Ganeti.Query.Filter
import Ganeti.Query.Group
import Ganeti.Query.Language
import Ganeti.Query.Node
import Ganeti.Query.Query
import qualified Ganeti.Query.Job as Job
{-# ANN module "HLint: ignore Use camelCase" #-}
......@@ -59,6 +61,8 @@ hasUnknownFields = (QFTUnknown `notElem`) . map fdefKind
-- * Test cases
-- ** Node queries
-- | Tests that querying any existing fields, via either query or
-- queryFields, will not return unknown fields.
prop_queryNode_noUnknown :: Property
......@@ -159,7 +163,7 @@ case_queryNode_allfields = do
(sortBy field_sort . map (\(f, _, _) -> f) $ Map.elems nodeFieldsMap)
(sortBy field_sort fdefs)
-- * Same as above, but for group
-- ** Group queries
prop_queryGroup_noUnknown :: Property
prop_queryGroup_noUnknown =
......@@ -231,6 +235,61 @@ case_queryGroup_allfields = do
(sortBy field_sort . map (\(f, _, _) -> f) $ Map.elems groupFieldsMap)
(sortBy field_sort fdefs)
-- ** Job queries
-- | Tests that querying any existing fields, via either query or
-- queryFields, will not return unknown fields. This uses 'undefined'
-- for config, as job queries shouldn't use the configuration, and an
-- explicit filter as otherwise non-live queries wouldn't return any
-- result rows.
prop_queryJob_noUnknown :: Property
prop_queryJob_noUnknown =
forAll (listOf (arbitrary::Gen (Positive Integer))) $ \ids ->
forAll (elements (Map.keys Job.fieldsMap)) $ \field -> monadicIO $ do
let qtype = ItemTypeLuxi QRJob
flt = makeSimpleFilter (nameField qtype) $
map (\(Positive i) -> Right i) ids
QueryResult fdefs fdata <-
run (query undefined False (Query qtype [field] flt)) >>= resultProp
QueryFieldsResult fdefs' <-
resultProp $ queryFields (QueryFields qtype [field])
stop $ conjoin
[ printTestCase ("Got unknown fields via query (" ++
show fdefs ++ ")") (hasUnknownFields fdefs)
, printTestCase ("Got unknown result status via query (" ++
show fdata ++ ")")
(all (all ((/= RSUnknown) . rentryStatus)) fdata)
, printTestCase ("Got unknown fields via query fields (" ++
show fdefs'++ ")") (hasUnknownFields fdefs')
]
-- | Tests that an unknown field is returned as such.
prop_queryJob_Unknown :: Property
prop_queryJob_Unknown =
forAll (listOf (arbitrary::Gen (Positive Integer))) $ \ids ->
forAll (arbitrary `suchThat` (`notElem` Map.keys Job.fieldsMap))
$ \field -> monadicIO $ do
let qtype = ItemTypeLuxi QRJob
flt = makeSimpleFilter (nameField qtype) $
map (\(Positive i) -> Right i) ids
QueryResult fdefs fdata <-
run (query undefined False (Query qtype [field] flt)) >>= resultProp
QueryFieldsResult fdefs' <-
resultProp $ queryFields (QueryFields qtype [field])
stop $ conjoin
[ printTestCase ("Got known fields via query (" ++ show fdefs ++ ")")
(not $ hasUnknownFields fdefs)
, printTestCase ("Got /= ResultUnknown result status via query (" ++
show fdata ++ ")")
(all (all ((== RSUnknown) . rentryStatus)) fdata)
, printTestCase ("Got a Just in a result value (" ++
show fdata ++ ")")
(all (all (isNothing . rentryValue)) fdata)
, printTestCase ("Got known fields via query fields (" ++ show fdefs'
++ ")") (not $ hasUnknownFields fdefs')
]
-- ** Misc other tests
-- | Tests that requested names checking behaves as expected.
prop_getRequestedNames :: Property
......@@ -258,5 +317,7 @@ testSuite "Query/Query"
, 'prop_queryGroup_Unknown
, 'prop_queryGroup_types
, 'case_queryGroup_allfields
, 'prop_queryJob_noUnknown
, 'prop_queryJob_Unknown
, 'prop_getRequestedNames
]
{-| Implementation of the Ganeti Query2 job queries.
-}
{-
Copyright (C) 2012 Google Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
-}
module Ganeti.Query.Job
( RuntimeData
, fieldsMap
, loadRuntimeData
, wantArchived
) where
import qualified Data.Map as Map
import qualified Text.JSON as J
import Ganeti.BasicTypes
import qualified Ganeti.Constants as C
import Ganeti.JQueue
import Ganeti.OpCodes (opSummary, metaOpCode)
import Ganeti.Path
import Ganeti.Query.Common
import Ganeti.Query.Language
import Ganeti.Query.Types
import Ganeti.Types
-- | The runtime data for a job.
type RuntimeData = Result (QueuedJob, Bool)
-- | Job priority explanation.
jobPrioDoc :: String
jobPrioDoc = "Current job priority (" ++ show C.opPrioLowest ++ " to " ++
show C.opPrioHighest ++ ")"
-- | Timestamp doc.
tsDoc :: String -> String
tsDoc = (++ " (tuple containing seconds and microseconds)")
-- | Wrapper for unavailable job.
maybeJob :: (J.JSON a) =>
(QueuedJob -> a) -> RuntimeData -> JobId -> ResultEntry
maybeJob _ (Bad _) _ = rsUnavail
maybeJob f (Ok (v, _)) _ = rsNormal $ f v
-- | Simple helper for a job getter.
jobGetter :: (J.JSON a) => (QueuedJob -> a) -> FieldGetter JobId RuntimeData
jobGetter = FieldRuntime . maybeJob
-- | Simple helper for a per-opcode getter.
opsGetter :: (J.JSON a) => (QueuedOpCode -> a) -> FieldGetter JobId RuntimeData
opsGetter f = FieldRuntime $ maybeJob (map f . qjOps)
-- | Archived field name.
archivedField :: String
archivedField = "archived"
-- | Check whether we should look at archived jobs as well.
wantArchived :: [FilterField] -> Bool
wantArchived = (archivedField `elem`)
-- | List of all node fields. FIXME: QFF_JOB_ID on the id field.
jobFields :: FieldList JobId RuntimeData
jobFields =
[ (FieldDefinition "id" "ID" QFTNumber "Job ID", FieldSimple rsNormal,
QffNormal)
, (FieldDefinition "status" "Status" QFTText "Job status",
jobGetter calcJobStatus, QffNormal)
, (FieldDefinition "priority" "Priority" QFTNumber jobPrioDoc,
jobGetter calcJobPriority, QffNormal)
, (FieldDefinition archivedField "Archived" QFTBool
"Whether job is archived",
FieldRuntime (\jinfo _ -> case jinfo of
Ok (_, archive) -> rsNormal archive
_ -> rsUnavail), QffNormal)
, (FieldDefinition "ops" "OpCodes" QFTOther "List of all opcodes",
opsGetter qoInput, QffNormal)
, (FieldDefinition "opresult" "OpCode_result" QFTOther
"List of opcodes results", opsGetter qoResult, QffNormal)
, (FieldDefinition "opstatus" "OpCode_status" QFTOther
"List of opcodes status", opsGetter qoStatus, QffNormal)
, (FieldDefinition "oplog" "OpCode_log" QFTOther
"List of opcode output logs", opsGetter qoLog, QffNormal)
, (FieldDefinition "opstart" "OpCode_start" QFTOther
"List of opcode start timestamps (before acquiring locks)",
opsGetter qoStartTimestamp, QffNormal)
, (FieldDefinition "opexec" "OpCode_exec" QFTOther
"List of opcode execution start timestamps (after acquiring locks)",
opsGetter qoExecTimestamp, QffNormal)
, (FieldDefinition "opend" "OpCode_end" QFTOther
"List of opcode execution end timestamps",
opsGetter qoEndTimestamp, QffNormal)
, (FieldDefinition "oppriority" "OpCode_prio" QFTOther
"List of opcode priorities", opsGetter qoPriority, QffNormal)
, (FieldDefinition "summary" "Summary" QFTOther
"List of per-opcode summaries",
opsGetter (opSummary . metaOpCode . qoInput), QffNormal)
, (FieldDefinition "received_ts" "Received" QFTOther
(tsDoc "Timestamp of when job was received"),
jobGetter qjReceivedTimestamp, QffTimestamp)
, (FieldDefinition "start_ts" "Start" QFTOther
(tsDoc "Timestamp of job start"),
jobGetter qjStartTimestamp, QffTimestamp)
, (FieldDefinition "end_ts" "End" QFTOther
(tsDoc "Timestamp of job end"),
jobGetter qjEndTimestamp, QffTimestamp)
]
-- | The node fields map.
fieldsMap :: FieldMap JobId RuntimeData
fieldsMap =
Map.fromList $ map (\v@(f, _, _) -> (fdefName f, v)) jobFields
-- | Load the given jobs from disk.
loadRuntimeData :: [JobId] -> Bool -> IO [RuntimeData]
loadRuntimeData ids archived = do
qdir <- queueDir
mapM (loadJobFromDisk qdir archived) ids
......@@ -52,7 +52,8 @@ module Ganeti.Query.Query
, nameField
) where
import Control.Monad (filterM)
import Control.DeepSeq
import Control.Monad (filterM, liftM, foldM)
import Control.Monad.Trans (lift)
import Data.List (intercalate)
import Data.Maybe (fromMaybe)
......@@ -60,17 +61,21 @@ import qualified Data.Map as Map
import qualified Text.JSON as J
import Ganeti.BasicTypes
import Ganeti.Errors
import Ganeti.Config
import Ganeti.Errors
import Ganeti.JQueue
import Ganeti.JSON
import Ganeti.Rpc
import Ganeti.Query.Language
import Ganeti.Objects
import Ganeti.Query.Common
import Ganeti.Query.Filter
import Ganeti.Query.Types
import Ganeti.Query.Node
import qualified Ganeti.Query.Job as Query.Job
import Ganeti.Query.Group
import Ganeti.Objects
import Ganeti.Query.Language
import Ganeti.Query.Node
import Ganeti.Query.Types
import Ganeti.Path
import Ganeti.Types
import Ganeti.Utils
-- * Helper functions
......@@ -144,11 +149,26 @@ getRequestedNames qry =
Just names -> getAllQuotedStrings names
Nothing -> []
-- | Compute the requested job IDs. This is custom since we need to
-- handle both strings and integers.
getRequestedJobIDs :: Filter FilterField -> Result [JobId]
getRequestedJobIDs qfilter =
case requestedNames (nameField (ItemTypeLuxi QRJob)) qfilter of
Nothing -> Ok []
Just [] -> Ok []
Just vals ->
mapM (\e -> case e of
QuotedString s -> makeJobIdS s
NumericValue i -> makeJobId $ fromIntegral i
) vals
-- | Main query execution function.
query :: ConfigData -- ^ The current configuration
-> Bool -- ^ Whether to collect live data
-> Query -- ^ The query (item, fields, filter)
-> IO (ErrorResult QueryResult) -- ^ Result
query cfg live (Query (ItemTypeLuxi QRJob) fields qfilter) =
queryJobs cfg live fields qfilter
query cfg live qry = queryInner cfg live qry $ getRequestedNames qry
-- | Inner query execution function.
......@@ -197,6 +217,58 @@ queryInner cfg _ (Query (ItemTypeOpCode QRGroup) fields qfilter) wanted =
queryInner _ _ (Query qkind _ _) _ =
return . Bad . GenericError $ "Query '" ++ show qkind ++ "' not supported"
-- | Query jobs specific query function, needed as we need to accept
-- both 'QuotedString' and 'NumericValue' as wanted names.
queryJobs :: ConfigData -- ^ The current configuration
-> Bool -- ^ Whether to collect live data
-> [FilterField] -- ^ Item
-> Filter FilterField -- ^ Filter
-> IO (ErrorResult QueryResult) -- ^ Result
queryJobs cfg live fields qfilter =
runResultT $ do
rootdir <- lift queueDir
let wanted_names = getRequestedJobIDs qfilter
want_arch = Query.Job.wantArchived fields
rjids <- case wanted_names of
Bad msg -> resultT . Bad $ GenericError msg
Ok [] -> if live
-- we can check the filesystem for actual jobs
then lift $ liftM sortJobIDs
(determineJobDirectories rootdir want_arch >>=
getJobIDs)
-- else we shouldn't look at the filesystem...
else return []
Ok v -> resultT $ Ok v
cfilter <- resultT $ compileFilter Query.Job.fieldsMap qfilter
let selected = getSelectedFields Query.Job.fieldsMap fields
(fdefs, fgetters, _) = unzip3 selected
live' = live && needsLiveData fgetters
disabled_data = Bad "live data disabled"
-- runs first pass of the filter, without a runtime context; this
-- will limit the jobs that we'll load from disk
jids <- resultT $
filterM (\jid -> evaluateFilter cfg Nothing jid cfilter) rjids
-- here we run the runtime data gathering, filtering and evaluation,
-- all in the same step, so that we don't keep jobs in memory longer
-- than we need; we can't be fully lazy due to the multiple monad
-- wrapping across different steps
qdir <- lift queueDir
fdata <- foldM
-- big lambda, but we use many variables from outside it...
(\lst jid -> do
job <- lift $ if live'
then loadJobFromDisk qdir want_arch jid
else return disabled_data
pass <- resultT $ evaluateFilter cfg (Just job) jid cfilter
let nlst = if pass
then let row = map (execGetter cfg job jid) fgetters
in rnf row `seq` row:lst
else lst
-- evaluate nlst (to WHNF), otherwise we're too lazy
return $! nlst
) [] jids
return QueryResult { qresFields = fdefs, qresData = reverse fdata }
-- | Helper for 'queryFields'.
fieldsExtractor :: FieldMap a b -> [FilterField] -> QueryFieldsResult
fieldsExtractor fieldsMap fields =
......@@ -213,6 +285,9 @@ queryFields (QueryFields (ItemTypeOpCode QRNode) fields) =
queryFields (QueryFields (ItemTypeOpCode QRGroup) fields) =
Ok $ fieldsExtractor groupFieldsMap fields
queryFields (QueryFields (ItemTypeLuxi QRJob) fields) =
Ok $ fieldsExtractor Query.Job.fieldsMap fields
queryFields (QueryFields qkind _) =
Bad . GenericError $ "QueryFields '" ++ show qkind ++ "' not supported"
......
......@@ -163,6 +163,10 @@ handleCall cfg (QueryGroups names fields lock) =
handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
(map Left names) fields lock
handleCall cfg (QueryJobs names fields) =
handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
(map (Right . fromIntegral . fromJobId) names) fields False
handleCall _ op =
return . Bad $
GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment