Statistics
| Branch: | Tag: | Revision:

root / htools / Ganeti / Luxi.hs @ c1c5aab1

History | View | Annotate | Download (14.2 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , QrViaLuxi(..)
31
  , ResultStatus(..)
32
  , LuxiReq(..)
33
  , Client
34
  , JobId
35
  , checkRS
36
  , getClient
37
  , getServer
38
  , acceptClient
39
  , closeClient
40
  , callMethod
41
  , submitManyJobs
42
  , queryJobsStatus
43
  , buildCall
44
  , validateCall
45
  , decodeCall
46
  , recvMsg
47
  , sendMsg
48
  ) where
49

    
50
import Data.IORef
51
import Data.Ratio (numerator, denominator)
52
import qualified Data.ByteString as B
53
import qualified Data.ByteString.UTF8 as UTF8
54
import Data.Word (Word8)
55
import Control.Monad
56
import Text.JSON (encodeStrict, decodeStrict)
57
import qualified Text.JSON as J
58
import Text.JSON.Types
59
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
60
import System.Timeout
61
import qualified Network.Socket as S
62

    
63
import Ganeti.HTools.JSON
64
import Ganeti.HTools.Types
65
import Ganeti.HTools.Utils
66

    
67
import Ganeti.Constants
68
import Ganeti.Jobs (JobStatus)
69
import Ganeti.OpCodes (OpCode)
70
import Ganeti.THH
71

    
72
-- * Utility functions
73

    
74
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
75
withTimeout :: Int -> String -> IO a -> IO a
76
withTimeout secs descr action = do
77
  result <- timeout (secs * 1000000) action
78
  case result of
79
    Nothing -> fail $ "Timeout in " ++ descr
80
    Just v -> return v
