Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Luxi.hs @ 3e02cd3c

History | View | Annotate | Download (15.3 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , LuxiReq(..)
31
  , Client
32
  , JobId
33
  , fromJobId
34
  , makeJobId
35
  , RecvResult(..)
36
  , strOfOp
37
  , getClient
38
  , getServer
39
  , acceptClient
40
  , closeClient
41
  , closeServer
42
  , callMethod
43
  , submitManyJobs
44
  , queryJobsStatus
45
  , buildCall
46
  , buildResponse
47
  , validateCall
48
  , decodeCall
49
  , recvMsg
50
  , recvMsgExt
51
  , sendMsg
52
  , allLuxiCalls
53
  ) where
54

    
55
import Control.Exception (catch)
56
import Data.IORef
57
import qualified Data.ByteString as B
58
import qualified Data.ByteString.Lazy as BL
59
import qualified Data.ByteString.UTF8 as UTF8
60
import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61
import Data.Word (Word8)
62
import Control.Monad
63
import Text.JSON (encodeStrict, decodeStrict)
64
import qualified Text.JSON as J
65
import Text.JSON.Pretty (pp_value)
66
import Text.JSON.Types
67
import System.Directory (removeFile)
68
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69
import System.IO.Error (isEOFError)
70
import System.Timeout
71
import qualified Network.Socket as S
72

    
73
import Ganeti.BasicTypes
74
import Ganeti.Constants
75
import Ganeti.Errors
76
import Ganeti.JSON
77
import Ganeti.OpParams (pTagsObject)
78
import Ganeti.OpCodes
79
import qualified Ganeti.Query.Language as Qlang
80
import Ganeti.THH
81
import Ganeti.Types
82

    
83
-- * Utility functions
84

    
85
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
86
withTimeout :: Int -> String -> IO a -> IO a
87
withTimeout secs descr action = do
88
  result <- timeout (secs * 1000000) action
89
  case result of
90
    Nothing -> fail $ "Timeout in " ++ descr
91
    Just v -> return v
92

    
93
-- * Generic protocol functionality
94

    
95
-- | Result of receiving a message from the socket.
96
data RecvResult = RecvConnClosed    -- ^ Connection closed
97
                | RecvError String  -- ^ Any other error
98
                | RecvOk String     -- ^ Successfull receive
99
                  deriving (Show, Eq)
100

    
101
-- | Currently supported Luxi operations and JSON serialization.
102
$(genLuxiOp "LuxiOp"
103
  [ (luxiReqQuery,
104
    [ simpleField "what"    [t| Qlang.ItemType |]
105
    , simpleField "fields"  [t| [String]  |]
106
    , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
107
    ])
108
  , (luxiReqQueryFields,
109
    [ simpleField "what"    [t| Qlang.ItemType |]
110
    , simpleField "fields"  [t| [String]  |]
111
    ])
112
  , (luxiReqQueryNodes,
113
     [ simpleField "names"  [t| [String] |]
114
     , simpleField "fields" [t| [String] |]
115
     , simpleField "lock"   [t| Bool     |]
116
     ])
117
  , (luxiReqQueryGroups,
118
     [ simpleField "names"  [t| [String] |]
119
     , simpleField "fields" [t| [String] |]
120
     , simpleField "lock"   [t| Bool     |]
121
     ])
122
  , (luxiReqQueryInstances,
123
     [ simpleField "names"  [t| [String] |]
124
     , simpleField "fields" [t| [String] |]
125
     , simpleField "lock"   [t| Bool     |]
126
     ])
127
  , (luxiReqQueryJobs,
128
     [ simpleField "ids"    [t| [JobId]  |]
129
     , simpleField "fields" [t| [String] |]
130
     ])
131
  , (luxiReqQueryExports,
132
     [ simpleField "nodes" [t| [String] |]
133
     , simpleField "lock"  [t| Bool     |]
134
     ])
135
  , (luxiReqQueryConfigValues,
136
     [ simpleField "fields" [t| [String] |] ]
137
    )
138
  , (luxiReqQueryClusterInfo, [])
139
  , (luxiReqQueryTags,
140
     [ pTagsObject ])
141
  , (luxiReqSubmitJob,
142
     [ simpleField "job" [t| [MetaOpCode] |] ]
143
    )
144
  , (luxiReqSubmitManyJobs,
145
     [ simpleField "ops" [t| [[MetaOpCode]] |] ]
146
    )
147
  , (luxiReqWaitForJobChange,
148
     [ simpleField "job"      [t| JobId   |]
149
     , simpleField "fields"   [t| [String]|]
150
     , simpleField "prev_job" [t| JSValue |]
151
     , simpleField "prev_log" [t| JSValue |]
152
     , simpleField "tmout"    [t| Int     |]
153
     ])
154
  , (luxiReqArchiveJob,
155
     [ simpleField "job" [t| JobId |] ]
156
    )
157
  , (luxiReqAutoArchiveJobs,
158
     [ simpleField "age"   [t| Int |]
159
     , simpleField "tmout" [t| Int |]
160
     ])
161
  , (luxiReqCancelJob,
162
     [ simpleField "job" [t| JobId |] ]
163
    )
164
  , (luxiReqChangeJobPriority,
165
     [ simpleField "job"      [t| JobId |]
166
     , simpleField "priority" [t| Int |] ]
167
    )
168
  , (luxiReqSetDrainFlag,
169
     [ simpleField "flag" [t| Bool |] ]
170
    )
171
  , (luxiReqSetWatcherPause,
172
     [ simpleField "duration" [t| Double |] ]
173
    )
174
  ])
