Luxi.hs 16.2 KB
Newer Older
1 2
{-# LANGUAGE TemplateHaskell #-}

3 4 5 6 7 8
{-| Implementation of the Ganeti LUXI interface.

-}

{-

9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.

-}

module Ganeti.Luxi
29
  ( LuxiOp(..)
Iustin Pop's avatar
Iustin Pop committed
30
  , LuxiReq(..)
31
  , Client
32
  , JobId
Iustin Pop's avatar
Iustin Pop committed
33 34
  , fromJobId
  , makeJobId
35 36
  , RecvResult(..)
  , strOfOp
37
  , getClient
38 39
  , getServer
  , acceptClient
40
  , closeClient
41
  , closeServer
42 43 44
  , callMethod
  , submitManyJobs
  , queryJobsStatus
45
  , buildCall
46
  , buildResponse
47 48
  , validateCall
  , decodeCall
49
  , recvMsg
50
  , recvMsgExt
51
  , sendMsg
52
  , allLuxiCalls
53
  ) where
54

55
import Control.Exception (catch)
56
import Data.IORef
57
import qualified Data.ByteString as B
58
import qualified Data.ByteString.Lazy as BL
59
import qualified Data.ByteString.UTF8 as UTF8
60
import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61
import Data.Word (Word8)
62
import Control.Monad
Iustin Pop's avatar
Iustin Pop committed
63
import Text.JSON (encodeStrict, decodeStrict)
64
import qualified Text.JSON as J
65
import Text.JSON.Pretty (pp_value)
66
import Text.JSON.Types
67
import System.Directory (removeFile)
68
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69
import System.IO.Error (isEOFError)
70
import System.Posix.Files
71 72 73
import System.Timeout
import qualified Network.Socket as S

74
import Ganeti.BasicTypes
75
import Ganeti.Constants
76 77
import Ganeti.Errors
import Ganeti.JSON
Iustin Pop's avatar
Iustin Pop committed
78
import Ganeti.OpParams (pTagsObject)
79
import Ganeti.OpCodes
80
import qualified Ganeti.Query.Language as Qlang
81
import Ganeti.Runtime (GanetiDaemon(..), MiscGroup(..), GanetiGroup(..))
82
import Ganeti.THH
Iustin Pop's avatar
Iustin Pop committed
83
import Ganeti.Types
84
import Ganeti.Utils
Iustin Pop's avatar
Iustin Pop committed
85

86 87 88 89 90
-- * Utility functions

-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
withTimeout :: Int -> String -> IO a -> IO a
withTimeout secs descr action = do
91
  result <- timeout (secs * 1000000) action
Iustin Pop's avatar
Iustin Pop committed
92 93 94
  case result of
    Nothing -> fail $ "Timeout in " ++ descr
    Just v -> return v
95 96 97

-- * Generic protocol functionality

98 99 100 101
-- | Result of receiving a message from the socket.
data RecvResult = RecvConnClosed    -- ^ Connection closed
                | RecvError String  -- ^ Any other error
                | RecvOk String     -- ^ Successfull receive
102
                  deriving (Show, Eq)
103

104 105
-- | Currently supported Luxi operations and JSON serialization.
$(genLuxiOp "LuxiOp"
106
  [ (luxiReqQuery,
107 108 109
    [ simpleField "what"    [t| Qlang.ItemType |]
    , simpleField "fields"  [t| [String]  |]
    , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
110
    ])
111
  , (luxiReqQueryFields,
112 113
    [ simpleField "what"    [t| Qlang.ItemType |]
    , simpleField "fields"  [t| [String]  |]
114
    ])
115
  , (luxiReqQueryNodes,
116 117 118
     [ simpleField "names"  [t| [String] |]
     , simpleField "fields" [t| [String] |]
     , simpleField "lock"   [t| Bool     |]
119
     ])
120
  , (luxiReqQueryGroups,
121 122 123
     [ simpleField "names"  [t| [String] |]
     , simpleField "fields" [t| [String] |]
     , simpleField "lock"   [t| Bool     |]
124
     ])
125 126 127 128 129
  , (luxiReqQueryNetworks,
     [ simpleField "names"  [t| [String] |]
     , simpleField "fields" [t| [String] |]
     , simpleField "lock"   [t| Bool     |]
     ])
130
  , (luxiReqQueryInstances,
131 132 133
     [ simpleField "names"  [t| [String] |]
     , simpleField "fields" [t| [String] |]
     , simpleField "lock"   [t| Bool     |]
134
     ])
135
  , (luxiReqQueryJobs,
Iustin Pop's avatar
Iustin Pop committed
136
     [ simpleField "ids"    [t| [JobId]  |]
137
     , simpleField "fields" [t| [String] |]
138
     ])
139
  , (luxiReqQueryExports,
140 141
     [ simpleField "nodes" [t| [String] |]
     , simpleField "lock"  [t| Bool     |]
142
     ])
143
  , (luxiReqQueryConfigValues,
144
     [ simpleField "fields" [t| [String] |] ]
145
    )
146 147
  , (luxiReqQueryClusterInfo, [])
  , (luxiReqQueryTags,
Jose A. Lopes's avatar
Jose A. Lopes committed
148 149 150
     [ pTagsObject 
     , simpleField "name" [t| String |]
     ])
151
  , (luxiReqSubmitJob,
152
     [ simpleField "job" [t| [MetaOpCode] |] ]
153
    )
154 155 156
  , (luxiReqSubmitJobToDrainedQueue,
     [ simpleField "job" [t| [MetaOpCode] |] ]
    )
157
  , (luxiReqSubmitManyJobs,
158
     [ simpleField "ops" [t| [[MetaOpCode]] |] ]
159
    )
160
  , (luxiReqWaitForJobChange,
Iustin Pop's avatar
Iustin Pop committed
161
     [ simpleField "job"      [t| JobId   |]
162 163 164 165
     , simpleField "fields"   [t| [String]|]
     , simpleField "prev_job" [t| JSValue |]
     , simpleField "prev_log" [t| JSValue |]
     , simpleField "tmout"    [t| Int     |]
166
     ])
167
  , (luxiReqArchiveJob,
Iustin Pop's avatar
Iustin Pop committed
168
     [ simpleField "job" [t| JobId |] ]
169
    )
170
  , (luxiReqAutoArchiveJobs,
171 172
     [ simpleField "age"   [t| Int |]
     , simpleField "tmout" [t| Int |]
173
     ])
174
  , (luxiReqCancelJob,
Iustin Pop's avatar
Iustin Pop committed
175
     [ simpleField "job" [t| JobId |] ]
176
    )
177
  , (luxiReqChangeJobPriority,
Iustin Pop's avatar
Iustin Pop committed
178
     [ simpleField "job"      [t| JobId |]
179 180
     , simpleField "priority" [t| Int |] ]
    )
181
  , (luxiReqSetDrainFlag,
182
     [ simpleField "flag" [t| Bool |] ]
183
    )
184
  , (luxiReqSetWatcherPause,
185
     [ simpleField "duration" [t| Double |] ]
186
    )
187
  ])
188

Iustin Pop's avatar
Iustin Pop committed
189 190
$(makeJSONInstance ''LuxiReq)

191 192 193
-- | List of all defined Luxi calls.
$(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")

194
-- | The serialisation of LuxiOps into strings in messages.
195
$(genStrOfOp ''LuxiOp "strOfOp")
196

197 198 199
-- | Type holding the initial (unparsed) Luxi call.
data LuxiCall = LuxiCall LuxiReq JSValue

200
-- | The end-of-message separator.
201 202 203 204 205 206
eOM :: Word8
eOM = 3

-- | The end-of-message encoded as a ByteString.
bEOM :: B.ByteString
bEOM = B.singleton eOM
207 208 209 210 211 212 213 214

-- | Valid keys in the requests and responses.
data MsgKeys = Method
             | Args
             | Success
             | Result

-- | The serialisation of MsgKeys into strings in messages.
215
$(genStrOfKey ''MsgKeys "strOfKey")
216 217

-- | Luxi client encapsulation.
218 219
data Client = Client { socket :: Handle           -- ^ The socket of the client
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
220 221 222 223 224
                     }

-- | Connects to the master daemon and returns a luxi Client.
getClient :: String -> IO Client
getClient path = do
225
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
226
  withTimeout luxiDefCtmo "creating luxi connection" $
227
              S.connect s (S.SockAddrUnix path)
228 229 230
  rf <- newIORef B.empty
  h <- S.socketToHandle s ReadWriteMode
  return Client { socket=h, rbuf=rf }
231

232
-- | Creates and returns a server endpoint.
233 234
getServer :: Bool -> FilePath -> IO S.Socket
getServer setOwner path = do
235 236
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
  S.bindSocket s (S.SockAddrUnix path)
237 238 239
  when setOwner $ do
    setOwnerAndGroupFromNames path GanetiLuxid $ ExtraGroup DaemonsGroup
    setFileMode path $ fromIntegral luxiSocketPerms
240 241 242
  S.listen s 5 -- 5 is the max backlog
  return s

243 244 245 246 247 248 249
-- | Closes a server endpoint.
-- FIXME: this should be encapsulated into a nicer type.
closeServer :: FilePath -> S.Socket -> IO ()
closeServer path sock = do
  S.sClose sock
  removeFile path

250 251 252 253 254 255 256 257 258
-- | Accepts a client
acceptClient :: S.Socket -> IO Client
acceptClient s = do
  -- second return is the address of the client, which we ignore here
  (client_socket, _) <- S.accept s
  new_buffer <- newIORef B.empty
  handle <- S.socketToHandle client_socket ReadWriteMode
  return Client { socket=handle, rbuf=new_buffer }

259 260
-- | Closes the client socket.
closeClient :: Client -> IO ()
261
closeClient = hClose . socket
262 263 264

-- | Sends a message over a luxi transport.
sendMsg :: Client -> String -> IO ()
265
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
266
  let encoded = UTF8L.fromString buf
267
      handle = socket s
268
  BL.hPut handle encoded
269 270 271 272 273 274 275 276
  B.hPut handle bEOM
  hFlush handle

-- | Given a current buffer and the handle, it will read from the
-- network until we get a full message, and it will return that
-- message and the leftover buffer contents.
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
recvUpdate handle obuf = do
277
  nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
278 279 280 281 282 283 284
            _ <- hWaitForInput handle (-1)
            B.hGetNonBlocking handle 4096
  let (msg, remaining) = B.break (eOM ==) nbuf
      newbuf = B.append obuf msg
  if B.null remaining
    then recvUpdate handle newbuf
    else return (newbuf, B.tail remaining)
285 286 287 288 289

-- | Waits for a message over a luxi transport.
recvMsg :: Client -> IO String
recvMsg s = do
  cbuf <- readIORef $ rbuf s
290
  let (imsg, ibuf) = B.break (eOM ==) cbuf
291
  (msg, nbuf) <-
292 293 294
    if B.null ibuf      -- if old buffer didn't contain a full message
      then recvUpdate (socket s) cbuf   -- then we read from network
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
295
  writeIORef (rbuf s) nbuf
296
  return $ UTF8.toString msg
297

298 299 300
-- | Extended wrapper over recvMsg.
recvMsgExt :: Client -> IO RecvResult
recvMsgExt s =
Iustin Pop's avatar
Iustin Pop committed
301
  Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
302 303 304
    return $ if isEOFError e
               then RecvConnClosed
               else RecvError (show e)
305

306 307 308
-- | Serialize a request to String.
buildCall :: LuxiOp  -- ^ The method
          -> String  -- ^ The serialized form
309
buildCall lo =
Iustin Pop's avatar
Iustin Pop committed
310 311
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
           , (strOfKey Args, opToArgs lo)
312 313 314
           ]
      jo = toJSObject ja
  in encodeStrict jo
315

316 317 318 319 320 321 322 323 324 325
-- | Serialize the response to String.
buildResponse :: Bool    -- ^ Success
              -> JSValue -- ^ The arguments
              -> String  -- ^ The serialized form
buildResponse success args =
  let ja = [ (strOfKey Success, JSBool success)
           , (strOfKey Result, args)]
      jo = toJSObject ja
  in encodeStrict jo

326 327 328
-- | Check that luxi request contains the required keys and parse it.
validateCall :: String -> Result LuxiCall
validateCall s = do
329 330
  arr <- fromJResult "parsing top-level luxi message" $
         decodeStrict s::Result (JSObject JSValue)
331 332 333 334 335 336 337 338 339 340 341 342 343
  let aobj = fromJSObject arr
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
  args <- fromObj aobj (strOfKey Args)
  return (LuxiCall call args)

-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
--
-- This is currently hand-coded until we make it more uniform so that
-- it can be generated using TH.
decodeCall :: LuxiCall -> Result LuxiOp
decodeCall (LuxiCall call args) =
  case call of
    ReqQueryJobs -> do
Iustin Pop's avatar
Iustin Pop committed
344
              (jids, jargs) <- fromJVal args
345 346 347 348
              jids' <- case jids of
                         JSNull -> return []
                         _ -> fromJVal jids
              return $ QueryJobs jids' jargs
349 350 351 352 353 354 355 356 357
    ReqQueryInstances -> do
              (names, fields, locking) <- fromJVal args
              return $ QueryInstances names fields locking
    ReqQueryNodes -> do
              (names, fields, locking) <- fromJVal args
              return $ QueryNodes names fields locking
    ReqQueryGroups -> do
              (names, fields, locking) <- fromJVal args
              return $ QueryGroups names fields locking
Iustin Pop's avatar
Iustin Pop committed
358
    ReqQueryClusterInfo ->
359
              return QueryClusterInfo
360 361 362
    ReqQueryNetworks -> do
              (names, fields, locking) <- fromJVal args
              return $ QueryNetworks names fields locking
363
    ReqQuery -> do
364 365
              (what, fields, qfilter) <- fromJVal args
              return $ Query what fields qfilter
366 367 368 369 370 371
    ReqQueryFields -> do
              (what, fields) <- fromJVal args
              fields' <- case fields of
                           JSNull -> return []
                           _ -> fromJVal fields
              return $ QueryFields what fields'
372 373 374 375
    ReqSubmitJob -> do
              [ops1] <- fromJVal args
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
              return $ SubmitJob ops2
376 377 378 379
    ReqSubmitJobToDrainedQueue -> do
              [ops1] <- fromJVal args
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
              return $ SubmitJobToDrainedQueue ops2
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
    ReqSubmitManyJobs -> do
              [ops1] <- fromJVal args
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
              return $ SubmitManyJobs ops2
    ReqWaitForJobChange -> do
              (jid, fields, pinfo, pidx, wtmout) <-
                -- No instance for 5-tuple, code copied from the
                -- json sources and adapted
                fromJResult "Parsing WaitForJobChange message" $
                case args of
                  JSArray [a, b, c, d, e] ->
                    (,,,,) `fmap`
                    J.readJSON a `ap`
                    J.readJSON b `ap`
                    J.readJSON c `ap`
                    J.readJSON d `ap`
                    J.readJSON e
                  _ -> J.Error "Not enough values"
Iustin Pop's avatar
Iustin Pop committed
398
              return $ WaitForJobChange jid fields pinfo pidx wtmout
399 400
    ReqArchiveJob -> do
              [jid] <- fromJVal args
Iustin Pop's avatar
Iustin Pop committed
401
              return $ ArchiveJob jid
402 403 404 405 406 407 408 409 410 411 412
    ReqAutoArchiveJobs -> do
              (age, tmout) <- fromJVal args
              return $ AutoArchiveJobs age tmout
    ReqQueryExports -> do
              (nodes, lock) <- fromJVal args
              return $ QueryExports nodes lock
    ReqQueryConfigValues -> do
              [fields] <- fromJVal args
              return $ QueryConfigValues fields
    ReqQueryTags -> do
              (kind, name) <- fromJVal args
Jose A. Lopes's avatar
Jose A. Lopes committed
413
              return $ QueryTags kind name
414
    ReqCancelJob -> do
Iustin Pop's avatar
Iustin Pop committed
415 416
              [jid] <- fromJVal args
              return $ CancelJob jid
417
    ReqChangeJobPriority -> do
Iustin Pop's avatar
Iustin Pop committed
418 419
              (jid, priority) <- fromJVal args
              return $ ChangeJobPriority jid priority
420 421 422 423 424 425 426
    ReqSetDrainFlag -> do
              [flag] <- fromJVal args
              return $ SetDrainFlag flag
    ReqSetWatcherPause -> do
              [duration] <- fromJVal args
              return $ SetWatcherPause duration

427 428
-- | Check that luxi responses contain the required keys and that the
-- call was successful.
429
validateResult :: String -> ErrorResult JSValue
430
validateResult s = do
431 432
  when (UTF8.replacement_char `elem` s) $
       fail "Failed to decode UTF-8, detected replacement char after decoding"
433
  oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
434
  let arr = J.fromJSObject oarr
435 436
  status <- fromObj arr (strOfKey Success)
  result <- fromObj arr (strOfKey Result)
Iustin Pop's avatar
Iustin Pop committed
437
  if status
438 439 440 441 442 443 444 445 446 447 448
    then return result
    else decodeError result

-- | Try to decode an error from the server response. This function
-- will always fail, since it's called only on the error path (when
-- status is False).
decodeError :: JSValue -> ErrorResult JSValue
decodeError val =
  case fromJVal val of
    Ok e -> Bad e
    Bad msg -> Bad $ GenericError msg
