Statistics
| Branch: | Tag: | Revision:

root / htools / Ganeti / Luxi.hs @ 37904802

History | View | Annotate | Download (15.6 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , LuxiReq(..)
31
  , Client
32
  , JobId
33
  , RecvResult(..)
34
  , strOfOp
35
  , getClient
36
  , getServer
37
  , acceptClient
38
  , closeClient
39
  , closeServer
40
  , callMethod
41
  , submitManyJobs
42
  , queryJobsStatus
43
  , buildCall
44
  , buildResponse
45
  , validateCall
46
  , decodeCall
47
  , recvMsg
48
  , recvMsgExt
49
  , sendMsg
50
  , allLuxiCalls
51
  ) where
52

    
53
import Control.Exception (catch)
54
import Data.IORef
55
import Data.Ratio (numerator, denominator)
56
import qualified Data.ByteString as B
57
import qualified Data.ByteString.UTF8 as UTF8
58
import Data.Word (Word8)
59
import Control.Monad
60
import Text.JSON (encodeStrict, decodeStrict)
61
import qualified Text.JSON as J
62
import Text.JSON.Pretty (pp_value)
63
import Text.JSON.Types
64
import System.Directory (removeFile)
65
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
66
import System.IO.Error (isEOFError)
67
import System.Timeout
68
import qualified Network.Socket as S
69

    
70
import Ganeti.BasicTypes
71
import Ganeti.Constants
72
import Ganeti.Errors
73
import Ganeti.JSON
74
import Ganeti.Jobs (JobStatus)
75
import Ganeti.OpCodes
76
import Ganeti.Utils
77
import qualified Ganeti.Query.Language as Qlang
78
import Ganeti.THH
79

    
80
-- * Utility functions
81

    
82
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
83
withTimeout :: Int -> String -> IO a -> IO a
84
withTimeout secs descr action = do
85
  result <- timeout (secs * 1000000) action
86
  case result of
87
    Nothing -> fail $ "Timeout in " ++ descr
88
    Just v -> return v
89

    
90
-- * Generic protocol functionality
91

    
92
-- | Result of receiving a message from the socket.
93
data RecvResult = RecvConnClosed    -- ^ Connection closed
94
                | RecvError String  -- ^ Any other error
95
                | RecvOk String     -- ^ Successfull receive
96
                  deriving (Show, Read, Eq)
97

    
98
-- | The Ganeti job type.
99
type JobId = Int
100

    
101
-- | Currently supported Luxi operations and JSON serialization.
102
$(genLuxiOp "LuxiOp"
103
  [ (luxiReqQuery,
104
    [ simpleField "what"    [t| Qlang.ItemType |]
105
    , simpleField "fields"  [t| [String]  |]
106
    , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
107
    ])
108
  , (luxiReqQueryFields,
109
    [ simpleField "what"    [t| Qlang.ItemType |]
110
    , simpleField "fields"  [t| [String]  |]
111
    ])
112
  , (luxiReqQueryNodes,
113
     [ simpleField "names"  [t| [String] |]
114
     , simpleField "fields" [t| [String] |]
115
     , simpleField "lock"   [t| Bool     |]
116
     ])
117
  , (luxiReqQueryGroups,
118
     [ simpleField "names"  [t| [String] |]
119
     , simpleField "fields" [t| [String] |]
120
     , simpleField "lock"   [t| Bool     |]
121
     ])
122
  , (luxiReqQueryInstances,
123
     [ simpleField "names"  [t| [String] |]
124
     , simpleField "fields" [t| [String] |]
125
     , simpleField "lock"   [t| Bool     |]
126
     ])
127
  , (luxiReqQueryJobs,
128
     [ simpleField "ids"    [t| [Int]    |]
129
     , simpleField "fields" [t| [String] |]
130
     ])
131
  , (luxiReqQueryExports,
132
     [ simpleField "nodes" [t| [String] |]
133
     , simpleField "lock"  [t| Bool     |]
134
     ])
135
  , (luxiReqQueryConfigValues,
136
     [ simpleField "fields" [t| [String] |] ]
137
    )
138
  , (luxiReqQueryClusterInfo, [])
139
  , (luxiReqQueryTags,
140
     [ customField 'decodeTagObject 'encodeTagObject $
141
       simpleField "kind" [t| TagObject |]
142
     ])
143
  , (luxiReqSubmitJob,
144
     [ simpleField "job" [t| [OpCode] |] ]
145
    )
146
  , (luxiReqSubmitManyJobs,
147
     [ simpleField "ops" [t| [[OpCode]] |] ]
148
    )
149
  , (luxiReqWaitForJobChange,
150
     [ simpleField "job"      [t| Int     |]
151
     , simpleField "fields"   [t| [String]|]
152
     , simpleField "prev_job" [t| JSValue |]
153
     , simpleField "prev_log" [t| JSValue |]
154
     , simpleField "tmout"    [t| Int     |]
155
     ])
156
  , (luxiReqArchiveJob,
157
     [ simpleField "job" [t| Int |] ]
158
    )
159
  , (luxiReqAutoArchiveJobs,
160
     [ simpleField "age"   [t| Int |]
161
     , simpleField "tmout" [t| Int |]
162
     ])
163
  , (luxiReqCancelJob,
164
     [ simpleField "job" [t| Int |] ]
165
    )
166
  , (luxiReqSetDrainFlag,
167
     [ simpleField "flag" [t| Bool |] ]
168
    )
169
  , (luxiReqSetWatcherPause,
170
     [ simpleField "duration" [t| Double |] ]
171
    )
172
  ])
