Revision 0fbc8447

b/src/Ganeti/Luxi.hs
29 29
  ( LuxiOp(..)
30 30
  , LuxiReq(..)
31 31
  , Client
32
  , Server
32 33
  , JobId
33 34
  , fromJobId
34 35
  , makeJobId
......
67 68
import Ganeti.OpParams (pTagsObject)
68 69
import Ganeti.OpCodes
69 70
import qualified Ganeti.Query.Language as Qlang
71
import Ganeti.Runtime (GanetiDaemon(..))
70 72
import Ganeti.THH
71 73
import Ganeti.Types
72 74

  
......
170 172
-- | Type holding the initial (unparsed) Luxi call.
171 173
data LuxiCall = LuxiCall LuxiReq JSValue
172 174

  
175
luxiConnectConfig :: ConnectConfig
176
luxiConnectConfig = ConnectConfig { connDaemon = GanetiLuxid
177
                                  , recvTmo    = luxiDefRwto
178
                                  , sendTmo    = luxiDefRwto
179
                                  }
180

  
181
-- | Connects to the master daemon and returns a luxi Client.
182
getLuxiClient :: String -> IO Client
183
getLuxiClient = connectClient luxiConnectConfig luxiDefCtmo
184

  
185
-- | Creates and returns a server endpoint.
186
getLuxiServer :: Bool -> FilePath -> IO Server
187
getLuxiServer = connectServer luxiConnectConfig
188

  
189

  
173 190
-- | Serialize a request to String.
174 191
buildCall :: LuxiOp  -- ^ The method
175 192
          -> String  -- ^ The serialized form
b/src/Ganeti/Query/Server.hs
38 38
import Data.Bits (bitSize)
39 39
import qualified Data.Set as Set (toList)
40 40
import Data.IORef
41
import qualified Network.Socket as S
42 41
import qualified Text.JSON as J
43 42
import Text.JSON (encode, showJSON, JSValue(..))
44 43
import System.Info (arch)
......
350 349

  
351 350
-- | Main listener loop: accepts clients, forks an I/O thread to handle
352 351
-- that client.
353
listener :: MVar () -> JQStatus -> ConfigReader -> S.Socket -> IO ()
352
listener :: MVar () -> JQStatus -> ConfigReader -> Server -> IO ()
354 353
listener qlock qstat creader socket = do
355 354
  client <- acceptClient socket
356 355
  _ <- forkIO $ clientLoop qlock qstat client creader
357 356
  return ()
358 357

  
359 358
-- | Type alias for prepMain results
360
type PrepResult = (FilePath, S.Socket, IORef (Result ConfigData), JQStatus)
359
type PrepResult = (FilePath, Server, IORef (Result ConfigData), JQStatus)
361 360

  
362 361
-- | Check function for luxid.
363 362
checkMain :: CheckFn ()
b/src/Ganeti/UDSServer.hs
26 26
-}
27 27

  
28 28
module Ganeti.UDSServer
29
  ( Client
29
  ( ConnectConfig(..)
30
  , Client
31
  , Server
30 32
  , RecvResult(..)
31 33
  , MsgKeys(..)
32 34
  , strOfKey
33
  , getLuxiClient
34
  , getLuxiServer
35
  , connectClient
36
  , connectServer
35 37
  , acceptClient
36 38
  , closeClient
37 39
  , closeServer
......
57 59
import Text.JSON (encodeStrict)
58 60
import Text.JSON.Types
59 61

  
60
import Ganeti.Constants
61 62
import Ganeti.Runtime (GanetiDaemon(..), MiscGroup(..), GanetiGroup(..))
62 63
import Ganeti.THH
63 64
import Ganeti.Utils
......
101 102
$(genStrOfKey ''MsgKeys "strOfKey")
102 103

  
103 104

  
104
-- | Luxi client encapsulation.
105
data ConnectConfig = ConnectConfig
106
                     { connDaemon :: GanetiDaemon
107
                     , recvTmo :: Int
108
                     , sendTmo :: Int
109
                     }
110

  
111
-- | A client encapsulation.
105 112
data Client = Client { socket :: Handle           -- ^ The socket of the client
106 113
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
114
                     , clientConfig :: ConnectConfig
107 115
                     }
108 116

  
109
-- | Connects to the master daemon and returns a luxi Client.
110
getLuxiClient :: String -> IO Client
111
getLuxiClient path = do
117
-- | A server encapsulation.
118
data Server = Server { sSocket :: S.Socket        -- ^ The bound server socket
119
                     , serverConfig :: ConnectConfig
120
                     }
121

  
122

  
123
-- | Connects to the master daemon and returns a Client.
124
connectClient
125
  :: ConnectConfig    -- ^ configuration for the client
126
  -> Int              -- ^ connection timeout
127
  -> FilePath         -- ^ socket path
128
  -> IO Client
129
connectClient conf tmo path = do
112 130
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
113
  withTimeout luxiDefCtmo "creating luxi connection" $
131
  withTimeout tmo "creating a connection" $
114 132
              S.connect s (S.SockAddrUnix path)
115 133
  rf <- newIORef B.empty
116 134
  h <- S.socketToHandle s ReadWriteMode
117
  return Client { socket=h, rbuf=rf }
135
  return Client { socket=h, rbuf=rf, clientConfig=conf }
118 136

  
119 137
-- | Creates and returns a server endpoint.
120
getLuxiServer :: Bool -> FilePath -> IO S.Socket
121
getLuxiServer setOwner path = do
138
connectServer :: ConnectConfig -> Bool -> FilePath -> IO Server
139
connectServer conf setOwner path = do
122 140
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
123 141
  S.bindSocket s (S.SockAddrUnix path)
124
  when setOwner . setOwnerAndGroupFromNames path GanetiLuxid $
142
  when setOwner . setOwnerAndGroupFromNames path (connDaemon conf) $
125 143
    ExtraGroup DaemonsGroup
126 144
  S.listen s 5 -- 5 is the max backlog
127
  return s
145
  return Server { sSocket=s, serverConfig=conf }
128 146

  
129 147
-- | Closes a server endpoint.
130 148
-- FIXME: this should be encapsulated into a nicer type.
131
closeServer :: FilePath -> S.Socket -> IO ()
132
closeServer path sock = do
133
  S.sClose sock
149
closeServer :: FilePath -> Server -> IO ()
150
closeServer path server = do
151
  S.sClose (sSocket server)
134 152
  removeFile path
135 153

  
136 154
-- | Accepts a client
137
acceptClient :: S.Socket -> IO Client
155
acceptClient :: Server -> IO Client
138 156
acceptClient s = do
139 157
  -- second return is the address of the client, which we ignore here
140
  (client_socket, _) <- S.accept s
158
  (client_socket, _) <- S.accept (sSocket s)
141 159
  new_buffer <- newIORef B.empty
142 160
  handle <- S.socketToHandle client_socket ReadWriteMode
143
  return Client { socket=handle, rbuf=new_buffer }
161
  return Client { socket=handle
162
                , rbuf=new_buffer
163
                , clientConfig=serverConfig s
164
                }
144 165

  
145 166
-- | Closes the client socket.
146 167
closeClient :: Client -> IO ()
147 168
closeClient = hClose . socket
148 169

  
149
-- | Sends a message over a luxi transport.
170
-- | Sends a message over a transport.
150 171
sendMsg :: Client -> String -> IO ()
151
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
172
sendMsg s buf = withTimeout (sendTmo $ clientConfig s) "sending a message" $ do
152 173
  let encoded = UTF8L.fromString buf
153 174
      handle = socket s
154 175
  BL.hPut handle encoded
......
158 179
-- | Given a current buffer and the handle, it will read from the
159 180
-- network until we get a full message, and it will return that
160 181
-- message and the leftover buffer contents.
161
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
162
recvUpdate handle obuf = do
163
  nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
182
recvUpdate :: ConnectConfig -> Handle -> B.ByteString
183
           -> IO (B.ByteString, B.ByteString)
184
recvUpdate conf handle obuf = do
185
  nbuf <- withTimeout (recvTmo conf) "reading a response" $ do
164 186
            _ <- hWaitForInput handle (-1)
165 187
            B.hGetNonBlocking handle 4096
166 188
  let (msg, remaining) = B.break (eOM ==) nbuf
167 189
      newbuf = B.append obuf msg
168 190
  if B.null remaining
169
    then recvUpdate handle newbuf
191
    then recvUpdate conf handle newbuf
170 192
    else return (newbuf, B.tail remaining)
171 193

  
172
-- | Waits for a message over a luxi transport.
194
-- | Waits for a message over a transport.
173 195
recvMsg :: Client -> IO String
174 196
recvMsg s = do
175 197
  cbuf <- readIORef $ rbuf s
176 198
  let (imsg, ibuf) = B.break (eOM ==) cbuf
177 199
  (msg, nbuf) <-
178 200
    if B.null ibuf      -- if old buffer didn't contain a full message
179
      then recvUpdate (socket s) cbuf   -- then we read from network
201
                        -- then we read from network:
202
      then recvUpdate (clientConfig s) (socket s) cbuf
180 203
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
181 204
  writeIORef (rbuf s) nbuf
182 205
  return $ UTF8.toString msg

Also available in: Unified diff