449 450

-- | Generic luxi method call.
451
callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
452 453
callMethod method s = do
  sendMsg s $ buildCall method
454 455 456
  result <- recvMsg s
  let rval = validateResult result
  return rval
Iustin Pop's avatar
Iustin Pop committed
457

458
-- | Parse job submission result.
459 460
parseSubmitJobResult :: JSValue -> ErrorResult JobId
parseSubmitJobResult (JSArray [JSBool True, v]) =
Iustin Pop's avatar
Iustin Pop committed
461 462 463
  case J.readJSON v of
    J.Error msg -> Bad $ LuxiError msg
    J.Ok v' -> Ok v'
464
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
465 466 467 468
  Bad . LuxiError $ fromJSString x
parseSubmitJobResult v =
  Bad . LuxiError $ "Unknown result from the master daemon: " ++
      show (pp_value v)
469

Iustin Pop's avatar
Iustin Pop committed
470
-- | Specialized submitManyJobs call.
471
submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
Iustin Pop's avatar
Iustin Pop committed
472
submitManyJobs s jobs = do
473
  rval <- callMethod (SubmitManyJobs jobs) s
Iustin Pop's avatar
Iustin Pop committed
474 475 476
  -- map each result (status, payload) pair into a nice Result ADT
  return $ case rval of
             Bad x -> Bad x
477
             Ok (JSArray r) -> mapM parseSubmitJobResult r
478 479
             x -> Bad . LuxiError $
                  "Cannot parse response from Ganeti: " ++ show x
Iustin Pop's avatar
Iustin Pop committed
480 481

-- | Custom queryJobs call.
482
queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
Iustin Pop's avatar
Iustin Pop committed
483
queryJobsStatus s jids = do
Iustin Pop's avatar
Iustin Pop committed
484
  rval <- callMethod (QueryJobs jids ["status"]) s
Iustin Pop's avatar
Iustin Pop committed
485 486 487 488
  return $ case rval of
             Bad x -> Bad x
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
                       J.Ok vals -> if any null vals
489 490
                                    then Bad $
                                         LuxiError "Missing job status field"
Iustin Pop's avatar
Iustin Pop committed
491
                                    else Ok (map head vals)
492
                       J.Error x -> Bad $ LuxiError x