175

    
176
$(makeJSONInstance ''LuxiReq)
177

    
178
-- | List of all defined Luxi calls.
179
$(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
180

    
181
-- | The serialisation of LuxiOps into strings in messages.
182
$(genStrOfOp ''LuxiOp "strOfOp")
183

    
184
-- | Type holding the initial (unparsed) Luxi call.
185
data LuxiCall = LuxiCall LuxiReq JSValue
186

    
187
-- | The end-of-message separator.
188
eOM :: Word8
189
eOM = 3
190

    
191
-- | The end-of-message encoded as a ByteString.
192
bEOM :: B.ByteString
193
bEOM = B.singleton eOM
194

    
195
-- | Valid keys in the requests and responses.
196
data MsgKeys = Method
197
             | Args
198
             | Success
199
             | Result
200

    
201
-- | The serialisation of MsgKeys into strings in messages.
202
$(genStrOfKey ''MsgKeys "strOfKey")
203

    
204
-- | Luxi client encapsulation.
205
data Client = Client { socket :: Handle           -- ^ The socket of the client
206
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
207
                     }
208

    
209
-- | Connects to the master daemon and returns a luxi Client.
210
getClient :: String -> IO Client
211
getClient path = do
212
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
213
  withTimeout luxiDefCtmo "creating luxi connection" $
214
              S.connect s (S.SockAddrUnix path)
215
  rf <- newIORef B.empty
216
  h <- S.socketToHandle s ReadWriteMode
217
  return Client { socket=h, rbuf=rf }
218

    
219
-- | Creates and returns a server endpoint.
220
getServer :: FilePath -> IO S.Socket
221
getServer path = do
222
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
223
  S.bindSocket s (S.SockAddrUnix path)
224
  S.listen s 5 -- 5 is the max backlog
225
  return s
226

    
227
-- | Closes a server endpoint.
228
-- FIXME: this should be encapsulated into a nicer type.
229
closeServer :: FilePath -> S.Socket -> IO ()
230
closeServer path sock = do
231
  S.sClose sock
232
  removeFile path
233

    
234
-- | Accepts a client
235
acceptClient :: S.Socket -> IO Client
236
acceptClient s = do
237
  -- second return is the address of the client, which we ignore here
238
  (client_socket, _) <- S.accept s
239
  new_buffer <- newIORef B.empty
240
  handle <- S.socketToHandle client_socket ReadWriteMode
241
  return Client { socket=handle, rbuf=new_buffer }
242

    
243
-- | Closes the client socket.
244
closeClient :: Client -> IO ()
245
closeClient = hClose . socket
246

    
247
-- | Sends a message over a luxi transport.
248
sendMsg :: Client -> String -> IO ()
249
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
250
  let encoded = UTF8L.fromString buf
251
      handle = socket s
252
  BL.hPut handle encoded
253
  B.hPut handle bEOM
254
  hFlush handle
255

    
256
-- | Given a current buffer and the handle, it will read from the
257
-- network until we get a full message, and it will return that
258
-- message and the leftover buffer contents.
259
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
260
recvUpdate handle obuf = do
261
  nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
262
            _ <- hWaitForInput handle (-1)
263
            B.hGetNonBlocking handle 4096
264
  let (msg, remaining) = B.break (eOM ==) nbuf
265
      newbuf = B.append obuf msg
266
  if B.null remaining
267
    then recvUpdate handle newbuf
268
    else return (newbuf, B.tail remaining)
269

    
270
-- | Waits for a message over a luxi transport.
271
recvMsg :: Client -> IO String
272
recvMsg s = do
273
  cbuf <- readIORef $ rbuf s
274
  let (imsg, ibuf) = B.break (eOM ==) cbuf
275
  (msg, nbuf) <-
276
    if B.null ibuf      -- if old buffer didn't contain a full message
277
      then recvUpdate (socket s) cbuf   -- then we read from network
278
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
279
  writeIORef (rbuf s) nbuf
280
  return $ UTF8.toString msg
281

    
282
-- | Extended wrapper over recvMsg.
283
recvMsgExt :: Client -> IO RecvResult
284
recvMsgExt s =
285
  Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
286
    return $ if isEOFError e
287
               then RecvConnClosed
288
               else RecvError (show e)
289

    
290
-- | Serialize a request to String.
291
buildCall :: LuxiOp  -- ^ The method
292
          -> String  -- ^ The serialized form
293
buildCall lo =
294
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
295
           , (strOfKey Args, opToArgs lo)
296
           ]
