import Data.IORef
import Data.Ratio (numerator, denominator)
+import qualified Data.ByteString as B
+import qualified Data.ByteString.UTF8 as UTF8
+import Data.Word (Word8)
import Control.Monad
import Text.JSON (encodeStrict, decodeStrict)
import qualified Text.JSON as J
import Text.JSON.Types
+import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
import System.Timeout
import qualified Network.Socket as S
checkRS RSOffline _ = fail "Ganeti reports resource as offline"
-- | The end-of-message separator.
-eOM :: Char
-eOM = '\3'
+eOM :: Word8
+eOM = 3
+
+-- | The end-of-message encoded as a ByteString.
+bEOM :: B.ByteString
+bEOM = B.singleton eOM
-- | Valid keys in the requests and responses.
data MsgKeys = Method
$(genStrOfKey ''MsgKeys "strOfKey")
-- | Luxi client encapsulation.
-data Client = Client { socket :: S.Socket -- ^ The socket of the client
- , rbuf :: IORef String -- ^ Already received buffer
+data Client = Client { socket :: Handle -- ^ The socket of the client
+ , rbuf :: IORef B.ByteString -- ^ Already received buffer
}
-- | Connects to the master daemon and returns a luxi Client.
s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
withTimeout connTimeout "creating luxi connection" $
S.connect s (S.SockAddrUnix path)
- rf <- newIORef ""
- return Client { socket=s, rbuf=rf}
+ rf <- newIORef B.empty
+ h <- S.socketToHandle s ReadWriteMode
+ return Client { socket=h, rbuf=rf }
-- | Closes the client socket.
closeClient :: Client -> IO ()
-closeClient = S.sClose . socket
+closeClient = hClose . socket
-- | Sends a message over a luxi transport.
sendMsg :: Client -> String -> IO ()
-sendMsg s buf =
- let _send obuf = do
- sbytes <- withTimeout queryTimeout
- "sending luxi message" $
- S.send (socket s) obuf
- unless (sbytes == length obuf) $ _send (drop sbytes obuf)
- in _send (buf ++ [eOM])
+sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
+ let encoded = UTF8.fromString buf
+ handle = socket s
+ B.hPut handle encoded
+ B.hPut handle bEOM
+ hFlush handle
+
+-- | Given a current buffer and the handle, it will read from the
+-- network until we get a full message, and it will return that
+-- message and the leftover buffer contents.
+recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
+recvUpdate handle obuf = do
+ nbuf <- withTimeout queryTimeout "reading luxi response" $ do
+ _ <- hWaitForInput handle (-1)
+ B.hGetNonBlocking handle 4096
+ let (msg, remaining) = B.break (eOM ==) nbuf
+ newbuf = B.append obuf msg
+ if B.null remaining
+ then recvUpdate handle newbuf
+ else return (newbuf, B.tail remaining)
-- | Waits for a message over a luxi transport.
recvMsg :: Client -> IO String
recvMsg s = do
- let _recv obuf = do
- nbuf <- withTimeout queryTimeout "reading luxi response" $
- S.recv (socket s) 4096
- let (msg, remaining) = break (eOM ==) nbuf
- if null remaining
- then _recv (obuf ++ msg)
- else return (obuf ++ msg, tail remaining)
cbuf <- readIORef $ rbuf s
- let (imsg, ibuf) = break (eOM ==) cbuf
+ let (imsg, ibuf) = B.break (eOM ==) cbuf
(msg, nbuf) <-
- if null ibuf -- if old buffer didn't contain a full message
- then _recv cbuf -- then we read from network
- else return (imsg, tail ibuf) -- else we return data from our buffer
+ if B.null ibuf -- if old buffer didn't contain a full message
+ then recvUpdate (socket s) cbuf -- then we read from network
+ else return (imsg, B.tail ibuf) -- else we return data from our buffer
writeIORef (rbuf s) nbuf
- return msg
+ return $ UTF8.toString msg
-- | Serialize a request to String.
buildCall :: LuxiOp -- ^ The method
-- call was successful.
validateResult :: String -> Result JSValue
validateResult s = do
+ when (UTF8.replacement_char `elem` s) $
+ fail "Failed to decode UTF-8, detected replacement char after decoding"
oarr <- fromJResult "Parsing LUXI response"
(decodeStrict s)::Result (JSObject JSValue)
let arr = J.fromJSObject oarr