Statistics
| Branch: | Tag: | Revision:

root / htools / Ganeti / Luxi.hs @ 0aff2293

History | View | Annotate | Download (15.4 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , QrViaLuxi(..)
31
  , ResultStatus(..)
32
  , LuxiReq(..)
33
  , Client
34
  , JobId
35
  , RecvResult(..)
36
  , strOfOp
37
  , checkRS
38
  , getClient
39
  , getServer
40
  , acceptClient
41
  , closeClient
42
  , closeServer
43
  , callMethod
44
  , submitManyJobs
45
  , queryJobsStatus
46
  , buildCall
47
  , buildResponse
48
  , validateCall
49
  , decodeCall
50
  , recvMsg
51
  , recvMsgExt
52
  , sendMsg
53
  ) where
54

    
55
import Control.Exception (catch)
56
import Data.IORef
57
import Data.Ratio (numerator, denominator)
58
import qualified Data.ByteString as B
59
import qualified Data.ByteString.UTF8 as UTF8
60
import Data.Word (Word8)
61
import Control.Monad
62
import Prelude hiding (catch)
63
import Text.JSON (encodeStrict, decodeStrict)
64
import qualified Text.JSON as J
65
import Text.JSON.Types
66
import System.Directory (removeFile)
67
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
68
import System.IO.Error (isEOFError)
69
import System.Timeout
70
import qualified Network.Socket as S
71

    
72
import Ganeti.HTools.JSON
73
import Ganeti.HTools.Types
74
import Ganeti.HTools.Utils
75

    
76
import Ganeti.Constants
77
import Ganeti.Jobs (JobStatus)
78
import Ganeti.OpCodes (OpCode)
79
import Ganeti.THH
80

    
81
-- * Utility functions
82

    
83
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
84
withTimeout :: Int -> String -> IO a -> IO a
85
withTimeout secs descr action = do
86
  result <- timeout (secs * 1000000) action
87
  case result of
88
    Nothing -> fail $ "Timeout in " ++ descr
89
    Just v -> return v
90

    
91
-- * Generic protocol functionality
92

    
93
-- | Result of receiving a message from the socket.
94
data RecvResult = RecvConnClosed    -- ^ Connection closed
95
                | RecvError String  -- ^ Any other error
96
                | RecvOk String     -- ^ Successfull receive
97
                  deriving (Show, Read, Eq)