297
      jo = toJSObject ja
298
  in encodeStrict jo
299

    
300
-- | Serialize the response to String.
301
buildResponse :: Bool    -- ^ Success
302
              -> JSValue -- ^ The arguments
303
              -> String  -- ^ The serialized form
304
buildResponse success args =
305
  let ja = [ (strOfKey Success, JSBool success)
306
           , (strOfKey Result, args)]
307
      jo = toJSObject ja
308
  in encodeStrict jo
309

    
310
-- | Check that luxi request contains the required keys and parse it.
311
validateCall :: String -> Result LuxiCall
312
validateCall s = do
313
  arr <- fromJResult "parsing top-level luxi message" $
314
         decodeStrict s::Result (JSObject JSValue)
315
  let aobj = fromJSObject arr
316
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
317
  args <- fromObj aobj (strOfKey Args)
318
  return (LuxiCall call args)
319

    
320
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
321
--
322
-- This is currently hand-coded until we make it more uniform so that
323
-- it can be generated using TH.
324
decodeCall :: LuxiCall -> Result LuxiOp
325
decodeCall (LuxiCall call args) =
326
  case call of
327
    ReqQueryJobs -> do
328
              (jids, jargs) <- fromJVal args
329
              jids' <- case jids of
330
                         JSNull -> return []
331
                         _ -> fromJVal jids
332
              return $ QueryJobs jids' jargs
333
    ReqQueryInstances -> do
334
              (names, fields, locking) <- fromJVal args
335
              return $ QueryInstances names fields locking
336
    ReqQueryNodes -> do
337
              (names, fields, locking) <- fromJVal args
338
              return $ QueryNodes names fields locking
339
    ReqQueryGroups -> do
340
              (names, fields, locking) <- fromJVal args
341
              return $ QueryGroups names fields locking
342
    ReqQueryClusterInfo ->
343
              return QueryClusterInfo
344
    ReqQuery -> do
345
              (what, fields, qfilter) <- fromJVal args
346
              return $ Query what fields qfilter
347
    ReqQueryFields -> do
348
              (what, fields) <- fromJVal args
349
              fields' <- case fields of
350
                           JSNull -> return []
351
                           _ -> fromJVal fields
352
              return $ QueryFields what fields'
353
    ReqSubmitJob -> do
354
              [ops1] <- fromJVal args
355
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
356
              return $ SubmitJob ops2
357
    ReqSubmitManyJobs -> do
358
              [ops1] <- fromJVal args
359
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
360
              return $ SubmitManyJobs ops2
361
    ReqWaitForJobChange -> do
362
              (jid, fields, pinfo, pidx, wtmout) <-
363
                -- No instance for 5-tuple, code copied from the
364
                -- json sources and adapted
