Revision e821050d htools/Ganeti/Luxi.hs

b/htools/Ganeti/Luxi.hs
45 45

  
46 46
import Data.IORef
47 47
import Data.Ratio (numerator, denominator)
48
import qualified Data.ByteString as B
49
import qualified Data.ByteString.UTF8 as UTF8
50
import Data.Word (Word8)
48 51
import Control.Monad
49 52
import Text.JSON (encodeStrict, decodeStrict)
50 53
import qualified Text.JSON as J
51 54
import Text.JSON.Types
55
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
52 56
import System.Timeout
53 57
import qualified Network.Socket as S
54 58

  
......
181 185
checkRS RSOffline _     = fail "Ganeti reports resource as offline"
182 186

  
183 187
-- | The end-of-message separator.
184
eOM :: Char
185
eOM = '\3'
188
eOM :: Word8
189
eOM = 3
190

  
191
-- | The end-of-message encoded as a ByteString.
192
bEOM :: B.ByteString
193
bEOM = B.singleton eOM
186 194

  
187 195
-- | Valid keys in the requests and responses.
188 196
data MsgKeys = Method
......
194 202
$(genStrOfKey ''MsgKeys "strOfKey")
195 203

  
196 204
-- | Luxi client encapsulation.
197
data Client = Client { socket :: S.Socket   -- ^ The socket of the client
198
                     , rbuf :: IORef String -- ^ Already received buffer
205
data Client = Client { socket :: Handle           -- ^ The socket of the client
206
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
199 207
                     }
200 208

  
201 209
-- | Connects to the master daemon and returns a luxi Client.
......
204 212
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
205 213
  withTimeout connTimeout "creating luxi connection" $
206 214
              S.connect s (S.SockAddrUnix path)
207
  rf <- newIORef ""
208
  return Client { socket=s, rbuf=rf}
215
  rf <- newIORef B.empty
216
  h <- S.socketToHandle s ReadWriteMode
217
  return Client { socket=h, rbuf=rf }
209 218

  
210 219
-- | Closes the client socket.
211 220
closeClient :: Client -> IO ()
212
closeClient = S.sClose . socket
221
closeClient = hClose . socket
213 222

  
214 223
-- | Sends a message over a luxi transport.
215 224
sendMsg :: Client -> String -> IO ()
216
sendMsg s buf =
217
  let _send obuf = do
218
        sbytes <- withTimeout queryTimeout
219
                  "sending luxi message" $
220
                  S.send (socket s) obuf
221
        unless (sbytes == length obuf) $ _send (drop sbytes obuf)
222
  in _send (buf ++ [eOM])
225
sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
226
  let encoded = UTF8.fromString buf
227
      handle = socket s
228
  B.hPut handle encoded
229
  B.hPut handle bEOM
230
  hFlush handle
231

  
232
-- | Given a current buffer and the handle, it will read from the
233
-- network until we get a full message, and it will return that
234
-- message and the leftover buffer contents.
235
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
236
recvUpdate handle obuf = do
237
  nbuf <- withTimeout queryTimeout "reading luxi response" $ do
238
            _ <- hWaitForInput handle (-1)
239
            B.hGetNonBlocking handle 4096
240
  let (msg, remaining) = B.break (eOM ==) nbuf
241
      newbuf = B.append obuf msg
242
  if B.null remaining
243
    then recvUpdate handle newbuf
244
    else return (newbuf, B.tail remaining)
223 245

  
224 246
-- | Waits for a message over a luxi transport.
225 247
recvMsg :: Client -> IO String
226 248
recvMsg s = do
227
  let _recv obuf = do
228
              nbuf <- withTimeout queryTimeout "reading luxi response" $
229
                      S.recv (socket s) 4096
230
              let (msg, remaining) = break (eOM ==) nbuf
231
              if null remaining
232
                then _recv (obuf ++ msg)
233
                else return (obuf ++ msg, tail remaining)
234 249
  cbuf <- readIORef $ rbuf s
235
  let (imsg, ibuf) = break (eOM ==) cbuf
250
  let (imsg, ibuf) = B.break (eOM ==) cbuf
236 251
  (msg, nbuf) <-
237
    if null ibuf      -- if old buffer didn't contain a full message
238
      then _recv cbuf   -- then we read from network
239
      else return (imsg, tail ibuf) -- else we return data from our buffer
252
    if B.null ibuf      -- if old buffer didn't contain a full message
253
      then recvUpdate (socket s) cbuf   -- then we read from network
254
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
240 255
  writeIORef (rbuf s) nbuf
241
  return msg
256
  return $ UTF8.toString msg
242 257

  
243 258
-- | Serialize a request to String.
244 259
buildCall :: LuxiOp  -- ^ The method
......
341 356
-- call was successful.
342 357
validateResult :: String -> Result JSValue
343 358
validateResult s = do
359
  when (UTF8.replacement_char `elem` s) $
360
       fail "Failed to decode UTF-8, detected replacement char after decoding"
344 361
  oarr <- fromJResult "Parsing LUXI response"
345 362
          (decodeStrict s)::Result (JSObject JSValue)
346 363
  let arr = J.fromJSObject oarr

Also available in: Unified diff