Statistics
| Branch: | Tag: | Revision:

root / htools / Ganeti / Luxi.hs @ dc6a0f82

History | View | Annotate | Download (15.4 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , QrViaLuxi(..)
31
  , ResultStatus(..)
32
  , LuxiReq(..)
33
  , Client
34
  , JobId
35
  , RecvResult(..)
36
  , strOfOp
37
  , checkRS
38
  , getClient
39
  , getServer
40
  , acceptClient
41
  , closeClient
42
  , closeServer
43
  , callMethod
44
  , submitManyJobs
45
  , queryJobsStatus
46
  , buildCall
47
  , buildResponse
48
  , validateCall
49
  , decodeCall
50
  , recvMsg
51
  , recvMsgExt
52
  , sendMsg
53
  ) where
54

    
55
import Control.Exception (catch)
56
import Data.IORef
57
import Data.Ratio (numerator, denominator)
58
import qualified Data.ByteString as B
59
import qualified Data.ByteString.UTF8 as UTF8
60
import Data.Word (Word8)
61
import Control.Monad
62
import Prelude hiding (catch)
63
import Text.JSON (encodeStrict, decodeStrict)
64
import qualified Text.JSON as J
65
import Text.JSON.Types
66
import System.Directory (removeFile)
67
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
68
import System.IO.Error (isEOFError)
69
import System.Timeout
70
import qualified Network.Socket as S
71

    
72
import Ganeti.HTools.JSON
73
import Ganeti.HTools.Types
74
import Ganeti.HTools.Utils
75

    
76
import Ganeti.Constants
77
import Ganeti.Jobs (JobStatus)
78
import Ganeti.OpCodes (OpCode)
79
import qualified Ganeti.Qlang as Qlang
80
import Ganeti.THH
81

    
82
-- * Utility functions
83

    
84
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
85
withTimeout :: Int -> String -> IO a -> IO a
86
withTimeout secs descr action = do
87
  result <- timeout (secs * 1000000) action
88
  case result of
89
    Nothing -> fail $ "Timeout in " ++ descr
90
    Just v -> return v
91

    
92
-- * Generic protocol functionality
93

    
94
-- | Result of receiving a message from the socket.
95
data RecvResult = RecvConnClosed    -- ^ Connection closed
96
                | RecvError String  -- ^ Any other error
97
                | RecvOk String     -- ^ Successfull receive
98
                  deriving (Show, Read, Eq)