173

    
174
$(makeJSONInstance ''LuxiReq)
175

    
176
-- | List of all defined Luxi calls.
177
$(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
178

    
179
-- | The serialisation of LuxiOps into strings in messages.
180
$(genStrOfOp ''LuxiOp "strOfOp")
181

    
182
-- | Type holding the initial (unparsed) Luxi call.
183
data LuxiCall = LuxiCall LuxiReq JSValue
184

    
185
-- | The end-of-message separator.
186
eOM :: Word8
187
eOM = 3
188

    
189
-- | The end-of-message encoded as a ByteString.
190
bEOM :: B.ByteString
191
bEOM = B.singleton eOM
192

    
193
-- | Valid keys in the requests and responses.
194
data MsgKeys = Method
195
             | Args
196
             | Success
197
             | Result
198

    
199
-- | The serialisation of MsgKeys into strings in messages.
200
$(genStrOfKey ''MsgKeys "strOfKey")
201

    
202
-- | Luxi client encapsulation.
203
data Client = Client { socket :: Handle           -- ^ The socket of the client
204
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
205
                     }
206

    
207
-- | Connects to the master daemon and returns a luxi Client.
208
getClient :: String -> IO Client
209
getClient path = do
210
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
211
  withTimeout luxiDefCtmo "creating luxi connection" $
212
              S.connect s (S.SockAddrUnix path)
213
  rf <- newIORef B.empty
214
  h <- S.socketToHandle s ReadWriteMode
215
  return Client { socket=h, rbuf=rf }
216

    
217
-- | Creates and returns a server endpoint.
218
getServer :: FilePath -> IO S.Socket
219
getServer path = do
220
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
221
  S.bindSocket s (S.SockAddrUnix path)
222
  S.listen s 5 -- 5 is the max backlog
223
  return s
224

    
225
-- | Closes a server endpoint.
226
-- FIXME: this should be encapsulated into a nicer type.
227
closeServer :: FilePath -> S.Socket -> IO ()
228
closeServer path sock = do
229
  S.sClose sock
230
  removeFile path
231

    
232
-- | Accepts a client
233
acceptClient :: S.Socket -> IO Client
234
acceptClient s = do
235
  -- second return is the address of the client, which we ignore here
236
  (client_socket, _) <- S.accept s
237
  new_buffer <- newIORef B.empty
238
  handle <- S.socketToHandle client_socket ReadWriteMode
239
  return Client { socket=handle, rbuf=new_buffer }
240

    
241
-- | Closes the client socket.
242
closeClient :: Client -> IO ()
243
closeClient = hClose . socket
244

    
245
-- | Sends a message over a luxi transport.
246
sendMsg :: Client -> String -> IO ()
247
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
248
  let encoded = UTF8.fromString buf
249
      handle = socket s
250
  B.hPut handle encoded
251
  B.hPut handle bEOM
252
  hFlush handle
253

    
254
-- | Given a current buffer and the handle, it will read from the
255
-- network until we get a full message, and it will return that
256
-- message and the leftover buffer contents.
257
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
258
recvUpdate handle obuf = do
259
  nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
260
            _ <- hWaitForInput handle (-1)
261
            B.hGetNonBlocking handle 4096
262
  let (msg, remaining) = B.break (eOM ==) nbuf
263
      newbuf = B.append obuf msg
264
  if B.null remaining
265
    then recvUpdate handle newbuf
266
    else return (newbuf, B.tail remaining)
267

    
268
-- | Waits for a message over a luxi transport.
269
recvMsg :: Client -> IO String
270
recvMsg s = do
271
  cbuf <- readIORef $ rbuf s
272
  let (imsg, ibuf) = B.break (eOM ==) cbuf
273
  (msg, nbuf) <-
274
    if B.null ibuf      -- if old buffer didn't contain a full message
275
      then recvUpdate (socket s) cbuf   -- then we read from network
276
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
277
  writeIORef (rbuf s) nbuf
278
  return $ UTF8.toString msg
279

    
280
-- | Extended wrapper over recvMsg.
281
recvMsgExt :: Client -> IO RecvResult
282
recvMsgExt s =
283
  Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
284
    return $ if isEOFError e
285
               then RecvConnClosed
286
               else RecvError (show e)
287

    
288
-- | Serialize a request to String.
289
buildCall :: LuxiOp  -- ^ The method
290
          -> String  -- ^ The serialized form
291
buildCall lo =
292
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
293
           , (strOfKey Args, opToArgs lo)
294
           ]
