module Ganeti.Luxi
( LuxiOp(..)
- , QrViaLuxi(..)
- , ResultStatus(..)
, LuxiReq(..)
, Client
, JobId
- , checkRS
+ , RecvResult(..)
+ , TagObject(..)
+ , strOfOp
, getClient
+ , getServer
+ , acceptClient
, closeClient
+ , closeServer
, callMethod
, submitManyJobs
, queryJobsStatus
, buildCall
+ , buildResponse
, validateCall
, decodeCall
+ , recvMsg
+ , recvMsgExt
+ , sendMsg
) where
+import Control.Exception (catch)
import Data.IORef
import Data.Ratio (numerator, denominator)
+import qualified Data.ByteString as B
+import qualified Data.ByteString.UTF8 as UTF8
+import Data.Word (Word8)
import Control.Monad
+import Prelude hiding (catch)
import Text.JSON (encodeStrict, decodeStrict)
import qualified Text.JSON as J
import Text.JSON.Types
+import System.Directory (removeFile)
+import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
+import System.IO.Error (isEOFError)
import System.Timeout
import qualified Network.Socket as S
-import Ganeti.HTools.JSON
+import Ganeti.JSON
import Ganeti.HTools.Types
import Ganeti.HTools.Utils
import Ganeti.Constants
import Ganeti.Jobs (JobStatus)
import Ganeti.OpCodes (OpCode)
+import qualified Ganeti.Query.Language as Qlang
import Ganeti.THH
-- * Utility functions
-- * Generic protocol functionality
+-- | Result of receiving a message from the socket.
+data RecvResult = RecvConnClosed -- ^ Connection closed
+ | RecvError String -- ^ Any other error
+ | RecvOk String -- ^ Successfull receive
+ deriving (Show, Read, Eq)
+
-- | The Ganeti job type.
type JobId = Int
-$(declareSADT "QrViaLuxi"
- [ ("QRLock", 'qrLock)
- , ("QRInstance", 'qrInstance)
- , ("QRNode", 'qrNode)
- , ("QRGroup", 'qrGroup)
- , ("QROs", 'qrOs)
+-- | Data type representing what items do the tag operations apply to.
+$(declareSADT "TagObject"
+ [ ("TagInstance", 'tagInstance)
+ , ("TagNode", 'tagNode)
+ , ("TagGroup", 'tagNodegroup)
+ , ("TagCluster", 'tagCluster)
])
-$(makeJSONInstance ''QrViaLuxi)
+$(makeJSONInstance ''TagObject)
-- | Currently supported Luxi operations and JSON serialization.
$(genLuxiOp "LuxiOp"
- [(luxiReqQuery,
- [ ("what", [t| QrViaLuxi |], [| id |])
- , ("fields", [t| [String] |], [| id |])
- , ("qfilter", [t| () |], [| const JSNull |])
+ [ (luxiReqQuery,
+ [ ("what", [t| Qlang.ItemType |])
+ , ("fields", [t| [String] |])
+ , ("qfilter", [t| Qlang.Filter Qlang.FilterField |])
+ ])
+ , (luxiReqQueryFields,
+ [ ("what", [t| Qlang.ItemType |])
+ , ("fields", [t| [String] |])
])
, (luxiReqQueryNodes,
- [ ("names", [t| [String] |], [| id |])
- , ("fields", [t| [String] |], [| id |])
- , ("lock", [t| Bool |], [| id |])
+ [ ("names", [t| [String] |])
+ , ("fields", [t| [String] |])
+ , ("lock", [t| Bool |])
])
, (luxiReqQueryGroups,
- [ ("names", [t| [String] |], [| id |])
- , ("fields", [t| [String] |], [| id |])
- , ("lock", [t| Bool |], [| id |])
+ [ ("names", [t| [String] |])
+ , ("fields", [t| [String] |])
+ , ("lock", [t| Bool |])
])
, (luxiReqQueryInstances,
- [ ("names", [t| [String] |], [| id |])
- , ("fields", [t| [String] |], [| id |])
- , ("lock", [t| Bool |], [| id |])
+ [ ("names", [t| [String] |])
+ , ("fields", [t| [String] |])
+ , ("lock", [t| Bool |])
])
, (luxiReqQueryJobs,
- [ ("ids", [t| [Int] |], [| id |])
- , ("fields", [t| [String] |], [| id |])
+ [ ("ids", [t| [Int] |])
+ , ("fields", [t| [String] |])
])
, (luxiReqQueryExports,
- [ ("nodes", [t| [String] |], [| id |])
- , ("lock", [t| Bool |], [| id |])
+ [ ("nodes", [t| [String] |])
+ , ("lock", [t| Bool |])
])
, (luxiReqQueryConfigValues,
- [ ("fields", [t| [String] |], [| id |]) ]
+ [ ("fields", [t| [String] |]) ]
)
, (luxiReqQueryClusterInfo, [])
, (luxiReqQueryTags,
- [ ("kind", [t| String |], [| id |])
- , ("name", [t| String |], [| id |])
+ [ ("kind", [t| TagObject |])
+ , ("name", [t| String |])
])
, (luxiReqSubmitJob,
- [ ("job", [t| [OpCode] |], [| id |]) ]
+ [ ("job", [t| [OpCode] |]) ]
)
, (luxiReqSubmitManyJobs,
- [ ("ops", [t| [[OpCode]] |], [| id |]) ]
+ [ ("ops", [t| [[OpCode]] |]) ]
)
, (luxiReqWaitForJobChange,
- [ ("job", [t| Int |], [| id |])
- , ("fields", [t| [String]|], [| id |])
- , ("prev_job", [t| JSValue |], [| id |])
- , ("prev_log", [t| JSValue |], [| id |])
- , ("tmout", [t| Int |], [| id |])
+ [ ("job", [t| Int |])
+ , ("fields", [t| [String]|])
+ , ("prev_job", [t| JSValue |])
+ , ("prev_log", [t| JSValue |])
+ , ("tmout", [t| Int |])
])
, (luxiReqArchiveJob,
- [ ("job", [t| Int |], [| id |]) ]
+ [ ("job", [t| Int |]) ]
)
, (luxiReqAutoArchiveJobs,
- [ ("age", [t| Int |], [| id |])
- , ("tmout", [t| Int |], [| id |])
+ [ ("age", [t| Int |])
+ , ("tmout", [t| Int |])
])
, (luxiReqCancelJob,
- [ ("job", [t| Int |], [| id |]) ]
+ [ ("job", [t| Int |]) ]
)
, (luxiReqSetDrainFlag,
- [ ("flag", [t| Bool |], [| id |]) ]
+ [ ("flag", [t| Bool |]) ]
)
, (luxiReqSetWatcherPause,
- [ ("duration", [t| Double |], [| id |]) ]
+ [ ("duration", [t| Double |]) ]
)
])
-- | The serialisation of LuxiOps into strings in messages.
$(genStrOfOp ''LuxiOp "strOfOp")
-$(declareIADT "ResultStatus"
- [ ("RSNormal", 'rsNormal)
- , ("RSUnknown", 'rsUnknown)
- , ("RSNoData", 'rsNodata)
- , ("RSUnavailable", 'rsUnavail)
- , ("RSOffline", 'rsOffline)
- ])
-
-$(makeJSONInstance ''ResultStatus)
-
-- | Type holding the initial (unparsed) Luxi call.
data LuxiCall = LuxiCall LuxiReq JSValue
--- | Check that ResultStatus is success or fail with descriptive message.
-checkRS :: (Monad m) => ResultStatus -> a -> m a
-checkRS RSNormal val = return val
-checkRS RSUnknown _ = fail "Unknown field"
-checkRS RSNoData _ = fail "No data for a field"
-checkRS RSUnavailable _ = fail "Ganeti reports unavailable data"
-checkRS RSOffline _ = fail "Ganeti reports resource as offline"
-
-- | The end-of-message separator.
-eOM :: Char
-eOM = '\3'
+eOM :: Word8
+eOM = 3
+
+-- | The end-of-message encoded as a ByteString.
+bEOM :: B.ByteString
+bEOM = B.singleton eOM
-- | Valid keys in the requests and responses.
data MsgKeys = Method
$(genStrOfKey ''MsgKeys "strOfKey")
-- | Luxi client encapsulation.
-data Client = Client { socket :: S.Socket -- ^ The socket of the client
- , rbuf :: IORef String -- ^ Already received buffer
+data Client = Client { socket :: Handle -- ^ The socket of the client
+ , rbuf :: IORef B.ByteString -- ^ Already received buffer
}
-- | Connects to the master daemon and returns a luxi Client.
s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
withTimeout connTimeout "creating luxi connection" $
S.connect s (S.SockAddrUnix path)
- rf <- newIORef ""
- return Client { socket=s, rbuf=rf}
+ rf <- newIORef B.empty
+ h <- S.socketToHandle s ReadWriteMode
+ return Client { socket=h, rbuf=rf }
+
+-- | Creates and returns a server endpoint.
+getServer :: FilePath -> IO S.Socket
+getServer path = do
+ s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
+ S.bindSocket s (S.SockAddrUnix path)
+ S.listen s 5 -- 5 is the max backlog
+ return s
+
+-- | Closes a server endpoint.
+-- FIXME: this should be encapsulated into a nicer type.
+closeServer :: FilePath -> S.Socket -> IO ()
+closeServer path sock = do
+ S.sClose sock
+ removeFile path
+
+-- | Accepts a client
+acceptClient :: S.Socket -> IO Client
+acceptClient s = do
+ -- second return is the address of the client, which we ignore here
+ (client_socket, _) <- S.accept s
+ new_buffer <- newIORef B.empty
+ handle <- S.socketToHandle client_socket ReadWriteMode
+ return Client { socket=handle, rbuf=new_buffer }
-- | Closes the client socket.
closeClient :: Client -> IO ()
-closeClient = S.sClose . socket
+closeClient = hClose . socket
-- | Sends a message over a luxi transport.
sendMsg :: Client -> String -> IO ()
-sendMsg s buf =
- let _send obuf = do
- sbytes <- withTimeout queryTimeout
- "sending luxi message" $
- S.send (socket s) obuf
- unless (sbytes == length obuf) $ _send (drop sbytes obuf)
- in _send (buf ++ [eOM])
+sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
+ let encoded = UTF8.fromString buf
+ handle = socket s
+ B.hPut handle encoded
+ B.hPut handle bEOM
+ hFlush handle
+
+-- | Given a current buffer and the handle, it will read from the
+-- network until we get a full message, and it will return that
+-- message and the leftover buffer contents.
+recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
+recvUpdate handle obuf = do
+ nbuf <- withTimeout queryTimeout "reading luxi response" $ do
+ _ <- hWaitForInput handle (-1)
+ B.hGetNonBlocking handle 4096
+ let (msg, remaining) = B.break (eOM ==) nbuf
+ newbuf = B.append obuf msg
+ if B.null remaining
+ then recvUpdate handle newbuf
+ else return (newbuf, B.tail remaining)
-- | Waits for a message over a luxi transport.
recvMsg :: Client -> IO String
recvMsg s = do
- let _recv obuf = do
- nbuf <- withTimeout queryTimeout "reading luxi response" $
- S.recv (socket s) 4096
- let (msg, remaining) = break (eOM ==) nbuf
- if null remaining
- then _recv (obuf ++ msg)
- else return (obuf ++ msg, tail remaining)
cbuf <- readIORef $ rbuf s
- let (imsg, ibuf) = break (eOM ==) cbuf
+ let (imsg, ibuf) = B.break (eOM ==) cbuf
(msg, nbuf) <-
- if null ibuf -- if old buffer didn't contain a full message
- then _recv cbuf -- then we read from network
- else return (imsg, tail ibuf) -- else we return data from our buffer
+ if B.null ibuf -- if old buffer didn't contain a full message
+ then recvUpdate (socket s) cbuf -- then we read from network
+ else return (imsg, B.tail ibuf) -- else we return data from our buffer
writeIORef (rbuf s) nbuf
- return msg
+ return $ UTF8.toString msg
+
+-- | Extended wrapper over recvMsg.
+recvMsgExt :: Client -> IO RecvResult
+recvMsgExt s =
+ catch (liftM RecvOk (recvMsg s)) $ \e ->
+ if isEOFError e
+ then return RecvConnClosed
+ else return $ RecvError (show e)
-- | Serialize a request to String.
buildCall :: LuxiOp -- ^ The method
-> String -- ^ The serialized form
buildCall lo =
- let ja = [ (strOfKey Method, JSString $ toJSString $ strOfOp lo::JSValue)
- , (strOfKey Args, opToArgs lo::JSValue)
+ let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
+ , (strOfKey Args, opToArgs lo)
]
jo = toJSObject ja
in encodeStrict jo
+-- | Serialize the response to String.
+buildResponse :: Bool -- ^ Success
+ -> JSValue -- ^ The arguments
+ -> String -- ^ The serialized form
+buildResponse success args =
+ let ja = [ (strOfKey Success, JSBool success)
+ , (strOfKey Result, args)]
+ jo = toJSObject ja
+ in encodeStrict jo
+
-- | Check that luxi request contains the required keys and parse it.
validateCall :: String -> Result LuxiCall
validateCall s = do
- arr <- fromJResult "luxi call" $ decodeStrict s::Result (JSObject JSValue)
+ arr <- fromJResult "parsing top-level luxi message" $
+ decodeStrict s::Result (JSObject JSValue)
let aobj = fromJSObject arr
call <- fromObj aobj (strOfKey Method)::Result LuxiReq
args <- fromObj aobj (strOfKey Args)
ReqQueryGroups -> do
(names, fields, locking) <- fromJVal args
return $ QueryGroups names fields locking
- ReqQueryClusterInfo -> do
+ ReqQueryClusterInfo ->
return QueryClusterInfo
ReqQuery -> do
- (what, fields, _) <-
- fromJVal args::Result (QrViaLuxi, [String], JSValue)
- return $ Query what fields ()
+ (what, fields, qfilter) <- fromJVal args
+ return $ Query what fields qfilter
+ ReqQueryFields -> do
+ (what, fields) <- fromJVal args
+ fields' <- case fields of
+ JSNull -> return []
+ _ -> fromJVal fields
+ return $ QueryFields what fields'
ReqSubmitJob -> do
[ops1] <- fromJVal args
ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
-- call was successful.
validateResult :: String -> Result JSValue
validateResult s = do
+ when (UTF8.replacement_char `elem` s) $
+ fail "Failed to decode UTF-8, detected replacement char after decoding"
oarr <- fromJResult "Parsing LUXI response"
(decodeStrict s)::Result (JSObject JSValue)
let arr = J.fromJSObject oarr