99

    
100
-- | The Ganeti job type.
101
type JobId = Int
102

    
103
$(declareSADT "QrViaLuxi"
104
  [ ("QRLock",     'qrLock)
105
  , ("QRInstance", 'qrInstance)
106
  , ("QRNode",     'qrNode)
107
  , ("QRGroup",    'qrGroup)
108
  , ("QROs",       'qrOs)
109
  , ("QRJob",      'qrJob)
110
  ])
111
$(makeJSONInstance ''QrViaLuxi)
112

    
113
-- | Currently supported Luxi operations and JSON serialization.
114
$(genLuxiOp "LuxiOp"
115
  [(luxiReqQuery,
116
    [ ("what",    [t| QrViaLuxi |], [| id |])
117
    , ("fields",  [t| [String]  |], [| id |])
118
    , ("qfilter", [t| Qlang.Filter |], [| id |])
119
    ])
120
  , (luxiReqQueryNodes,
121
     [ ("names",  [t| [String] |], [| id |])
122
     , ("fields", [t| [String] |], [| id |])
123
     , ("lock",   [t| Bool     |], [| id |])
124
     ])
125
  , (luxiReqQueryGroups,
126
     [ ("names",  [t| [String] |], [| id |])
127
     , ("fields", [t| [String] |], [| id |])
128
     , ("lock",   [t| Bool     |], [| id |])
129
     ])
130
  , (luxiReqQueryInstances,
131
     [ ("names",  [t| [String] |], [| id |])
132
     , ("fields", [t| [String] |], [| id |])
133
     , ("lock",   [t| Bool     |], [| id |])
134
     ])
135
  , (luxiReqQueryJobs,
136
     [ ("ids",    [t| [Int]    |], [| id |])
137
     , ("fields", [t| [String] |], [| id |])
138
     ])
139
  , (luxiReqQueryExports,
140
     [ ("nodes", [t| [String] |], [| id |])
141
     , ("lock",  [t| Bool     |], [| id |])
142
     ])
143
  , (luxiReqQueryConfigValues,
144
     [ ("fields", [t| [String] |], [| id |]) ]
145
    )
146
  , (luxiReqQueryClusterInfo, [])
147
  , (luxiReqQueryTags,
148
     [ ("kind", [t| String |], [| id |])
149
     , ("name", [t| String |], [| id |])
150
     ])
151
  , (luxiReqSubmitJob,
152
     [ ("job", [t| [OpCode] |], [| id |]) ]
153
    )
154
  , (luxiReqSubmitManyJobs,
155
     [ ("ops", [t| [[OpCode]] |], [| id |]) ]
156
    )
157
  , (luxiReqWaitForJobChange,
158
     [ ("job",      [t| Int     |], [| id |])
159
     , ("fields",   [t| [String]|], [| id |])
160
     , ("prev_job", [t| JSValue |], [| id |])
161
     , ("prev_log", [t| JSValue |], [| id |])
162
     , ("tmout",    [t| Int     |], [| id |])
163
     ])
164
  , (luxiReqArchiveJob,
165
     [ ("job", [t| Int |], [| id |]) ]
166
    )
167
  , (luxiReqAutoArchiveJobs,
168
     [ ("age",   [t| Int |], [| id |])
169
     , ("tmout", [t| Int |], [| id |])
170
     ])
171
  , (luxiReqCancelJob,
172
     [ ("job", [t| Int |], [| id |]) ]
173
    )
174
  , (luxiReqSetDrainFlag,
175
     [ ("flag", [t| Bool |], [| id |]) ]
176
    )
177
  , (luxiReqSetWatcherPause,
178
     [ ("duration", [t| Double |], [| id |]) ]
179
    )
180
  ])
181

    
182
$(makeJSONInstance ''LuxiReq)
183

    
184
-- | The serialisation of LuxiOps into strings in messages.
185
$(genStrOfOp ''LuxiOp "strOfOp")
186

    
187
$(declareIADT "ResultStatus"
188
  [ ("RSNormal", 'rsNormal)
189
  , ("RSUnknown", 'rsUnknown)
190
  , ("RSNoData", 'rsNodata)
191
  , ("RSUnavailable", 'rsUnavail)
192
  , ("RSOffline", 'rsOffline)
193
  ])
194

    
195
$(makeJSONInstance ''ResultStatus)
196

    
197
-- | Type holding the initial (unparsed) Luxi call.
198
data LuxiCall = LuxiCall LuxiReq JSValue
199

    
200
-- | Check that ResultStatus is success or fail with descriptive message.
201
checkRS :: (Monad m) => ResultStatus -> a -> m a
202
checkRS RSNormal val    = return val
203
checkRS RSUnknown _     = fail "Unknown field"
204
checkRS RSNoData _      = fail "No data for a field"
205
checkRS RSUnavailable _ = fail "Ganeti reports unavailable data"
206
checkRS RSOffline _     = fail "Ganeti reports resource as offline"
207

    
208
-- | The end-of-message separator.
209
eOM :: Word8
210
eOM = 3
211

    
212
-- | The end-of-message encoded as a ByteString.
213
bEOM :: B.ByteString
214
bEOM = B.singleton eOM
215

    
216
-- | Valid keys in the requests and responses.
217
data MsgKeys = Method
218
             | Args
219
             | Success
220
             | Result
221

    
222
-- | The serialisation of MsgKeys into strings in messages.
223
$(genStrOfKey ''MsgKeys "strOfKey")
224

    
225
-- | Luxi client encapsulation.
226
data Client = Client { socket :: Handle           -- ^ The socket of the client
227
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
228
                     }
229

    
230
-- | Connects to the master daemon and returns a luxi Client.
231
getClient :: String -> IO Client
232
getClient path = do
233
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
234
  withTimeout connTimeout "creating luxi connection" $
235
              S.connect s (S.SockAddrUnix path)
236
  rf <- newIORef B.empty
237
  h <- S.socketToHandle s ReadWriteMode
238
  return Client { socket=h, rbuf=rf }
239

    
240
-- | Creates and returns a server endpoint.
241
getServer :: FilePath -> IO S.Socket
242
getServer path = do
243
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
244
  S.bindSocket s (S.SockAddrUnix path)
245
  S.listen s 5 -- 5 is the max backlog
246
  return s
247

    
248
-- | Closes a server endpoint.
249
-- FIXME: this should be encapsulated into a nicer type.
250
closeServer :: FilePath -> S.Socket -> IO ()
251
closeServer path sock = do
252
  S.sClose sock
253
  removeFile path
254

    
255
-- | Accepts a client
256
acceptClient :: S.Socket -> IO Client
257
acceptClient s = do
258
  -- second return is the address of the client, which we ignore here
259
  (client_socket, _) <- S.accept s
260
  new_buffer <- newIORef B.empty
261
  handle <- S.socketToHandle client_socket ReadWriteMode
262
  return Client { socket=handle, rbuf=new_buffer }
263

    
264
-- | Closes the client socket.
265
closeClient :: Client -> IO ()
266
closeClient = hClose . socket
267

    
268
-- | Sends a message over a luxi transport.
269
sendMsg :: Client -> String -> IO ()
270
sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
271
  let encoded = UTF8.fromString buf
272
      handle = socket s
273
  B.hPut handle encoded
274
  B.hPut handle bEOM
275
  hFlush handle
276

    
277
-- | Given a current buffer and the handle, it will read from the
278
-- network until we get a full message, and it will return that
279
-- message and the leftover buffer contents.
280
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
281
recvUpdate handle obuf = do
282
  nbuf <- withTimeout queryTimeout "reading luxi response" $ do
283
            _ <- hWaitForInput handle (-1)
284
            B.hGetNonBlocking handle 4096
285
  let (msg, remaining) = B.break (eOM ==) nbuf
286
      newbuf = B.append obuf msg
287
  if B.null remaining
288
    then recvUpdate handle newbuf
289
    else return (newbuf, B.tail remaining)
290

    
291
-- | Waits for a message over a luxi transport.
292
recvMsg :: Client -> IO String
293
recvMsg s = do
294
  cbuf <- readIORef $ rbuf s
295
  let (imsg, ibuf) = B.break (eOM ==) cbuf
296
  (msg, nbuf) <-
297
    if B.null ibuf      -- if old buffer didn't contain a full message
298
      then recvUpdate (socket s) cbuf   -- then we read from network
299
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
300
  writeIORef (rbuf s) nbuf
301
  return $ UTF8.toString msg
302

    
303
-- | Extended wrapper over recvMsg.
304
recvMsgExt :: Client -> IO RecvResult
305
recvMsgExt s =
306
  catch (liftM RecvOk (recvMsg s)) $ \e ->
307
    if isEOFError e
308
      then return RecvConnClosed
309
      else return $ RecvError (show e)
310

    
311
-- | Serialize a request to String.
312
buildCall :: LuxiOp  -- ^ The method
313
          -> String  -- ^ The serialized form
314
buildCall lo =
315
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
316
           , (strOfKey Args, opToArgs lo)
317
           ]
318
      jo = toJSObject ja
319
  in encodeStrict jo
320

    
321
-- | Serialize the response to String.
322
buildResponse :: Bool    -- ^ Success
323
              -> JSValue -- ^ The arguments
324
              -> String  -- ^ The serialized form
325
buildResponse success args =
326
  let ja = [ (strOfKey Success, JSBool success)
327
           , (strOfKey Result, args)]
328
      jo = toJSObject ja
329
  in encodeStrict jo
330

    
331
-- | Check that luxi request contains the required keys and parse it.
332
validateCall :: String -> Result LuxiCall
333
validateCall s = do
334
  arr <- fromJResult "parsing top-level luxi message" $
335
         decodeStrict s::Result (JSObject JSValue)
336
  let aobj = fromJSObject arr
337
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
338
  args <- fromObj aobj (strOfKey Args)
339
  return (LuxiCall call args)
340

    
341
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
342
--
343
-- This is currently hand-coded until we make it more uniform so that
344
-- it can be generated using TH.
345
decodeCall :: LuxiCall -> Result LuxiOp
346
decodeCall (LuxiCall call args) =
347
  case call of
348
    ReqQueryJobs -> do
349
              (jid, jargs) <- fromJVal args
350
              rid <- mapM parseJobId jid
351
              let rargs = map fromJSString jargs
352
              return $ QueryJobs rid rargs
353
    ReqQueryInstances -> do
354
              (names, fields, locking) <- fromJVal args
355
              return $ QueryInstances names fields locking
356
    ReqQueryNodes -> do
357
              (names, fields, locking) <- fromJVal args
358
              return $ QueryNodes names fields locking
359
    ReqQueryGroups -> do
360
              (names, fields, locking) <- fromJVal args
361
              return $ QueryGroups names fields locking
362
    ReqQueryClusterInfo -> do
363
              return QueryClusterInfo
364
    ReqQuery -> do
365
              (what, fields, qfilter) <- fromJVal args
366
              return $ Query what fields qfilter
367
    ReqSubmitJob -> do
368
              [ops1] <- fromJVal args
369
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
370
              return $ SubmitJob ops2
371
    ReqSubmitManyJobs -> do
372
              [ops1] <- fromJVal args
373
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
374
              return $ SubmitManyJobs ops2
375
    ReqWaitForJobChange -> do
376
              (jid, fields, pinfo, pidx, wtmout) <-
377
                -- No instance for 5-tuple, code copied from the
378
                -- json sources and adapted
379
                fromJResult "Parsing WaitForJobChange message" $
380
                case args of
381
                  JSArray [a, b, c, d, e] ->
382
                    (,,,,) `fmap`
383
                    J.readJSON a `ap`
384
                    J.readJSON b `ap`
385
                    J.readJSON c `ap`
386
                    J.readJSON d `ap`
387
                    J.readJSON e
388
                  _ -> J.Error "Not enough values"
389
              rid <- parseJobId jid
390
              return $ WaitForJobChange rid fields pinfo pidx wtmout
391
    ReqArchiveJob -> do
392
              [jid] <- fromJVal args
393
              rid <- parseJobId jid
394
              return $ ArchiveJob rid
395
    ReqAutoArchiveJobs -> do
396
              (age, tmout) <- fromJVal args
397
              return $ AutoArchiveJobs age tmout
398
    ReqQueryExports -> do
399
              (nodes, lock) <- fromJVal args
400
              return $ QueryExports nodes lock
401
    ReqQueryConfigValues -> do
402
              [fields] <- fromJVal args
403
              return $ QueryConfigValues fields
404
    ReqQueryTags -> do
405
              (kind, name) <- fromJVal args
406
              return $ QueryTags kind name
407
    ReqCancelJob -> do
408
              [job] <- fromJVal args
409
              rid <- parseJobId job
410
              return $ CancelJob rid
411
    ReqSetDrainFlag -> do
412
              [flag] <- fromJVal args
413
              return $ SetDrainFlag flag
414
    ReqSetWatcherPause -> do
415
              [duration] <- fromJVal args
416
              return $ SetWatcherPause duration
417

    
418
-- | Check that luxi responses contain the required keys and that the
419
-- call was successful.
420
validateResult :: String -> Result JSValue
421
validateResult s = do
422
  when (UTF8.replacement_char `elem` s) $
423
       fail "Failed to decode UTF-8, detected replacement char after decoding"
424
  oarr <- fromJResult "Parsing LUXI response"
425
          (decodeStrict s)::Result (JSObject JSValue)
426
  let arr = J.fromJSObject oarr
427
  status <- fromObj arr (strOfKey Success)::Result Bool
428
  let rkey = strOfKey Result
429
  if status
430
    then fromObj arr rkey
431
    else fromObj arr rkey >>= fail
432

    
433
-- | Generic luxi method call.
434
callMethod :: LuxiOp -> Client -> IO (Result JSValue)
435
callMethod method s = do
436
  sendMsg s $ buildCall method
437
  result <- recvMsg s
438
  let rval = validateResult result
439
  return rval
440

    
441
-- | Parses a job ID.
442
parseJobId :: JSValue -> Result JobId
443
parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
444
parseJobId (JSRational _ x) =
445
  if denominator x /= 1
446
    then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
447
    -- FIXME: potential integer overflow here on 32-bit platforms
448
    else Ok . fromIntegral . numerator $ x
449
parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
450

    
451
-- | Parse job submission result.
452
parseSubmitJobResult :: JSValue -> Result JobId
453
parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v
454
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
455
  Bad (fromJSString x)
456
parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++
457
                         show v
458

    
459
-- | Specialized submitManyJobs call.
460
submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId])
461
submitManyJobs s jobs = do
462
  rval <- callMethod (SubmitManyJobs jobs) s
463
  -- map each result (status, payload) pair into a nice Result ADT
464
  return $ case rval of
465
             Bad x -> Bad x
466
             Ok (JSArray r) -> mapM parseSubmitJobResult r
467
             x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
468

    
469
-- | Custom queryJobs call.
470
queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
471
queryJobsStatus s jids = do
472
  rval <- callMethod (QueryJobs jids ["status"]) s
473
  return $ case rval of
474
             Bad x -> Bad x
475
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
476
                       J.Ok vals -> if any null vals
477
                                    then Bad "Missing job status field"
478
                                    else Ok (map head vals)
479
                       J.Error x -> Bad x