Statistics
| Branch: | Tag: | Revision:

root / htools / Ganeti / Luxi.hs @ 7eda951b

History | View | Annotate | Download (8.3 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
    ( LuxiOp(..)
30
    , QrViaLuxi(..)
31
    , Client
32
    , getClient
33
    , closeClient
34
    , callMethod
35
    , submitManyJobs
36
    , queryJobsStatus
37
    ) where
38

    
39
import Data.IORef
40
import Control.Monad
41
import Text.JSON (encodeStrict, decodeStrict)
42
import qualified Text.JSON as J
43
import Text.JSON.Types
44
import System.Timeout
45
import qualified Network.Socket as S
46

    
47
import Ganeti.HTools.Utils
48
import Ganeti.HTools.Types
49

    
50
import Ganeti.Constants
51
import Ganeti.Jobs (JobStatus)
52
import Ganeti.OpCodes (OpCode)
53
import Ganeti.THH
54

    
55
-- * Utility functions
56

    
57
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
58
withTimeout :: Int -> String -> IO a -> IO a
59
withTimeout secs descr action = do
60
    result <- timeout (secs * 1000000) action
61
    (case result of
62
       Nothing -> fail $ "Timeout in " ++ descr
63
       Just v -> return v)
64

    
65
-- * Generic protocol functionality
66

    
67
$(declareSADT "QrViaLuxi"
68
     [ ("QRLock", 'qrLock)
69
     , ("QRInstance", 'qrInstance)
70
     , ("QRNode", 'qrNode)
71
     , ("QRGroup", 'qrGroup)
72
     , ("QROs", 'qrOs)
73
     ])
74
$(makeJSONInstance ''QrViaLuxi)
75

    
76
-- | Currently supported Luxi operations and JSON serialization.
77
$(genLuxiOp "LuxiOp"
78
    [("Query" ,
79
       [ ("what",    [t| QrViaLuxi |], [| id |])
80
       , ("fields",  [t| [String]  |], [| id |])
81
       , ("qfilter", [t| ()        |], [| const JSNull |])
82
       ])
83
     , ("QueryNodes",
84
       [ ("names",  [t| [String] |], [| id |])
85
       , ("fields", [t| [String] |], [| id |])
86
       , ("lock",   [t| Bool     |], [| id |])
87
       ])
88
    , ("QueryGroups",
89
       [ ("names",  [t| [String] |], [| id |])
90
       , ("fields", [t| [String] |], [| id |])
91
       , ("lock",   [t| Bool     |], [| id |])
92
       ])
93
    , ("QueryInstances",
94
       [ ("names",  [t| [String] |], [| id |])
95
       , ("fields", [t| [String] |], [| id |])
96
       , ("lock",   [t| Bool     |], [| id |])
97
       ])
98
    , ("QueryJobs",
99
       [ ("ids",    [t| [Int]    |], [| map show |])
100
       , ("fields", [t| [String] |], [| id |])
101
       ])
102
    , ("QueryExports",
103
       [ ("nodes", [t| [String] |], [| id |])
104
       , ("lock",  [t| Bool     |], [| id |])
105
       ])
106
    , ("QueryConfigValues",
107
       [ ("fields", [t| [String] |], [| id |]) ]
108
      )
109
    , ("QueryClusterInfo", [])
110
    , ("QueryTags",
111
       [ ("kind", [t| String |], [| id |])
112
       , ("name", [t| String |], [| id |])
113
       ])
114
    , ("SubmitJob",
115
       [ ("job", [t| [OpCode] |], [| id |]) ]
116
      )
117
    , ("SubmitManyJobs",
118
       [ ("ops", [t| [[OpCode]] |], [| id |]) ]
119
      )
120
    , ("WaitForJobChange",
121
       [ ("job",      [t| Int     |], [| id |])
122
       , ("fields",   [t| [String]|], [| id |])
123
       , ("prev_job", [t| JSValue |], [| id |])
124
       , ("prev_log", [t| JSValue |], [| id |])
125
       , ("tmout",    [t| Int     |], [| id |])
126
       ])
127
    , ("ArchiveJob",
128
       [ ("job", [t| Int |], [| show |]) ]
129
      )
130
    , ("AutoArchiveJobs",
131
       [ ("age",   [t| Int |], [| id |])
132
       , ("tmout", [t| Int |], [| id |])
133
       ])
134
    , ("CancelJob",
135
       [ ("job", [t| Int |], [| show |]) ]
136
      )
137
    , ("SetDrainFlag",
138
       [ ("flag", [t| Bool |], [| id |]) ]
139
      )
140
    , ("SetWatcherPause",
141
       [ ("duration", [t| Double |], [| id |]) ]
142
      )
143
  ])
144

    
145
-- | The serialisation of LuxiOps into strings in messages.
146
$(genStrOfOp ''LuxiOp "strOfOp")
147

    
148
-- | The end-of-message separator.
149
eOM :: Char
150
eOM = '\3'
151

    
152
-- | Valid keys in the requests and responses.
153
data MsgKeys = Method
154
             | Args
155
             | Success
156
             | Result
157

    
158
-- | The serialisation of MsgKeys into strings in messages.
159
$(genStrOfKey ''MsgKeys "strOfKey")
160

    
161
-- | Luxi client encapsulation.
162
data Client = Client { socket :: S.Socket   -- ^ The socket of the client
163
                     , rbuf :: IORef String -- ^ Already received buffer
164
                     }
165

    
166
-- | Connects to the master daemon and returns a luxi Client.
167
getClient :: String -> IO Client
168
getClient path = do
169
    s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
170
    withTimeout connTimeout "creating luxi connection" $
171
                S.connect s (S.SockAddrUnix path)
172
    rf <- newIORef ""
173
    return Client { socket=s, rbuf=rf}
174

    
175
-- | Closes the client socket.
176
closeClient :: Client -> IO ()
177
closeClient = S.sClose . socket
178

    
179
-- | Sends a message over a luxi transport.
180
sendMsg :: Client -> String -> IO ()
181
sendMsg s buf =
182
    let _send obuf = do
183
          sbytes <- withTimeout queryTimeout
184
                    "sending luxi message" $
185
                    S.send (socket s) obuf
186
          unless (sbytes == length obuf) $ _send (drop sbytes obuf)
187
    in _send (buf ++ [eOM])
188

    
189
-- | Waits for a message over a luxi transport.
190
recvMsg :: Client -> IO String
191
recvMsg s = do
192
  let _recv obuf = do
193
              nbuf <- withTimeout queryTimeout "reading luxi response" $
194
                      S.recv (socket s) 4096
195
              let (msg, remaining) = break (eOM ==) nbuf
196
              (if null remaining
197
               then _recv (obuf ++ msg)
198
               else return (obuf ++ msg, tail remaining))
199
  cbuf <- readIORef $ rbuf s
200
  let (imsg, ibuf) = break (eOM ==) cbuf
201
  (msg, nbuf) <-
202
      (if null ibuf      -- if old buffer didn't contain a full message
203
       then _recv cbuf   -- then we read from network
204
       else return (imsg, tail ibuf)) -- else we return data from our buffer
205
  writeIORef (rbuf s) nbuf
206
  return msg
207

    
208
-- | Serialize a request to String.
209
buildCall :: LuxiOp  -- ^ The method
210
          -> String  -- ^ The serialized form
211
buildCall lo =
212
    let ja = [ (strOfKey Method, JSString $ toJSString $ strOfOp lo::JSValue)
213
             , (strOfKey Args, opToArgs lo::JSValue)
214
             ]
215
        jo = toJSObject ja
216
    in encodeStrict jo
217

    
218
-- | Check that luxi responses contain the required keys and that the
219
-- call was successful.
220
validateResult :: String -> Result JSValue
221
validateResult s = do
222
  oarr <- fromJResult "Parsing LUXI response"
223
          (decodeStrict s)::Result (JSObject JSValue)
224
  let arr = J.fromJSObject oarr
225
  status <- fromObj arr (strOfKey Success)::Result Bool
226
  let rkey = strOfKey Result
227
  (if status
228
   then fromObj arr rkey
229
   else fromObj arr rkey >>= fail)
230

    
231
-- | Generic luxi method call.
232
callMethod :: LuxiOp -> Client -> IO (Result JSValue)
233
callMethod method s = do
234
  sendMsg s $ buildCall method
235
  result <- recvMsg s
236
  let rval = validateResult result
237
  return rval
238

    
239
-- | Specialized submitManyJobs call.
240
submitManyJobs :: Client -> [[OpCode]] -> IO (Result [String])
241
submitManyJobs s jobs = do
242
  rval <- callMethod (SubmitManyJobs jobs) s
243
  -- map each result (status, payload) pair into a nice Result ADT
244
  return $ case rval of
245
             Bad x -> Bad x
246
             Ok (JSArray r) ->
247
                 mapM (\v -> case v of
248
                               JSArray [JSBool True, JSString x] ->
249
                                   Ok (fromJSString x)
250
                               JSArray [JSBool False, JSString x] ->
251
                                   Bad (fromJSString x)
252
                               _ -> Bad "Unknown result from the master daemon"
253
                      ) r
254
             x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
255

    
256
-- | Custom queryJobs call.
257
queryJobsStatus :: Client -> [String] -> IO (Result [JobStatus])
258
queryJobsStatus s jids = do
259
  rval <- callMethod (QueryJobs (map read jids) ["status"]) s
260
  return $ case rval of
261
             Bad x -> Bad x
262
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
263
                       J.Ok vals -> if any null vals
264
                                    then Bad "Missing job status field"
265
                                    else Ok (map head vals)
266
                       J.Error x -> Bad x