Statistics
| Branch: | Tag: | Revision:

root / htools / Ganeti / Luxi.hs @ be747966

History | View | Annotate | Download (15.2 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , ResultStatus(..)
31
  , LuxiReq(..)
32
  , Client
33
  , JobId
34
  , RecvResult(..)
35
  , TagObject(..)
36
  , strOfOp
37
  , checkRS
38
  , getClient
39
  , getServer
40
  , acceptClient
41
  , closeClient
42
  , closeServer
43
  , callMethod
44
  , submitManyJobs
45
  , queryJobsStatus
46
  , buildCall
47
  , buildResponse
48
  , validateCall
49
  , decodeCall
50
  , recvMsg
51
  , recvMsgExt
52
  , sendMsg
53
  ) where
54

    
55
import Control.Exception (catch)
56
import Data.IORef
57
import Data.Ratio (numerator, denominator)
58
import qualified Data.ByteString as B
59
import qualified Data.ByteString.UTF8 as UTF8
60
import Data.Word (Word8)
61
import Control.Monad
62
import Prelude hiding (catch)
63
import Text.JSON (encodeStrict, decodeStrict)
64
import qualified Text.JSON as J
65
import Text.JSON.Types
66
import System.Directory (removeFile)
67
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
68
import System.IO.Error (isEOFError)
69
import System.Timeout
70
import qualified Network.Socket as S
71

    
72
import Ganeti.HTools.JSON
73
import Ganeti.HTools.Types
74
import Ganeti.HTools.Utils
75

    
76
import Ganeti.Constants
77
import Ganeti.Jobs (JobStatus)
78
import Ganeti.OpCodes (OpCode)
79
import qualified Ganeti.Qlang as Qlang
80
import Ganeti.THH
81

    
82
-- * Utility functions
83

    
84
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
85
withTimeout :: Int -> String -> IO a -> IO a
86
withTimeout secs descr action = do
87
  result <- timeout (secs * 1000000) action
88
  case result of
89
    Nothing -> fail $ "Timeout in " ++ descr
90
    Just v -> return v
91

    
92
-- * Generic protocol functionality
93

    
94
-- | Result of receiving a message from the socket.
95
data RecvResult = RecvConnClosed    -- ^ Connection closed
96
                | RecvError String  -- ^ Any other error
97
                | RecvOk String     -- ^ Successfull receive
98
                  deriving (Show, Read, Eq)