365
                fromJResult "Parsing WaitForJobChange message" $
366
                case args of
367
                  JSArray [a, b, c, d, e] ->
368
                    (,,,,) `fmap`
369
                    J.readJSON a `ap`
370
                    J.readJSON b `ap`
371
                    J.readJSON c `ap`
372
                    J.readJSON d `ap`
373
                    J.readJSON e
374
                  _ -> J.Error "Not enough values"
375
              return $ WaitForJobChange jid fields pinfo pidx wtmout
376
    ReqArchiveJob -> do
377
              [jid] <- fromJVal args
378
              return $ ArchiveJob jid
379
    ReqAutoArchiveJobs -> do
380
              (age, tmout) <- fromJVal args
381
              return $ AutoArchiveJobs age tmout
382
    ReqQueryExports -> do
383
              (nodes, lock) <- fromJVal args
384
              return $ QueryExports nodes lock
385
    ReqQueryConfigValues -> do
386
              [fields] <- fromJVal args
387
              return $ QueryConfigValues fields
388
    ReqQueryTags -> do
389
              (kind, name) <- fromJVal args
390
              item <- tagObjectFrom kind name
391
              return $ QueryTags item
392
    ReqCancelJob -> do
393
              [jid] <- fromJVal args
394
              return $ CancelJob jid
395
    ReqChangeJobPriority -> do
396
              (jid, priority) <- fromJVal args
397
              return $ ChangeJobPriority jid priority
398
    ReqSetDrainFlag -> do
399
              [flag] <- fromJVal args
400
              return $ SetDrainFlag flag
401
    ReqSetWatcherPause -> do
402
              [duration] <- fromJVal args
403
              return $ SetWatcherPause duration
404

    
405
-- | Check that luxi responses contain the required keys and that the
406
-- call was successful.
407
validateResult :: String -> ErrorResult JSValue
408
validateResult s = do
409
  when (UTF8.replacement_char `elem` s) $
410
       fail "Failed to decode UTF-8, detected replacement char after decoding"
411
  oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
412
  let arr = J.fromJSObject oarr
413
  status <- fromObj arr (strOfKey Success)
414
  result <- fromObj arr (strOfKey Result)
415
  if status
416
    then return result
417
    else decodeError result
418

    
419
-- | Try to decode an error from the server response. This function
420
-- will always fail, since it's called only on the error path (when
421
-- status is False).
422
decodeError :: JSValue -> ErrorResult JSValue
423
decodeError val =
424
  case fromJVal val of
425
    Ok e -> Bad e
426
    Bad msg -> Bad $ GenericError msg
427

    
428
-- | Generic luxi method call.
429
callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
430
callMethod method s = do
431
  sendMsg s $ buildCall method
432
  result <- recvMsg s
433
  let rval = validateResult result
434
  return rval
435

    
436
-- | Parse job submission result.
437
parseSubmitJobResult :: JSValue -> ErrorResult JobId
438
parseSubmitJobResult (JSArray [JSBool True, v]) =
439
  case J.readJSON v of
440
    J.Error msg -> Bad $ LuxiError msg
441
    J.Ok v' -> Ok v'
442
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
443
  Bad . LuxiError $ fromJSString x
444
parseSubmitJobResult v =
445
  Bad . LuxiError $ "Unknown result from the master daemon: " ++
446
      show (pp_value v)
447

    
448
-- | Specialized submitManyJobs call.
449
submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
450
submitManyJobs s jobs = do
451
  rval <- callMethod (SubmitManyJobs jobs) s
452
  -- map each result (status, payload) pair into a nice Result ADT
453
  return $ case rval of
454
             Bad x -> Bad x
455
             Ok (JSArray r) -> mapM parseSubmitJobResult r
456
             x -> Bad . LuxiError $
457
                  "Cannot parse response from Ganeti: " ++ show x
458

    
459
-- | Custom queryJobs call.
460
queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
461
queryJobsStatus s jids = do
462
  rval <- callMethod (QueryJobs jids ["status"]) s
463
  return $ case rval of
464
             Bad x -> Bad x
465
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
466
                       J.Ok vals -> if any null vals
467
                                    then Bad $
468
                                         LuxiError "Missing job status field"
469
                                    else Ok (map head vals)
470
                       J.Error x -> Bad $ LuxiError x