X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/76b620280a38d782da0c277974cce6cd459546bf..3329f4dea673d11088966e0f991e3a40f9506206:/htools/Ganeti/Luxi.hs?ds=sidebyside diff --git a/htools/Ganeti/Luxi.hs b/htools/Ganeti/Luxi.hs index d8c243c..16fb27d 100644 --- a/htools/Ganeti/Luxi.hs +++ b/htools/Ganeti/Luxi.hs @@ -27,38 +27,54 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA module Ganeti.Luxi ( LuxiOp(..) - , QrViaLuxi(..) - , ResultStatus(..) , LuxiReq(..) , Client , JobId - , checkRS + , RecvResult(..) + , TagObject(..) + , strOfOp , getClient + , getServer + , acceptClient , closeClient + , closeServer , callMethod , submitManyJobs , queryJobsStatus , buildCall + , buildResponse , validateCall , decodeCall + , recvMsg + , recvMsgExt + , sendMsg ) where +import Control.Exception (catch) import Data.IORef import Data.Ratio (numerator, denominator) +import qualified Data.ByteString as B +import qualified Data.ByteString.UTF8 as UTF8 +import Data.Word (Word8) import Control.Monad +import Prelude hiding (catch) import Text.JSON (encodeStrict, decodeStrict) import qualified Text.JSON as J import Text.JSON.Types +import System.Directory (removeFile) +import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..)) +import System.IO.Error (isEOFError) import System.Timeout import qualified Network.Socket as S -import Ganeti.HTools.JSON +import Ganeti.JSON import Ganeti.HTools.Types import Ganeti.HTools.Utils import Ganeti.Constants import Ganeti.Jobs (JobStatus) import Ganeti.OpCodes (OpCode) +import qualified Ganeti.Query.Language as Qlang import Ganeti.THH -- * Utility functions @@ -73,84 +89,94 @@ withTimeout secs descr action = do -- * Generic protocol functionality +-- | Result of receiving a message from the socket. +data RecvResult = RecvConnClosed -- ^ Connection closed + | RecvError String -- ^ Any other error + | RecvOk String -- ^ Successfull receive + deriving (Show, Read, Eq) + -- | The Ganeti job type. type JobId = Int -$(declareSADT "QrViaLuxi" - [ ("QRLock", 'qrLock) - , ("QRInstance", 'qrInstance) - , ("QRNode", 'qrNode) - , ("QRGroup", 'qrGroup) - , ("QROs", 'qrOs) +-- | Data type representing what items do the tag operations apply to. +$(declareSADT "TagObject" + [ ("TagInstance", 'tagInstance) + , ("TagNode", 'tagNode) + , ("TagGroup", 'tagNodegroup) + , ("TagCluster", 'tagCluster) ]) -$(makeJSONInstance ''QrViaLuxi) +$(makeJSONInstance ''TagObject) -- | Currently supported Luxi operations and JSON serialization. $(genLuxiOp "LuxiOp" - [(luxiReqQuery, - [ ("what", [t| QrViaLuxi |], [| id |]) - , ("fields", [t| [String] |], [| id |]) - , ("qfilter", [t| () |], [| const JSNull |]) + [ (luxiReqQuery, + [ ("what", [t| Qlang.ItemType |]) + , ("fields", [t| [String] |]) + , ("qfilter", [t| Qlang.Filter Qlang.FilterField |]) + ]) + , (luxiReqQueryFields, + [ ("what", [t| Qlang.ItemType |]) + , ("fields", [t| [String] |]) ]) , (luxiReqQueryNodes, - [ ("names", [t| [String] |], [| id |]) - , ("fields", [t| [String] |], [| id |]) - , ("lock", [t| Bool |], [| id |]) + [ ("names", [t| [String] |]) + , ("fields", [t| [String] |]) + , ("lock", [t| Bool |]) ]) , (luxiReqQueryGroups, - [ ("names", [t| [String] |], [| id |]) - , ("fields", [t| [String] |], [| id |]) - , ("lock", [t| Bool |], [| id |]) + [ ("names", [t| [String] |]) + , ("fields", [t| [String] |]) + , ("lock", [t| Bool |]) ]) , (luxiReqQueryInstances, - [ ("names", [t| [String] |], [| id |]) - , ("fields", [t| [String] |], [| id |]) - , ("lock", [t| Bool |], [| id |]) + [ ("names", [t| [String] |]) + , ("fields", [t| [String] |]) + , ("lock", [t| Bool |]) ]) , (luxiReqQueryJobs, - [ ("ids", [t| [Int] |], [| id |]) - , ("fields", [t| [String] |], [| id |]) + [ ("ids", [t| [Int] |]) + , ("fields", [t| [String] |]) ]) , (luxiReqQueryExports, - [ ("nodes", [t| [String] |], [| id |]) - , ("lock", [t| Bool |], [| id |]) + [ ("nodes", [t| [String] |]) + , ("lock", [t| Bool |]) ]) , (luxiReqQueryConfigValues, - [ ("fields", [t| [String] |], [| id |]) ] + [ ("fields", [t| [String] |]) ] ) , (luxiReqQueryClusterInfo, []) , (luxiReqQueryTags, - [ ("kind", [t| String |], [| id |]) - , ("name", [t| String |], [| id |]) + [ ("kind", [t| TagObject |]) + , ("name", [t| String |]) ]) , (luxiReqSubmitJob, - [ ("job", [t| [OpCode] |], [| id |]) ] + [ ("job", [t| [OpCode] |]) ] ) , (luxiReqSubmitManyJobs, - [ ("ops", [t| [[OpCode]] |], [| id |]) ] + [ ("ops", [t| [[OpCode]] |]) ] ) , (luxiReqWaitForJobChange, - [ ("job", [t| Int |], [| id |]) - , ("fields", [t| [String]|], [| id |]) - , ("prev_job", [t| JSValue |], [| id |]) - , ("prev_log", [t| JSValue |], [| id |]) - , ("tmout", [t| Int |], [| id |]) + [ ("job", [t| Int |]) + , ("fields", [t| [String]|]) + , ("prev_job", [t| JSValue |]) + , ("prev_log", [t| JSValue |]) + , ("tmout", [t| Int |]) ]) , (luxiReqArchiveJob, - [ ("job", [t| Int |], [| id |]) ] + [ ("job", [t| Int |]) ] ) , (luxiReqAutoArchiveJobs, - [ ("age", [t| Int |], [| id |]) - , ("tmout", [t| Int |], [| id |]) + [ ("age", [t| Int |]) + , ("tmout", [t| Int |]) ]) , (luxiReqCancelJob, - [ ("job", [t| Int |], [| id |]) ] + [ ("job", [t| Int |]) ] ) , (luxiReqSetDrainFlag, - [ ("flag", [t| Bool |], [| id |]) ] + [ ("flag", [t| Bool |]) ] ) , (luxiReqSetWatcherPause, - [ ("duration", [t| Double |], [| id |]) ] + [ ("duration", [t| Double |]) ] ) ]) @@ -159,30 +185,16 @@ $(makeJSONInstance ''LuxiReq) -- | The serialisation of LuxiOps into strings in messages. $(genStrOfOp ''LuxiOp "strOfOp") -$(declareIADT "ResultStatus" - [ ("RSNormal", 'rsNormal) - , ("RSUnknown", 'rsUnknown) - , ("RSNoData", 'rsNodata) - , ("RSUnavailable", 'rsUnavail) - , ("RSOffline", 'rsOffline) - ]) - -$(makeJSONInstance ''ResultStatus) - -- | Type holding the initial (unparsed) Luxi call. data LuxiCall = LuxiCall LuxiReq JSValue --- | Check that ResultStatus is success or fail with descriptive message. -checkRS :: (Monad m) => ResultStatus -> a -> m a -checkRS RSNormal val = return val -checkRS RSUnknown _ = fail "Unknown field" -checkRS RSNoData _ = fail "No data for a field" -checkRS RSUnavailable _ = fail "Ganeti reports unavailable data" -checkRS RSOffline _ = fail "Ganeti reports resource as offline" - -- | The end-of-message separator. -eOM :: Char -eOM = '\3' +eOM :: Word8 +eOM = 3 + +-- | The end-of-message encoded as a ByteString. +bEOM :: B.ByteString +bEOM = B.singleton eOM -- | Valid keys in the requests and responses. data MsgKeys = Method @@ -194,8 +206,8 @@ data MsgKeys = Method $(genStrOfKey ''MsgKeys "strOfKey") -- | Luxi client encapsulation. -data Client = Client { socket :: S.Socket -- ^ The socket of the client - , rbuf :: IORef String -- ^ Already received buffer +data Client = Client { socket :: Handle -- ^ The socket of the client + , rbuf :: IORef B.ByteString -- ^ Already received buffer } -- | Connects to the master daemon and returns a luxi Client. @@ -204,56 +216,106 @@ getClient path = do s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol withTimeout connTimeout "creating luxi connection" $ S.connect s (S.SockAddrUnix path) - rf <- newIORef "" - return Client { socket=s, rbuf=rf} + rf <- newIORef B.empty + h <- S.socketToHandle s ReadWriteMode + return Client { socket=h, rbuf=rf } + +-- | Creates and returns a server endpoint. +getServer :: FilePath -> IO S.Socket +getServer path = do + s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol + S.bindSocket s (S.SockAddrUnix path) + S.listen s 5 -- 5 is the max backlog + return s + +-- | Closes a server endpoint. +-- FIXME: this should be encapsulated into a nicer type. +closeServer :: FilePath -> S.Socket -> IO () +closeServer path sock = do + S.sClose sock + removeFile path + +-- | Accepts a client +acceptClient :: S.Socket -> IO Client +acceptClient s = do + -- second return is the address of the client, which we ignore here + (client_socket, _) <- S.accept s + new_buffer <- newIORef B.empty + handle <- S.socketToHandle client_socket ReadWriteMode + return Client { socket=handle, rbuf=new_buffer } -- | Closes the client socket. closeClient :: Client -> IO () -closeClient = S.sClose . socket +closeClient = hClose . socket -- | Sends a message over a luxi transport. sendMsg :: Client -> String -> IO () -sendMsg s buf = - let _send obuf = do - sbytes <- withTimeout queryTimeout - "sending luxi message" $ - S.send (socket s) obuf - unless (sbytes == length obuf) $ _send (drop sbytes obuf) - in _send (buf ++ [eOM]) +sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do + let encoded = UTF8.fromString buf + handle = socket s + B.hPut handle encoded + B.hPut handle bEOM + hFlush handle + +-- | Given a current buffer and the handle, it will read from the +-- network until we get a full message, and it will return that +-- message and the leftover buffer contents. +recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString) +recvUpdate handle obuf = do + nbuf <- withTimeout queryTimeout "reading luxi response" $ do + _ <- hWaitForInput handle (-1) + B.hGetNonBlocking handle 4096 + let (msg, remaining) = B.break (eOM ==) nbuf + newbuf = B.append obuf msg + if B.null remaining + then recvUpdate handle newbuf + else return (newbuf, B.tail remaining) -- | Waits for a message over a luxi transport. recvMsg :: Client -> IO String recvMsg s = do - let _recv obuf = do - nbuf <- withTimeout queryTimeout "reading luxi response" $ - S.recv (socket s) 4096 - let (msg, remaining) = break (eOM ==) nbuf - if null remaining - then _recv (obuf ++ msg) - else return (obuf ++ msg, tail remaining) cbuf <- readIORef $ rbuf s - let (imsg, ibuf) = break (eOM ==) cbuf + let (imsg, ibuf) = B.break (eOM ==) cbuf (msg, nbuf) <- - if null ibuf -- if old buffer didn't contain a full message - then _recv cbuf -- then we read from network - else return (imsg, tail ibuf) -- else we return data from our buffer + if B.null ibuf -- if old buffer didn't contain a full message + then recvUpdate (socket s) cbuf -- then we read from network + else return (imsg, B.tail ibuf) -- else we return data from our buffer writeIORef (rbuf s) nbuf - return msg + return $ UTF8.toString msg + +-- | Extended wrapper over recvMsg. +recvMsgExt :: Client -> IO RecvResult +recvMsgExt s = + catch (liftM RecvOk (recvMsg s)) $ \e -> + if isEOFError e + then return RecvConnClosed + else return $ RecvError (show e) -- | Serialize a request to String. buildCall :: LuxiOp -- ^ The method -> String -- ^ The serialized form buildCall lo = - let ja = [ (strOfKey Method, JSString $ toJSString $ strOfOp lo::JSValue) - , (strOfKey Args, opToArgs lo::JSValue) + let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo) + , (strOfKey Args, opToArgs lo) ] jo = toJSObject ja in encodeStrict jo +-- | Serialize the response to String. +buildResponse :: Bool -- ^ Success + -> JSValue -- ^ The arguments + -> String -- ^ The serialized form +buildResponse success args = + let ja = [ (strOfKey Success, JSBool success) + , (strOfKey Result, args)] + jo = toJSObject ja + in encodeStrict jo + -- | Check that luxi request contains the required keys and parse it. validateCall :: String -> Result LuxiCall validateCall s = do - arr <- fromJResult "luxi call" $ decodeStrict s::Result (JSObject JSValue) + arr <- fromJResult "parsing top-level luxi message" $ + decodeStrict s::Result (JSObject JSValue) let aobj = fromJSObject arr call <- fromObj aobj (strOfKey Method)::Result LuxiReq args <- fromObj aobj (strOfKey Args) @@ -280,12 +342,17 @@ decodeCall (LuxiCall call args) = ReqQueryGroups -> do (names, fields, locking) <- fromJVal args return $ QueryGroups names fields locking - ReqQueryClusterInfo -> do + ReqQueryClusterInfo -> return QueryClusterInfo ReqQuery -> do - (what, fields, _) <- - fromJVal args::Result (QrViaLuxi, [String], JSValue) - return $ Query what fields () + (what, fields, qfilter) <- fromJVal args + return $ Query what fields qfilter + ReqQueryFields -> do + (what, fields) <- fromJVal args + fields' <- case fields of + JSNull -> return [] + _ -> fromJVal fields + return $ QueryFields what fields' ReqSubmitJob -> do [ops1] <- fromJVal args ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1 @@ -341,6 +408,8 @@ decodeCall (LuxiCall call args) = -- call was successful. validateResult :: String -> Result JSValue validateResult s = do + when (UTF8.replacement_char `elem` s) $ + fail "Failed to decode UTF-8, detected replacement char after decoding" oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)::Result (JSObject JSValue) let arr = J.fromJSObject oarr