Revision e821050d htools/Ganeti/Luxi.hs
b/htools/Ganeti/Luxi.hs | ||
---|---|---|
45 | 45 |
|
46 | 46 |
import Data.IORef |
47 | 47 |
import Data.Ratio (numerator, denominator) |
48 |
import qualified Data.ByteString as B |
|
49 |
import qualified Data.ByteString.UTF8 as UTF8 |
|
50 |
import Data.Word (Word8) |
|
48 | 51 |
import Control.Monad |
49 | 52 |
import Text.JSON (encodeStrict, decodeStrict) |
50 | 53 |
import qualified Text.JSON as J |
51 | 54 |
import Text.JSON.Types |
55 |
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..)) |
|
52 | 56 |
import System.Timeout |
53 | 57 |
import qualified Network.Socket as S |
54 | 58 |
|
... | ... | |
181 | 185 |
checkRS RSOffline _ = fail "Ganeti reports resource as offline" |
182 | 186 |
|
183 | 187 |
-- | The end-of-message separator. |
184 |
eOM :: Char |
|
185 |
eOM = '\3' |
|
188 |
eOM :: Word8 |
|
189 |
eOM = 3 |
|
190 |
|
|
191 |
-- | The end-of-message encoded as a ByteString. |
|
192 |
bEOM :: B.ByteString |
|
193 |
bEOM = B.singleton eOM |
|
186 | 194 |
|
187 | 195 |
-- | Valid keys in the requests and responses. |
188 | 196 |
data MsgKeys = Method |
... | ... | |
194 | 202 |
$(genStrOfKey ''MsgKeys "strOfKey") |
195 | 203 |
|
196 | 204 |
-- | Luxi client encapsulation. |
197 |
data Client = Client { socket :: S.Socket -- ^ The socket of the client
|
|
198 |
, rbuf :: IORef String -- ^ Already received buffer |
|
205 |
data Client = Client { socket :: Handle -- ^ The socket of the client
|
|
206 |
, rbuf :: IORef B.ByteString -- ^ Already received buffer
|
|
199 | 207 |
} |
200 | 208 |
|
201 | 209 |
-- | Connects to the master daemon and returns a luxi Client. |
... | ... | |
204 | 212 |
s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol |
205 | 213 |
withTimeout connTimeout "creating luxi connection" $ |
206 | 214 |
S.connect s (S.SockAddrUnix path) |
207 |
rf <- newIORef "" |
|
208 |
return Client { socket=s, rbuf=rf} |
|
215 |
rf <- newIORef B.empty |
|
216 |
h <- S.socketToHandle s ReadWriteMode |
|
217 |
return Client { socket=h, rbuf=rf } |
|
209 | 218 |
|
210 | 219 |
-- | Closes the client socket. |
211 | 220 |
closeClient :: Client -> IO () |
212 |
closeClient = S.sClose . socket
|
|
221 |
closeClient = hClose . socket
|
|
213 | 222 |
|
214 | 223 |
-- | Sends a message over a luxi transport. |
215 | 224 |
sendMsg :: Client -> String -> IO () |
216 |
sendMsg s buf = |
|
217 |
let _send obuf = do |
|
218 |
sbytes <- withTimeout queryTimeout |
|
219 |
"sending luxi message" $ |
|
220 |
S.send (socket s) obuf |
|
221 |
unless (sbytes == length obuf) $ _send (drop sbytes obuf) |
|
222 |
in _send (buf ++ [eOM]) |
|
225 |
sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do |
|
226 |
let encoded = UTF8.fromString buf |
|
227 |
handle = socket s |
|
228 |
B.hPut handle encoded |
|
229 |
B.hPut handle bEOM |
|
230 |
hFlush handle |
|
231 |
|
|
232 |
-- | Given a current buffer and the handle, it will read from the |
|
233 |
-- network until we get a full message, and it will return that |
|
234 |
-- message and the leftover buffer contents. |
|
235 |
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString) |
|
236 |
recvUpdate handle obuf = do |
|
237 |
nbuf <- withTimeout queryTimeout "reading luxi response" $ do |
|
238 |
_ <- hWaitForInput handle (-1) |
|
239 |
B.hGetNonBlocking handle 4096 |
|
240 |
let (msg, remaining) = B.break (eOM ==) nbuf |
|
241 |
newbuf = B.append obuf msg |
|
242 |
if B.null remaining |
|
243 |
then recvUpdate handle newbuf |
|
244 |
else return (newbuf, B.tail remaining) |
|
223 | 245 |
|
224 | 246 |
-- | Waits for a message over a luxi transport. |
225 | 247 |
recvMsg :: Client -> IO String |
226 | 248 |
recvMsg s = do |
227 |
let _recv obuf = do |
|
228 |
nbuf <- withTimeout queryTimeout "reading luxi response" $ |
|
229 |
S.recv (socket s) 4096 |
|
230 |
let (msg, remaining) = break (eOM ==) nbuf |
|
231 |
if null remaining |
|
232 |
then _recv (obuf ++ msg) |
|
233 |
else return (obuf ++ msg, tail remaining) |
|
234 | 249 |
cbuf <- readIORef $ rbuf s |
235 |
let (imsg, ibuf) = break (eOM ==) cbuf |
|
250 |
let (imsg, ibuf) = B.break (eOM ==) cbuf
|
|
236 | 251 |
(msg, nbuf) <- |
237 |
if null ibuf -- if old buffer didn't contain a full message |
|
238 |
then _recv cbuf -- then we read from network
|
|
239 |
else return (imsg, tail ibuf) -- else we return data from our buffer
|
|
252 |
if B.null ibuf -- if old buffer didn't contain a full message
|
|
253 |
then recvUpdate (socket s) cbuf -- then we read from network
|
|
254 |
else return (imsg, B.tail ibuf) -- else we return data from our buffer
|
|
240 | 255 |
writeIORef (rbuf s) nbuf |
241 |
return msg |
|
256 |
return $ UTF8.toString msg
|
|
242 | 257 |
|
243 | 258 |
-- | Serialize a request to String. |
244 | 259 |
buildCall :: LuxiOp -- ^ The method |
... | ... | |
341 | 356 |
-- call was successful. |
342 | 357 |
validateResult :: String -> Result JSValue |
343 | 358 |
validateResult s = do |
359 |
when (UTF8.replacement_char `elem` s) $ |
|
360 |
fail "Failed to decode UTF-8, detected replacement char after decoding" |
|
344 | 361 |
oarr <- fromJResult "Parsing LUXI response" |
345 | 362 |
(decodeStrict s)::Result (JSObject JSValue) |
346 | 363 |
let arr = J.fromJSObject oarr |
Also available in: Unified diff