Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / UDSServer.hs @ c92b4671

History | View | Annotate | Download (12.1 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti Unix Domain Socket JSON server interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2013 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.UDSServer
29
  ( ConnectConfig(..)
30
  , Client
31
  , Server
32
  , RecvResult(..)
33
  , MsgKeys(..)
34
  , strOfKey
35
  -- * Unix sockets
36
  , openClientSocket
37
  , closeClientSocket
38
  , openServerSocket
39
  , closeServerSocket
40
  , acceptSocket
41
  -- * Client and server
42
  , connectClient
43
  , connectServer
44
  , acceptClient
45
  , closeClient
46
  , closeServer
47
  , buildResponse
48
  , parseCall
49
  , recvMsg
50
  , recvMsgExt
51
  , sendMsg
52
  -- * Client handler
53
  , Handler(..)
54
  , HandlerResult
55
  , listener
56
  ) where
57

    
58
import Control.Applicative
59
import Control.Concurrent (forkIO)
60
import Control.Exception (catch)
61
import Data.IORef
62
import qualified Data.ByteString as B
63
import qualified Data.ByteString.Lazy as BL
64
import qualified Data.ByteString.UTF8 as UTF8
65
import qualified Data.ByteString.Lazy.UTF8 as UTF8L
66
import Data.Word (Word8)
67
import Control.Monad
68
import qualified Network.Socket as S
69
import System.Directory (removeFile)
70
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
71
import System.IO.Error (isEOFError)
72
import System.Timeout
73
import Text.JSON (encodeStrict, decodeStrict)
74
import qualified Text.JSON as J
75
import Text.JSON.Types
76

    
77
import Ganeti.BasicTypes
78
import Ganeti.Errors (GanetiException)
79
import Ganeti.JSON
80
import Ganeti.Logging
81
import Ganeti.Runtime (GanetiDaemon(..), MiscGroup(..), GanetiGroup(..))
82
import Ganeti.THH
83
import Ganeti.Utils
84

    
85

    
86
-- * Utility functions
87

    
88
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
89
withTimeout :: Int -> String -> IO a -> IO a
90
withTimeout secs descr action = do
91
  result <- timeout (secs * 1000000) action
92
  case result of
93
    Nothing -> fail $ "Timeout in " ++ descr
94
    Just v -> return v
95

    
96

    
97
-- * Generic protocol functionality
98

    
99
-- | Result of receiving a message from the socket.
100
data RecvResult = RecvConnClosed    -- ^ Connection closed
101
                | RecvError String  -- ^ Any other error
102
                | RecvOk String     -- ^ Successfull receive
103
                  deriving (Show, Eq)
104

    
105

    
106
-- | The end-of-message separator.
107
eOM :: Word8
108
eOM = 3
109

    
110
-- | The end-of-message encoded as a ByteString.
111
bEOM :: B.ByteString
112
bEOM = B.singleton eOM
113

    
114
-- | Valid keys in the requests and responses.
115
data MsgKeys = Method
116
             | Args
117
             | Success
118
             | Result
119

    
120
-- | The serialisation of MsgKeys into strings in messages.
121
$(genStrOfKey ''MsgKeys "strOfKey")
122

    
123

    
124
data ConnectConfig = ConnectConfig
125
                     { connDaemon :: GanetiDaemon
126
                     , recvTmo :: Int
127
                     , sendTmo :: Int
128
                     }
129

    
130
-- | A client encapsulation.
131
data Client = Client { socket :: Handle           -- ^ The socket of the client
132
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
133
                     , clientConfig :: ConnectConfig
134
                     }
135

    
136
-- | A server encapsulation.
137
data Server = Server { sSocket :: S.Socket        -- ^ The bound server socket
138
                     , sPath :: FilePath          -- ^ The scoket's path
139
                     , serverConfig :: ConnectConfig
140
                     }
141

    
142
-- * Unix sockets
143

    
144
-- | Creates a Unix socket and connects it to the specified @path@,
145
-- where @timeout@ specifies the connection timeout.
146
openClientSocket
147
  :: Int              -- ^ connection timeout
148
  -> FilePath         -- ^ socket path
149
  -> IO Handle
150
openClientSocket tmo path = do
151
  sock <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
152
  withTimeout tmo "creating a connection" $
153
              S.connect sock (S.SockAddrUnix path)
154
  S.socketToHandle sock ReadWriteMode
155

    
156
closeClientSocket :: Handle -> IO ()
157
closeClientSocket = hClose
158

    
159
-- | Creates a Unix socket and binds it to the specified @path@.
160
openServerSocket :: FilePath -> IO S.Socket
161
openServerSocket path = do
162
  sock <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
163
  S.bindSocket sock (S.SockAddrUnix path)
164
  return sock
165

    
166
closeServerSocket :: S.Socket -> FilePath -> IO ()
167
closeServerSocket sock path = do
168
  S.sClose sock
169
  removeFile path
170

    
171
acceptSocket :: S.Socket -> IO Handle
172
acceptSocket sock = do
173
  -- ignore client socket address
174
  (clientSock, _) <- S.accept sock
175
  S.socketToHandle clientSock ReadWriteMode
176

    
177
-- * Client and server
178

    
179
-- | Connects to the master daemon and returns a Client.
180
connectClient
181
  :: ConnectConfig    -- ^ configuration for the client
182
  -> Int              -- ^ connection timeout
183
  -> FilePath         -- ^ socket path
184
  -> IO Client
185
connectClient conf tmo path = do
186
  h <- openClientSocket tmo path
187
  rf <- newIORef B.empty
188
  return Client { socket=h, rbuf=rf, clientConfig=conf }
189

    
190
-- | Creates and returns a server endpoint.
191
connectServer :: ConnectConfig -> Bool -> FilePath -> IO Server
192
connectServer conf setOwner path = do
193
  s <- openServerSocket path
194
  when setOwner . setOwnerAndGroupFromNames path (connDaemon conf) $
195
    ExtraGroup DaemonsGroup
196
  S.listen s 5 -- 5 is the max backlog
197
  return Server { sSocket=s, sPath=path, serverConfig=conf }
198

    
199
-- | Closes a server endpoint.
200
closeServer :: Server -> IO ()
201
closeServer server =
202
  closeServerSocket (sSocket server) (sPath server)
203

    
204
-- | Accepts a client
205
acceptClient :: Server -> IO Client
206
acceptClient s = do
207
  handle <- acceptSocket (sSocket s)
208
  new_buffer <- newIORef B.empty
209
  return Client { socket=handle
210
                , rbuf=new_buffer
211
                , clientConfig=serverConfig s
212
                }
213

    
214
-- | Closes the client socket.
215
closeClient :: Client -> IO ()
216
closeClient = closeClientSocket . socket
217

    
218
-- | Sends a message over a transport.
219
sendMsg :: Client -> String -> IO ()
220
sendMsg s buf = withTimeout (sendTmo $ clientConfig s) "sending a message" $ do
221
  let encoded = UTF8L.fromString buf
222
      handle = socket s
223
  BL.hPut handle encoded
224
  B.hPut handle bEOM
225
  hFlush handle
226

    
227
-- | Given a current buffer and the handle, it will read from the
228
-- network until we get a full message, and it will return that
229
-- message and the leftover buffer contents.
230
recvUpdate :: ConnectConfig -> Handle -> B.ByteString
231
           -> IO (B.ByteString, B.ByteString)
232
recvUpdate conf handle obuf = do
233
  nbuf <- withTimeout (recvTmo conf) "reading a response" $ do
234
            _ <- hWaitForInput handle (-1)
235
            B.hGetNonBlocking handle 4096
236
  let (msg, remaining) = B.break (eOM ==) nbuf
237
      newbuf = B.append obuf msg
238
  if B.null remaining
239
    then recvUpdate conf handle newbuf
240
    else return (newbuf, B.tail remaining)
241

    
242
-- | Waits for a message over a transport.
243
recvMsg :: Client -> IO String
244
recvMsg s = do
245
  cbuf <- readIORef $ rbuf s
246
  let (imsg, ibuf) = B.break (eOM ==) cbuf
247
  (msg, nbuf) <-
248
    if B.null ibuf      -- if old buffer didn't contain a full message
249
                        -- then we read from network:
250
      then recvUpdate (clientConfig s) (socket s) cbuf
251
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
252
  writeIORef (rbuf s) nbuf
253
  return $ UTF8.toString msg
254

    
255
-- | Extended wrapper over recvMsg.
256
recvMsgExt :: Client -> IO RecvResult
257
recvMsgExt s =
258
  Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
259
    return $ if isEOFError e
260
               then RecvConnClosed
261
               else RecvError (show e)
262

    
263

    
264
-- | Parse the required keys out of a call.
265
parseCall :: (J.JSON mth, J.JSON args) => String -> Result (mth, args)
266
parseCall s = do
267
  arr <- fromJResult "parsing top-level JSON message" $
268
           decodeStrict s :: Result (JSObject JSValue)
269
  let keyFromObj :: (J.JSON a) => MsgKeys -> Result a
270
      keyFromObj = fromObj (fromJSObject arr) . strOfKey
271
  (,) <$> keyFromObj Method <*> keyFromObj Args
272

    
273

    
274
-- | Serialize the response to String.
275
buildResponse :: Bool    -- ^ Success
276
              -> JSValue -- ^ The arguments
277
              -> String  -- ^ The serialized form
278
buildResponse success args =
279
  let ja = [ (strOfKey Success, JSBool success)
280
           , (strOfKey Result, args)]
281
      jo = toJSObject ja
282
  in encodeStrict jo
283

    
284
-- | Logs an outgoing message.
285
logMsg
286
    :: (Show e, J.JSON e, MonadLog m)
287
    => Handler i o
288
    -> i                          -- ^ the received request (used for logging)
289
    -> GenericResult e J.JSValue  -- ^ A message to be sent
290
    -> m ()
291
logMsg handler req (Bad err) =
292
  logWarning $ "Failed to execute request " ++ hInputLogLong handler req ++ ": "
293
               ++ show err
294
logMsg handler req (Ok result) = do
295
  -- only log the first 2,000 chars of the result
296
  logDebug $ "Result (truncated): " ++ take 2000 (J.encode result)
297
  logInfo $ "Successfully handled " ++ hInputLogShort handler req
298

    
299
-- | Prepares an outgoing message.
300
prepareMsg
301
    :: (J.JSON e)
302
    => GenericResult e J.JSValue  -- ^ A message to be sent
303
    -> (Bool, J.JSValue)
304
prepareMsg (Bad err)   = (False, J.showJSON err)
305
prepareMsg (Ok result) = (True, result)
306

    
307

    
308
-- * Processing client requests
309

    
310
type HandlerResult o = IO (Bool, GenericResult GanetiException o)
311

    
312
data Handler i o = Handler
313
  { hParse         :: J.JSValue -> J.JSValue -> Result i
314
    -- ^ parses method and its arguments into the input type
315
  , hInputLogShort :: i -> String
316
    -- ^ short description of an input, for the INFO logging level
317
  , hInputLogLong  :: i -> String
318
    -- ^ long description of an input, for the DEBUG logging level
319
  , hExec          :: i -> HandlerResult o
320
    -- ^ executes the handler on an input
321
  }
322

    
323

    
324
handleJsonMessage
325
    :: (J.JSON o)
326
    => Handler i o              -- ^ handler
327
    -> i                        -- ^ parsed input
328
    -> HandlerResult J.JSValue
329
handleJsonMessage handler req = do
330
  (close, call_result) <- hExec handler req
331
  return (close, fmap J.showJSON call_result)
332

    
333
-- | Takes a request as a 'String', parses it, passes it to a handler and
334
-- formats its response.
335
handleRawMessage
336
    :: (J.JSON o)
337
    => Handler i o              -- ^ handler
338
    -> String                   -- ^ raw unparsed input
339
    -> IO (Bool, String)
340
handleRawMessage handler payload =
341
  case parseCall payload >>= uncurry (hParse handler) of
342
    Bad err -> do
343
         let errmsg = "Failed to parse request: " ++ err
344
         logWarning errmsg
345
         return (False, buildResponse False (J.showJSON errmsg))
346
    Ok req -> do
347
        logDebug $ "Request: " ++ hInputLogLong handler req
348
        (close, call_result_json) <- handleJsonMessage handler req
349
        logMsg handler req call_result_json
350
        let (status, response) = prepareMsg call_result_json
351
        return (close, buildResponse status response)
352

    
353
-- | Reads a request, passes it to a handler and sends a response back to the
354
-- client.
355
handleClient
356
    :: (J.JSON o)
357
    => Handler i o
358
    -> Client
359
    -> IO Bool
360
handleClient handler client = do
361
  msg <- recvMsgExt client
362
  logDebug $ "Received message: " ++ show msg
363
  case msg of
364
    RecvConnClosed -> logDebug "Connection closed" >>
365
                      return False
366
    RecvError err -> logWarning ("Error during message receiving: " ++ err) >>
367
                     return False
368
    RecvOk payload -> do
369
      (close, outMsg) <- handleRawMessage handler payload
370
      sendMsg client outMsg
371
      return close
372

    
373
-- | Main client loop: runs one loop of 'handleClient', and if that
374
-- doesn't report a finished (closed) connection, restarts itself.
375
clientLoop
376
    :: (J.JSON o)
377
    => Handler i o
378
    -> Client
379
    -> IO ()
380
clientLoop handler client = do
381
  result <- handleClient handler client
382
  if result
383
    then clientLoop handler client
384
    else closeClient client
385

    
386
-- | Main listener loop: accepts clients, forks an I/O thread to handle
387
-- that client.
388
listener
389
    :: (J.JSON o)
390
    => Handler i o
391
    -> Server
392
    -> IO ()
393
listener handler server = do
394
  client <- acceptClient server
395
  _ <- forkIO $ clientLoop handler client
396
  return ()