Commit cc40185c authored by Iustin Pop's avatar Iustin Pop

Add a Ganeti-specific implementation of Curl Multi

As we want to be able to run queries against multiple nodes in
parallel, and furthermore in parallel with other work, we need to
implement the Curl Multi interface (see libcurl-multi(3)).

This patch adds a Ganeti-specific such implementation, to be used
until upstream Curl bindings provide it. The implemented interface
(there are two 'multi' interfaces) is the older curl_multi_perform(3).

It has one downside (which is also, somewhat, and advantage): we use
polling instead of more properly select() or poll. This is due to how
waiting for FDs is implemented in Haskell: currently, it's not
possible to wait for multiple FDs at once nicely, so we'd have to fork
many threads for each FD to be watched, or alternatively one could use
FFI select, but that would block the entire runtime.

With the current poll method, the implementation achieves consistent
~100 RPC/s per second (with 10 multi interfaces running in parallel,
each with 10 easy handles), and over ~1 hour of runtime the memory
usage is stable, so memory allocation/deallocation (manual when
dealing with FFI) _seems_ to be well handled.

Future optimisations could be to move to curl_multi_socket_action(3),
which might allow better integration with the Haskell runtime.
Signed-off-by: default avatarIustin Pop <iustin@google.com>
Reviewed-by: default avatarMichele Tartara <mtartara@google.com>
parent da9e2aff
......@@ -61,6 +61,7 @@ HS_DIRS = \
src/Ganeti/Block \
src/Ganeti/Block/Drbd \
src/Ganeti/Confd \
src/Ganeti/Curl \
src/Ganeti/DataCollectors \
src/Ganeti/HTools \
src/Ganeti/HTools/Backend \
......@@ -122,6 +123,7 @@ ALL_APIDOC_HS_DIRS = \
$(APIDOC_HS_DIR)/Ganeti/Block \
$(APIDOC_HS_DIR)/Ganeti/Block/Drbd \
$(APIDOC_HS_DIR)/Ganeti/Confd \
$(APIDOC_HS_DIR)/Ganeti/Curl \
$(APIDOC_HS_DIR)/Ganeti/DataCollectors \
$(APIDOC_HS_DIR)/Ganeti/HTools \
$(APIDOC_HS_DIR)/Ganeti/HTools/Backend \
......@@ -492,6 +494,7 @@ HS_LIB_SRCS = \
src/Ganeti/Confd/Types.hs \
src/Ganeti/Confd/Utils.hs \
src/Ganeti/Config.hs \
src/Ganeti/Curl/Multi.hs \
src/Ganeti/Daemon.hs \
src/Ganeti/DataCollectors/CLI.hs \
src/Ganeti/DataCollectors/Drbd.hs \
......@@ -601,8 +604,11 @@ HS_LIBTEST_SRCS = $(HS_LIB_SRCS) $(HS_TEST_SRCS)
HS_BUILT_SRCS = \
test/hs/Test/Ganeti/TestImports.hs \
src/Ganeti/Constants.hs \
src/Ganeti/Curl/Internal.hs \
src/Ganeti/Version.hs
HS_BUILT_SRCS_IN = $(patsubst %,%.in,$(HS_BUILT_SRCS))
HS_BUILT_SRCS_IN = \
$(patsubst %,%.in,$(filter-out src/Ganeti/Curl/Internal.hs,$(HS_BUILT_SRCS))) \
src/Ganeti/Curl/Internal.hsc
$(RUN_IN_TEMPDIR): | stamp-directories
......@@ -1373,6 +1379,9 @@ src/Ganeti/Constants.hs: src/Ganeti/Constants.hs.in \
PYTHONPATH=. $(RUN_IN_TEMPDIR) $(CURDIR)/$(CONVERT_CONSTANTS); \
} > $@
src/Ganeti/Curl/Internal.hs: src/Ganeti/Curl/Internal.hsc | stamp-directories
hsc2hs -o $@ $<
test/hs/Test/Ganeti/TestImports.hs: test/hs/Test/Ganeti/TestImports.hs.in \
$(built_base_sources)
set -e; \
......
{-# LANGUAGE ForeignFunctionInterface #-}
{-# OPTIONS_GHC -fno-warn-deprecated-flags #-}
-- the above is needed due to the fact that hsc2hs generates code also
-- compatible with older compilers; see
-- http://hackage.haskell.org/trac/ghc/ticket/3844
{-| Hsc2hs definitions for 'Storable' interfaces.
-}
{-
Copyright (C) 2013 Google Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
-}
module Ganeti.Curl.Internal
( CurlMsgCode(..)
, toMsgCode
, fromMsgCode
, CurlMsg(..)
, errorBufferSize
, CurlMCode(..)
, toMCode
) where
import Foreign
import Foreign.C.Types
import Network.Curl
#include <curl/curl.h>
-- | Data representing a @CURLMSG@ enum.
data CurlMsgCode = CurlMsgNone
| CurlMsgDone
| CurlMsgUnknown CInt -- ^ Haskell specific code for
-- unknown codes
deriving (Show, Eq)
-- | Data representing a @struct CURLMsg@.
data CurlMsg = CurlMsg
{ cmMessage :: CurlMsgCode -- ^ The message type
, cmHandle :: CurlH -- ^ The internal curl handle to which it applies
, cmResult :: CurlCode -- ^ The message-specific result
}
-- | Partial 'Storable' instance for 'CurlMsg'; we do not extract all
-- fields, only the one we are interested in.
instance Storable CurlMsg where
sizeOf _ = (#size CURLMsg)
alignment _ = alignment (undefined :: CInt)
peek ptr = do
msg <- (#peek CURLMsg, msg) ptr
handle <- (#peek CURLMsg, easy_handle) ptr
result <- (#peek CURLMsg, data.result) ptr
return $ CurlMsg (toMsgCode msg) handle (toCode result)
poke ptr (CurlMsg msg handle result) = do
(#poke CURLMsg, msg) ptr (fromMsgCode msg)
(#poke CURLMsg, easy_handle) ptr handle
(#poke CURLMsg, data.result) ptr ((fromIntegral $ fromEnum result)::CInt)
-- | Minimum buffer size for 'CurlErrorBuffer'.
errorBufferSize :: Int
errorBufferSize = (#const CURL_ERROR_SIZE)
-- | Multi interface error codes.
data CurlMCode = CurlmCallMultiPerform
| CurlmOK
| CurlmBadHandle
| CurlmBadEasyHandle
| CurlmOutOfMemory
| CurlmInternalError
| CurlmBadSocket
| CurlmUnknownOption
| CurlmUnknown CInt -- ^ Haskell specific code denoting
-- undefined codes (e.g. when
-- libcurl has defined new codes
-- that are not implemented yet)
deriving (Show, Eq)
-- | Convert a CInt CURLMSG code (as returned by the C library) to a
-- 'CurlMsgCode'. When an unknown code is received, the special
-- 'CurlMsgUnknown' constructor will be used.
toMsgCode :: CInt -> CurlMsgCode
toMsgCode (#const CURLMSG_NONE) = CurlMsgNone
toMsgCode (#const CURLMSG_DONE) = CurlMsgDone
toMsgCode v = CurlMsgUnknown v
-- | Convert a CurlMsgCode to a CInt.
fromMsgCode :: CurlMsgCode -> CInt
fromMsgCode CurlMsgNone = (#const CURLMSG_NONE)
fromMsgCode CurlMsgDone = (#const CURLMSG_DONE)
fromMsgCode (CurlMsgUnknown v) = v
-- | Convert a CInt CURLMcode (as returned by the C library) to a
-- 'CurlMCode'. When an unknown code is received, the special
-- 'CurlmUnknown' constructor will be used.
toMCode :: CInt -> CurlMCode
toMCode (#const CURLM_CALL_MULTI_PERFORM) = CurlmCallMultiPerform
toMCode (#const CURLM_OK) = CurlmOK
toMCode (#const CURLM_BAD_HANDLE) = CurlmBadHandle
toMCode (#const CURLM_BAD_EASY_HANDLE) = CurlmBadEasyHandle
toMCode (#const CURLM_OUT_OF_MEMORY) = CurlmOutOfMemory
toMCode (#const CURLM_INTERNAL_ERROR) = CurlmInternalError
toMCode (#const CURLM_BAD_SOCKET) = CurlmBadSocket
toMCode (#const CURLM_UNKNOWN_OPTION) = CurlmUnknownOption
toMCode v = CurlmUnknown v
{-# LANGUAGE ForeignFunctionInterface, EmptyDataDecls #-}
{-| Ganeti-specific implementation of the Curl multi interface
(<http://curl.haxx.se/libcurl/c/libcurl-multi.html>).
TODO: Evaluate implementing and switching to
curl_multi_socket_action(3) interface, which is deemed to be more
performant for high-numbers of connections (but this is not the case
for Ganeti).
-}
{-
Copyright (C) 2013 Google Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
-}
module Ganeti.Curl.Multi where
import Control.Concurrent
import Control.Monad
import Data.IORef
import qualified Data.Map as Map
import Foreign.C.String
import Foreign.C.Types
import Foreign.Marshal
import Foreign.Ptr
import Foreign.Storable
import Network.Curl
import Ganeti.Curl.Internal
import Ganeti.Logging
-- * Data types
-- | Empty data type denoting a Curl multi handle. Naming is similar to
-- "Network.Curl" types.
data CurlM_
-- | Type alias for a pointer to a Curl multi handle.
type CurlMH = Ptr CurlM_
-- | Our type alias for maps indexing 'CurlH' handles to the 'IORef'
-- for the Curl code.
type HandleMap = Map.Map CurlH (IORef CurlCode)
-- * FFI declarations
foreign import ccall
"curl_multi_init" curl_multi_init :: IO CurlMH
foreign import ccall
"curl_multi_cleanup" curl_multi_cleanup :: CurlMH -> IO CInt
foreign import ccall
"curl_multi_add_handle" curl_multi_add_handle :: CurlMH -> CurlH -> IO CInt
foreign import ccall
"curl_multi_remove_handle" curl_multi_remove_handle :: CurlMH -> CurlH ->
IO CInt
foreign import ccall
"curl_multi_perform" curl_multi_perform :: CurlMH -> Ptr CInt -> IO CInt
foreign import ccall
"curl_multi_info_read" curl_multi_info_read :: CurlMH -> Ptr CInt
-> IO (Ptr CurlMsg)
-- * Wrappers over FFI functions
-- | Adds an easy handle to a multi handle. This is a nicer wrapper
-- over 'curl_multi_add_handle' that fails for wrong codes.
curlMultiAddHandle :: CurlMH -> Curl -> IO ()
curlMultiAddHandle multi easy = do
r <- curlPrim easy $ \_ x -> curl_multi_add_handle multi x
when (toMCode r /= CurlmOK) .
fail $ "Failed adding easy handle to multi handle: " ++ show r
-- | Nice wrapper over 'curl_multi_info_read' that massages the
-- results into Haskell types.
curlMultiInfoRead :: CurlMH -> IO (Maybe CurlMsg, CInt)
curlMultiInfoRead multi =
alloca $ \ppending -> do
pmsg <- curl_multi_info_read multi ppending
pending <- peek ppending
msg <- if pmsg == nullPtr
then return Nothing
else Just `fmap` peek pmsg
return (msg, pending)
-- | Nice wrapper over 'curl_multi_perform'.
curlMultiPerform :: CurlMH -> IO (CurlMCode, CInt)
curlMultiPerform multi =
alloca $ \running -> do
mcode <- curl_multi_perform multi running
running' <- peek running
return (toMCode mcode, running')
-- * Helper functions
-- | Magical constant for the polling delay. This needs to be chosen such that:
--
-- * we don't poll too often; a slower poll allows the RTS to schedule
-- other threads, and let them work
--
-- * we don't want to pool too slow, so that Curl gets to act on the
-- handles that need it
pollDelayInterval :: Int
pollDelayInterval = 10000
-- | Writes incoming curl data to a list of strings, stored in an 'IORef'.
writeHandle :: IORef [String] -> Ptr CChar -> CInt -> CInt -> Ptr () -> IO CInt
writeHandle bufref cstr sz nelems _ = do
let full_sz = sz * nelems
hs_str <- peekCStringLen (cstr, fromIntegral full_sz)
modifyIORef bufref (hs_str:)
return full_sz
-- | Loops and extracts all pending messages from a Curl multi handle.
readMessages :: CurlMH -> HandleMap -> IO ()
readMessages mh hmap = do
(cmsg, pending) <- curlMultiInfoRead mh
case cmsg of
Nothing -> return ()
Just (CurlMsg msg eh res) -> do
logDebug $ "Got msg! msg " ++ show msg ++ " res " ++ show res ++
", " ++ show pending ++ " messages left"
let cref = (Map.!) hmap eh
writeIORef cref res
_ <- curl_multi_remove_handle mh eh
when (pending > 0) $ readMessages mh hmap
-- | Loops and polls curl until there are no more remaining handles.
performMulti :: CurlMH -> HandleMap -> CInt -> IO ()
performMulti mh hmap expected = do
(mcode, running) <- curlMultiPerform mh
delay <- case mcode of
CurlmCallMultiPerform -> return $ return ()
CurlmOK -> return $ threadDelay pollDelayInterval
code -> error $ "Received bad return code from" ++
"'curl_multi_perform': " ++ show code
logDebug $ "mcode: " ++ show mcode ++ ", remaining: " ++ show running
-- check if any handles are done and then retrieve their messages
when (expected /= running) $ readMessages mh hmap
-- and if we still have handles running, loop
when (running > 0) $ delay >> performMulti mh hmap running
-- | Template for the Curl error buffer.
errorBuffer :: String
errorBuffer = replicate errorBufferSize '\0'
-- | Allocate a NULL-initialised error buffer.
mallocErrorBuffer :: IO CString
mallocErrorBuffer = fst `fmap` newCStringLen errorBuffer
-- | Initialise a curl handle. This is just a wrapper over the
-- "Network.Curl" function 'initialize', plus adding our options.
makeEasyHandle :: (IORef [String], Ptr CChar, ([CurlOption], URLString))
-> IO Curl
makeEasyHandle (f, e, (opts, url)) = do
h <- initialize
setopts h opts
setopts h [ CurlWriteFunction (writeHandle f)
, CurlErrorBuffer e
, CurlURL url
, CurlFailOnError True
, CurlNoSignal True
, CurlProxy ""
]
return h
-- * Main multi-call work function
-- | Perform a multi-call against a list of nodes.
execMultiCall :: [([CurlOption], String)] -> IO [(CurlCode, String)]
execMultiCall ous = do
-- error buffers
errorbufs <- mapM (\_ -> mallocErrorBuffer) ous
-- result buffers
outbufs <- mapM (\_ -> newIORef []) ous
-- handles
ehandles <- mapM makeEasyHandle $ zip3 outbufs errorbufs ous
-- data.map holding handles to error code iorefs
hmap <- foldM (\m h -> curlPrim h (\_ hnd -> do
ccode <- newIORef CurlOK
return $ Map.insert hnd ccode m
)) Map.empty ehandles
mh <- curl_multi_init
mapM_ (curlMultiAddHandle mh) ehandles
performMulti mh hmap (fromIntegral $ length ehandles)
-- dummy code to keep the handles alive until here
mapM_ (\h -> curlPrim h (\_ _ -> return ())) ehandles
-- cleanup the multi handle
mh_cleanup <- toMCode `fmap` curl_multi_cleanup mh
when (mh_cleanup /= CurlmOK) .
logError $ "Non-OK return from multi_cleanup: " ++ show mh_cleanup
-- and now extract the data from the IORefs
mapM (\(e, b, h) -> do
s <- peekCString e
free e
cref <- curlPrim h (\_ hnd -> return $ (Map.!) hmap hnd)
ccode <- readIORef cref
result <- if ccode == CurlOK
then (concat . reverse) `fmap` readIORef b
else return s
return (ccode, result)
) $ zip3 errorbufs outbufs ehandles
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment