Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / UDSServer.hs @ 71a4c605

History | View | Annotate | Download (6.1 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti Unix Domain Socket JSON server interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2013 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.UDSServer
29
  ( Client
30
  , RecvResult(..)
31
  , MsgKeys(..)
32
  , strOfKey
33
  , getClient
34
  , getServer
35
  , acceptClient
36
  , closeClient
37
  , closeServer
38
  , buildResponse
39
  , recvMsg
40
  , recvMsgExt
41
  , sendMsg
42
  ) where
43

    
44
import Control.Exception (catch)
45
import Data.IORef
46
import qualified Data.ByteString as B
47
import qualified Data.ByteString.Lazy as BL
48
import qualified Data.ByteString.UTF8 as UTF8
49
import qualified Data.ByteString.Lazy.UTF8 as UTF8L
50
import Data.Word (Word8)
51
import Control.Monad
52
import qualified Network.Socket as S
53
import System.Directory (removeFile)
54
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
55
import System.IO.Error (isEOFError)
56
import System.Timeout
57
import Text.JSON (encodeStrict)
58
import Text.JSON.Types
59

    
60
import Ganeti.Constants
61
import Ganeti.Runtime (GanetiDaemon(..), MiscGroup(..), GanetiGroup(..))
62
import Ganeti.THH
63
import Ganeti.Utils
64

    
65

    
66
-- * Utility functions
67

    
68
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
69
withTimeout :: Int -> String -> IO a -> IO a
70
withTimeout secs descr action = do
71
  result <- timeout (secs * 1000000) action
72
  case result of
73
    Nothing -> fail $ "Timeout in " ++ descr
74
    Just v -> return v
75

    
76

    
77
-- * Generic protocol functionality
78

    
79
-- | Result of receiving a message from the socket.
80
data RecvResult = RecvConnClosed    -- ^ Connection closed
81
                | RecvError String  -- ^ Any other error
82
                | RecvOk String     -- ^ Successfull receive
83
                  deriving (Show, Eq)
84

    
85

    
86
-- | The end-of-message separator.
87
eOM :: Word8
88
eOM = 3
89

    
90
-- | The end-of-message encoded as a ByteString.
91
bEOM :: B.ByteString
92
bEOM = B.singleton eOM
93

    
94
-- | Valid keys in the requests and responses.
95
data MsgKeys = Method
96
             | Args
97
             | Success
98
             | Result
99

    
100
-- | The serialisation of MsgKeys into strings in messages.
101
$(genStrOfKey ''MsgKeys "strOfKey")
102

    
103

    
104
-- | Luxi client encapsulation.
105
data Client = Client { socket :: Handle           -- ^ The socket of the client
106
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
107
                     }
108

    
109
-- | Connects to the master daemon and returns a luxi Client.
110
getClient :: String -> IO Client
111
getClient path = do
112
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
113
  withTimeout luxiDefCtmo "creating luxi connection" $
114
              S.connect s (S.SockAddrUnix path)
115
  rf <- newIORef B.empty
116
  h <- S.socketToHandle s ReadWriteMode
117
  return Client { socket=h, rbuf=rf }
118

    
119
-- | Creates and returns a server endpoint.
120
getServer :: Bool -> FilePath -> IO S.Socket
121
getServer setOwner path = do
122
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
123
  S.bindSocket s (S.SockAddrUnix path)
124
  when setOwner . setOwnerAndGroupFromNames path GanetiLuxid $
125
    ExtraGroup DaemonsGroup
126
  S.listen s 5 -- 5 is the max backlog
127
  return s
128

    
129
-- | Closes a server endpoint.
130
-- FIXME: this should be encapsulated into a nicer type.
131
closeServer :: FilePath -> S.Socket -> IO ()
132
closeServer path sock = do
133
  S.sClose sock
134
  removeFile path
135

    
136
-- | Accepts a client
137
acceptClient :: S.Socket -> IO Client
138
acceptClient s = do
139
  -- second return is the address of the client, which we ignore here
140
  (client_socket, _) <- S.accept s
141
  new_buffer <- newIORef B.empty
142
  handle <- S.socketToHandle client_socket ReadWriteMode
143
  return Client { socket=handle, rbuf=new_buffer }
144

    
145
-- | Closes the client socket.
146
closeClient :: Client -> IO ()
147
closeClient = hClose . socket
148

    
149
-- | Sends a message over a luxi transport.
150
sendMsg :: Client -> String -> IO ()
151
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
152
  let encoded = UTF8L.fromString buf
153
      handle = socket s
154
  BL.hPut handle encoded
155
  B.hPut handle bEOM
156
  hFlush handle
157

    
158
-- | Given a current buffer and the handle, it will read from the
159
-- network until we get a full message, and it will return that
160
-- message and the leftover buffer contents.
161
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
162
recvUpdate handle obuf = do
163
  nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
164
            _ <- hWaitForInput handle (-1)
165
            B.hGetNonBlocking handle 4096
166
  let (msg, remaining) = B.break (eOM ==) nbuf
167
      newbuf = B.append obuf msg
168
  if B.null remaining
169
    then recvUpdate handle newbuf
170
    else return (newbuf, B.tail remaining)
171

    
172
-- | Waits for a message over a luxi transport.
173
recvMsg :: Client -> IO String
174
recvMsg s = do
175
  cbuf <- readIORef $ rbuf s
176
  let (imsg, ibuf) = B.break (eOM ==) cbuf
177
  (msg, nbuf) <-
178
    if B.null ibuf      -- if old buffer didn't contain a full message
179
      then recvUpdate (socket s) cbuf   -- then we read from network
180
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
181
  writeIORef (rbuf s) nbuf
182
  return $ UTF8.toString msg
183

    
184
-- | Extended wrapper over recvMsg.
185
recvMsgExt :: Client -> IO RecvResult
186
recvMsgExt s =
187
  Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
188
    return $ if isEOFError e
189
               then RecvConnClosed
190
               else RecvError (show e)
191

    
192

    
193
-- | Serialize the response to String.
194
buildResponse :: Bool    -- ^ Success
195
              -> JSValue -- ^ The arguments
196
              -> String  -- ^ The serialized form
197
buildResponse success args =
198
  let ja = [ (strOfKey Success, JSBool success)
199
           , (strOfKey Result, args)]
200
      jo = toJSObject ja
201
  in encodeStrict jo