Statistics
| Branch: | Tag: | Revision:

root / htools / Ganeti / Luxi.hs @ 8a9ee1e9

History | View | Annotate | Download (14.9 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , ResultStatus(..)
31
  , LuxiReq(..)
32
  , Client
33
  , JobId
34
  , RecvResult(..)
35
  , strOfOp
36
  , checkRS
37
  , getClient
38
  , getServer
39
  , acceptClient
40
  , closeClient
41
  , closeServer
42
  , callMethod
43
  , submitManyJobs
44
  , queryJobsStatus
45
  , buildCall
46
  , buildResponse
47
  , validateCall
48
  , decodeCall
49
  , recvMsg
50
  , recvMsgExt
51
  , sendMsg
52
  ) where
53

    
54
import Control.Exception (catch)
55
import Data.IORef
56
import Data.Ratio (numerator, denominator)
57
import qualified Data.ByteString as B
58
import qualified Data.ByteString.UTF8 as UTF8
59
import Data.Word (Word8)
60
import Control.Monad
61
import Prelude hiding (catch)
62
import Text.JSON (encodeStrict, decodeStrict)
63
import qualified Text.JSON as J
64
import Text.JSON.Types
65
import System.Directory (removeFile)
66
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
67
import System.IO.Error (isEOFError)
68
import System.Timeout
69
import qualified Network.Socket as S
70

    
71
import Ganeti.HTools.JSON
72
import Ganeti.HTools.Types
73
import Ganeti.HTools.Utils
74

    
75
import Ganeti.Constants
76
import Ganeti.Jobs (JobStatus)
77
import Ganeti.OpCodes (OpCode)
78
import qualified Ganeti.Qlang as Qlang
79
import Ganeti.THH
80

    
81
-- * Utility functions
82

    
83
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
84
withTimeout :: Int -> String -> IO a -> IO a
85
withTimeout secs descr action = do
86
  result <- timeout (secs * 1000000) action
87
  case result of
88
    Nothing -> fail $ "Timeout in " ++ descr
89
    Just v -> return v
90

    
91
-- * Generic protocol functionality
92

    
93
-- | Result of receiving a message from the socket.
94
data RecvResult = RecvConnClosed    -- ^ Connection closed
95
                | RecvError String  -- ^ Any other error
96
                | RecvOk String     -- ^ Successfull receive
97
                  deriving (Show, Read, Eq)
98

    
99
-- | The Ganeti job type.
100
type JobId = Int
101

    
102
-- | Currently supported Luxi operations and JSON serialization.
103
$(genLuxiOp "LuxiOp"
104
  [(luxiReqQuery,
105
    [ ("what",    [t| Qlang.ItemType |])
106
    , ("fields",  [t| [String]  |])
107
    , ("qfilter", [t| Qlang.Filter |])
108
    ])
109
  , (luxiReqQueryNodes,
110
     [ ("names",  [t| [String] |])
111
     , ("fields", [t| [String] |])
112
     , ("lock",   [t| Bool     |])
113
     ])
114
  , (luxiReqQueryGroups,
115
     [ ("names",  [t| [String] |])
116
     , ("fields", [t| [String] |])
117
     , ("lock",   [t| Bool     |])
118
     ])
119
  , (luxiReqQueryInstances,
120
     [ ("names",  [t| [String] |])
121
     , ("fields", [t| [String] |])
122
     , ("lock",   [t| Bool     |])
123
     ])
124
  , (luxiReqQueryJobs,
125
     [ ("ids",    [t| [Int]    |])
126
     , ("fields", [t| [String] |])
127
     ])
128
  , (luxiReqQueryExports,
129
     [ ("nodes", [t| [String] |])
130
     , ("lock",  [t| Bool     |])
131
     ])
132
  , (luxiReqQueryConfigValues,
133
     [ ("fields", [t| [String] |]) ]
134
    )
135
  , (luxiReqQueryClusterInfo, [])
136
  , (luxiReqQueryTags,
137
     [ ("kind", [t| String |])
138
     , ("name", [t| String |])
139
     ])
140
  , (luxiReqSubmitJob,
141
     [ ("job", [t| [OpCode] |]) ]
142
    )
143
  , (luxiReqSubmitManyJobs,
144
     [ ("ops", [t| [[OpCode]] |]) ]
145
    )
146
  , (luxiReqWaitForJobChange,
147
     [ ("job",      [t| Int     |])
148
     , ("fields",   [t| [String]|])
149
     , ("prev_job", [t| JSValue |])
150
     , ("prev_log", [t| JSValue |])
151
     , ("tmout",    [t| Int     |])
152
     ])
153
  , (luxiReqArchiveJob,
154
     [ ("job", [t| Int |]) ]
155
    )
156
  , (luxiReqAutoArchiveJobs,
157
     [ ("age",   [t| Int |])
158
     , ("tmout", [t| Int |])
159
     ])
160
  , (luxiReqCancelJob,
161
     [ ("job", [t| Int |]) ]
162
    )
163
  , (luxiReqSetDrainFlag,
164
     [ ("flag", [t| Bool |]) ]
165
    )
166
  , (luxiReqSetWatcherPause,
167
     [ ("duration", [t| Double |]) ]
168
    )
169
  ])