81

    
82
-- * Generic protocol functionality
83

    
84
-- | The Ganeti job type.
85
type JobId = Int
86

    
87
$(declareSADT "QrViaLuxi"
88
  [ ("QRLock", 'qrLock)
89
  , ("QRInstance", 'qrInstance)
90
  , ("QRNode", 'qrNode)
91
  , ("QRGroup", 'qrGroup)
92
  , ("QROs", 'qrOs)
93
  ])
94
$(makeJSONInstance ''QrViaLuxi)
95

    
96
-- | Currently supported Luxi operations and JSON serialization.
97
$(genLuxiOp "LuxiOp"
98
  [(luxiReqQuery,
99
    [ ("what",    [t| QrViaLuxi |], [| id |])
100
    , ("fields",  [t| [String]  |], [| id |])
101
    , ("qfilter", [t| ()        |], [| const JSNull |])
102
    ])
103
  , (luxiReqQueryNodes,
104
     [ ("names",  [t| [String] |], [| id |])
105
     , ("fields", [t| [String] |], [| id |])
106
     , ("lock",   [t| Bool     |], [| id |])
107
     ])
108
  , (luxiReqQueryGroups,
109
     [ ("names",  [t| [String] |], [| id |])
110
     , ("fields", [t| [String] |], [| id |])
111
     , ("lock",   [t| Bool     |], [| id |])
112
     ])
113
  , (luxiReqQueryInstances,
114
     [ ("names",  [t| [String] |], [| id |])
115
     , ("fields", [t| [String] |], [| id |])
116
     , ("lock",   [t| Bool     |], [| id |])
117
     ])
118
  , (luxiReqQueryJobs,
119
     [ ("ids",    [t| [Int]    |], [| id |])
120
     , ("fields", [t| [String] |], [| id |])
121
     ])
122
  , (luxiReqQueryExports,
123
     [ ("nodes", [t| [String] |], [| id |])
124
     , ("lock",  [t| Bool     |], [| id |])
125
     ])
126
  , (luxiReqQueryConfigValues,
127
     [ ("fields", [t| [String] |], [| id |]) ]
128
    )
129
  , (luxiReqQueryClusterInfo, [])
130
  , (luxiReqQueryTags,
131
     [ ("kind", [t| String |], [| id |])
132
     , ("name", [t| String |], [| id |])
133
     ])
134
  , (luxiReqSubmitJob,
135
     [ ("job", [t| [OpCode] |], [| id |]) ]
136
    )
137
  , (luxiReqSubmitManyJobs,
138
     [ ("ops", [t| [[OpCode]] |], [| id |]) ]
139
    )
140
  , (luxiReqWaitForJobChange,
141
     [ ("job",      [t| Int     |], [| id |])
142
     , ("fields",   [t| [String]|], [| id |])
143
     , ("prev_job", [t| JSValue |], [| id |])
144
     , ("prev_log", [t| JSValue |], [| id |])
145
     , ("tmout",    [t| Int     |], [| id |])
146
     ])
147
  , (luxiReqArchiveJob,
148
     [ ("job", [t| Int |], [| id |]) ]
149
    )
150
  , (luxiReqAutoArchiveJobs,
151
     [ ("age",   [t| Int |], [| id |])
152
     , ("tmout", [t| Int |], [| id |])
153
     ])
154
  , (luxiReqCancelJob,
155
     [ ("job", [t| Int |], [| id |]) ]
156
    )
157
  , (luxiReqSetDrainFlag,
158
     [ ("flag", [t| Bool |], [| id |]) ]
159
    )
160
  , (luxiReqSetWatcherPause,
161
     [ ("duration", [t| Double |], [| id |]) ]
162
    )
163
  ])
164

    
165
$(makeJSONInstance ''LuxiReq)
166

    
167
-- | The serialisation of LuxiOps into strings in messages.
168
$(genStrOfOp ''LuxiOp "strOfOp")
169

    
170
$(declareIADT "ResultStatus"
171
  [ ("RSNormal", 'rsNormal)
172
  , ("RSUnknown", 'rsUnknown)
173
  , ("RSNoData", 'rsNodata)
174
  , ("RSUnavailable", 'rsUnavail)
175
  , ("RSOffline", 'rsOffline)
176
  ])
177

    
178
$(makeJSONInstance ''ResultStatus)
179

    
180
-- | Type holding the initial (unparsed) Luxi call.
181
data LuxiCall = LuxiCall LuxiReq JSValue
182

    
183
-- | Check that ResultStatus is success or fail with descriptive message.
184
checkRS :: (Monad m) => ResultStatus -> a -> m a
185
checkRS RSNormal val    = return val
186
checkRS RSUnknown _     = fail "Unknown field"
187
checkRS RSNoData _      = fail "No data for a field"
188
checkRS RSUnavailable _ = fail "Ganeti reports unavailable data"
189
checkRS RSOffline _     = fail "Ganeti reports resource as offline"
190

    
191
-- | The end-of-message separator.
192
eOM :: Word8
193
eOM = 3
194

    
195
-- | The end-of-message encoded as a ByteString.
196
bEOM :: B.ByteString
197
bEOM = B.singleton eOM
198

    
199
-- | Valid keys in the requests and responses.
200
data MsgKeys = Method
201
             | Args
202
             | Success
203
             | Result
204

    
205
-- | The serialisation of MsgKeys into strings in messages.
206
$(genStrOfKey ''MsgKeys "strOfKey")
207

    
208
-- | Luxi client encapsulation.
209
data Client = Client { socket :: Handle           -- ^ The socket of the client
210
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
211
                     }
212

    
213
-- | Connects to the master daemon and returns a luxi Client.
214
getClient :: String -> IO Client
215
getClient path = do
216
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
217
  withTimeout connTimeout "creating luxi connection" $
218
              S.connect s (S.SockAddrUnix path)
219
  rf <- newIORef B.empty
220
  h <- S.socketToHandle s ReadWriteMode
221
  return Client { socket=h, rbuf=rf }
222

    
223
-- | Creates and returns a server endpoint.
224
getServer :: FilePath -> IO S.Socket
225
getServer path = do
226
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
227
  S.bindSocket s (S.SockAddrUnix path)
228
  S.listen s 5 -- 5 is the max backlog
229
  return s
230

    
231
-- | Accepts a client
232
acceptClient :: S.Socket -> IO Client
233
acceptClient s = do
234
  -- second return is the address of the client, which we ignore here
235
  (client_socket, _) <- S.accept s
236
  new_buffer <- newIORef B.empty
237
  handle <- S.socketToHandle client_socket ReadWriteMode
238
  return Client { socket=handle, rbuf=new_buffer }
239

    
240
-- | Closes the client socket.
241
closeClient :: Client -> IO ()
242
closeClient = hClose . socket
243

    
244
-- | Sends a message over a luxi transport.
245
sendMsg :: Client -> String -> IO ()
246
sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
247
  let encoded = UTF8.fromString buf
248
      handle = socket s
249
  B.hPut handle encoded
250
  B.hPut handle bEOM
251
  hFlush handle
252

    
253
-- | Given a current buffer and the handle, it will read from the
254
-- network until we get a full message, and it will return that
255
-- message and the leftover buffer contents.
256
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
257
recvUpdate handle obuf = do
258
  nbuf <- withTimeout queryTimeout "reading luxi response" $ do
259
            _ <- hWaitForInput handle (-1)
260
            B.hGetNonBlocking handle 4096
261
  let (msg, remaining) = B.break (eOM ==) nbuf
262
      newbuf = B.append obuf msg
263
  if B.null remaining
264
    then recvUpdate handle newbuf
265
    else return (newbuf, B.tail remaining)
266

    
267
-- | Waits for a message over a luxi transport.
268
recvMsg :: Client -> IO String
269
recvMsg s = do
270
  cbuf <- readIORef $ rbuf s
271
  let (imsg, ibuf) = B.break (eOM ==) cbuf
272
  (msg, nbuf) <-
273
    if B.null ibuf      -- if old buffer didn't contain a full message
274
      then recvUpdate (socket s) cbuf   -- then we read from network
275
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
276
  writeIORef (rbuf s) nbuf
277
  return $ UTF8.toString msg
278

    
279
-- | Serialize a request to String.
280
buildCall :: LuxiOp  -- ^ The method
281
          -> String  -- ^ The serialized form
282
buildCall lo =
283
  let ja = [ (strOfKey Method, JSString $ toJSString $ strOfOp lo::JSValue)
284
           , (strOfKey Args, opToArgs lo::JSValue)
285
           ]
286
      jo = toJSObject ja
287
  in encodeStrict jo
288

    
289
-- | Check that luxi request contains the required keys and parse it.
290
validateCall :: String -> Result LuxiCall
291
validateCall s = do
292
  arr <- fromJResult "luxi call" $ decodeStrict s::Result (JSObject JSValue)
293
  let aobj = fromJSObject arr
294
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
295
  args <- fromObj aobj (strOfKey Args)
296
  return (LuxiCall call args)
297

    
298
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
299
--
300
-- This is currently hand-coded until we make it more uniform so that
301
-- it can be generated using TH.
302
decodeCall :: LuxiCall -> Result LuxiOp
303
decodeCall (LuxiCall call args) =
304
  case call of
305
    ReqQueryJobs -> do
306
              (jid, jargs) <- fromJVal args
307
              rid <- mapM parseJobId jid
308
              let rargs = map fromJSString jargs
309
              return $ QueryJobs rid rargs
310
    ReqQueryInstances -> do
311
              (names, fields, locking) <- fromJVal args
312
              return $ QueryInstances names fields locking
313
    ReqQueryNodes -> do
314
              (names, fields, locking) <- fromJVal args
315
              return $ QueryNodes names fields locking
316
    ReqQueryGroups -> do
317
              (names, fields, locking) <- fromJVal args
318
              return $ QueryGroups names fields locking
319
    ReqQueryClusterInfo -> do
320
              return QueryClusterInfo
321
    ReqQuery -> do
322
              (what, fields, _) <-
323
                fromJVal args::Result (QrViaLuxi, [String], JSValue)
324
              return $ Query what fields ()
325
    ReqSubmitJob -> do
326
              [ops1] <- fromJVal args
327
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
328
              return $ SubmitJob ops2
329
    ReqSubmitManyJobs -> do
330
              [ops1] <- fromJVal args
331
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
332
              return $ SubmitManyJobs ops2
333
    ReqWaitForJobChange -> do
334
              (jid, fields, pinfo, pidx, wtmout) <-
335
                -- No instance for 5-tuple, code copied from the
336
                -- json sources and adapted
337
                fromJResult "Parsing WaitForJobChange message" $
338
                case args of
339
                  JSArray [a, b, c, d, e] ->
340
                    (,,,,) `fmap`
341
                    J.readJSON a `ap`
342
                    J.readJSON b `ap`
343
                    J.readJSON c `ap`
344
                    J.readJSON d `ap`
345
                    J.readJSON e
346
                  _ -> J.Error "Not enough values"
347
              rid <- parseJobId jid
348
              return $ WaitForJobChange rid fields pinfo pidx wtmout
349
    ReqArchiveJob -> do
350
              [jid] <- fromJVal args
351
              rid <- parseJobId jid
352
              return $ ArchiveJob rid
353
    ReqAutoArchiveJobs -> do
354
              (age, tmout) <- fromJVal args
355
              return $ AutoArchiveJobs age tmout
356
    ReqQueryExports -> do
357
              (nodes, lock) <- fromJVal args
358
              return $ QueryExports nodes lock
359
    ReqQueryConfigValues -> do
360
              [fields] <- fromJVal args
361
              return $ QueryConfigValues fields
362
    ReqQueryTags -> do
363
              (kind, name) <- fromJVal args
364
              return $ QueryTags kind name
365
    ReqCancelJob -> do
366
              [job] <- fromJVal args
367
              rid <- parseJobId job
368
              return $ CancelJob rid
369
    ReqSetDrainFlag -> do
370
              [flag] <- fromJVal args
371
              return $ SetDrainFlag flag
372
    ReqSetWatcherPause -> do
373
              [duration] <- fromJVal args
374
              return $ SetWatcherPause duration
375

    
376
-- | Check that luxi responses contain the required keys and that the
377
-- call was successful.
378
validateResult :: String -> Result JSValue
379
validateResult s = do
380
  when (UTF8.replacement_char `elem` s) $
381
       fail "Failed to decode UTF-8, detected replacement char after decoding"
382
  oarr <- fromJResult "Parsing LUXI response"
383
          (decodeStrict s)::Result (JSObject JSValue)
384
  let arr = J.fromJSObject oarr
385
  status <- fromObj arr (strOfKey Success)::Result Bool
386
  let rkey = strOfKey Result
387
  if status
388
    then fromObj arr rkey
389
    else fromObj arr rkey >>= fail
390

    
391
-- | Generic luxi method call.
392
callMethod :: LuxiOp -> Client -> IO (Result JSValue)
393
callMethod method s = do
394
  sendMsg s $ buildCall method
395
  result <- recvMsg s
396
  let rval = validateResult result
397
  return rval
398

    
399
-- | Parses a job ID.
400
parseJobId :: JSValue -> Result JobId
401
parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
402
parseJobId (JSRational _ x) =
403
  if denominator x /= 1
404
    then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
405
    -- FIXME: potential integer overflow here on 32-bit platforms
406
    else Ok . fromIntegral . numerator $ x
407
parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
408

    
409
-- | Parse job submission result.
410
parseSubmitJobResult :: JSValue -> Result JobId
411
parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v
412
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
413
  Bad (fromJSString x)
414
parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++
415
                         show v
416

    
417
-- | Specialized submitManyJobs call.
418
submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId])
419
submitManyJobs s jobs = do
420
  rval <- callMethod (SubmitManyJobs jobs) s
421
  -- map each result (status, payload) pair into a nice Result ADT
422
  return $ case rval of
423
             Bad x -> Bad x
424
             Ok (JSArray r) -> mapM parseSubmitJobResult r
425
             x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
426

    
427
-- | Custom queryJobs call.
428
queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
429
queryJobsStatus s jids = do
430
  rval <- callMethod (QueryJobs jids ["status"]) s
431
  return $ case rval of
432
             Bad x -> Bad x
433
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
434
                       J.Ok vals -> if any null vals
435
                                    then Bad "Missing job status field"
436
                                    else Ok (map head vals)
437
                       J.Error x -> Bad x