295
      jo = toJSObject ja
296
  in encodeStrict jo
297

    
298
-- | Serialize the response to String.
299
buildResponse :: Bool    -- ^ Success
300
              -> JSValue -- ^ The arguments
301
              -> String  -- ^ The serialized form
302
buildResponse success args =
303
  let ja = [ (strOfKey Success, JSBool success)
304
           , (strOfKey Result, args)]
305
      jo = toJSObject ja
306
  in encodeStrict jo
307

    
308
-- | Check that luxi request contains the required keys and parse it.
309
validateCall :: String -> Result LuxiCall
310
validateCall s = do
311
  arr <- fromJResult "parsing top-level luxi message" $
312
         decodeStrict s::Result (JSObject JSValue)
313
  let aobj = fromJSObject arr
314
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
315
  args <- fromObj aobj (strOfKey Args)
316
  return (LuxiCall call args)
317

    
318
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
319
--
320
-- This is currently hand-coded until we make it more uniform so that
321
-- it can be generated using TH.
322
decodeCall :: LuxiCall -> Result LuxiOp
323
decodeCall (LuxiCall call args) =
324
  case call of
325
    ReqQueryJobs -> do
326
              (jid, jargs) <- fromJVal args
327
              rid <- mapM parseJobId jid
328
              let rargs = map fromJSString jargs
329
              return $ QueryJobs rid rargs
330
    ReqQueryInstances -> do
331
              (names, fields, locking) <- fromJVal args
332
              return $ QueryInstances names fields locking
333
    ReqQueryNodes -> do
334
              (names, fields, locking) <- fromJVal args
335
              return $ QueryNodes names fields locking
336
    ReqQueryGroups -> do
337
              (names, fields, locking) <- fromJVal args
338
              return $ QueryGroups names fields locking
339
    ReqQueryClusterInfo ->
340
              return QueryClusterInfo
341
    ReqQuery -> do
342
              (what, fields, qfilter) <- fromJVal args
343
              return $ Query what fields qfilter
344
    ReqQueryFields -> do
345
              (what, fields) <- fromJVal args
346
              fields' <- case fields of
347
                           JSNull -> return []
348
                           _ -> fromJVal fields
349
              return $ QueryFields what fields'
350
    ReqSubmitJob -> do
351
              [ops1] <- fromJVal args
352
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
353
              return $ SubmitJob ops2
354
    ReqSubmitManyJobs -> do
355
              [ops1] <- fromJVal args
356
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
357
              return $ SubmitManyJobs ops2
358
    ReqWaitForJobChange -> do
359
              (jid, fields, pinfo, pidx, wtmout) <-
360
                -- No instance for 5-tuple, code copied from the
361
                -- json sources and adapted
362
                fromJResult "Parsing WaitForJobChange message" $
363
                case args of
364
                  JSArray [a, b, c, d, e] ->
365
                    (,,,,) `fmap`
366
                    J.readJSON a `ap`
367
                    J.readJSON b `ap`
368
                    J.readJSON c `ap`
