From 04063ba791874f6b12e4cc1c8b0c23441ceaa6c9 Mon Sep 17 00:00:00 2001
From: Michele Tartara <mtartara@google.com>
Date: Fri, 14 Dec 2012 14:32:34 +0000
Subject: [PATCH] Add Confd client to the Haskell code base

The client queries all the master candidates in parallel, until the minimum
number of replies, defined in the constant file, is received.
A timeout prevents the waiting from being of indefinite length.

The reply to be returned to the function that made the query is decided
according to the Confd design document.

Signed-off-by: Michele Tartara <mtartara@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>
---
 Makefile.am                   |   1 +
 htools/Ganeti/Confd/Client.hs | 125 ++++++++++++++++++++++++++++++++++
 htools/Ganeti/Confd/Types.hs  |  19 ++++++
 3 files changed, 145 insertions(+)
 create mode 100644 htools/Ganeti/Confd/Client.hs

diff --git a/Makefile.am b/Makefile.am
index ffd4a8b40..563d31eff 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -450,6 +450,7 @@ HS_LIB_SRCS = \
 	htools/Ganeti/BasicTypes.hs \
 	htools/Ganeti/Common.hs \
 	htools/Ganeti/Compat.hs \
+	htools/Ganeti/Confd/Client.hs \
 	htools/Ganeti/Confd/Server.hs \
 	htools/Ganeti/Confd/Types.hs \
 	htools/Ganeti/Confd/Utils.hs \
diff --git a/htools/Ganeti/Confd/Client.hs b/htools/Ganeti/Confd/Client.hs
new file mode 100644
index 000000000..16a5bfcd7
--- /dev/null
+++ b/htools/Ganeti/Confd/Client.hs
@@ -0,0 +1,125 @@
+{-| Implementation of the Ganeti Confd client functionality.
+
+-}
+
+{-
+
+Copyright (C) 2012 Google Inc.
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; either version 2 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful, but
+WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+02110-1301, USA.
+
+-}
+
+module Ganeti.Confd.Client
+  ( getConfdClient
+  , query
+  ) where
+
+import Control.Concurrent
+import Control.Monad
+import Data.List
+import qualified Network.Socket as S
+import System.Posix.Time
+import qualified Text.JSON as J
+
+import Ganeti.BasicTypes
+import Ganeti.Confd.Types
+import Ganeti.Confd.Utils
+import qualified Ganeti.Constants as C
+import Ganeti.Hash
+import Ganeti.Ssconf
+
+-- | Builds a properly initialized ConfdClient
+getConfdClient :: IO ConfdClient
+getConfdClient = S.withSocketsDo $ do
+  hmac <- getClusterHmac
+  candList <- getMasterCandidatesIps Nothing
+  peerList <-
+    case candList of
+      (Ok p) -> return p
+      (Bad msg) -> fail msg
+  return . ConfdClient hmac peerList $ fromIntegral C.defaultConfdPort
+
+-- | Sends a query to all the Confd servers the client is connected to.
+-- Returns the most up-to-date result according to the serial number,
+-- chosen between those received before the timeout.
+query :: ConfdClient -> ConfdRequestType -> ConfdQuery -> IO (Maybe ConfdReply)
+query client crType cQuery = do
+  semaphore <- newMVar ()
+  answer <- newMVar Nothing
+  let dest = [(host, serverPort client) | host <- peers client]
+      hmac = hmacKey client
+      jobs = map (queryOneServer semaphore answer crType cQuery hmac) dest
+      watchdog reqAnswers = do
+        threadDelay $ 1000000 * C.confdClientExpireTimeout
+        _ <- swapMVar reqAnswers 0
+        putMVar semaphore ()
+      waitForResult reqAnswers = do
+        _ <- takeMVar semaphore
+        l <- takeMVar reqAnswers
+        unless (l == 0) $ do
+          putMVar reqAnswers $ l - 1
+          waitForResult reqAnswers
+  reqAnswers <- newMVar . min C.confdDefaultReqCoverage $ length dest
+  workers <- mapM forkIO jobs
+  watcher <- forkIO $ watchdog reqAnswers
+  waitForResult reqAnswers
+  mapM_ killThread $ watcher:workers
+  takeMVar answer
+
+-- | Updates the reply to the query. As per the Confd design document,
+-- only the reply with the highest serial number is kept.
+updateConfdReply :: ConfdReply -> Maybe ConfdReply -> Maybe ConfdReply
+updateConfdReply newValue Nothing = Just newValue
+updateConfdReply newValue (Just currentValue) = Just $
+  if confdReplyStatus newValue == ReplyStatusOk
+      && confdReplySerial newValue > confdReplySerial currentValue
+    then newValue
+    else currentValue
+
+-- | Send a query to a single server, waits for the result and stores it
+-- in a shared variable. Then, sends a signal on another shared variable
+-- acting as a semaphore.
+-- This function is meant to be used as one of multiple threads querying
+-- multiple servers in parallel.
+queryOneServer
+  :: MVar ()                 -- ^ The semaphore that will be signalled
+  -> MVar (Maybe ConfdReply) -- ^ The shared variable for the result
+  -> ConfdRequestType        -- ^ The type of the query to be sent
+  -> ConfdQuery              -- ^ The content of the query
+  -> HashKey                 -- ^ The hmac key to sign the message
+  -> (String, S.PortNumber)  -- ^ The address and port of the server
+  -> IO ()
+queryOneServer semaphore answer crType cQuery hmac (host, port) = do
+  request <- newConfdRequest crType cQuery
+  timestamp <- fmap show epochTime
+  let signedMsg =
+        signMessage hmac timestamp (J.encodeStrict request)
+      completeMsg = C.confdMagicFourcc ++ J.encodeStrict signedMsg
+  s <- S.socket S.AF_INET S.Datagram S.defaultProtocol
+  hostAddr <- S.inet_addr host
+  _ <- S.sendTo s completeMsg $ S.SockAddrInet port hostAddr
+  replyMsg <- S.recv s C.maxUdpDataSize
+  parsedReply <-
+    if C.confdMagicFourcc `isPrefixOf` replyMsg
+      then return . parseReply hmac (drop 4 replyMsg) $ confdRqRsalt request
+      else fail "Invalid magic code!"
+  reply <-
+    case parsedReply of
+      Ok (_, r) -> return r
+      Bad msg -> fail msg
+  modifyMVar_ answer $! return . updateConfdReply reply
+  putMVar semaphore ()
diff --git a/htools/Ganeti/Confd/Types.hs b/htools/Ganeti/Confd/Types.hs
index 7749e2351..5439fbe77 100644
--- a/htools/Ganeti/Confd/Types.hs
+++ b/htools/Ganeti/Confd/Types.hs
@@ -34,6 +34,7 @@ module Ganeti.Confd.Types
   , C.confdDefaultReqCoverage
   , C.confdClientExpireTimeout
   , C.maxUdpDataSize
