+{-# LANGUAGE TemplateHaskell #-}
+
{-| Implementation of the Ganeti LUXI interface.
-}
{-
-Copyright (C) 2009, 2010, 2011 Google Inc.
+Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
-}
module Ganeti.Luxi
- ( LuxiOp(..)
- , Client
- , getClient
- , closeClient
- , callMethod
- , submitManyJobs
- , queryJobsStatus
- ) where
+ ( LuxiOp(..)
+ , LuxiReq(..)
+ , Client
+ , JobId
+ , RecvResult(..)
+ , TagObject(..)
+ , strOfOp
+ , getClient
+ , getServer
+ , acceptClient
+ , closeClient
+ , closeServer
+ , callMethod
+ , submitManyJobs
+ , queryJobsStatus
+ , buildCall
+ , buildResponse
+ , validateCall
+ , decodeCall
+ , recvMsg
+ , recvMsgExt
+ , sendMsg
+ ) where
+import Control.Exception (catch)
import Data.IORef
+import Data.Ratio (numerator, denominator)
+import qualified Data.ByteString as B
+import qualified Data.ByteString.UTF8 as UTF8
+import Data.Word (Word8)
import Control.Monad
+import Prelude hiding (catch)
import Text.JSON (encodeStrict, decodeStrict)
import qualified Text.JSON as J
import Text.JSON.Types
+import System.Directory (removeFile)
+import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
+import System.IO.Error (isEOFError)
import System.Timeout
import qualified Network.Socket as S
-import Ganeti.HTools.Utils
-import Ganeti.HTools.Types
+import Ganeti.BasicTypes
+import Ganeti.JSON
+import Ganeti.Utils
+import Ganeti.Constants
import Ganeti.Jobs (JobStatus)
import Ganeti.OpCodes (OpCode)
+import qualified Ganeti.Query.Language as Qlang
+import Ganeti.THH
-- * Utility functions
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
withTimeout :: Int -> String -> IO a -> IO a
withTimeout secs descr action = do
- result <- timeout (secs * 1000000) action
- (case result of
- Nothing -> fail $ "Timeout in " ++ descr
- Just v -> return v)
+ result <- timeout (secs * 1000000) action
+ case result of
+ Nothing -> fail $ "Timeout in " ++ descr
+ Just v -> return v
-- * Generic protocol functionality
--- | Currently supported Luxi operations.
-data LuxiOp = QueryInstances [String] [String] Bool
- | QueryNodes [String] [String] Bool
- | QueryGroups [String] [String] Bool
- | QueryJobs [Int] [String]
- | QueryExports [String] Bool
- | QueryConfigValues [String]
- | QueryClusterInfo
- | QueryTags String String
- | SubmitJob [OpCode]
- | SubmitManyJobs [[OpCode]]
- | WaitForJobChange Int [String] JSValue JSValue Int
- | ArchiveJob Int
- | AutoArchiveJobs Int Int
- | CancelJob Int
- | SetDrainFlag Bool
- | SetWatcherPause Double
- deriving (Show, Read)
+-- | Result of receiving a message from the socket.
+data RecvResult = RecvConnClosed -- ^ Connection closed
+ | RecvError String -- ^ Any other error
+ | RecvOk String -- ^ Successfull receive
+ deriving (Show, Read, Eq)
+
+-- | The Ganeti job type.
+type JobId = Int
+
+-- | Data type representing what items do the tag operations apply to.
+$(declareSADT "TagObject"
+ [ ("TagInstance", 'tagInstance)
+ , ("TagNode", 'tagNode)
+ , ("TagGroup", 'tagNodegroup)
+ , ("TagCluster", 'tagCluster)
+ ])
+$(makeJSONInstance ''TagObject)
+
+-- | Currently supported Luxi operations and JSON serialization.
+$(genLuxiOp "LuxiOp"
+ [ (luxiReqQuery,
+ [ ("what", [t| Qlang.ItemType |])
+ , ("fields", [t| [String] |])
+ , ("qfilter", [t| Qlang.Filter Qlang.FilterField |])
+ ])
+ , (luxiReqQueryFields,
+ [ ("what", [t| Qlang.ItemType |])
+ , ("fields", [t| [String] |])
+ ])
+ , (luxiReqQueryNodes,
+ [ ("names", [t| [String] |])
+ , ("fields", [t| [String] |])
+ , ("lock", [t| Bool |])
+ ])
+ , (luxiReqQueryGroups,
+ [ ("names", [t| [String] |])
+ , ("fields", [t| [String] |])
+ , ("lock", [t| Bool |])
+ ])
+ , (luxiReqQueryInstances,
+ [ ("names", [t| [String] |])
+ , ("fields", [t| [String] |])
+ , ("lock", [t| Bool |])
+ ])
+ , (luxiReqQueryJobs,
+ [ ("ids", [t| [Int] |])
+ , ("fields", [t| [String] |])
+ ])
+ , (luxiReqQueryExports,
+ [ ("nodes", [t| [String] |])
+ , ("lock", [t| Bool |])
+ ])
+ , (luxiReqQueryConfigValues,
+ [ ("fields", [t| [String] |]) ]
+ )
+ , (luxiReqQueryClusterInfo, [])
+ , (luxiReqQueryTags,
+ [ ("kind", [t| TagObject |])
+ , ("name", [t| String |])
+ ])
+ , (luxiReqSubmitJob,
+ [ ("job", [t| [OpCode] |]) ]
+ )
+ , (luxiReqSubmitManyJobs,
+ [ ("ops", [t| [[OpCode]] |]) ]
+ )
+ , (luxiReqWaitForJobChange,
+ [ ("job", [t| Int |])
+ , ("fields", [t| [String]|])
+ , ("prev_job", [t| JSValue |])
+ , ("prev_log", [t| JSValue |])
+ , ("tmout", [t| Int |])
+ ])
+ , (luxiReqArchiveJob,
+ [ ("job", [t| Int |]) ]
+ )
+ , (luxiReqAutoArchiveJobs,
+ [ ("age", [t| Int |])
+ , ("tmout", [t| Int |])
+ ])
+ , (luxiReqCancelJob,
+ [ ("job", [t| Int |]) ]
+ )
+ , (luxiReqSetDrainFlag,
+ [ ("flag", [t| Bool |]) ]
+ )
+ , (luxiReqSetWatcherPause,
+ [ ("duration", [t| Double |]) ]
+ )
+ ])
+
+$(makeJSONInstance ''LuxiReq)
-- | The serialisation of LuxiOps into strings in messages.
-strOfOp :: LuxiOp -> String
-strOfOp QueryNodes {} = "QueryNodes"
-strOfOp QueryGroups {} = "QueryGroups"
-strOfOp QueryInstances {} = "QueryInstances"
-strOfOp QueryJobs {} = "QueryJobs"
-strOfOp QueryExports {} = "QueryExports"
-strOfOp QueryConfigValues {} = "QueryConfigValues"
-strOfOp QueryClusterInfo {} = "QueryClusterInfo"
-strOfOp QueryTags {} = "QueryTags"
-strOfOp SubmitManyJobs {} = "SubmitManyJobs"
-strOfOp WaitForJobChange {} = "WaitForJobChange"
-strOfOp SubmitJob {} = "SubmitJob"
-strOfOp ArchiveJob {} = "ArchiveJob"
-strOfOp AutoArchiveJobs {} = "AutoArchiveJobs"
-strOfOp CancelJob {} = "CancelJob"
-strOfOp SetDrainFlag {} = "SetDrainFlag"
-strOfOp SetWatcherPause {} = "SetWatcherPause"
+$(genStrOfOp ''LuxiOp "strOfOp")
+
+-- | Type holding the initial (unparsed) Luxi call.
+data LuxiCall = LuxiCall LuxiReq JSValue
-- | The end-of-message separator.
-eOM :: Char
-eOM = '\3'
+eOM :: Word8
+eOM = 3
+
+-- | The end-of-message encoded as a ByteString.
+bEOM :: B.ByteString
+bEOM = B.singleton eOM
-- | Valid keys in the requests and responses.
data MsgKeys = Method
| Result
-- | The serialisation of MsgKeys into strings in messages.
-strOfKey :: MsgKeys -> String
-strOfKey Method = "method"
-strOfKey Args = "args"
-strOfKey Success = "success"
-strOfKey Result = "result"
+$(genStrOfKey ''MsgKeys "strOfKey")
-- | Luxi client encapsulation.
-data Client = Client { socket :: S.Socket -- ^ The socket of the client
- , rbuf :: IORef String -- ^ Already received buffer
+data Client = Client { socket :: Handle -- ^ The socket of the client
+ , rbuf :: IORef B.ByteString -- ^ Already received buffer
}
-- | Connects to the master daemon and returns a luxi Client.
getClient :: String -> IO Client
getClient path = do
- s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
- withTimeout connTimeout "creating luxi connection" $
- S.connect s (S.SockAddrUnix path)
- rf <- newIORef ""
- return Client { socket=s, rbuf=rf}
+ s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
+ withTimeout luxiDefCtmo "creating luxi connection" $
+ S.connect s (S.SockAddrUnix path)
+ rf <- newIORef B.empty
+ h <- S.socketToHandle s ReadWriteMode
+ return Client { socket=h, rbuf=rf }
+
+-- | Creates and returns a server endpoint.
+getServer :: FilePath -> IO S.Socket
+getServer path = do
+ s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
+ S.bindSocket s (S.SockAddrUnix path)
+ S.listen s 5 -- 5 is the max backlog
+ return s
+
+-- | Closes a server endpoint.
+-- FIXME: this should be encapsulated into a nicer type.
+closeServer :: FilePath -> S.Socket -> IO ()
+closeServer path sock = do
+ S.sClose sock
+ removeFile path
+
+-- | Accepts a client
+acceptClient :: S.Socket -> IO Client
+acceptClient s = do
+ -- second return is the address of the client, which we ignore here
+ (client_socket, _) <- S.accept s
+ new_buffer <- newIORef B.empty
+ handle <- S.socketToHandle client_socket ReadWriteMode
+ return Client { socket=handle, rbuf=new_buffer }
-- | Closes the client socket.
closeClient :: Client -> IO ()
-closeClient = S.sClose . socket
+closeClient = hClose . socket
-- | Sends a message over a luxi transport.
sendMsg :: Client -> String -> IO ()
-sendMsg s buf =
- let _send obuf = do
- sbytes <- withTimeout queryTimeout
- "sending luxi message" $
- S.send (socket s) obuf
- unless (sbytes == length obuf) $ _send (drop sbytes obuf)
- in _send (buf ++ [eOM])
+sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
+ let encoded = UTF8.fromString buf
+ handle = socket s
+ B.hPut handle encoded
+ B.hPut handle bEOM
+ hFlush handle
+
+-- | Given a current buffer and the handle, it will read from the
+-- network until we get a full message, and it will return that
+-- message and the leftover buffer contents.
+recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
+recvUpdate handle obuf = do
+ nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
+ _ <- hWaitForInput handle (-1)
+ B.hGetNonBlocking handle 4096
+ let (msg, remaining) = B.break (eOM ==) nbuf
+ newbuf = B.append obuf msg
+ if B.null remaining
+ then recvUpdate handle newbuf
+ else return (newbuf, B.tail remaining)
-- | Waits for a message over a luxi transport.
recvMsg :: Client -> IO String
recvMsg s = do
- let _recv obuf = do
- nbuf <- withTimeout queryTimeout "reading luxi response" $
- S.recv (socket s) 4096
- let (msg, remaining) = break (eOM ==) nbuf
- (if null remaining
- then _recv (obuf ++ msg)
- else return (obuf ++ msg, tail remaining))
cbuf <- readIORef $ rbuf s
- let (imsg, ibuf) = break (eOM ==) cbuf
+ let (imsg, ibuf) = B.break (eOM ==) cbuf
(msg, nbuf) <-
- (if null ibuf -- if old buffer didn't contain a full message
- then _recv cbuf -- then we read from network
- else return (imsg, tail ibuf)) -- else we return data from our buffer
+ if B.null ibuf -- if old buffer didn't contain a full message
+ then recvUpdate (socket s) cbuf -- then we read from network
+ else return (imsg, B.tail ibuf) -- else we return data from our buffer
writeIORef (rbuf s) nbuf
- return msg
-
--- | Compute the serialized form of a Luxi operation
-opToArgs :: LuxiOp -> JSValue
-opToArgs (QueryNodes names fields lock) = J.showJSON (names, fields, lock)
-opToArgs (QueryGroups names fields lock) = J.showJSON (names, fields, lock)
-opToArgs (QueryInstances names fields lock) = J.showJSON (names, fields, lock)
-opToArgs (QueryJobs ids fields) = J.showJSON (map show ids, fields)
-opToArgs (QueryExports nodes lock) = J.showJSON (nodes, lock)
-opToArgs (QueryConfigValues fields) = J.showJSON fields
-opToArgs (QueryClusterInfo) = J.showJSON ()
-opToArgs (QueryTags kind name) = J.showJSON (kind, name)
-opToArgs (SubmitJob j) = J.showJSON j
-opToArgs (SubmitManyJobs ops) = J.showJSON ops
--- This is special, since the JSON library doesn't export an instance
--- of a 5-tuple
-opToArgs (WaitForJobChange a b c d e) =
- JSArray [ J.showJSON a, J.showJSON b, J.showJSON c
- , J.showJSON d, J.showJSON e]
-opToArgs (ArchiveJob a) = J.showJSON (show a)
-opToArgs (AutoArchiveJobs a b) = J.showJSON (a, b)
-opToArgs (CancelJob a) = J.showJSON (show a)
-opToArgs (SetDrainFlag flag) = J.showJSON flag
-opToArgs (SetWatcherPause duration) = J.showJSON [duration]
+ return $ UTF8.toString msg
+
+-- | Extended wrapper over recvMsg.
+recvMsgExt :: Client -> IO RecvResult
+recvMsgExt s =
+ catch (liftM RecvOk (recvMsg s)) $ \e ->
+ if isEOFError e
+ then return RecvConnClosed
+ else return $ RecvError (show e)
-- | Serialize a request to String.
buildCall :: LuxiOp -- ^ The method
-> String -- ^ The serialized form
buildCall lo =
- let ja = [ (strOfKey Method, JSString $ toJSString $ strOfOp lo::JSValue)
- , (strOfKey Args, opToArgs lo::JSValue)
- ]
- jo = toJSObject ja
- in encodeStrict jo
+ let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
+ , (strOfKey Args, opToArgs lo)
+ ]
+ jo = toJSObject ja
+ in encodeStrict jo
+
+-- | Serialize the response to String.
+buildResponse :: Bool -- ^ Success
+ -> JSValue -- ^ The arguments
+ -> String -- ^ The serialized form
+buildResponse success args =
+ let ja = [ (strOfKey Success, JSBool success)
+ , (strOfKey Result, args)]
+ jo = toJSObject ja
+ in encodeStrict jo
+
+-- | Check that luxi request contains the required keys and parse it.
+validateCall :: String -> Result LuxiCall
+validateCall s = do
+ arr <- fromJResult "parsing top-level luxi message" $
+ decodeStrict s::Result (JSObject JSValue)
+ let aobj = fromJSObject arr
+ call <- fromObj aobj (strOfKey Method)::Result LuxiReq
+ args <- fromObj aobj (strOfKey Args)
+ return (LuxiCall call args)
+
+-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
+--
+-- This is currently hand-coded until we make it more uniform so that
+-- it can be generated using TH.
+decodeCall :: LuxiCall -> Result LuxiOp
+decodeCall (LuxiCall call args) =
+ case call of
+ ReqQueryJobs -> do
+ (jid, jargs) <- fromJVal args
+ rid <- mapM parseJobId jid
+ let rargs = map fromJSString jargs
+ return $ QueryJobs rid rargs
+ ReqQueryInstances -> do
+ (names, fields, locking) <- fromJVal args
+ return $ QueryInstances names fields locking
+ ReqQueryNodes -> do
+ (names, fields, locking) <- fromJVal args
+ return $ QueryNodes names fields locking
+ ReqQueryGroups -> do
+ (names, fields, locking) <- fromJVal args
+ return $ QueryGroups names fields locking
+ ReqQueryClusterInfo ->
+ return QueryClusterInfo
+ ReqQuery -> do
+ (what, fields, qfilter) <- fromJVal args
+ return $ Query what fields qfilter
+ ReqQueryFields -> do
+ (what, fields) <- fromJVal args
+ fields' <- case fields of
+ JSNull -> return []
+ _ -> fromJVal fields
+ return $ QueryFields what fields'
+ ReqSubmitJob -> do
+ [ops1] <- fromJVal args
+ ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
+ return $ SubmitJob ops2
+ ReqSubmitManyJobs -> do
+ [ops1] <- fromJVal args
+ ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
+ return $ SubmitManyJobs ops2
+ ReqWaitForJobChange -> do
+ (jid, fields, pinfo, pidx, wtmout) <-
+ -- No instance for 5-tuple, code copied from the
+ -- json sources and adapted
+ fromJResult "Parsing WaitForJobChange message" $
+ case args of
+ JSArray [a, b, c, d, e] ->
+ (,,,,) `fmap`
+ J.readJSON a `ap`
+ J.readJSON b `ap`
+ J.readJSON c `ap`
+ J.readJSON d `ap`
+ J.readJSON e
+ _ -> J.Error "Not enough values"
+ rid <- parseJobId jid
+ return $ WaitForJobChange rid fields pinfo pidx wtmout
+ ReqArchiveJob -> do
+ [jid] <- fromJVal args
+ rid <- parseJobId jid
+ return $ ArchiveJob rid
+ ReqAutoArchiveJobs -> do
+ (age, tmout) <- fromJVal args
+ return $ AutoArchiveJobs age tmout
+ ReqQueryExports -> do
+ (nodes, lock) <- fromJVal args
+ return $ QueryExports nodes lock
+ ReqQueryConfigValues -> do
+ [fields] <- fromJVal args
+ return $ QueryConfigValues fields
+ ReqQueryTags -> do
+ (kind, name) <- fromJVal args
+ return $ QueryTags kind name
+ ReqCancelJob -> do
+ [job] <- fromJVal args
+ rid <- parseJobId job
+ return $ CancelJob rid
+ ReqSetDrainFlag -> do
+ [flag] <- fromJVal args
+ return $ SetDrainFlag flag
+ ReqSetWatcherPause -> do
+ [duration] <- fromJVal args
+ return $ SetWatcherPause duration
-- | Check that luxi responses contain the required keys and that the
-- call was successful.
validateResult :: String -> Result JSValue
validateResult s = do
+ when (UTF8.replacement_char `elem` s) $
+ fail "Failed to decode UTF-8, detected replacement char after decoding"
oarr <- fromJResult "Parsing LUXI response"
(decodeStrict s)::Result (JSObject JSValue)
let arr = J.fromJSObject oarr
status <- fromObj arr (strOfKey Success)::Result Bool
let rkey = strOfKey Result
- (if status
- then fromObj arr rkey
- else fromObj arr rkey >>= fail)
+ if status
+ then fromObj arr rkey
+ else fromObj arr rkey >>= fail
-- | Generic luxi method call.
callMethod :: LuxiOp -> Client -> IO (Result JSValue)
let rval = validateResult result
return rval
+-- | Parses a job ID.
+parseJobId :: JSValue -> Result JobId
+parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
+parseJobId (JSRational _ x) =
+ if denominator x /= 1
+ then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
+ -- FIXME: potential integer overflow here on 32-bit platforms
+ else Ok . fromIntegral . numerator $ x
+parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
+
+-- | Parse job submission result.
+parseSubmitJobResult :: JSValue -> Result JobId
+parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v
+parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
+ Bad (fromJSString x)
+parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++
+ show v
+
-- | Specialized submitManyJobs call.
-submitManyJobs :: Client -> [[OpCode]] -> IO (Result [String])
+submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId])
submitManyJobs s jobs = do
rval <- callMethod (SubmitManyJobs jobs) s
-- map each result (status, payload) pair into a nice Result ADT
return $ case rval of
Bad x -> Bad x
- Ok (JSArray r) ->
- mapM (\v -> case v of
- JSArray [JSBool True, JSString x] ->
- Ok (fromJSString x)
- JSArray [JSBool False, JSString x] ->
- Bad (fromJSString x)
- _ -> Bad "Unknown result from the master daemon"
- ) r
+ Ok (JSArray r) -> mapM parseSubmitJobResult r
x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
-- | Custom queryJobs call.
-queryJobsStatus :: Client -> [String] -> IO (Result [JobStatus])
+queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
queryJobsStatus s jids = do
- rval <- callMethod (QueryJobs (map read jids) ["status"]) s
+ rval <- callMethod (QueryJobs jids ["status"]) s
return $ case rval of
Bad x -> Bad x
Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of