45 |
45 |
|
46 |
46 |
import Data.IORef
|
47 |
47 |
import Data.Ratio (numerator, denominator)
|
|
48 |
import qualified Data.ByteString as B
|
|
49 |
import qualified Data.ByteString.UTF8 as UTF8
|
|
50 |
import Data.Word (Word8)
|
48 |
51 |
import Control.Monad
|
49 |
52 |
import Text.JSON (encodeStrict, decodeStrict)
|
50 |
53 |
import qualified Text.JSON as J
|
51 |
54 |
import Text.JSON.Types
|
|
55 |
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
|
52 |
56 |
import System.Timeout
|
53 |
57 |
import qualified Network.Socket as S
|
54 |
58 |
|
... | ... | |
181 |
185 |
checkRS RSOffline _ = fail "Ganeti reports resource as offline"
|
182 |
186 |
|
183 |
187 |
-- | The end-of-message separator.
|
184 |
|
eOM :: Char
|
185 |
|
eOM = '\3'
|
|
188 |
eOM :: Word8
|
|
189 |
eOM = 3
|
|
190 |
|
|
191 |
-- | The end-of-message encoded as a ByteString.
|
|
192 |
bEOM :: B.ByteString
|
|
193 |
bEOM = B.singleton eOM
|
186 |
194 |
|
187 |
195 |
-- | Valid keys in the requests and responses.
|
188 |
196 |
data MsgKeys = Method
|
... | ... | |
194 |
202 |
$(genStrOfKey ''MsgKeys "strOfKey")
|
195 |
203 |
|
196 |
204 |
-- | Luxi client encapsulation.
|
197 |
|
data Client = Client { socket :: S.Socket -- ^ The socket of the client
|
198 |
|
, rbuf :: IORef String -- ^ Already received buffer
|
|
205 |
data Client = Client { socket :: Handle -- ^ The socket of the client
|
|
206 |
, rbuf :: IORef B.ByteString -- ^ Already received buffer
|
199 |
207 |
}
|
200 |
208 |
|
201 |
209 |
-- | Connects to the master daemon and returns a luxi Client.
|
... | ... | |
204 |
212 |
s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
|
205 |
213 |
withTimeout connTimeout "creating luxi connection" $
|
206 |
214 |
S.connect s (S.SockAddrUnix path)
|
207 |
|
rf <- newIORef ""
|
208 |
|
return Client { socket=s, rbuf=rf}
|
|
215 |
rf <- newIORef B.empty
|
|
216 |
h <- S.socketToHandle s ReadWriteMode
|
|
217 |
return Client { socket=h, rbuf=rf }
|
209 |
218 |
|
210 |
219 |
-- | Closes the client socket.
|
211 |
220 |
closeClient :: Client -> IO ()
|
212 |
|
closeClient = S.sClose . socket
|
|
221 |
closeClient = hClose . socket
|
213 |
222 |
|
214 |
223 |
-- | Sends a message over a luxi transport.
|
215 |
224 |
sendMsg :: Client -> String -> IO ()
|
216 |
|
sendMsg s buf =
|
217 |
|
let _send obuf = do
|
218 |
|
sbytes <- withTimeout queryTimeout
|
219 |
|
"sending luxi message" $
|
220 |
|
S.send (socket s) obuf
|
221 |
|
unless (sbytes == length obuf) $ _send (drop sbytes obuf)
|
222 |
|
in _send (buf ++ [eOM])
|
|
225 |
sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
|
|
226 |
let encoded = UTF8.fromString buf
|
|
227 |
handle = socket s
|
|
228 |
B.hPut handle encoded
|
|
229 |
B.hPut handle bEOM
|
|
230 |
hFlush handle
|
|
231 |
|
|
232 |
-- | Given a current buffer and the handle, it will read from the
|
|
233 |
-- network until we get a full message, and it will return that
|
|
234 |
-- message and the leftover buffer contents.
|
|
235 |
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
|
|
236 |
recvUpdate handle obuf = do
|
|
237 |
nbuf <- withTimeout queryTimeout "reading luxi response" $ do
|
|
238 |
_ <- hWaitForInput handle (-1)
|
|
239 |
B.hGetNonBlocking handle 4096
|
|
240 |
let (msg, remaining) = B.break (eOM ==) nbuf
|
|
241 |
newbuf = B.append obuf msg
|
|
242 |
if B.null remaining
|
|
243 |
then recvUpdate handle newbuf
|
|
244 |
else return (newbuf, B.tail remaining)
|
223 |
245 |
|
224 |
246 |
-- | Waits for a message over a luxi transport.
|
225 |
247 |
recvMsg :: Client -> IO String
|
226 |
248 |
recvMsg s = do
|
227 |
|
let _recv obuf = do
|
228 |
|
nbuf <- withTimeout queryTimeout "reading luxi response" $
|
229 |
|
S.recv (socket s) 4096
|
230 |
|
let (msg, remaining) = break (eOM ==) nbuf
|
231 |
|
if null remaining
|
232 |
|
then _recv (obuf ++ msg)
|
233 |
|
else return (obuf ++ msg, tail remaining)
|
234 |
249 |
cbuf <- readIORef $ rbuf s
|
235 |
|
let (imsg, ibuf) = break (eOM ==) cbuf
|
|
250 |
let (imsg, ibuf) = B.break (eOM ==) cbuf
|
236 |
251 |
(msg, nbuf) <-
|
237 |
|
if null ibuf -- if old buffer didn't contain a full message
|
238 |
|
then _recv cbuf -- then we read from network
|
239 |
|
else return (imsg, tail ibuf) -- else we return data from our buffer
|
|
252 |
if B.null ibuf -- if old buffer didn't contain a full message
|
|
253 |
then recvUpdate (socket s) cbuf -- then we read from network
|
|
254 |
else return (imsg, B.tail ibuf) -- else we return data from our buffer
|
240 |
255 |
writeIORef (rbuf s) nbuf
|
241 |
|
return msg
|
|
256 |
return $ UTF8.toString msg
|
242 |
257 |
|
243 |
258 |
-- | Serialize a request to String.
|
244 |
259 |
buildCall :: LuxiOp -- ^ The method
|
... | ... | |
341 |
356 |
-- call was successful.
|
342 |
357 |
validateResult :: String -> Result JSValue
|
343 |
358 |
validateResult s = do
|
|
359 |
when (UTF8.replacement_char `elem` s) $
|
|
360 |
fail "Failed to decode UTF-8, detected replacement char after decoding"
|
344 |
361 |
oarr <- fromJResult "Parsing LUXI response"
|
345 |
362 |
(decodeStrict s)::Result (JSObject JSValue)
|
346 |
363 |
let arr = J.fromJSObject oarr
|