98

    
99
-- | The Ganeti job type.
100
type JobId = Int
101

    
102
$(declareSADT "QrViaLuxi"
103
  [ ("QRLock", 'qrLock)
104
  , ("QRInstance", 'qrInstance)
105
  , ("QRNode", 'qrNode)
106
  , ("QRGroup", 'qrGroup)
107
  , ("QROs", 'qrOs)
108
  ])
109
$(makeJSONInstance ''QrViaLuxi)
110

    
111
-- | Currently supported Luxi operations and JSON serialization.
112
$(genLuxiOp "LuxiOp"
113
  [(luxiReqQuery,
114
    [ ("what",    [t| QrViaLuxi |], [| id |])
115
    , ("fields",  [t| [String]  |], [| id |])
116
    , ("qfilter", [t| ()        |], [| const JSNull |])
117
    ])
118
  , (luxiReqQueryNodes,
119
     [ ("names",  [t| [String] |], [| id |])
120
     , ("fields", [t| [String] |], [| id |])
121
     , ("lock",   [t| Bool     |], [| id |])
122
     ])
123
  , (luxiReqQueryGroups,
124
     [ ("names",  [t| [String] |], [| id |])
125
     , ("fields", [t| [String] |], [| id |])
126
     , ("lock",   [t| Bool     |], [| id |])
127
     ])
128
  , (luxiReqQueryInstances,
129
     [ ("names",  [t| [String] |], [| id |])
130
     , ("fields", [t| [String] |], [| id |])
131
     , ("lock",   [t| Bool     |], [| id |])
132
     ])
133
  , (luxiReqQueryJobs,
134
     [ ("ids",    [t| [Int]    |], [| id |])
135
     , ("fields", [t| [String] |], [| id |])
136
     ])
137
  , (luxiReqQueryExports,
138
     [ ("nodes", [t| [String] |], [| id |])
139
     , ("lock",  [t| Bool     |], [| id |])
140
     ])
141
  , (luxiReqQueryConfigValues,
142
     [ ("fields", [t| [String] |], [| id |]) ]
143
    )
144
  , (luxiReqQueryClusterInfo, [])
145
  , (luxiReqQueryTags,
146
     [ ("kind", [t| String |], [| id |])
147
     , ("name", [t| String |], [| id |])
148
     ])
149
  , (luxiReqSubmitJob,
150
     [ ("job", [t| [OpCode] |], [| id |]) ]
151
    )
152
  , (luxiReqSubmitManyJobs,
153
     [ ("ops", [t| [[OpCode]] |], [| id |]) ]
154
    )
155
  , (luxiReqWaitForJobChange,
156
     [ ("job",      [t| Int     |], [| id |])
157
     , ("fields",   [t| [String]|], [| id |])
158
     , ("prev_job", [t| JSValue |], [| id |])
159
     , ("prev_log", [t| JSValue |], [| id |])
160
     , ("tmout",    [t| Int     |], [| id |])
161
     ])
162
  , (luxiReqArchiveJob,
163
     [ ("job", [t| Int |], [| id |]) ]
164
    )
165
  , (luxiReqAutoArchiveJobs,
166
     [ ("age",   [t| Int |], [| id |])
167
     , ("tmout", [t| Int |], [| id |])
168
     ])
169
  , (luxiReqCancelJob,
170
     [ ("job", [t| Int |], [| id |]) ]
171
    )
172
  , (luxiReqSetDrainFlag,
173
     [ ("flag", [t| Bool |], [| id |]) ]
174
    )
175
  , (luxiReqSetWatcherPause,
176
     [ ("duration", [t| Double |], [| id |]) ]
177
    )
178
  ])
179

    
180
$(makeJSONInstance ''LuxiReq)
181

    
182
-- | The serialisation of LuxiOps into strings in messages.
183
$(genStrOfOp ''LuxiOp "strOfOp")
184

    
185
$(declareIADT "ResultStatus"
186
  [ ("RSNormal", 'rsNormal)
187
  , ("RSUnknown", 'rsUnknown)
188
  , ("RSNoData", 'rsNodata)
189
  , ("RSUnavailable", 'rsUnavail)
190
  , ("RSOffline", 'rsOffline)
191
  ])
192

    
193
$(makeJSONInstance ''ResultStatus)
194

    
195
-- | Type holding the initial (unparsed) Luxi call.
196
data LuxiCall = LuxiCall LuxiReq JSValue
197

    
198
-- | Check that ResultStatus is success or fail with descriptive message.
199
checkRS :: (Monad m) => ResultStatus -> a -> m a
200
checkRS RSNormal val    = return val
201
checkRS RSUnknown _     = fail "Unknown field"
202
checkRS RSNoData _      = fail "No data for a field"
203
checkRS RSUnavailable _ = fail "Ganeti reports unavailable data"
204
checkRS RSOffline _     = fail "Ganeti reports resource as offline"
205

    
206
-- | The end-of-message separator.
207
eOM :: Word8
208
eOM = 3
209

    
210
-- | The end-of-message encoded as a ByteString.
211
bEOM :: B.ByteString
212
bEOM = B.singleton eOM
213

    
214
-- | Valid keys in the requests and responses.
215
data MsgKeys = Method
216
             | Args
217
             | Success
218
             | Result
219

    
220
-- | The serialisation of MsgKeys into strings in messages.
221
$(genStrOfKey ''MsgKeys "strOfKey")
222

    
223
-- | Luxi client encapsulation.
224
data Client = Client { socket :: Handle           -- ^ The socket of the client
225
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
226
                     }
227

    
228
-- | Connects to the master daemon and returns a luxi Client.
229
getClient :: String -> IO Client
230
getClient path = do
231
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
232
  withTimeout connTimeout "creating luxi connection" $
233
              S.connect s (S.SockAddrUnix path)
234
  rf <- newIORef B.empty
235
  h <- S.socketToHandle s ReadWriteMode
236
  return Client { socket=h, rbuf=rf }
237

    
238
-- | Creates and returns a server endpoint.
239
getServer :: FilePath -> IO S.Socket
240
getServer path = do
241
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
242
  S.bindSocket s (S.SockAddrUnix path)
243
  S.listen s 5 -- 5 is the max backlog
244
  return s
245

    
246
-- | Closes a server endpoint.
247
-- FIXME: this should be encapsulated into a nicer type.
248
closeServer :: FilePath -> S.Socket -> IO ()
249
closeServer path sock = do
250
  S.sClose sock
251
  removeFile path
252

    
253
-- | Accepts a client
254
acceptClient :: S.Socket -> IO Client
255
acceptClient s = do
256
  -- second return is the address of the client, which we ignore here
257
  (client_socket, _) <- S.accept s
258
  new_buffer <- newIORef B.empty
259
  handle <- S.socketToHandle client_socket ReadWriteMode
260
  return Client { socket=handle, rbuf=new_buffer }
261

    
262
-- | Closes the client socket.
263
closeClient :: Client -> IO ()
264
closeClient = hClose . socket
265

    
266
-- | Sends a message over a luxi transport.
267
sendMsg :: Client -> String -> IO ()
268
sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
269
  let encoded = UTF8.fromString buf
270
      handle = socket s
271
  B.hPut handle encoded
272
  B.hPut handle bEOM
273
  hFlush handle
274

    
275
-- | Given a current buffer and the handle, it will read from the
276
-- network until we get a full message, and it will return that
277
-- message and the leftover buffer contents.
278
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
279
recvUpdate handle obuf = do
280
  nbuf <- withTimeout queryTimeout "reading luxi response" $ do
281
            _ <- hWaitForInput handle (-1)
282
            B.hGetNonBlocking handle 4096
283
  let (msg, remaining) = B.break (eOM ==) nbuf
284
      newbuf = B.append obuf msg
285
  if B.null remaining
286
    then recvUpdate handle newbuf
287
    else return (newbuf, B.tail remaining)
288

    
289
-- | Waits for a message over a luxi transport.
290
recvMsg :: Client -> IO String
291
recvMsg s = do
292
  cbuf <- readIORef $ rbuf s
293
  let (imsg, ibuf) = B.break (eOM ==) cbuf
294
  (msg, nbuf) <-
295
    if B.null ibuf      -- if old buffer didn't contain a full message
296
      then recvUpdate (socket s) cbuf   -- then we read from network
297
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
298
  writeIORef (rbuf s) nbuf
299
  return $ UTF8.toString msg
300

    
301
-- | Extended wrapper over recvMsg.
302
recvMsgExt :: Client -> IO RecvResult
303
recvMsgExt s =
304
  catch (liftM RecvOk (recvMsg s)) $ \e ->
305
    if isEOFError e
306
      then return RecvConnClosed
307
      else return $ RecvError (show e)
308

    
309
-- | Serialize a request to String.
310
buildCall :: LuxiOp  -- ^ The method
311
          -> String  -- ^ The serialized form
312
buildCall lo =
313
  let ja = [ (strOfKey Method, JSString $ toJSString $ strOfOp lo::JSValue)
314
           , (strOfKey Args, opToArgs lo::JSValue)
315
           ]
316
      jo = toJSObject ja
317
  in encodeStrict jo
318

    
319
-- | Serialize the response to String.
320
buildResponse :: Bool    -- ^ Success
321
              -> JSValue -- ^ The arguments
322
              -> String  -- ^ The serialized form
323
buildResponse success args =
324
  let ja = [ (strOfKey Success, JSBool success)
325
           , (strOfKey Result, args)]
326
      jo = toJSObject ja
327
  in encodeStrict jo
328

    
329
-- | Check that luxi request contains the required keys and parse it.
330
validateCall :: String -> Result LuxiCall
331
validateCall s = do
332
  arr <- fromJResult "parsing top-level luxi message" $
333
         decodeStrict s::Result (JSObject JSValue)
334
  let aobj = fromJSObject arr
335
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
336
  args <- fromObj aobj (strOfKey Args)
337
  return (LuxiCall call args)
338

    
339
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
340
--
341
-- This is currently hand-coded until we make it more uniform so that
342
-- it can be generated using TH.
343
decodeCall :: LuxiCall -> Result LuxiOp
344
decodeCall (LuxiCall call args) =
345
  case call of
346
    ReqQueryJobs -> do
347
              (jid, jargs) <- fromJVal args
348
              rid <- mapM parseJobId jid
349
              let rargs = map fromJSString jargs
350
              return $ QueryJobs rid rargs
351
    ReqQueryInstances -> do
352
              (names, fields, locking) <- fromJVal args
353
              return $ QueryInstances names fields locking
354
    ReqQueryNodes -> do
355
              (names, fields, locking) <- fromJVal args
356
              return $ QueryNodes names fields locking
357
    ReqQueryGroups -> do
358
              (names, fields, locking) <- fromJVal args
359
              return $ QueryGroups names fields locking
360
    ReqQueryClusterInfo -> do
361
              return QueryClusterInfo
362
    ReqQuery -> do
363
              (what, fields, _) <-
364
                fromJVal args::Result (QrViaLuxi, [String], JSValue)
365
              return $ Query what fields ()
366
    ReqSubmitJob -> do
367
              [ops1] <- fromJVal args
368
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
369
              return $ SubmitJob ops2
370
    ReqSubmitManyJobs -> do
371
              [ops1] <- fromJVal args
372
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
373
              return $ SubmitManyJobs ops2
374
    ReqWaitForJobChange -> do
375
              (jid, fields, pinfo, pidx, wtmout) <-
376
                -- No instance for 5-tuple, code copied from the
377
                -- json sources and adapted
378
                fromJResult "Parsing WaitForJobChange message" $
379
                case args of
380
                  JSArray [a, b, c, d, e] ->
381
                    (,,,,) `fmap`
382
                    J.readJSON a `ap`
383
                    J.readJSON b `ap`
384
                    J.readJSON c `ap`
385
                    J.readJSON d `ap`
386
                    J.readJSON e
387
                  _ -> J.Error "Not enough values"
388
              rid <- parseJobId jid
389
              return $ WaitForJobChange rid fields pinfo pidx wtmout
390
    ReqArchiveJob -> do
391
              [jid] <- fromJVal args
392
              rid <- parseJobId jid
393
              return $ ArchiveJob rid
394
    ReqAutoArchiveJobs -> do
395
              (age, tmout) <- fromJVal args
396
              return $ AutoArchiveJobs age tmout
397
    ReqQueryExports -> do
398
              (nodes, lock) <- fromJVal args
399
              return $ QueryExports nodes lock
400
    ReqQueryConfigValues -> do
401
              [fields] <- fromJVal args
402
              return $ QueryConfigValues fields
403
    ReqQueryTags -> do
404
              (kind, name) <- fromJVal args
405
              return $ QueryTags kind name
406
    ReqCancelJob -> do
407
              [job] <- fromJVal args
408
              rid <- parseJobId job
409
              return $ CancelJob rid
410
    ReqSetDrainFlag -> do
411
              [flag] <- fromJVal args
412
              return $ SetDrainFlag flag
413
    ReqSetWatcherPause -> do
414
              [duration] <- fromJVal args
415
              return $ SetWatcherPause duration
416

    
417
-- | Check that luxi responses contain the required keys and that the
418
-- call was successful.
419
validateResult :: String -> Result JSValue
420
validateResult s = do
421
  when (UTF8.replacement_char `elem` s) $
422
       fail "Failed to decode UTF-8, detected replacement char after decoding"
423
  oarr <- fromJResult "Parsing LUXI response"
424
          (decodeStrict s)::Result (JSObject JSValue)
425
  let arr = J.fromJSObject oarr
426
  status <- fromObj arr (strOfKey Success)::Result Bool
427
  let rkey = strOfKey Result
428
  if status
429
    then fromObj arr rkey
430
    else fromObj arr rkey >>= fail
431

    
432
-- | Generic luxi method call.
433
callMethod :: LuxiOp -> Client -> IO (Result JSValue)
434
callMethod method s = do
435
  sendMsg s $ buildCall method
436
  result <- recvMsg s
437
  let rval = validateResult result
438
  return rval
439

    
440
-- | Parses a job ID.
441
parseJobId :: JSValue -> Result JobId
442
parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
443
parseJobId (JSRational _ x) =
444
  if denominator x /= 1
445
    then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
446
    -- FIXME: potential integer overflow here on 32-bit platforms
447
    else Ok . fromIntegral . numerator $ x
448
parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
449

    
450
-- | Parse job submission result.
451
parseSubmitJobResult :: JSValue -> Result JobId
452
parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v
453
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
454
  Bad (fromJSString x)
455
parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++
456
                         show v
457

    
458
-- | Specialized submitManyJobs call.
459
submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId])
460
submitManyJobs s jobs = do
461
  rval <- callMethod (SubmitManyJobs jobs) s
462
  -- map each result (status, payload) pair into a nice Result ADT
463
  return $ case rval of
464
             Bad x -> Bad x
465
             Ok (JSArray r) -> mapM parseSubmitJobResult r
466
             x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
467

    
468
-- | Custom queryJobs call.
469
queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
470
queryJobsStatus s jids = do
471
  rval <- callMethod (QueryJobs jids ["status"]) s
472
  return $ case rval of
473
             Bad x -> Bad x
474
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
475
                       J.Ok vals -> if any null vals
476
                                    then Bad "Missing job status field"
477
                                    else Ok (map head vals)
478
                       J.Error x -> Bad x