Statistics
| Branch: | Tag: | Revision:

root / htools / Ganeti / Luxi.hs @ 05ff7a00

History | View | Annotate | Download (8.3 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
    ( LuxiOp(..)
30
    , Client
31
    , getClient
32
    , closeClient
33
    , callMethod
34
    , submitManyJobs
35
    , queryJobsStatus
36
    ) where
37

    
38
import Data.IORef
39
import Control.Monad
40
import Text.JSON (encodeStrict, decodeStrict)
41
import qualified Text.JSON as J
42
import Text.JSON.Types
43
import System.Timeout
44
import qualified Network.Socket as S
45

    
46
import Ganeti.HTools.Utils
47
import Ganeti.HTools.Types
48

    
49
import Ganeti.Jobs (JobStatus)
50
import Ganeti.OpCodes (OpCode)
51
import Ganeti.THH
52

    
53
-- * Utility functions
54

    
55
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
56
withTimeout :: Int -> String -> IO a -> IO a
57
withTimeout secs descr action = do
58
    result <- timeout (secs * 1000000) action
59
    (case result of
60
       Nothing -> fail $ "Timeout in " ++ descr
61
       Just v -> return v)
62

    
63
-- * Generic protocol functionality
64

    
65
-- | Currently supported Luxi operations and JSON serialization.
66
$(genLuxiOp "LuxiOp"
67
    [ ("QueryNodes",
68
       [ ("names",  [t| [String] |], [| id |])
69
       , ("fields", [t| [String] |], [| id |])
70
       , ("lock",   [t| Bool     |], [| id |])
71
       ],
72
       [| J.showJSON |])
73
    , ("QueryGroups",
74
       [ ("names",  [t| [String] |], [| id |])
75
       , ("fields", [t| [String] |], [| id |])
76
       , ("lock",   [t| Bool     |], [| id |])
77
       ],
78
       [| J.showJSON |])
79
    , ("QueryInstances",
80
       [ ("names",  [t| [String] |], [| id |])
81
       , ("fields", [t| [String] |], [| id |])
82
       , ("lock",   [t| Bool     |], [| id |])
83
       ],
84
       [| J.showJSON |])
85
    , ("QueryJobs",
86
       [ ("ids",    [t| [Int]    |], [| map show |])
87
       , ("fields", [t| [String] |], [| id |])
88
       ],
89
       [| J.showJSON |])
90
    , ("QueryExports",
91
       [ ("nodes", [t| [String] |], [| id |])
92
       , ("lock",  [t| Bool     |], [| id |])
93
       ],
94
       [| J.showJSON |])
95
    , ("QueryConfigValues",
96
       [ ("fields", [t| [String] |], [| id |]) ],
97
       [| J.showJSON |])
98
    , ("QueryClusterInfo",
99
       [],
100
       [| J.showJSON |])
101
    , ("QueryTags",
102
       [ ("kind", [t| String |], [| id |])
103
       , ("name", [t| String |], [| id |])
104
       ],
105
       [| J.showJSON |])
106
    , ("SubmitJob",
107
       [ ("job", [t| [OpCode] |], [| id |]) ],
108
       [| J.showJSON |])
109
    , ("SubmitManyJobs",
110
       [ ("ops", [t| [[OpCode]] |], [| id |]) ],
111
       [| J.showJSON |])
112
    , ("WaitForJobChange",
113
       [ ("job",      [t| Int     |], [| J.showJSON |])
114
       , ("fields",   [t| [String]|], [| J.showJSON |])
115
       , ("prev_job", [t| JSValue |], [| J.showJSON |])
116
       , ("prev_log", [t| JSValue |], [| J.showJSON |])
117
       , ("tmout",    [t| Int     |], [| J.showJSON |])
118
       ],
119
       [| \(j, f, pj, pl, t) -> JSArray [j, f, pj, pl, t] |])
120
    , ("ArchiveJob",
121
       [ ("job", [t| Int |], [| show |]) ],
122
       [| J.showJSON |])
123
    , ("AutoArchiveJobs",
124
       [ ("age",   [t| Int |], [| id |])
125
       , ("tmout", [t| Int |], [| id |])
126
       ],
127
       [| J.showJSON |])
128
    , ("CancelJob",
129
       [("job", [t| Int |], [| show |]) ],
130
       [| J.showJSON |])
131
    , ("SetDrainFlag",
132
       [ ("flag", [t| Bool |], [| id |]) ],
133
       [| J.showJSON |])
134
    , ("SetWatcherPause",
135
       [ ("duration", [t| Double |], [| (: []) |]) ],
136
       [| J.showJSON |])
137
  ])
138

    
139
-- | The serialisation of LuxiOps into strings in messages.
140
$(genStrOfOp ''LuxiOp "strOfOp")
141

    
142
-- | The end-of-message separator.
143
eOM :: Char
144
eOM = '\3'
145

    
146
-- | Valid keys in the requests and responses.
147
data MsgKeys = Method
148
             | Args
149
             | Success
150
             | Result
151

    
152
-- | The serialisation of MsgKeys into strings in messages.
153
$(genStrOfKey ''MsgKeys "strOfKey")
154

    
155
-- | Luxi client encapsulation.
156
data Client = Client { socket :: S.Socket   -- ^ The socket of the client
157
                     , rbuf :: IORef String -- ^ Already received buffer
158
                     }
159

    
160
-- | Connects to the master daemon and returns a luxi Client.
161
getClient :: String -> IO Client
162
getClient path = do
163
    s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
164
    withTimeout connTimeout "creating luxi connection" $
165
                S.connect s (S.SockAddrUnix path)
166
    rf <- newIORef ""
167
    return Client { socket=s, rbuf=rf}
168

    
169
-- | Closes the client socket.
170
closeClient :: Client -> IO ()
171
closeClient = S.sClose . socket
172

    
173
-- | Sends a message over a luxi transport.
174
sendMsg :: Client -> String -> IO ()
175
sendMsg s buf =
176
    let _send obuf = do
177
          sbytes <- withTimeout queryTimeout
178
                    "sending luxi message" $
179
                    S.send (socket s) obuf
180
          unless (sbytes == length obuf) $ _send (drop sbytes obuf)
181
    in _send (buf ++ [eOM])
182

    
183
-- | Waits for a message over a luxi transport.
184
recvMsg :: Client -> IO String
185
recvMsg s = do
186
  let _recv obuf = do
187
              nbuf <- withTimeout queryTimeout "reading luxi response" $
188
                      S.recv (socket s) 4096
189
              let (msg, remaining) = break (eOM ==) nbuf
190
              (if null remaining
191
               then _recv (obuf ++ msg)
192
               else return (obuf ++ msg, tail remaining))
193
  cbuf <- readIORef $ rbuf s
194
  let (imsg, ibuf) = break (eOM ==) cbuf
195
  (msg, nbuf) <-
196
      (if null ibuf      -- if old buffer didn't contain a full message
197
       then _recv cbuf   -- then we read from network
198
       else return (imsg, tail ibuf)) -- else we return data from our buffer
199
  writeIORef (rbuf s) nbuf
200
  return msg
201

    
202
-- | Serialize a request to String.
203
buildCall :: LuxiOp  -- ^ The method
204
          -> String  -- ^ The serialized form
205
buildCall lo =
206
    let ja = [ (strOfKey Method, JSString $ toJSString $ strOfOp lo::JSValue)
207
             , (strOfKey Args, opToArgs lo::JSValue)
208
             ]
209
        jo = toJSObject ja
210
    in encodeStrict jo
211

    
212
-- | Check that luxi responses contain the required keys and that the
213
-- call was successful.
214
validateResult :: String -> Result JSValue
215
validateResult s = do
216
  oarr <- fromJResult "Parsing LUXI response"
217
          (decodeStrict s)::Result (JSObject JSValue)
218
  let arr = J.fromJSObject oarr
219
  status <- fromObj arr (strOfKey Success)::Result Bool
220
  let rkey = strOfKey Result
221
  (if status
222
   then fromObj arr rkey
223
   else fromObj arr rkey >>= fail)
224

    
225
-- | Generic luxi method call.
226
callMethod :: LuxiOp -> Client -> IO (Result JSValue)
227
callMethod method s = do
228
  sendMsg s $ buildCall method
229
  result <- recvMsg s
230
  let rval = validateResult result
231
  return rval
232

    
233
-- | Specialized submitManyJobs call.
234
submitManyJobs :: Client -> [[OpCode]] -> IO (Result [String])
235
submitManyJobs s jobs = do
236
  rval <- callMethod (SubmitManyJobs jobs) s
237
  -- map each result (status, payload) pair into a nice Result ADT
238
  return $ case rval of
239
             Bad x -> Bad x
240
             Ok (JSArray r) ->
241
                 mapM (\v -> case v of
242
                               JSArray [JSBool True, JSString x] ->
243
                                   Ok (fromJSString x)
244
                               JSArray [JSBool False, JSString x] ->
245
                                   Bad (fromJSString x)
246
                               _ -> Bad "Unknown result from the master daemon"
247
                      ) r
248
             x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
249

    
250
-- | Custom queryJobs call.
251
queryJobsStatus :: Client -> [String] -> IO (Result [JobStatus])
252
queryJobsStatus s jids = do
253
  rval <- callMethod (QueryJobs (map read jids) ["status"]) s
254
  return $ case rval of
255
             Bad x -> Bad x
256
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
257
                       J.Ok vals -> if any null vals
258
                                    then Bad "Missing job status field"
259
                                    else Ok (map head vals)
260
                       J.Error x -> Bad x