X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/260d0bda2674c313a34c1019703e3ed31b673542..3329f4dea673d11088966e0f991e3a40f9506206:/htools/Ganeti/Luxi.hs diff --git a/htools/Ganeti/Luxi.hs b/htools/Ganeti/Luxi.hs index ade6c22..16fb27d 100644 --- a/htools/Ganeti/Luxi.hs +++ b/htools/Ganeti/Luxi.hs @@ -6,7 +6,7 @@ {- -Copyright (C) 2009, 2010, 2011 Google Inc. +Copyright (C) 2009, 2010, 2011, 2012 Google Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -26,32 +26,55 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA -} module Ganeti.Luxi - ( LuxiOp(..) - , QrViaLuxi(..) - , ResultStatus(..) - , Client - , checkRS - , getClient - , closeClient - , callMethod - , submitManyJobs - , queryJobsStatus - ) where - + ( LuxiOp(..) + , LuxiReq(..) + , Client + , JobId + , RecvResult(..) + , TagObject(..) + , strOfOp + , getClient + , getServer + , acceptClient + , closeClient + , closeServer + , callMethod + , submitManyJobs + , queryJobsStatus + , buildCall + , buildResponse + , validateCall + , decodeCall + , recvMsg + , recvMsgExt + , sendMsg + ) where + +import Control.Exception (catch) import Data.IORef +import Data.Ratio (numerator, denominator) +import qualified Data.ByteString as B +import qualified Data.ByteString.UTF8 as UTF8 +import Data.Word (Word8) import Control.Monad +import Prelude hiding (catch) import Text.JSON (encodeStrict, decodeStrict) import qualified Text.JSON as J import Text.JSON.Types +import System.Directory (removeFile) +import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..)) +import System.IO.Error (isEOFError) import System.Timeout import qualified Network.Socket as S -import Ganeti.HTools.Utils +import Ganeti.JSON import Ganeti.HTools.Types +import Ganeti.HTools.Utils import Ganeti.Constants import Ganeti.Jobs (JobStatus) import Ganeti.OpCodes (OpCode) +import qualified Ganeti.Query.Language as Qlang import Ganeti.THH -- * Utility functions @@ -59,114 +82,119 @@ import Ganeti.THH -- | Wrapper over System.Timeout.timeout that fails in the IO monad. withTimeout :: Int -> String -> IO a -> IO a withTimeout secs descr action = do - result <- timeout (secs * 1000000) action - (case result of - Nothing -> fail $ "Timeout in " ++ descr - Just v -> return v) + result <- timeout (secs * 1000000) action + case result of + Nothing -> fail $ "Timeout in " ++ descr + Just v -> return v -- * Generic protocol functionality -$(declareSADT "QrViaLuxi" - [ ("QRLock", 'qrLock) - , ("QRInstance", 'qrInstance) - , ("QRNode", 'qrNode) - , ("QRGroup", 'qrGroup) - , ("QROs", 'qrOs) - ]) -$(makeJSONInstance ''QrViaLuxi) +-- | Result of receiving a message from the socket. +data RecvResult = RecvConnClosed -- ^ Connection closed + | RecvError String -- ^ Any other error + | RecvOk String -- ^ Successfull receive + deriving (Show, Read, Eq) + +-- | The Ganeti job type. +type JobId = Int + +-- | Data type representing what items do the tag operations apply to. +$(declareSADT "TagObject" + [ ("TagInstance", 'tagInstance) + , ("TagNode", 'tagNode) + , ("TagGroup", 'tagNodegroup) + , ("TagCluster", 'tagCluster) + ]) +$(makeJSONInstance ''TagObject) -- | Currently supported Luxi operations and JSON serialization. $(genLuxiOp "LuxiOp" - [("Query" , - [ ("what", [t| QrViaLuxi |], [| id |]) - , ("fields", [t| [String] |], [| id |]) - , ("qfilter", [t| () |], [| const JSNull |]) - ]) - , ("QueryNodes", - [ ("names", [t| [String] |], [| id |]) - , ("fields", [t| [String] |], [| id |]) - , ("lock", [t| Bool |], [| id |]) - ]) - , ("QueryGroups", - [ ("names", [t| [String] |], [| id |]) - , ("fields", [t| [String] |], [| id |]) - , ("lock", [t| Bool |], [| id |]) - ]) - , ("QueryInstances", - [ ("names", [t| [String] |], [| id |]) - , ("fields", [t| [String] |], [| id |]) - , ("lock", [t| Bool |], [| id |]) - ]) - , ("QueryJobs", - [ ("ids", [t| [Int] |], [| map show |]) - , ("fields", [t| [String] |], [| id |]) - ]) - , ("QueryExports", - [ ("nodes", [t| [String] |], [| id |]) - , ("lock", [t| Bool |], [| id |]) - ]) - , ("QueryConfigValues", - [ ("fields", [t| [String] |], [| id |]) ] - ) - , ("QueryClusterInfo", []) - , ("QueryTags", - [ ("kind", [t| String |], [| id |]) - , ("name", [t| String |], [| id |]) - ]) - , ("SubmitJob", - [ ("job", [t| [OpCode] |], [| id |]) ] - ) - , ("SubmitManyJobs", - [ ("ops", [t| [[OpCode]] |], [| id |]) ] - ) - , ("WaitForJobChange", - [ ("job", [t| Int |], [| id |]) - , ("fields", [t| [String]|], [| id |]) - , ("prev_job", [t| JSValue |], [| id |]) - , ("prev_log", [t| JSValue |], [| id |]) - , ("tmout", [t| Int |], [| id |]) - ]) - , ("ArchiveJob", - [ ("job", [t| Int |], [| show |]) ] - ) - , ("AutoArchiveJobs", - [ ("age", [t| Int |], [| id |]) - , ("tmout", [t| Int |], [| id |]) - ]) - , ("CancelJob", - [ ("job", [t| Int |], [| show |]) ] - ) - , ("SetDrainFlag", - [ ("flag", [t| Bool |], [| id |]) ] - ) - , ("SetWatcherPause", - [ ("duration", [t| Double |], [| id |]) ] - ) + [ (luxiReqQuery, + [ ("what", [t| Qlang.ItemType |]) + , ("fields", [t| [String] |]) + , ("qfilter", [t| Qlang.Filter Qlang.FilterField |]) + ]) + , (luxiReqQueryFields, + [ ("what", [t| Qlang.ItemType |]) + , ("fields", [t| [String] |]) + ]) + , (luxiReqQueryNodes, + [ ("names", [t| [String] |]) + , ("fields", [t| [String] |]) + , ("lock", [t| Bool |]) + ]) + , (luxiReqQueryGroups, + [ ("names", [t| [String] |]) + , ("fields", [t| [String] |]) + , ("lock", [t| Bool |]) + ]) + , (luxiReqQueryInstances, + [ ("names", [t| [String] |]) + , ("fields", [t| [String] |]) + , ("lock", [t| Bool |]) + ]) + , (luxiReqQueryJobs, + [ ("ids", [t| [Int] |]) + , ("fields", [t| [String] |]) + ]) + , (luxiReqQueryExports, + [ ("nodes", [t| [String] |]) + , ("lock", [t| Bool |]) + ]) + , (luxiReqQueryConfigValues, + [ ("fields", [t| [String] |]) ] + ) + , (luxiReqQueryClusterInfo, []) + , (luxiReqQueryTags, + [ ("kind", [t| TagObject |]) + , ("name", [t| String |]) + ]) + , (luxiReqSubmitJob, + [ ("job", [t| [OpCode] |]) ] + ) + , (luxiReqSubmitManyJobs, + [ ("ops", [t| [[OpCode]] |]) ] + ) + , (luxiReqWaitForJobChange, + [ ("job", [t| Int |]) + , ("fields", [t| [String]|]) + , ("prev_job", [t| JSValue |]) + , ("prev_log", [t| JSValue |]) + , ("tmout", [t| Int |]) + ]) + , (luxiReqArchiveJob, + [ ("job", [t| Int |]) ] + ) + , (luxiReqAutoArchiveJobs, + [ ("age", [t| Int |]) + , ("tmout", [t| Int |]) + ]) + , (luxiReqCancelJob, + [ ("job", [t| Int |]) ] + ) + , (luxiReqSetDrainFlag, + [ ("flag", [t| Bool |]) ] + ) + , (luxiReqSetWatcherPause, + [ ("duration", [t| Double |]) ] + ) ]) +$(makeJSONInstance ''LuxiReq) + -- | The serialisation of LuxiOps into strings in messages. $(genStrOfOp ''LuxiOp "strOfOp") -$(declareIADT "ResultStatus" - [ ("RSNormal", 'rsNormal) - , ("RSUnknown", 'rsUnknown) - , ("RSNoData", 'rsNodata) - , ("RSUnavailable", 'rsUnavail) - , ("RSOffline", 'rsOffline) - ]) -$(makeJSONInstanceInt ''ResultStatus) - --- | Check that ResultStatus is success or fail with descriptive message. -checkRS :: (Monad m) => ResultStatus -> a -> m a -checkRS RSNormal val = return val -checkRS RSUnknown _ = fail "Unknown field" -checkRS RSNoData _ = fail "No data for a field" -checkRS RSUnavailable _ = fail "Ganeti reports unavailable data" -checkRS RSOffline _ = fail "Ganeti reports resource as offline" +-- | Type holding the initial (unparsed) Luxi call. +data LuxiCall = LuxiCall LuxiReq JSValue -- | The end-of-message separator. -eOM :: Char -eOM = '\3' +eOM :: Word8 +eOM = 3 + +-- | The end-of-message encoded as a ByteString. +bEOM :: B.ByteString +bEOM = B.singleton eOM -- | Valid keys in the requests and responses. data MsgKeys = Method @@ -178,74 +206,218 @@ data MsgKeys = Method $(genStrOfKey ''MsgKeys "strOfKey") -- | Luxi client encapsulation. -data Client = Client { socket :: S.Socket -- ^ The socket of the client - , rbuf :: IORef String -- ^ Already received buffer +data Client = Client { socket :: Handle -- ^ The socket of the client + , rbuf :: IORef B.ByteString -- ^ Already received buffer } -- | Connects to the master daemon and returns a luxi Client. getClient :: String -> IO Client getClient path = do - s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol - withTimeout connTimeout "creating luxi connection" $ - S.connect s (S.SockAddrUnix path) - rf <- newIORef "" - return Client { socket=s, rbuf=rf} + s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol + withTimeout connTimeout "creating luxi connection" $ + S.connect s (S.SockAddrUnix path) + rf <- newIORef B.empty + h <- S.socketToHandle s ReadWriteMode + return Client { socket=h, rbuf=rf } + +-- | Creates and returns a server endpoint. +getServer :: FilePath -> IO S.Socket +getServer path = do + s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol + S.bindSocket s (S.SockAddrUnix path) + S.listen s 5 -- 5 is the max backlog + return s + +-- | Closes a server endpoint. +-- FIXME: this should be encapsulated into a nicer type. +closeServer :: FilePath -> S.Socket -> IO () +closeServer path sock = do + S.sClose sock + removeFile path + +-- | Accepts a client +acceptClient :: S.Socket -> IO Client +acceptClient s = do + -- second return is the address of the client, which we ignore here + (client_socket, _) <- S.accept s + new_buffer <- newIORef B.empty + handle <- S.socketToHandle client_socket ReadWriteMode + return Client { socket=handle, rbuf=new_buffer } -- | Closes the client socket. closeClient :: Client -> IO () -closeClient = S.sClose . socket +closeClient = hClose . socket -- | Sends a message over a luxi transport. sendMsg :: Client -> String -> IO () -sendMsg s buf = - let _send obuf = do - sbytes <- withTimeout queryTimeout - "sending luxi message" $ - S.send (socket s) obuf - unless (sbytes == length obuf) $ _send (drop sbytes obuf) - in _send (buf ++ [eOM]) +sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do + let encoded = UTF8.fromString buf + handle = socket s + B.hPut handle encoded + B.hPut handle bEOM + hFlush handle + +-- | Given a current buffer and the handle, it will read from the +-- network until we get a full message, and it will return that +-- message and the leftover buffer contents. +recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString) +recvUpdate handle obuf = do + nbuf <- withTimeout queryTimeout "reading luxi response" $ do + _ <- hWaitForInput handle (-1) + B.hGetNonBlocking handle 4096 + let (msg, remaining) = B.break (eOM ==) nbuf + newbuf = B.append obuf msg + if B.null remaining + then recvUpdate handle newbuf + else return (newbuf, B.tail remaining) -- | Waits for a message over a luxi transport. recvMsg :: Client -> IO String recvMsg s = do - let _recv obuf = do - nbuf <- withTimeout queryTimeout "reading luxi response" $ - S.recv (socket s) 4096 - let (msg, remaining) = break (eOM ==) nbuf - (if null remaining - then _recv (obuf ++ msg) - else return (obuf ++ msg, tail remaining)) cbuf <- readIORef $ rbuf s - let (imsg, ibuf) = break (eOM ==) cbuf + let (imsg, ibuf) = B.break (eOM ==) cbuf (msg, nbuf) <- - (if null ibuf -- if old buffer didn't contain a full message - then _recv cbuf -- then we read from network - else return (imsg, tail ibuf)) -- else we return data from our buffer + if B.null ibuf -- if old buffer didn't contain a full message + then recvUpdate (socket s) cbuf -- then we read from network + else return (imsg, B.tail ibuf) -- else we return data from our buffer writeIORef (rbuf s) nbuf - return msg + return $ UTF8.toString msg + +-- | Extended wrapper over recvMsg. +recvMsgExt :: Client -> IO RecvResult +recvMsgExt s = + catch (liftM RecvOk (recvMsg s)) $ \e -> + if isEOFError e + then return RecvConnClosed + else return $ RecvError (show e) -- | Serialize a request to String. buildCall :: LuxiOp -- ^ The method -> String -- ^ The serialized form buildCall lo = - let ja = [ (strOfKey Method, JSString $ toJSString $ strOfOp lo::JSValue) - , (strOfKey Args, opToArgs lo::JSValue) - ] - jo = toJSObject ja - in encodeStrict jo + let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo) + , (strOfKey Args, opToArgs lo) + ] + jo = toJSObject ja + in encodeStrict jo + +-- | Serialize the response to String. +buildResponse :: Bool -- ^ Success + -> JSValue -- ^ The arguments + -> String -- ^ The serialized form +buildResponse success args = + let ja = [ (strOfKey Success, JSBool success) + , (strOfKey Result, args)] + jo = toJSObject ja + in encodeStrict jo + +-- | Check that luxi request contains the required keys and parse it. +validateCall :: String -> Result LuxiCall +validateCall s = do + arr <- fromJResult "parsing top-level luxi message" $ + decodeStrict s::Result (JSObject JSValue) + let aobj = fromJSObject arr + call <- fromObj aobj (strOfKey Method)::Result LuxiReq + args <- fromObj aobj (strOfKey Args) + return (LuxiCall call args) + +-- | Converts Luxi call arguments into a 'LuxiOp' data structure. +-- +-- This is currently hand-coded until we make it more uniform so that +-- it can be generated using TH. +decodeCall :: LuxiCall -> Result LuxiOp +decodeCall (LuxiCall call args) = + case call of + ReqQueryJobs -> do + (jid, jargs) <- fromJVal args + rid <- mapM parseJobId jid + let rargs = map fromJSString jargs + return $ QueryJobs rid rargs + ReqQueryInstances -> do + (names, fields, locking) <- fromJVal args + return $ QueryInstances names fields locking + ReqQueryNodes -> do + (names, fields, locking) <- fromJVal args + return $ QueryNodes names fields locking + ReqQueryGroups -> do + (names, fields, locking) <- fromJVal args + return $ QueryGroups names fields locking + ReqQueryClusterInfo -> + return QueryClusterInfo + ReqQuery -> do + (what, fields, qfilter) <- fromJVal args + return $ Query what fields qfilter + ReqQueryFields -> do + (what, fields) <- fromJVal args + fields' <- case fields of + JSNull -> return [] + _ -> fromJVal fields + return $ QueryFields what fields' + ReqSubmitJob -> do + [ops1] <- fromJVal args + ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1 + return $ SubmitJob ops2 + ReqSubmitManyJobs -> do + [ops1] <- fromJVal args + ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1 + return $ SubmitManyJobs ops2 + ReqWaitForJobChange -> do + (jid, fields, pinfo, pidx, wtmout) <- + -- No instance for 5-tuple, code copied from the + -- json sources and adapted + fromJResult "Parsing WaitForJobChange message" $ + case args of + JSArray [a, b, c, d, e] -> + (,,,,) `fmap` + J.readJSON a `ap` + J.readJSON b `ap` + J.readJSON c `ap` + J.readJSON d `ap` + J.readJSON e + _ -> J.Error "Not enough values" + rid <- parseJobId jid + return $ WaitForJobChange rid fields pinfo pidx wtmout + ReqArchiveJob -> do + [jid] <- fromJVal args + rid <- parseJobId jid + return $ ArchiveJob rid + ReqAutoArchiveJobs -> do + (age, tmout) <- fromJVal args + return $ AutoArchiveJobs age tmout + ReqQueryExports -> do + (nodes, lock) <- fromJVal args + return $ QueryExports nodes lock + ReqQueryConfigValues -> do + [fields] <- fromJVal args + return $ QueryConfigValues fields + ReqQueryTags -> do + (kind, name) <- fromJVal args + return $ QueryTags kind name + ReqCancelJob -> do + [job] <- fromJVal args + rid <- parseJobId job + return $ CancelJob rid + ReqSetDrainFlag -> do + [flag] <- fromJVal args + return $ SetDrainFlag flag + ReqSetWatcherPause -> do + [duration] <- fromJVal args + return $ SetWatcherPause duration -- | Check that luxi responses contain the required keys and that the -- call was successful. validateResult :: String -> Result JSValue validateResult s = do + when (UTF8.replacement_char `elem` s) $ + fail "Failed to decode UTF-8, detected replacement char after decoding" oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)::Result (JSObject JSValue) let arr = J.fromJSObject oarr status <- fromObj arr (strOfKey Success)::Result Bool let rkey = strOfKey Result - (if status - then fromObj arr rkey - else fromObj arr rkey >>= fail) + if status + then fromObj arr rkey + else fromObj arr rkey >>= fail -- | Generic luxi method call. callMethod :: LuxiOp -> Client -> IO (Result JSValue) @@ -255,27 +427,38 @@ callMethod method s = do let rval = validateResult result return rval +-- | Parses a job ID. +parseJobId :: JSValue -> Result JobId +parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x +parseJobId (JSRational _ x) = + if denominator x /= 1 + then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x + -- FIXME: potential integer overflow here on 32-bit platforms + else Ok . fromIntegral . numerator $ x +parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x + +-- | Parse job submission result. +parseSubmitJobResult :: JSValue -> Result JobId +parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v +parseSubmitJobResult (JSArray [JSBool False, JSString x]) = + Bad (fromJSString x) +parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++ + show v + -- | Specialized submitManyJobs call. -submitManyJobs :: Client -> [[OpCode]] -> IO (Result [String]) +submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId]) submitManyJobs s jobs = do rval <- callMethod (SubmitManyJobs jobs) s -- map each result (status, payload) pair into a nice Result ADT return $ case rval of Bad x -> Bad x - Ok (JSArray r) -> - mapM (\v -> case v of - JSArray [JSBool True, JSString x] -> - Ok (fromJSString x) - JSArray [JSBool False, JSString x] -> - Bad (fromJSString x) - _ -> Bad "Unknown result from the master daemon" - ) r + Ok (JSArray r) -> mapM parseSubmitJobResult r x -> Bad ("Cannot parse response from Ganeti: " ++ show x) -- | Custom queryJobs call. -queryJobsStatus :: Client -> [String] -> IO (Result [JobStatus]) +queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus]) queryJobsStatus s jids = do - rval <- callMethod (QueryJobs (map read jids) ["status"]) s + rval <- callMethod (QueryJobs jids ["status"]) s return $ case rval of Bad x -> Bad x Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of