X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/619e89c8313793d8770966181fd6c5eaed1ef349..d2970809857fb9b2204fd44e17ada88fdc660ff5:/htools/Ganeti/Luxi.hs diff --git a/htools/Ganeti/Luxi.hs b/htools/Ganeti/Luxi.hs index 7206c9b..ff39892 100644 --- a/htools/Ganeti/Luxi.hs +++ b/htools/Ganeti/Luxi.hs @@ -27,38 +27,56 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA module Ganeti.Luxi ( LuxiOp(..) - , QrViaLuxi(..) - , ResultStatus(..) , LuxiReq(..) , Client , JobId - , checkRS + , fromJobId + , makeJobId + , RecvResult(..) + , strOfOp , getClient + , getServer + , acceptClient , closeClient + , closeServer , callMethod , submitManyJobs , queryJobsStatus , buildCall + , buildResponse , validateCall , decodeCall + , recvMsg + , recvMsgExt + , sendMsg + , allLuxiCalls ) where +import Control.Exception (catch) import Data.IORef +import qualified Data.ByteString as B +import qualified Data.ByteString.UTF8 as UTF8 +import Data.Word (Word8) import Control.Monad import Text.JSON (encodeStrict, decodeStrict) import qualified Text.JSON as J +import Text.JSON.Pretty (pp_value) import Text.JSON.Types +import System.Directory (removeFile) +import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..)) +import System.IO.Error (isEOFError) import System.Timeout import qualified Network.Socket as S -import Ganeti.HTools.JSON -import Ganeti.HTools.Types -import Ganeti.HTools.Utils - +import Ganeti.BasicTypes import Ganeti.Constants -import Ganeti.Jobs (JobStatus) -import Ganeti.OpCodes (OpCode) +import Ganeti.Errors +import Ganeti.JSON +import Ganeti.OpParams (pTagsObject) +import Ganeti.OpCodes +import qualified Ganeti.Query.Language as Qlang import Ganeti.THH +import Ganeti.Types -- * Utility functions @@ -72,116 +90,105 @@ withTimeout secs descr action = do -- * Generic protocol functionality --- | The Ganeti job type. -type JobId = String - -$(declareSADT "QrViaLuxi" - [ ("QRLock", 'qrLock) - , ("QRInstance", 'qrInstance) - , ("QRNode", 'qrNode) - , ("QRGroup", 'qrGroup) - , ("QROs", 'qrOs) - ]) -$(makeJSONInstance ''QrViaLuxi) +-- | Result of receiving a message from the socket. +data RecvResult = RecvConnClosed -- ^ Connection closed + | RecvError String -- ^ Any other error + | RecvOk String -- ^ Successfull receive + deriving (Show, Eq) -- | Currently supported Luxi operations and JSON serialization. $(genLuxiOp "LuxiOp" - [(luxiReqQuery, - [ ("what", [t| QrViaLuxi |], [| id |]) - , ("fields", [t| [String] |], [| id |]) - , ("qfilter", [t| () |], [| const JSNull |]) + [ (luxiReqQuery, + [ simpleField "what" [t| Qlang.ItemType |] + , simpleField "fields" [t| [String] |] + , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |] + ]) + , (luxiReqQueryFields, + [ simpleField "what" [t| Qlang.ItemType |] + , simpleField "fields" [t| [String] |] ]) , (luxiReqQueryNodes, - [ ("names", [t| [String] |], [| id |]) - , ("fields", [t| [String] |], [| id |]) - , ("lock", [t| Bool |], [| id |]) + [ simpleField "names" [t| [String] |] + , simpleField "fields" [t| [String] |] + , simpleField "lock" [t| Bool |] ]) , (luxiReqQueryGroups, - [ ("names", [t| [String] |], [| id |]) - , ("fields", [t| [String] |], [| id |]) - , ("lock", [t| Bool |], [| id |]) + [ simpleField "names" [t| [String] |] + , simpleField "fields" [t| [String] |] + , simpleField "lock" [t| Bool |] ]) , (luxiReqQueryInstances, - [ ("names", [t| [String] |], [| id |]) - , ("fields", [t| [String] |], [| id |]) - , ("lock", [t| Bool |], [| id |]) + [ simpleField "names" [t| [String] |] + , simpleField "fields" [t| [String] |] + , simpleField "lock" [t| Bool |] ]) , (luxiReqQueryJobs, - [ ("ids", [t| [Int] |], [| map show |]) - , ("fields", [t| [String] |], [| id |]) + [ simpleField "ids" [t| [JobId] |] + , simpleField "fields" [t| [String] |] ]) , (luxiReqQueryExports, - [ ("nodes", [t| [String] |], [| id |]) - , ("lock", [t| Bool |], [| id |]) + [ simpleField "nodes" [t| [String] |] + , simpleField "lock" [t| Bool |] ]) , (luxiReqQueryConfigValues, - [ ("fields", [t| [String] |], [| id |]) ] + [ simpleField "fields" [t| [String] |] ] ) , (luxiReqQueryClusterInfo, []) , (luxiReqQueryTags, - [ ("kind", [t| String |], [| id |]) - , ("name", [t| String |], [| id |]) - ]) + [ pTagsObject ]) , (luxiReqSubmitJob, - [ ("job", [t| [OpCode] |], [| id |]) ] + [ simpleField "job" [t| [MetaOpCode] |] ] ) , (luxiReqSubmitManyJobs, - [ ("ops", [t| [[OpCode]] |], [| id |]) ] + [ simpleField "ops" [t| [[MetaOpCode]] |] ] ) , (luxiReqWaitForJobChange, - [ ("job", [t| Int |], [| show |]) - , ("fields", [t| [String]|], [| id |]) - , ("prev_job", [t| JSValue |], [| id |]) - , ("prev_log", [t| JSValue |], [| id |]) - , ("tmout", [t| Int |], [| id |]) + [ simpleField "job" [t| JobId |] + , simpleField "fields" [t| [String]|] + , simpleField "prev_job" [t| JSValue |] + , simpleField "prev_log" [t| JSValue |] + , simpleField "tmout" [t| Int |] ]) , (luxiReqArchiveJob, - [ ("job", [t| Int |], [| show |]) ] + [ simpleField "job" [t| JobId |] ] ) , (luxiReqAutoArchiveJobs, - [ ("age", [t| Int |], [| id |]) - , ("tmout", [t| Int |], [| id |]) + [ simpleField "age" [t| Int |] + , simpleField "tmout" [t| Int |] ]) , (luxiReqCancelJob, - [ ("job", [t| Int |], [| show |]) ] + [ simpleField "job" [t| JobId |] ] + ) + , (luxiReqChangeJobPriority, + [ simpleField "job" [t| JobId |] + , simpleField "priority" [t| Int |] ] ) , (luxiReqSetDrainFlag, - [ ("flag", [t| Bool |], [| id |]) ] + [ simpleField "flag" [t| Bool |] ] ) , (luxiReqSetWatcherPause, - [ ("duration", [t| Double |], [| id |]) ] + [ simpleField "duration" [t| Double |] ] ) ]) $(makeJSONInstance ''LuxiReq) +-- | List of all defined Luxi calls. +$(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls") + -- | The serialisation of LuxiOps into strings in messages. $(genStrOfOp ''LuxiOp "strOfOp") -$(declareIADT "ResultStatus" - [ ("RSNormal", 'rsNormal) - , ("RSUnknown", 'rsUnknown) - , ("RSNoData", 'rsNodata) - , ("RSUnavailable", 'rsUnavail) - , ("RSOffline", 'rsOffline) - ]) - -$(makeJSONInstance ''ResultStatus) - -- | Type holding the initial (unparsed) Luxi call. data LuxiCall = LuxiCall LuxiReq JSValue --- | Check that ResultStatus is success or fail with descriptive message. -checkRS :: (Monad m) => ResultStatus -> a -> m a -checkRS RSNormal val = return val -checkRS RSUnknown _ = fail "Unknown field" -checkRS RSNoData _ = fail "No data for a field" -checkRS RSUnavailable _ = fail "Ganeti reports unavailable data" -checkRS RSOffline _ = fail "Ganeti reports resource as offline" - -- | The end-of-message separator. -eOM :: Char -eOM = '\3' +eOM :: Word8 +eOM = 3 + +-- | The end-of-message encoded as a ByteString. +bEOM :: B.ByteString +bEOM = B.singleton eOM -- | Valid keys in the requests and responses. data MsgKeys = Method @@ -193,66 +200,116 @@ data MsgKeys = Method $(genStrOfKey ''MsgKeys "strOfKey") -- | Luxi client encapsulation. -data Client = Client { socket :: S.Socket -- ^ The socket of the client - , rbuf :: IORef String -- ^ Already received buffer +data Client = Client { socket :: Handle -- ^ The socket of the client + , rbuf :: IORef B.ByteString -- ^ Already received buffer } -- | Connects to the master daemon and returns a luxi Client. getClient :: String -> IO Client getClient path = do s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol - withTimeout connTimeout "creating luxi connection" $ + withTimeout luxiDefCtmo "creating luxi connection" $ S.connect s (S.SockAddrUnix path) - rf <- newIORef "" - return Client { socket=s, rbuf=rf} + rf <- newIORef B.empty + h <- S.socketToHandle s ReadWriteMode + return Client { socket=h, rbuf=rf } + +-- | Creates and returns a server endpoint. +getServer :: FilePath -> IO S.Socket +getServer path = do + s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol + S.bindSocket s (S.SockAddrUnix path) + S.listen s 5 -- 5 is the max backlog + return s + +-- | Closes a server endpoint. +-- FIXME: this should be encapsulated into a nicer type. +closeServer :: FilePath -> S.Socket -> IO () +closeServer path sock = do + S.sClose sock + removeFile path + +-- | Accepts a client +acceptClient :: S.Socket -> IO Client +acceptClient s = do + -- second return is the address of the client, which we ignore here + (client_socket, _) <- S.accept s + new_buffer <- newIORef B.empty + handle <- S.socketToHandle client_socket ReadWriteMode + return Client { socket=handle, rbuf=new_buffer } -- | Closes the client socket. closeClient :: Client -> IO () -closeClient = S.sClose . socket +closeClient = hClose . socket -- | Sends a message over a luxi transport. sendMsg :: Client -> String -> IO () -sendMsg s buf = - let _send obuf = do - sbytes <- withTimeout queryTimeout - "sending luxi message" $ - S.send (socket s) obuf - unless (sbytes == length obuf) $ _send (drop sbytes obuf) - in _send (buf ++ [eOM]) +sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do + let encoded = UTF8.fromString buf + handle = socket s + B.hPut handle encoded + B.hPut handle bEOM + hFlush handle + +-- | Given a current buffer and the handle, it will read from the +-- network until we get a full message, and it will return that +-- message and the leftover buffer contents. +recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString) +recvUpdate handle obuf = do + nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do + _ <- hWaitForInput handle (-1) + B.hGetNonBlocking handle 4096 + let (msg, remaining) = B.break (eOM ==) nbuf + newbuf = B.append obuf msg + if B.null remaining + then recvUpdate handle newbuf + else return (newbuf, B.tail remaining) -- | Waits for a message over a luxi transport. recvMsg :: Client -> IO String recvMsg s = do - let _recv obuf = do - nbuf <- withTimeout queryTimeout "reading luxi response" $ - S.recv (socket s) 4096 - let (msg, remaining) = break (eOM ==) nbuf - if null remaining - then _recv (obuf ++ msg) - else return (obuf ++ msg, tail remaining) cbuf <- readIORef $ rbuf s - let (imsg, ibuf) = break (eOM ==) cbuf + let (imsg, ibuf) = B.break (eOM ==) cbuf (msg, nbuf) <- - if null ibuf -- if old buffer didn't contain a full message - then _recv cbuf -- then we read from network - else return (imsg, tail ibuf) -- else we return data from our buffer + if B.null ibuf -- if old buffer didn't contain a full message + then recvUpdate (socket s) cbuf -- then we read from network + else return (imsg, B.tail ibuf) -- else we return data from our buffer writeIORef (rbuf s) nbuf - return msg + return $ UTF8.toString msg + +-- | Extended wrapper over recvMsg. +recvMsgExt :: Client -> IO RecvResult +recvMsgExt s = + Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e -> + return $ if isEOFError e + then RecvConnClosed + else RecvError (show e) -- | Serialize a request to String. buildCall :: LuxiOp -- ^ The method -> String -- ^ The serialized form buildCall lo = - let ja = [ (strOfKey Method, JSString $ toJSString $ strOfOp lo::JSValue) - , (strOfKey Args, opToArgs lo::JSValue) + let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo) + , (strOfKey Args, opToArgs lo) ] jo = toJSObject ja in encodeStrict jo +-- | Serialize the response to String. +buildResponse :: Bool -- ^ Success + -> JSValue -- ^ The arguments + -> String -- ^ The serialized form +buildResponse success args = + let ja = [ (strOfKey Success, JSBool success) + , (strOfKey Result, args)] + jo = toJSObject ja + in encodeStrict jo + -- | Check that luxi request contains the required keys and parse it. validateCall :: String -> Result LuxiCall validateCall s = do - arr <- fromJResult "luxi call" $ decodeStrict s::Result (JSObject JSValue) + arr <- fromJResult "parsing top-level luxi message" $ + decodeStrict s::Result (JSObject JSValue) let aobj = fromJSObject arr call <- fromObj aobj (strOfKey Method)::Result LuxiReq args <- fromObj aobj (strOfKey Args) @@ -266,10 +323,11 @@ decodeCall :: LuxiCall -> Result LuxiOp decodeCall (LuxiCall call args) = case call of ReqQueryJobs -> do - (jid, jargs) <- fromJVal args - rid <- mapM (tryRead "parsing job ID" . fromJSString) jid - let rargs = map fromJSString jargs - return $ QueryJobs rid rargs + (jids, jargs) <- fromJVal args + jids' <- case jids of + JSNull -> return [] + _ -> fromJVal jids + return $ QueryJobs jids' jargs ReqQueryInstances -> do (names, fields, locking) <- fromJVal args return $ QueryInstances names fields locking @@ -279,12 +337,17 @@ decodeCall (LuxiCall call args) = ReqQueryGroups -> do (names, fields, locking) <- fromJVal args return $ QueryGroups names fields locking - ReqQueryClusterInfo -> do + ReqQueryClusterInfo -> return QueryClusterInfo ReqQuery -> do - (what, fields, _) <- - fromJVal args::Result (QrViaLuxi, [String], JSValue) - return $ Query what fields () + (what, fields, qfilter) <- fromJVal args + return $ Query what fields qfilter + ReqQueryFields -> do + (what, fields) <- fromJVal args + fields' <- case fields of + JSNull -> return [] + _ -> fromJVal fields + return $ QueryFields what fields' ReqSubmitJob -> do [ops1] <- fromJVal args ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1 @@ -307,12 +370,10 @@ decodeCall (LuxiCall call args) = J.readJSON d `ap` J.readJSON e _ -> J.Error "Not enough values" - rid <- tryRead "parsing job ID" jid - return $ WaitForJobChange rid fields pinfo pidx wtmout + return $ WaitForJobChange jid fields pinfo pidx wtmout ReqArchiveJob -> do [jid] <- fromJVal args - rid <- tryRead "parsing job ID" jid - return $ ArchiveJob rid + return $ ArchiveJob jid ReqAutoArchiveJobs -> do (age, tmout) <- fromJVal args return $ AutoArchiveJobs age tmout @@ -324,11 +385,14 @@ decodeCall (LuxiCall call args) = return $ QueryConfigValues fields ReqQueryTags -> do (kind, name) <- fromJVal args - return $ QueryTags kind name + item <- tagObjectFrom kind name + return $ QueryTags item ReqCancelJob -> do - [job] <- fromJVal args - rid <- tryRead "parsing job ID" job - return $ CancelJob rid + [jid] <- fromJVal args + return $ CancelJob jid + ReqChangeJobPriority -> do + (jid, priority) <- fromJVal args + return $ ChangeJobPriority jid priority ReqSetDrainFlag -> do [flag] <- fromJVal args return $ SetDrainFlag flag @@ -338,56 +402,67 @@ decodeCall (LuxiCall call args) = -- | Check that luxi responses contain the required keys and that the -- call was successful. -validateResult :: String -> Result JSValue +validateResult :: String -> ErrorResult JSValue validateResult s = do - oarr <- fromJResult "Parsing LUXI response" - (decodeStrict s)::Result (JSObject JSValue) + when (UTF8.replacement_char `elem` s) $ + fail "Failed to decode UTF-8, detected replacement char after decoding" + oarr <- fromJResult "Parsing LUXI response" (decodeStrict s) let arr = J.fromJSObject oarr - status <- fromObj arr (strOfKey Success)::Result Bool - let rkey = strOfKey Result + status <- fromObj arr (strOfKey Success) + result <- fromObj arr (strOfKey Result) if status - then fromObj arr rkey - else fromObj arr rkey >>= fail + then return result + else decodeError result + +-- | Try to decode an error from the server response. This function +-- will always fail, since it's called only on the error path (when +-- status is False). +decodeError :: JSValue -> ErrorResult JSValue +decodeError val = + case fromJVal val of + Ok e -> Bad e + Bad msg -> Bad $ GenericError msg -- | Generic luxi method call. -callMethod :: LuxiOp -> Client -> IO (Result JSValue) +callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue) callMethod method s = do sendMsg s $ buildCall method result <- recvMsg s let rval = validateResult result return rval --- | Parses a job ID. -parseJobId :: JSValue -> Result JobId -parseJobId (JSString x) = Ok $ fromJSString x -parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x - -- | Parse job submission result. -parseSubmitJobResult :: JSValue -> Result JobId -parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v +parseSubmitJobResult :: JSValue -> ErrorResult JobId +parseSubmitJobResult (JSArray [JSBool True, v]) = + case J.readJSON v of + J.Error msg -> Bad $ LuxiError msg + J.Ok v' -> Ok v' parseSubmitJobResult (JSArray [JSBool False, JSString x]) = - Bad (fromJSString x) -parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++ - show v + Bad . LuxiError $ fromJSString x +parseSubmitJobResult v = + Bad . LuxiError $ "Unknown result from the master daemon: " ++ + show (pp_value v) -- | Specialized submitManyJobs call. -submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId]) +submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId]) submitManyJobs s jobs = do rval <- callMethod (SubmitManyJobs jobs) s -- map each result (status, payload) pair into a nice Result ADT return $ case rval of Bad x -> Bad x Ok (JSArray r) -> mapM parseSubmitJobResult r - x -> Bad ("Cannot parse response from Ganeti: " ++ show x) + x -> Bad . LuxiError $ + "Cannot parse response from Ganeti: " ++ show x -- | Custom queryJobs call. -queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus]) +queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus]) queryJobsStatus s jids = do - rval <- callMethod (QueryJobs (map read jids) ["status"]) s + rval <- callMethod (QueryJobs jids ["status"]) s return $ case rval of Bad x -> Bad x Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of J.Ok vals -> if any null vals - then Bad "Missing job status field" + then Bad $ + LuxiError "Missing job status field" else Ok (map head vals) - J.Error x -> Bad x + J.Error x -> Bad $ LuxiError x