99

    
100
-- | The Ganeti job type.
101
type JobId = Int
102

    
103
-- | Data type representing what items do the tag operations apply to.
104
$(declareSADT "TagObject"
105
  [ ("TagInstance", 'tagInstance)
106
  , ("TagNode",     'tagNode)
107
  , ("TagGroup",    'tagNodegroup)
108
  , ("TagCluster",  'tagCluster)
109
  ])
110
$(makeJSONInstance ''TagObject)
111

    
112
-- | Currently supported Luxi operations and JSON serialization.
113
$(genLuxiOp "LuxiOp"
114
  [(luxiReqQuery,
115
    [ ("what",    [t| Qlang.ItemType |])
116
    , ("fields",  [t| [String]  |])
117
    , ("qfilter", [t| Qlang.Filter |])
118
    ])
119
  , (luxiReqQueryNodes,
120
     [ ("names",  [t| [String] |])
121
     , ("fields", [t| [String] |])
122
     , ("lock",   [t| Bool     |])
123
     ])
124
  , (luxiReqQueryGroups,
125
     [ ("names",  [t| [String] |])
126
     , ("fields", [t| [String] |])
127
     , ("lock",   [t| Bool     |])
128
     ])
129
  , (luxiReqQueryInstances,
130
     [ ("names",  [t| [String] |])
131
     , ("fields", [t| [String] |])
132
     , ("lock",   [t| Bool     |])
133
     ])
134
  , (luxiReqQueryJobs,
135
     [ ("ids",    [t| [Int]    |])
136
     , ("fields", [t| [String] |])
137
     ])
138
  , (luxiReqQueryExports,
139
     [ ("nodes", [t| [String] |])
140
     , ("lock",  [t| Bool     |])
141
     ])
142
  , (luxiReqQueryConfigValues,
143
     [ ("fields", [t| [String] |]) ]
144
    )
145
  , (luxiReqQueryClusterInfo, [])
146
  , (luxiReqQueryTags,
147
     [ ("kind", [t| TagObject |])
148
     , ("name", [t| String |])
149
     ])
150
  , (luxiReqSubmitJob,
151
     [ ("job", [t| [OpCode] |]) ]
152
    )
153
  , (luxiReqSubmitManyJobs,
154
     [ ("ops", [t| [[OpCode]] |]) ]
155
    )
156
  , (luxiReqWaitForJobChange,
157
     [ ("job",      [t| Int     |])
158
     , ("fields",   [t| [String]|])
159
     , ("prev_job", [t| JSValue |])
160
     , ("prev_log", [t| JSValue |])
161
     , ("tmout",    [t| Int     |])
162
     ])
163
  , (luxiReqArchiveJob,
164
     [ ("job", [t| Int |]) ]
165
    )
166
  , (luxiReqAutoArchiveJobs,
167
     [ ("age",   [t| Int |])
168
     , ("tmout", [t| Int |])
169
     ])
170
  , (luxiReqCancelJob,
171
     [ ("job", [t| Int |]) ]
172
    )
173
  , (luxiReqSetDrainFlag,
174
     [ ("flag", [t| Bool |]) ]
175
    )
176
  , (luxiReqSetWatcherPause,
177
     [ ("duration", [t| Double |]) ]
178
    )
179
  ])
180

    
181
$(makeJSONInstance ''LuxiReq)
182

    
183
-- | The serialisation of LuxiOps into strings in messages.
184
$(genStrOfOp ''LuxiOp "strOfOp")
185

    
186
$(declareIADT "ResultStatus"
187
  [ ("RSNormal", 'rsNormal)
188
  , ("RSUnknown", 'rsUnknown)
189
  , ("RSNoData", 'rsNodata)
190
  , ("RSUnavailable", 'rsUnavail)
191
  , ("RSOffline", 'rsOffline)
192
  ])
193

    
194
$(makeJSONInstance ''ResultStatus)
195

    
196
-- | Type holding the initial (unparsed) Luxi call.
197
data LuxiCall = LuxiCall LuxiReq JSValue
198

    
199
-- | Check that ResultStatus is success or fail with descriptive message.
200
checkRS :: (Monad m) => ResultStatus -> a -> m a
201
checkRS RSNormal val    = return val
202
checkRS RSUnknown _     = fail "Unknown field"
203
checkRS RSNoData _      = fail "No data for a field"
204
checkRS RSUnavailable _ = fail "Ganeti reports unavailable data"
205
checkRS RSOffline _     = fail "Ganeti reports resource as offline"
206

    
207
-- | The end-of-message separator.
208
eOM :: Word8
209
eOM = 3
210

    
211
-- | The end-of-message encoded as a ByteString.
212
bEOM :: B.ByteString
213
bEOM = B.singleton eOM
214

    
215
-- | Valid keys in the requests and responses.
216
data MsgKeys = Method
217
             | Args
218
             | Success
219
             | Result
220

    
221
-- | The serialisation of MsgKeys into strings in messages.
222
$(genStrOfKey ''MsgKeys "strOfKey")
223

    
224
-- | Luxi client encapsulation.
225
data Client = Client { socket :: Handle           -- ^ The socket of the client
226
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
227
                     }
228

    
229
-- | Connects to the master daemon and returns a luxi Client.
230
getClient :: String -> IO Client
231
getClient path = do
232
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
233
  withTimeout connTimeout "creating luxi connection" $
234
              S.connect s (S.SockAddrUnix path)
235
  rf <- newIORef B.empty
236
  h <- S.socketToHandle s ReadWriteMode
237
  return Client { socket=h, rbuf=rf }
238

    
239
-- | Creates and returns a server endpoint.
240
getServer :: FilePath -> IO S.Socket
241
getServer path = do
242
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
243
  S.bindSocket s (S.SockAddrUnix path)
244
  S.listen s 5 -- 5 is the max backlog
245
  return s
246

    
247
-- | Closes a server endpoint.
248
-- FIXME: this should be encapsulated into a nicer type.
249
closeServer :: FilePath -> S.Socket -> IO ()
250
closeServer path sock = do
251
  S.sClose sock
252
  removeFile path
253

    
254
-- | Accepts a client
255
acceptClient :: S.Socket -> IO Client
256
acceptClient s = do
257
  -- second return is the address of the client, which we ignore here
258
  (client_socket, _) <- S.accept s
259
  new_buffer <- newIORef B.empty
260
  handle <- S.socketToHandle client_socket ReadWriteMode
261
  return Client { socket=handle, rbuf=new_buffer }
262

    
263
-- | Closes the client socket.
264
closeClient :: Client -> IO ()
265
closeClient = hClose . socket
266

    
267
-- | Sends a message over a luxi transport.
268
sendMsg :: Client -> String -> IO ()
269
sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
270
  let encoded = UTF8.fromString buf
271
      handle = socket s
272
  B.hPut handle encoded
273
  B.hPut handle bEOM
274
  hFlush handle
275

    
276
-- | Given a current buffer and the handle, it will read from the
277
-- network until we get a full message, and it will return that
278
-- message and the leftover buffer contents.
279
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
280
recvUpdate handle obuf = do
281
  nbuf <- withTimeout queryTimeout "reading luxi response" $ do
282
            _ <- hWaitForInput handle (-1)
283
            B.hGetNonBlocking handle 4096
284
  let (msg, remaining) = B.break (eOM ==) nbuf
285
      newbuf = B.append obuf msg
286
  if B.null remaining
287
    then recvUpdate handle newbuf
288
    else return (newbuf, B.tail remaining)
289

    
290
-- | Waits for a message over a luxi transport.
291
recvMsg :: Client -> IO String
292
recvMsg s = do
293
  cbuf <- readIORef $ rbuf s
294
  let (imsg, ibuf) = B.break (eOM ==) cbuf
295
  (msg, nbuf) <-
296
    if B.null ibuf      -- if old buffer didn't contain a full message
297
      then recvUpdate (socket s) cbuf   -- then we read from network
298
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
299
  writeIORef (rbuf s) nbuf
300
  return $ UTF8.toString msg
301

    
302
-- | Extended wrapper over recvMsg.
303
recvMsgExt :: Client -> IO RecvResult
304
recvMsgExt s =
305
  catch (liftM RecvOk (recvMsg s)) $ \e ->
306
    if isEOFError e
307
      then return RecvConnClosed
308
      else return $ RecvError (show e)
309

    
310
-- | Serialize a request to String.
311
buildCall :: LuxiOp  -- ^ The method
312
          -> String  -- ^ The serialized form
313
buildCall lo =
314
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
315
           , (strOfKey Args, opToArgs lo)
316
           ]
317
      jo = toJSObject ja
318
  in encodeStrict jo
319

    
320
-- | Serialize the response to String.
321
buildResponse :: Bool    -- ^ Success
322
              -> JSValue -- ^ The arguments
323
              -> String  -- ^ The serialized form
324
buildResponse success args =
325
  let ja = [ (strOfKey Success, JSBool success)
326
           , (strOfKey Result, args)]
327
      jo = toJSObject ja
328
  in encodeStrict jo
329

    
330
-- | Check that luxi request contains the required keys and parse it.
331
validateCall :: String -> Result LuxiCall
332
validateCall s = do
333
  arr <- fromJResult "parsing top-level luxi message" $
334
         decodeStrict s::Result (JSObject JSValue)
335
  let aobj = fromJSObject arr
336
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
337
  args <- fromObj aobj (strOfKey Args)
338
  return (LuxiCall call args)
339

    
340
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
341
--
342
-- This is currently hand-coded until we make it more uniform so that
343
-- it can be generated using TH.
344
decodeCall :: LuxiCall -> Result LuxiOp
345
decodeCall (LuxiCall call args) =
346
  case call of
347
    ReqQueryJobs -> do
348
              (jid, jargs) <- fromJVal args
349
              rid <- mapM parseJobId jid
350
              let rargs = map fromJSString jargs
351
              return $ QueryJobs rid rargs
352
    ReqQueryInstances -> do
353
              (names, fields, locking) <- fromJVal args
354
              return $ QueryInstances names fields locking
355
    ReqQueryNodes -> do
356
              (names, fields, locking) <- fromJVal args
357
              return $ QueryNodes names fields locking
358
    ReqQueryGroups -> do
359
              (names, fields, locking) <- fromJVal args
360
              return $ QueryGroups names fields locking
361
    ReqQueryClusterInfo -> do
362
              return QueryClusterInfo
363
    ReqQuery -> do
364
              (what, fields, qfilter) <- fromJVal args
365
              return $ Query what fields qfilter
366
    ReqSubmitJob -> do
367
              [ops1] <- fromJVal args
368
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
369
              return $ SubmitJob ops2
370
    ReqSubmitManyJobs -> do
371
              [ops1] <- fromJVal args
372
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
373
              return $ SubmitManyJobs ops2
374
    ReqWaitForJobChange -> do
375
              (jid, fields, pinfo, pidx, wtmout) <-
376
                -- No instance for 5-tuple, code copied from the
377
                -- json sources and adapted
378
                fromJResult "Parsing WaitForJobChange message" $
379
                case args of
380
                  JSArray [a, b, c, d, e] ->
381
                    (,,,,) `fmap`
382
                    J.readJSON a `ap`
383
                    J.readJSON b `ap`
384
                    J.readJSON c `ap`
385
                    J.readJSON d `ap`
386
                    J.readJSON e
387
                  _ -> J.Error "Not enough values"
388
              rid <- parseJobId jid
389
              return $ WaitForJobChange rid fields pinfo pidx wtmout
390
    ReqArchiveJob -> do
391
              [jid] <- fromJVal args
392
              rid <- parseJobId jid
393
              return $ ArchiveJob rid
394
    ReqAutoArchiveJobs -> do
395
              (age, tmout) <- fromJVal args
396
              return $ AutoArchiveJobs age tmout
397
    ReqQueryExports -> do
398
              (nodes, lock) <- fromJVal args
399
              return $ QueryExports nodes lock
400
    ReqQueryConfigValues -> do
401
              [fields] <- fromJVal args
402
              return $ QueryConfigValues fields
403
    ReqQueryTags -> do
404
              (kind, name) <- fromJVal args
405
              return $ QueryTags kind name
406
    ReqCancelJob -> do
407
              [job] <- fromJVal args
408
              rid <- parseJobId job
409
              return $ CancelJob rid
410
    ReqSetDrainFlag -> do
411
              [flag] <- fromJVal args
412
              return $ SetDrainFlag flag
413
    ReqSetWatcherPause -> do
414
              [duration] <- fromJVal args
415
              return $ SetWatcherPause duration
416

    
417
-- | Check that luxi responses contain the required keys and that the
418
-- call was successful.
419
validateResult :: String -> Result JSValue
420
validateResult s = do
421
  when (UTF8.replacement_char `elem` s) $
422
       fail "Failed to decode UTF-8, detected replacement char after decoding"
423
  oarr <- fromJResult "Parsing LUXI response"
424
          (decodeStrict s)::Result (JSObject JSValue)
425
  let arr = J.fromJSObject oarr
426
  status <- fromObj arr (strOfKey Success)::Result Bool
427
  let rkey = strOfKey Result
428
  if status
429
    then fromObj arr rkey
430
    else fromObj arr rkey >>= fail
431

    
432
-- | Generic luxi method call.
433
callMethod :: LuxiOp -> Client -> IO (Result JSValue)
434
callMethod method s = do
435
  sendMsg s $ buildCall method
436
  result <- recvMsg s
437
  let rval = validateResult result
438
  return rval
439

    
440
-- | Parses a job ID.
441
parseJobId :: JSValue -> Result JobId
442
parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
443
parseJobId (JSRational _ x) =
444
  if denominator x /= 1
445
    then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
446
    -- FIXME: potential integer overflow here on 32-bit platforms
447
    else Ok . fromIntegral . numerator $ x
448
parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
449

    
450
-- | Parse job submission result.
451
parseSubmitJobResult :: JSValue -> Result JobId
452
parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v
453
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
454
  Bad (fromJSString x)
455
parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++
456
                         show v
457

    
458
-- | Specialized submitManyJobs call.
459
submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId])
460
submitManyJobs s jobs = do
461
  rval <- callMethod (SubmitManyJobs jobs) s
462
  -- map each result (status, payload) pair into a nice Result ADT
463
  return $ case rval of
464
             Bad x -> Bad x
465
             Ok (JSArray r) -> mapM parseSubmitJobResult r
466
             x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
467

    
468
-- | Custom queryJobs call.
469
queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
470
queryJobsStatus s jids = do
471
  rval <- callMethod (QueryJobs jids ["status"]) s
472
  return $ case rval of
473
             Bad x -> Bad x
474
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
475
                       J.Ok vals -> if any null vals
476
                                    then Bad "Missing job status field"
477
                                    else Ok (map head vals)
478
                       J.Error x -> Bad x