Revision 0fbc8447 src/Ganeti/UDSServer.hs
b/src/Ganeti/UDSServer.hs | ||
---|---|---|
26 | 26 |
-} |
27 | 27 |
|
28 | 28 |
module Ganeti.UDSServer |
29 |
( Client |
|
29 |
( ConnectConfig(..) |
|
30 |
, Client |
|
31 |
, Server |
|
30 | 32 |
, RecvResult(..) |
31 | 33 |
, MsgKeys(..) |
32 | 34 |
, strOfKey |
33 |
, getLuxiClient
|
|
34 |
, getLuxiServer
|
|
35 |
, connectClient
|
|
36 |
, connectServer
|
|
35 | 37 |
, acceptClient |
36 | 38 |
, closeClient |
37 | 39 |
, closeServer |
... | ... | |
57 | 59 |
import Text.JSON (encodeStrict) |
58 | 60 |
import Text.JSON.Types |
59 | 61 |
|
60 |
import Ganeti.Constants |
|
61 | 62 |
import Ganeti.Runtime (GanetiDaemon(..), MiscGroup(..), GanetiGroup(..)) |
62 | 63 |
import Ganeti.THH |
63 | 64 |
import Ganeti.Utils |
... | ... | |
101 | 102 |
$(genStrOfKey ''MsgKeys "strOfKey") |
102 | 103 |
|
103 | 104 |
|
104 |
-- | Luxi client encapsulation. |
|
105 |
data ConnectConfig = ConnectConfig |
|
106 |
{ connDaemon :: GanetiDaemon |
|
107 |
, recvTmo :: Int |
|
108 |
, sendTmo :: Int |
|
109 |
} |
|
110 |
|
|
111 |
-- | A client encapsulation. |
|
105 | 112 |
data Client = Client { socket :: Handle -- ^ The socket of the client |
106 | 113 |
, rbuf :: IORef B.ByteString -- ^ Already received buffer |
114 |
, clientConfig :: ConnectConfig |
|
107 | 115 |
} |
108 | 116 |
|
109 |
-- | Connects to the master daemon and returns a luxi Client. |
|
110 |
getLuxiClient :: String -> IO Client |
|
111 |
getLuxiClient path = do |
|
117 |
-- | A server encapsulation. |
|
118 |
data Server = Server { sSocket :: S.Socket -- ^ The bound server socket |
|
119 |
, serverConfig :: ConnectConfig |
|
120 |
} |
|
121 |
|
|
122 |
|
|
123 |
-- | Connects to the master daemon and returns a Client. |
|
124 |
connectClient |
|
125 |
:: ConnectConfig -- ^ configuration for the client |
|
126 |
-> Int -- ^ connection timeout |
|
127 |
-> FilePath -- ^ socket path |
|
128 |
-> IO Client |
|
129 |
connectClient conf tmo path = do |
|
112 | 130 |
s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol |
113 |
withTimeout luxiDefCtmo "creating luxi connection" $
|
|
131 |
withTimeout tmo "creating a connection" $
|
|
114 | 132 |
S.connect s (S.SockAddrUnix path) |
115 | 133 |
rf <- newIORef B.empty |
116 | 134 |
h <- S.socketToHandle s ReadWriteMode |
117 |
return Client { socket=h, rbuf=rf } |
|
135 |
return Client { socket=h, rbuf=rf, clientConfig=conf }
|
|
118 | 136 |
|
119 | 137 |
-- | Creates and returns a server endpoint. |
120 |
getLuxiServer :: Bool -> FilePath -> IO S.Socket
|
|
121 |
getLuxiServer setOwner path = do
|
|
138 |
connectServer :: ConnectConfig -> Bool -> FilePath -> IO Server
|
|
139 |
connectServer conf setOwner path = do
|
|
122 | 140 |
s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol |
123 | 141 |
S.bindSocket s (S.SockAddrUnix path) |
124 |
when setOwner . setOwnerAndGroupFromNames path GanetiLuxid $
|
|
142 |
when setOwner . setOwnerAndGroupFromNames path (connDaemon conf) $
|
|
125 | 143 |
ExtraGroup DaemonsGroup |
126 | 144 |
S.listen s 5 -- 5 is the max backlog |
127 |
return s
|
|
145 |
return Server { sSocket=s, serverConfig=conf }
|
|
128 | 146 |
|
129 | 147 |
-- | Closes a server endpoint. |
130 | 148 |
-- FIXME: this should be encapsulated into a nicer type. |
131 |
closeServer :: FilePath -> S.Socket -> IO ()
|
|
132 |
closeServer path sock = do
|
|
133 |
S.sClose sock
|
|
149 |
closeServer :: FilePath -> Server -> IO ()
|
|
150 |
closeServer path server = do
|
|
151 |
S.sClose (sSocket server)
|
|
134 | 152 |
removeFile path |
135 | 153 |
|
136 | 154 |
-- | Accepts a client |
137 |
acceptClient :: S.Socket -> IO Client
|
|
155 |
acceptClient :: Server -> IO Client
|
|
138 | 156 |
acceptClient s = do |
139 | 157 |
-- second return is the address of the client, which we ignore here |
140 |
(client_socket, _) <- S.accept s
|
|
158 |
(client_socket, _) <- S.accept (sSocket s)
|
|
141 | 159 |
new_buffer <- newIORef B.empty |
142 | 160 |
handle <- S.socketToHandle client_socket ReadWriteMode |
143 |
return Client { socket=handle, rbuf=new_buffer } |
|
161 |
return Client { socket=handle |
|
162 |
, rbuf=new_buffer |
|
163 |
, clientConfig=serverConfig s |
|
164 |
} |
|
144 | 165 |
|
145 | 166 |
-- | Closes the client socket. |
146 | 167 |
closeClient :: Client -> IO () |
147 | 168 |
closeClient = hClose . socket |
148 | 169 |
|
149 |
-- | Sends a message over a luxi transport.
|
|
170 |
-- | Sends a message over a transport. |
|
150 | 171 |
sendMsg :: Client -> String -> IO () |
151 |
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
|
|
172 |
sendMsg s buf = withTimeout (sendTmo $ clientConfig s) "sending a message" $ do
|
|
152 | 173 |
let encoded = UTF8L.fromString buf |
153 | 174 |
handle = socket s |
154 | 175 |
BL.hPut handle encoded |
... | ... | |
158 | 179 |
-- | Given a current buffer and the handle, it will read from the |
159 | 180 |
-- network until we get a full message, and it will return that |
160 | 181 |
-- message and the leftover buffer contents. |
161 |
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString) |
|
162 |
recvUpdate handle obuf = do |
|
163 |
nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do |
|
182 |
recvUpdate :: ConnectConfig -> Handle -> B.ByteString |
|
183 |
-> IO (B.ByteString, B.ByteString) |
|
184 |
recvUpdate conf handle obuf = do |
|
185 |
nbuf <- withTimeout (recvTmo conf) "reading a response" $ do |
|
164 | 186 |
_ <- hWaitForInput handle (-1) |
165 | 187 |
B.hGetNonBlocking handle 4096 |
166 | 188 |
let (msg, remaining) = B.break (eOM ==) nbuf |
167 | 189 |
newbuf = B.append obuf msg |
168 | 190 |
if B.null remaining |
169 |
then recvUpdate handle newbuf |
|
191 |
then recvUpdate conf handle newbuf
|
|
170 | 192 |
else return (newbuf, B.tail remaining) |
171 | 193 |
|
172 |
-- | Waits for a message over a luxi transport.
|
|
194 |
-- | Waits for a message over a transport. |
|
173 | 195 |
recvMsg :: Client -> IO String |
174 | 196 |
recvMsg s = do |
175 | 197 |
cbuf <- readIORef $ rbuf s |
176 | 198 |
let (imsg, ibuf) = B.break (eOM ==) cbuf |
177 | 199 |
(msg, nbuf) <- |
178 | 200 |
if B.null ibuf -- if old buffer didn't contain a full message |
179 |
then recvUpdate (socket s) cbuf -- then we read from network |
|
201 |
-- then we read from network: |
|
202 |
then recvUpdate (clientConfig s) (socket s) cbuf |
|
180 | 203 |
else return (imsg, B.tail ibuf) -- else we return data from our buffer |
181 | 204 |
writeIORef (rbuf s) nbuf |
182 | 205 |
return $ UTF8.toString msg |
Also available in: Unified diff