Statistics
| Branch: | Tag: | Revision:

root / htools / Ganeti / Luxi.hs @ e821050d

History | View | Annotate | Download (13.6 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , QrViaLuxi(..)
31
  , ResultStatus(..)
32
  , LuxiReq(..)
33
  , Client
34
  , JobId
35
  , checkRS
36
  , getClient
37
  , closeClient
38
  , callMethod
39
  , submitManyJobs
40
  , queryJobsStatus
41
  , buildCall
42
  , validateCall
43
  , decodeCall
44
  ) where
45

    
46
import Data.IORef
47
import Data.Ratio (numerator, denominator)
48
import qualified Data.ByteString as B
49
import qualified Data.ByteString.UTF8 as UTF8
50
import Data.Word (Word8)
51
import Control.Monad
52
import Text.JSON (encodeStrict, decodeStrict)
53
import qualified Text.JSON as J
54
import Text.JSON.Types
55
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
56
import System.Timeout
57
import qualified Network.Socket as S
58

    
59
import Ganeti.HTools.JSON
60
import Ganeti.HTools.Types
61
import Ganeti.HTools.Utils
62

    
63
import Ganeti.Constants
64
import Ganeti.Jobs (JobStatus)
65
import Ganeti.OpCodes (OpCode)
66
import Ganeti.THH
67

    
68
-- * Utility functions
69

    
70
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
71
withTimeout :: Int -> String -> IO a -> IO a
72
withTimeout secs descr action = do
73
  result <- timeout (secs * 1000000) action
74
  case result of
75
    Nothing -> fail $ "Timeout in " ++ descr
76
    Just v -> return v