369
                    J.readJSON d `ap`
370
                    J.readJSON e
371
                  _ -> J.Error "Not enough values"
372
              rid <- parseJobId jid
373
              return $ WaitForJobChange rid fields pinfo pidx wtmout
374
    ReqArchiveJob -> do
375
              [jid] <- fromJVal args
376
              rid <- parseJobId jid
377
              return $ ArchiveJob rid
378
    ReqAutoArchiveJobs -> do
379
              (age, tmout) <- fromJVal args
380
              return $ AutoArchiveJobs age tmout
381
    ReqQueryExports -> do
382
              (nodes, lock) <- fromJVal args
383
              return $ QueryExports nodes lock
384
    ReqQueryConfigValues -> do
385
              [fields] <- fromJVal args
386
              return $ QueryConfigValues fields
387
    ReqQueryTags -> do
388
              (kind, name) <- fromJVal args
389
              item <- tagObjectFrom kind name
390
              return $ QueryTags item
391
    ReqCancelJob -> do
392
              [job] <- fromJVal args
393
              rid <- parseJobId job
394
              return $ CancelJob rid
395
    ReqSetDrainFlag -> do
396
              [flag] <- fromJVal args
397
              return $ SetDrainFlag flag
398
    ReqSetWatcherPause -> do
399
              [duration] <- fromJVal args
400
              return $ SetWatcherPause duration
401

    
402
-- | Check that luxi responses contain the required keys and that the
403
-- call was successful.
404
validateResult :: String -> ErrorResult JSValue
405
validateResult s = do
406
  when (UTF8.replacement_char `elem` s) $
407
       fail "Failed to decode UTF-8, detected replacement char after decoding"
408
  oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
409
  let arr = J.fromJSObject oarr
410
  status <- fromObj arr (strOfKey Success)
411
  result <- fromObj arr (strOfKey Result)
412
  if status
413
    then return result
414
    else decodeError result
415

    
416
-- | Try to decode an error from the server response. This function
417
-- will always fail, since it's called only on the error path (when
418
-- status is False).
419
decodeError :: JSValue -> ErrorResult JSValue
420
decodeError val =
421
  case fromJVal val of
422
    Ok e -> Bad e
423
    Bad msg -> Bad $ GenericError msg
424

    
425
-- | Generic luxi method call.
426
callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
427
callMethod method s = do
428
  sendMsg s $ buildCall method
429
  result <- recvMsg s
430
  let rval = validateResult result
431
  return rval
432

    
433
-- | Parses a job ID.
434
parseJobId :: JSValue -> Result JobId
435
parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
436
parseJobId (JSRational _ x) =
437
  if denominator x /= 1
438
    then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
439
    -- FIXME: potential integer overflow here on 32-bit platforms
440
    else Ok . fromIntegral . numerator $ x
441
parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
442

    
443
-- | Parse job submission result.
444
parseSubmitJobResult :: JSValue -> ErrorResult JobId
445
parseSubmitJobResult (JSArray [JSBool True, v]) =
446
  case parseJobId v of
447
    Bad msg -> Bad $ LuxiError msg
448
    Ok v' -> Ok v'
449
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
450
  Bad . LuxiError $ fromJSString x
451
parseSubmitJobResult v =
452
  Bad . LuxiError $ "Unknown result from the master daemon: " ++
453
      show (pp_value v)
454

    
455
-- | Specialized submitManyJobs call.
456
submitManyJobs :: Client -> [[OpCode]] -> IO (ErrorResult [JobId])
457
submitManyJobs s jobs = do
458
  rval <- callMethod (SubmitManyJobs jobs) s
459
  -- map each result (status, payload) pair into a nice Result ADT
460
  return $ case rval of
461
             Bad x -> Bad x
462
             Ok (JSArray r) -> mapM parseSubmitJobResult r
463
             x -> Bad . LuxiError $
464
                  "Cannot parse response from Ganeti: " ++ show x
465

    
466
-- | Custom queryJobs call.
467
queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
468
queryJobsStatus s jids = do
469
  rval <- callMethod (QueryJobs jids ["status"]) s
470
  return $ case rval of
471
             Bad x -> Bad x
472
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
473
                       J.Ok vals -> if any null vals
474
                                    then Bad $
475
                                         LuxiError "Missing job status field"
476
                                    else Ok (map head vals)
477
                       J.Error x -> Bad $ LuxiError x