170

    
171
$(makeJSONInstance ''LuxiReq)
172

    
173
-- | The serialisation of LuxiOps into strings in messages.
174
$(genStrOfOp ''LuxiOp "strOfOp")
175

    
176
$(declareIADT "ResultStatus"
177
  [ ("RSNormal", 'rsNormal)
178
  , ("RSUnknown", 'rsUnknown)
179
  , ("RSNoData", 'rsNodata)
180
  , ("RSUnavailable", 'rsUnavail)
181
  , ("RSOffline", 'rsOffline)
182
  ])
183

    
184
$(makeJSONInstance ''ResultStatus)
185

    
186
-- | Type holding the initial (unparsed) Luxi call.
187
data LuxiCall = LuxiCall LuxiReq JSValue
188

    
189
-- | Check that ResultStatus is success or fail with descriptive message.
190
checkRS :: (Monad m) => ResultStatus -> a -> m a
191
checkRS RSNormal val    = return val
192
checkRS RSUnknown _     = fail "Unknown field"
193
checkRS RSNoData _      = fail "No data for a field"
194
checkRS RSUnavailable _ = fail "Ganeti reports unavailable data"
195
checkRS RSOffline _     = fail "Ganeti reports resource as offline"
196

    
197
-- | The end-of-message separator.
198
eOM :: Word8
199
eOM = 3
200

    
201
-- | The end-of-message encoded as a ByteString.
202
bEOM :: B.ByteString
203
bEOM = B.singleton eOM
204

    
205
-- | Valid keys in the requests and responses.
206
data MsgKeys = Method
207
             | Args
208
             | Success
209
             | Result
210

    
211
-- | The serialisation of MsgKeys into strings in messages.
212
$(genStrOfKey ''MsgKeys "strOfKey")
213

    
214
-- | Luxi client encapsulation.
215
data Client = Client { socket :: Handle           -- ^ The socket of the client
216
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
217
                     }
218

    
219
-- | Connects to the master daemon and returns a luxi Client.
220
getClient :: String -> IO Client
221
getClient path = do
222
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
223
  withTimeout connTimeout "creating luxi connection" $
224
              S.connect s (S.SockAddrUnix path)
225
  rf <- newIORef B.empty
226
  h <- S.socketToHandle s ReadWriteMode
227
  return Client { socket=h, rbuf=rf }
228

    
229
-- | Creates and returns a server endpoint.
230
getServer :: FilePath -> IO S.Socket
231
getServer path = do
232
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
233
  S.bindSocket s (S.SockAddrUnix path)
234
  S.listen s 5 -- 5 is the max backlog
235
  return s
236

    
237
-- | Closes a server endpoint.
238
-- FIXME: this should be encapsulated into a nicer type.
239
closeServer :: FilePath -> S.Socket -> IO ()
240
closeServer path sock = do
241
  S.sClose sock
242
  removeFile path
243

    
244
-- | Accepts a client
245
acceptClient :: S.Socket -> IO Client
246
acceptClient s = do
247
  -- second return is the address of the client, which we ignore here
248
  (client_socket, _) <- S.accept s
249
  new_buffer <- newIORef B.empty
250
  handle <- S.socketToHandle client_socket ReadWriteMode
251
  return Client { socket=handle, rbuf=new_buffer }
252

    
253
-- | Closes the client socket.
254
closeClient :: Client -> IO ()
255
closeClient = hClose . socket
256

    
257
-- | Sends a message over a luxi transport.
258
sendMsg :: Client -> String -> IO ()
259
sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
260
  let encoded = UTF8.fromString buf
261
      handle = socket s
262
  B.hPut handle encoded
263
  B.hPut handle bEOM
264
  hFlush handle
265

    
266
-- | Given a current buffer and the handle, it will read from the
267
-- network until we get a full message, and it will return that
268
-- message and the leftover buffer contents.
269
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
270
recvUpdate handle obuf = do
271
  nbuf <- withTimeout queryTimeout "reading luxi response" $ do
272
            _ <- hWaitForInput handle (-1)
273
            B.hGetNonBlocking handle 4096
274
  let (msg, remaining) = B.break (eOM ==) nbuf
275
      newbuf = B.append obuf msg
276
  if B.null remaining
277
    then recvUpdate handle newbuf
278
    else return (newbuf, B.tail remaining)
279

    
280
-- | Waits for a message over a luxi transport.
281
recvMsg :: Client -> IO String
282
recvMsg s = do
283
  cbuf <- readIORef $ rbuf s
284
  let (imsg, ibuf) = B.break (eOM ==) cbuf
285
  (msg, nbuf) <-
286
    if B.null ibuf      -- if old buffer didn't contain a full message
287
      then recvUpdate (socket s) cbuf   -- then we read from network
288
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
289
  writeIORef (rbuf s) nbuf
290
  return $ UTF8.toString msg
291

    
292
-- | Extended wrapper over recvMsg.
293
recvMsgExt :: Client -> IO RecvResult
294
recvMsgExt s =
295
  catch (liftM RecvOk (recvMsg s)) $ \e ->
296
    if isEOFError e
297
      then return RecvConnClosed
298
      else return $ RecvError (show e)
299

    
300
-- | Serialize a request to String.
301
buildCall :: LuxiOp  -- ^ The method
302
          -> String  -- ^ The serialized form
303
buildCall lo =
304
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
305
           , (strOfKey Args, opToArgs lo)
306
           ]
307
      jo = toJSObject ja
308
  in encodeStrict jo
309

    
310
-- | Serialize the response to String.
311
buildResponse :: Bool    -- ^ Success
312
              -> JSValue -- ^ The arguments
313
              -> String  -- ^ The serialized form
314
buildResponse success args =
315
  let ja = [ (strOfKey Success, JSBool success)
316
           , (strOfKey Result, args)]
317
      jo = toJSObject ja
318
  in encodeStrict jo
319

    
320
-- | Check that luxi request contains the required keys and parse it.
321
validateCall :: String -> Result LuxiCall
322
validateCall s = do
323
  arr <- fromJResult "parsing top-level luxi message" $
324
         decodeStrict s::Result (JSObject JSValue)
325
  let aobj = fromJSObject arr
326
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
327
  args <- fromObj aobj (strOfKey Args)
328
  return (LuxiCall call args)
329

    
330
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
331
--
332
-- This is currently hand-coded until we make it more uniform so that
333
-- it can be generated using TH.
334
decodeCall :: LuxiCall -> Result LuxiOp
335
decodeCall (LuxiCall call args) =
336
  case call of
337
    ReqQueryJobs -> do
338
              (jid, jargs) <- fromJVal args
339
              rid <- mapM parseJobId jid
340
              let rargs = map fromJSString jargs
341
              return $ QueryJobs rid rargs
342
    ReqQueryInstances -> do
343
              (names, fields, locking) <- fromJVal args
344
              return $ QueryInstances names fields locking
345
    ReqQueryNodes -> do
346
              (names, fields, locking) <- fromJVal args
347
              return $ QueryNodes names fields locking
348
    ReqQueryGroups -> do
349
              (names, fields, locking) <- fromJVal args
350
              return $ QueryGroups names fields locking
351
    ReqQueryClusterInfo -> do
352
              return QueryClusterInfo
353
    ReqQuery -> do
354
              (what, fields, qfilter) <- fromJVal args
355
              return $ Query what fields qfilter
356
    ReqSubmitJob -> do
357
              [ops1] <- fromJVal args
358
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
359
              return $ SubmitJob ops2
360
    ReqSubmitManyJobs -> do
361
              [ops1] <- fromJVal args
362
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
363
              return $ SubmitManyJobs ops2
364
    ReqWaitForJobChange -> do
365
              (jid, fields, pinfo, pidx, wtmout) <-
366
                -- No instance for 5-tuple, code copied from the
367
                -- json sources and adapted
368
                fromJResult "Parsing WaitForJobChange message" $
369
                case args of
370
                  JSArray [a, b, c, d, e] ->
371
                    (,,,,) `fmap`
372
                    J.readJSON a `ap`
373
                    J.readJSON b `ap`
374
                    J.readJSON c `ap`
375
                    J.readJSON d `ap`
376
                    J.readJSON e
377
                  _ -> J.Error "Not enough values"
378
              rid <- parseJobId jid
379
              return $ WaitForJobChange rid fields pinfo pidx wtmout
380
    ReqArchiveJob -> do
381
              [jid] <- fromJVal args
382
              rid <- parseJobId jid
383
              return $ ArchiveJob rid
384
    ReqAutoArchiveJobs -> do
385
              (age, tmout) <- fromJVal args
386
              return $ AutoArchiveJobs age tmout
387
    ReqQueryExports -> do
388
              (nodes, lock) <- fromJVal args
389
              return $ QueryExports nodes lock
390
    ReqQueryConfigValues -> do
391
              [fields] <- fromJVal args
392
              return $ QueryConfigValues fields
393
    ReqQueryTags -> do
394
              (kind, name) <- fromJVal args
395
              return $ QueryTags kind name
396
    ReqCancelJob -> do
397
              [job] <- fromJVal args
398
              rid <- parseJobId job
399
              return $ CancelJob rid
400
    ReqSetDrainFlag -> do
401
              [flag] <- fromJVal args
402
              return $ SetDrainFlag flag
403
    ReqSetWatcherPause -> do
404
              [duration] <- fromJVal args
405
              return $ SetWatcherPause duration
406

    
407
-- | Check that luxi responses contain the required keys and that the
408
-- call was successful.
409
validateResult :: String -> Result JSValue
410
validateResult s = do
411
  when (UTF8.replacement_char `elem` s) $
412
       fail "Failed to decode UTF-8, detected replacement char after decoding"
413
  oarr <- fromJResult "Parsing LUXI response"
414
          (decodeStrict s)::Result (JSObject JSValue)
415
  let arr = J.fromJSObject oarr
416
  status <- fromObj arr (strOfKey Success)::Result Bool
417
  let rkey = strOfKey Result
418
  if status
419
    then fromObj arr rkey
420
    else fromObj arr rkey >>= fail
421

    
422
-- | Generic luxi method call.
423
callMethod :: LuxiOp -> Client -> IO (Result JSValue)
424
callMethod method s = do
425
  sendMsg s $ buildCall method
426
  result <- recvMsg s
427
  let rval = validateResult result
428
  return rval
429

    
430
-- | Parses a job ID.
431
parseJobId :: JSValue -> Result JobId
432
parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
433
parseJobId (JSRational _ x) =
434
  if denominator x /= 1
435
    then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
436
    -- FIXME: potential integer overflow here on 32-bit platforms
437
    else Ok . fromIntegral . numerator $ x
438
parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
439

    
440
-- | Parse job submission result.
441
parseSubmitJobResult :: JSValue -> Result JobId
442
parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v
443
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
444
  Bad (fromJSString x)
445
parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++
446
                         show v
447

    
448
-- | Specialized submitManyJobs call.
449
submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId])
450
submitManyJobs s jobs = do
451
  rval <- callMethod (SubmitManyJobs jobs) s
452
  -- map each result (status, payload) pair into a nice Result ADT
453
  return $ case rval of
454
             Bad x -> Bad x
455
             Ok (JSArray r) -> mapM parseSubmitJobResult r
456
             x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
457

    
458
-- | Custom queryJobs call.
459
queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
460
queryJobsStatus s jids = do
461
  rval <- callMethod (QueryJobs jids ["status"]) s
462
  return $ case rval of
463
             Bad x -> Bad x
464
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
465
                       J.Ok vals -> if any null vals
466
                                    then Bad "Missing job status field"
467
                                    else Ok (map head vals)
468
                       J.Error x -> Bad x