77

    
78
-- * Generic protocol functionality
79

    
80
-- | The Ganeti job type.
81
type JobId = Int
82

    
83
$(declareSADT "QrViaLuxi"
84
  [ ("QRLock", 'qrLock)
85
  , ("QRInstance", 'qrInstance)
86
  , ("QRNode", 'qrNode)
87
  , ("QRGroup", 'qrGroup)
88
  , ("QROs", 'qrOs)
89
  ])
90
$(makeJSONInstance ''QrViaLuxi)
91

    
92
-- | Currently supported Luxi operations and JSON serialization.
93
$(genLuxiOp "LuxiOp"
94
  [(luxiReqQuery,
95
    [ ("what",    [t| QrViaLuxi |], [| id |])
96
    , ("fields",  [t| [String]  |], [| id |])
97
    , ("qfilter", [t| ()        |], [| const JSNull |])
98
    ])
99
  , (luxiReqQueryNodes,
100
     [ ("names",  [t| [String] |], [| id |])
101
     , ("fields", [t| [String] |], [| id |])
102
     , ("lock",   [t| Bool     |], [| id |])
103
     ])
104
  , (luxiReqQueryGroups,
105
     [ ("names",  [t| [String] |], [| id |])
106
     , ("fields", [t| [String] |], [| id |])
107
     , ("lock",   [t| Bool     |], [| id |])
108
     ])
109
  , (luxiReqQueryInstances,
110
     [ ("names",  [t| [String] |], [| id |])
111
     , ("fields", [t| [String] |], [| id |])
112
     , ("lock",   [t| Bool     |], [| id |])
113
     ])
114
  , (luxiReqQueryJobs,
115
     [ ("ids",    [t| [Int]    |], [| id |])
116
     , ("fields", [t| [String] |], [| id |])
117
     ])
118
  , (luxiReqQueryExports,
119
     [ ("nodes", [t| [String] |], [| id |])
120
     , ("lock",  [t| Bool     |], [| id |])
121
     ])
122
  , (luxiReqQueryConfigValues,
123
     [ ("fields", [t| [String] |], [| id |]) ]
124
    )
125
  , (luxiReqQueryClusterInfo, [])
126
  , (luxiReqQueryTags,
127
     [ ("kind", [t| String |], [| id |])
128
     , ("name", [t| String |], [| id |])
129
     ])
130
  , (luxiReqSubmitJob,
131
     [ ("job", [t| [OpCode] |], [| id |]) ]
132
    )
133
  , (luxiReqSubmitManyJobs,
134
     [ ("ops", [t| [[OpCode]] |], [| id |]) ]
135
    )
136
  , (luxiReqWaitForJobChange,
137
     [ ("job",      [t| Int     |], [| id |])
138
     , ("fields",   [t| [String]|], [| id |])
139
     , ("prev_job", [t| JSValue |], [| id |])
140
     , ("prev_log", [t| JSValue |], [| id |])
141
     , ("tmout",    [t| Int     |], [| id |])
142
     ])
143
  , (luxiReqArchiveJob,
144
     [ ("job", [t| Int |], [| id |]) ]
145
    )
146
  , (luxiReqAutoArchiveJobs,
147
     [ ("age",   [t| Int |], [| id |])
148
     , ("tmout", [t| Int |], [| id |])
149
     ])
150
  , (luxiReqCancelJob,
151
     [ ("job", [t| Int |], [| id |]) ]
152
    )
153
  , (luxiReqSetDrainFlag,
154
     [ ("flag", [t| Bool |], [| id |]) ]
155
    )
156
  , (luxiReqSetWatcherPause,
157
     [ ("duration", [t| Double |], [| id |]) ]
158
    )
159
  ])
160

    
161
$(makeJSONInstance ''LuxiReq)
162

    
163
-- | The serialisation of LuxiOps into strings in messages.
164
$(genStrOfOp ''LuxiOp "strOfOp")
165

    
166
$(declareIADT "ResultStatus"
167
  [ ("RSNormal", 'rsNormal)
168
  , ("RSUnknown", 'rsUnknown)
169
  , ("RSNoData", 'rsNodata)
170
  , ("RSUnavailable", 'rsUnavail)
171
  , ("RSOffline", 'rsOffline)
172
  ])
173

    
174
$(makeJSONInstance ''ResultStatus)
175

    
176
-- | Type holding the initial (unparsed) Luxi call.
177
data LuxiCall = LuxiCall LuxiReq JSValue
178

    
179
-- | Check that ResultStatus is success or fail with descriptive message.
180
checkRS :: (Monad m) => ResultStatus -> a -> m a
181
checkRS RSNormal val    = return val
182
checkRS RSUnknown _     = fail "Unknown field"
183
checkRS RSNoData _      = fail "No data for a field"
184
checkRS RSUnavailable _ = fail "Ganeti reports unavailable data"
185
checkRS RSOffline _     = fail "Ganeti reports resource as offline"
186

    
187
-- | The end-of-message separator.
188
eOM :: Word8
189
eOM = 3
190

    
191
-- | The end-of-message encoded as a ByteString.
192
bEOM :: B.ByteString
193
bEOM = B.singleton eOM
194

    
195
-- | Valid keys in the requests and responses.
196
data MsgKeys = Method
197
             | Args
198
             | Success
199
             | Result
200

    
201
-- | The serialisation of MsgKeys into strings in messages.
202
$(genStrOfKey ''MsgKeys "strOfKey")
203

    
204
-- | Luxi client encapsulation.
205
data Client = Client { socket :: Handle           -- ^ The socket of the client
206
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
207
                     }
208

    
209
-- | Connects to the master daemon and returns a luxi Client.
210
getClient :: String -> IO Client
211
getClient path = do
212
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
213
  withTimeout connTimeout "creating luxi connection" $
214
              S.connect s (S.SockAddrUnix path)
215
  rf <- newIORef B.empty
216
  h <- S.socketToHandle s ReadWriteMode
217
  return Client { socket=h, rbuf=rf }
218

    
219
-- | Closes the client socket.
220
closeClient :: Client -> IO ()
221
closeClient = hClose . socket
222

    
223
-- | Sends a message over a luxi transport.
224
sendMsg :: Client -> String -> IO ()
225
sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
226
  let encoded = UTF8.fromString buf
227
      handle = socket s
228
  B.hPut handle encoded
229
  B.hPut handle bEOM
230
  hFlush handle
231

    
232
-- | Given a current buffer and the handle, it will read from the
233
-- network until we get a full message, and it will return that
234
-- message and the leftover buffer contents.
235
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
236
recvUpdate handle obuf = do
237
  nbuf <- withTimeout queryTimeout "reading luxi response" $ do
238
            _ <- hWaitForInput handle (-1)
239
            B.hGetNonBlocking handle 4096
240
  let (msg, remaining) = B.break (eOM ==) nbuf
241
      newbuf = B.append obuf msg
242
  if B.null remaining
243
    then recvUpdate handle newbuf
244
    else return (newbuf, B.tail remaining)
245

    
246
-- | Waits for a message over a luxi transport.
247
recvMsg :: Client -> IO String
248
recvMsg s = do
249
  cbuf <- readIORef $ rbuf s
250
  let (imsg, ibuf) = B.break (eOM ==) cbuf
251
  (msg, nbuf) <-
252
    if B.null ibuf      -- if old buffer didn't contain a full message
253
      then recvUpdate (socket s) cbuf   -- then we read from network
254
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
255
  writeIORef (rbuf s) nbuf
256
  return $ UTF8.toString msg
257

    
258
-- | Serialize a request to String.
259
buildCall :: LuxiOp  -- ^ The method
260
          -> String  -- ^ The serialized form
261
buildCall lo =
262
  let ja = [ (strOfKey Method, JSString $ toJSString $ strOfOp lo::JSValue)
263
           , (strOfKey Args, opToArgs lo::JSValue)
264
           ]
265
      jo = toJSObject ja
266
  in encodeStrict jo
267

    
268
-- | Check that luxi request contains the required keys and parse it.
269
validateCall :: String -> Result LuxiCall
270
validateCall s = do
271
  arr <- fromJResult "luxi call" $ decodeStrict s::Result (JSObject JSValue)
272
  let aobj = fromJSObject arr
273
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
274
  args <- fromObj aobj (strOfKey Args)
275
  return (LuxiCall call args)
276

    
277
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
278
--
279
-- This is currently hand-coded until we make it more uniform so that
280
-- it can be generated using TH.
281
decodeCall :: LuxiCall -> Result LuxiOp
282
decodeCall (LuxiCall call args) =
283
  case call of
284
    ReqQueryJobs -> do
285
              (jid, jargs) <- fromJVal args
286
              rid <- mapM parseJobId jid
287
              let rargs = map fromJSString jargs
288
              return $ QueryJobs rid rargs
289
    ReqQueryInstances -> do
290
              (names, fields, locking) <- fromJVal args
291
              return $ QueryInstances names fields locking
292
    ReqQueryNodes -> do
293
              (names, fields, locking) <- fromJVal args
294
              return $ QueryNodes names fields locking
295
    ReqQueryGroups -> do
296
              (names, fields, locking) <- fromJVal args
297
              return $ QueryGroups names fields locking
298
    ReqQueryClusterInfo -> do
299
              return QueryClusterInfo
300
    ReqQuery -> do
301
              (what, fields, _) <-
302
                fromJVal args::Result (QrViaLuxi, [String], JSValue)
303
              return $ Query what fields ()
304
    ReqSubmitJob -> do
305
              [ops1] <- fromJVal args
306
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
307
              return $ SubmitJob ops2
308
    ReqSubmitManyJobs -> do
309
              [ops1] <- fromJVal args
310
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
311
              return $ SubmitManyJobs ops2
312
    ReqWaitForJobChange -> do
313
              (jid, fields, pinfo, pidx, wtmout) <-
314
                -- No instance for 5-tuple, code copied from the
315
                -- json sources and adapted
316
                fromJResult "Parsing WaitForJobChange message" $
317
                case args of
318
                  JSArray [a, b, c, d, e] ->
319
                    (,,,,) `fmap`
320
                    J.readJSON a `ap`
321
                    J.readJSON b `ap`
322
                    J.readJSON c `ap`
323
                    J.readJSON d `ap`
324
                    J.readJSON e
325
                  _ -> J.Error "Not enough values"
326
              rid <- parseJobId jid
327
              return $ WaitForJobChange rid fields pinfo pidx wtmout
328
    ReqArchiveJob -> do
329
              [jid] <- fromJVal args
330
              rid <- parseJobId jid
331
              return $ ArchiveJob rid
332
    ReqAutoArchiveJobs -> do
333
              (age, tmout) <- fromJVal args
334
              return $ AutoArchiveJobs age tmout
335
    ReqQueryExports -> do
336
              (nodes, lock) <- fromJVal args
337
              return $ QueryExports nodes lock
338
    ReqQueryConfigValues -> do
339
              [fields] <- fromJVal args
340
              return $ QueryConfigValues fields
341
    ReqQueryTags -> do
342
              (kind, name) <- fromJVal args
343
              return $ QueryTags kind name
344
    ReqCancelJob -> do
345
              [job] <- fromJVal args
346
              rid <- parseJobId job
347
              return $ CancelJob rid
348
    ReqSetDrainFlag -> do
349
              [flag] <- fromJVal args
350
              return $ SetDrainFlag flag
351
    ReqSetWatcherPause -> do
352
              [duration] <- fromJVal args
353
              return $ SetWatcherPause duration
354

    
355
-- | Check that luxi responses contain the required keys and that the
356
-- call was successful.
357
validateResult :: String -> Result JSValue
358
validateResult s = do
359
  when (UTF8.replacement_char `elem` s) $
360
       fail "Failed to decode UTF-8, detected replacement char after decoding"
361
  oarr <- fromJResult "Parsing LUXI response"
362
          (decodeStrict s)::Result (JSObject JSValue)
363
  let arr = J.fromJSObject oarr
364
  status <- fromObj arr (strOfKey Success)::Result Bool
365
  let rkey = strOfKey Result
366
  if status
367
    then fromObj arr rkey
368
    else fromObj arr rkey >>= fail
369

    
370
-- | Generic luxi method call.
371
callMethod :: LuxiOp -> Client -> IO (Result JSValue)
372
callMethod method s = do
373
  sendMsg s $ buildCall method
374
  result <- recvMsg s
375
  let rval = validateResult result
376
  return rval
377

    
378
-- | Parses a job ID.
379
parseJobId :: JSValue -> Result JobId
380
parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
381
parseJobId (JSRational _ x) =
382
  if denominator x /= 1
383
    then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
384
    -- FIXME: potential integer overflow here on 32-bit platforms
385
    else Ok . fromIntegral . numerator $ x
386
parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
387

    
388
-- | Parse job submission result.
389
parseSubmitJobResult :: JSValue -> Result JobId
390
parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v
391
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
392
  Bad (fromJSString x)
393
parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++
394
                         show v
395

    
396
-- | Specialized submitManyJobs call.
397
submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId])
398
submitManyJobs s jobs = do
399
  rval <- callMethod (SubmitManyJobs jobs) s
400
  -- map each result (status, payload) pair into a nice Result ADT
401
  return $ case rval of
402
             Bad x -> Bad x
403
             Ok (JSArray r) -> mapM parseSubmitJobResult r
404
             x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
405

    
406
-- | Custom queryJobs call.
407
queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
408
queryJobsStatus s jids = do
409
  rval <- callMethod (QueryJobs jids ["status"]) s
410
  return $ case rval of
411
             Bad x -> Bad x
412
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
413
                       J.Ok vals -> if any null vals
414
                                    then Bad "Missing job status field"
415
                                    else Ok (map head vals)
416
                       J.Error x -> Bad x