+  , ConfdClient(..)
   , ConfdRequestType(..)
   , ConfdReqQ(..)
   , ConfdReqField(..)
@@ -41,15 +42,19 @@ module Ganeti.Confd.Types
   , ConfdNodeRole(..)
   , ConfdErrorType(..)
   , ConfdRequest(..)
+  , newConfdRequest
   , ConfdReply(..)
   , ConfdQuery(..)
   , SignedMessage(..)
   ) where
 
 import Text.JSON
+import qualified Network.Socket as S
 
 import qualified Ganeti.Constants as C
+import Ganeti.Hash
 import Ganeti.THH
+import Ganeti.Utils (newUUID)
 
 {-
    Note that we re-export as is from Constants the following simple items:
@@ -152,6 +157,13 @@ $(buildObject "ConfdRequest" "confdRq"
   , simpleField "rsalt"    [t| String |]
   ])
 
+-- | Client side helper function for creating requests. It automatically fills
+-- in some default values.
+newConfdRequest :: ConfdRequestType -> ConfdQuery -> IO ConfdRequest
+newConfdRequest reqType query = do
+  rsalt <- newUUID
+  return $ ConfdRequest C.confdProtocolVersion reqType query rsalt
+
 $(buildObject "ConfdReply" "confdReply"
   [ simpleField "protocol" [t| Int              |]
   , simpleField "status"   [t| ConfdReplyStatus |]
@@ -164,3 +176,10 @@ $(buildObject "SignedMessage" "signedMsg"
   , simpleField "msg"  [t| String |]
   , simpleField "salt" [t| String |]
   ])
+
+-- | Data type containing information used by the Confd client.
+data ConfdClient = ConfdClient
+  { hmacKey :: HashKey         -- ^ The hmac used for authentication
+  , peers :: [String]          -- ^ The list of nodes to query
+  , serverPort :: S.PortNumber -- ^ The port where confd server is listening
+  }
-- 
GitLab