Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Luxi.hs @ 795d035d

History | View | Annotate | Download (15.6 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , LuxiReq(..)
31
  , Client
32
  , JobId
33
  , fromJobId
34
  , makeJobId
35
  , RecvResult(..)
36
  , strOfOp
37
  , getClient
38
  , getServer
39
  , acceptClient
40
  , closeClient
41
  , closeServer
42
  , callMethod
43
  , submitManyJobs
44
  , queryJobsStatus
45
  , buildCall
46
  , buildResponse
47
  , validateCall
48
  , decodeCall
49
  , recvMsg
50
  , recvMsgExt
51
  , sendMsg
52
  , allLuxiCalls
53
  ) where
54

    
55
import Control.Exception (catch)
56
import Data.IORef
57
import qualified Data.ByteString as B
58
import qualified Data.ByteString.Lazy as BL
59
import qualified Data.ByteString.UTF8 as UTF8
60
import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61
import Data.Word (Word8)
62
import Control.Monad
63
import Text.JSON (encodeStrict, decodeStrict)
64
import qualified Text.JSON as J
65
import Text.JSON.Pretty (pp_value)
66
import Text.JSON.Types
67
import System.Directory (removeFile)
68
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69
import System.IO.Error (isEOFError)
70
import System.Timeout
71
import qualified Network.Socket as S
72

    
73
import Ganeti.BasicTypes
74
import Ganeti.Constants
75
import Ganeti.Errors
76
import Ganeti.JSON
77
import Ganeti.OpParams (pTagsObject)
78
import Ganeti.OpCodes
79
import qualified Ganeti.Query.Language as Qlang
80
import Ganeti.THH
81
import Ganeti.Types
82

    
83
-- * Utility functions
84

    
85
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
86
withTimeout :: Int -> String -> IO a -> IO a
87
withTimeout secs descr action = do
88
  result <- timeout (secs * 1000000) action
89
  case result of
90
    Nothing -> fail $ "Timeout in " ++ descr
91
    Just v -> return v
92

    
93
-- * Generic protocol functionality
94

    
95
-- | Result of receiving a message from the socket.
96
data RecvResult = RecvConnClosed    -- ^ Connection closed
97
                | RecvError String  -- ^ Any other error
98
                | RecvOk String     -- ^ Successfull receive
99
                  deriving (Show, Eq)
100

    
101
-- | Currently supported Luxi operations and JSON serialization.
102
$(genLuxiOp "LuxiOp"
103
  [ (luxiReqQuery,
104
    [ simpleField "what"    [t| Qlang.ItemType |]
105
    , simpleField "fields"  [t| [String]  |]
106
    , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
107
    ])
108
  , (luxiReqQueryFields,
109
    [ simpleField "what"    [t| Qlang.ItemType |]
110
    , simpleField "fields"  [t| [String]  |]
111
    ])
112
  , (luxiReqQueryNodes,
113
     [ simpleField "names"  [t| [String] |]
114
     , simpleField "fields" [t| [String] |]
115
     , simpleField "lock"   [t| Bool     |]
116
     ])
117
  , (luxiReqQueryGroups,
118
     [ simpleField "names"  [t| [String] |]
119
     , simpleField "fields" [t| [String] |]
120
     , simpleField "lock"   [t| Bool     |]
121
     ])
122
  , (luxiReqQueryNetworks,
123
     [ simpleField "names"  [t| [String] |]
124
     , simpleField "fields" [t| [String] |]
125
     , simpleField "lock"   [t| Bool     |]
126
     ])
127
  , (luxiReqQueryInstances,
128
     [ simpleField "names"  [t| [String] |]
129
     , simpleField "fields" [t| [String] |]
130
     , simpleField "lock"   [t| Bool     |]
131
     ])
132
  , (luxiReqQueryJobs,
133
     [ simpleField "ids"    [t| [JobId]  |]
134
     , simpleField "fields" [t| [String] |]
135
     ])
136
  , (luxiReqQueryExports,
137
     [ simpleField "nodes" [t| [String] |]
138
     , simpleField "lock"  [t| Bool     |]
139
     ])
140
  , (luxiReqQueryConfigValues,
141
     [ simpleField "fields" [t| [String] |] ]
142
    )
143
  , (luxiReqQueryClusterInfo, [])
144
  , (luxiReqQueryTags,
145
     [ pTagsObject ])
146
  , (luxiReqSubmitJob,
147
     [ simpleField "job" [t| [MetaOpCode] |] ]
148
    )
149
  , (luxiReqSubmitManyJobs,
150
     [ simpleField "ops" [t| [[MetaOpCode]] |] ]
151
    )
152
  , (luxiReqWaitForJobChange,
153
     [ simpleField "job"      [t| JobId   |]
154
     , simpleField "fields"   [t| [String]|]
155
     , simpleField "prev_job" [t| JSValue |]
156
     , simpleField "prev_log" [t| JSValue |]
157
     , simpleField "tmout"    [t| Int     |]
158
     ])
159
  , (luxiReqArchiveJob,
160
     [ simpleField "job" [t| JobId |] ]
161
    )
162
  , (luxiReqAutoArchiveJobs,
163
     [ simpleField "age"   [t| Int |]
164
     , simpleField "tmout" [t| Int |]
165
     ])
166
  , (luxiReqCancelJob,
167
     [ simpleField "job" [t| JobId |] ]
168
    )
169
  , (luxiReqChangeJobPriority,
170
     [ simpleField "job"      [t| JobId |]
171
     , simpleField "priority" [t| Int |] ]
172
    )
173
  , (luxiReqSetDrainFlag,
174
     [ simpleField "flag" [t| Bool |] ]
175
    )
176
  , (luxiReqSetWatcherPause,
177
     [ simpleField "duration" [t| Double |] ]
178
    )
179
  ])
180

    
181
$(makeJSONInstance ''LuxiReq)
182

    
183
-- | List of all defined Luxi calls.
184
$(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
185

    
186
-- | The serialisation of LuxiOps into strings in messages.
187
$(genStrOfOp ''LuxiOp "strOfOp")
188

    
189
-- | Type holding the initial (unparsed) Luxi call.
190
data LuxiCall = LuxiCall LuxiReq JSValue
191

    
192
-- | The end-of-message separator.
193
eOM :: Word8
194
eOM = 3
195

    
196
-- | The end-of-message encoded as a ByteString.
197
bEOM :: B.ByteString
198
bEOM = B.singleton eOM
199

    
200
-- | Valid keys in the requests and responses.
201
data MsgKeys = Method
202
             | Args
203
             | Success
204
             | Result
205

    
206
-- | The serialisation of MsgKeys into strings in messages.
207
$(genStrOfKey ''MsgKeys "strOfKey")
208

    
209
-- | Luxi client encapsulation.
210
data Client = Client { socket :: Handle           -- ^ The socket of the client
211
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
212
                     }
213

    
214
-- | Connects to the master daemon and returns a luxi Client.
215
getClient :: String -> IO Client
216
getClient path = do
217
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
218
  withTimeout luxiDefCtmo "creating luxi connection" $
219
              S.connect s (S.SockAddrUnix path)
220
  rf <- newIORef B.empty
221
  h <- S.socketToHandle s ReadWriteMode
222
  return Client { socket=h, rbuf=rf }
223

    
224
-- | Creates and returns a server endpoint.
225
getServer :: FilePath -> IO S.Socket
226
getServer path = do
227
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
228
  S.bindSocket s (S.SockAddrUnix path)
229
  S.listen s 5 -- 5 is the max backlog
230
  return s
231

    
232
-- | Closes a server endpoint.
233
-- FIXME: this should be encapsulated into a nicer type.
234
closeServer :: FilePath -> S.Socket -> IO ()
235
closeServer path sock = do
236
  S.sClose sock
237
  removeFile path
238

    
239
-- | Accepts a client
240
acceptClient :: S.Socket -> IO Client
241
acceptClient s = do
242
  -- second return is the address of the client, which we ignore here
243
  (client_socket, _) <- S.accept s
244
  new_buffer <- newIORef B.empty
245
  handle <- S.socketToHandle client_socket ReadWriteMode
246
  return Client { socket=handle, rbuf=new_buffer }
247

    
248
-- | Closes the client socket.
249
closeClient :: Client -> IO ()
250
closeClient = hClose . socket
251

    
252
-- | Sends a message over a luxi transport.
253
sendMsg :: Client -> String -> IO ()
254
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
255
  let encoded = UTF8L.fromString buf
256
      handle = socket s
257
  BL.hPut handle encoded
258
  B.hPut handle bEOM
259
  hFlush handle
260

    
261
-- | Given a current buffer and the handle, it will read from the
262
-- network until we get a full message, and it will return that
263
-- message and the leftover buffer contents.
264
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
265
recvUpdate handle obuf = do
266
  nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
267
            _ <- hWaitForInput handle (-1)
268
            B.hGetNonBlocking handle 4096
269
  let (msg, remaining) = B.break (eOM ==) nbuf
270
      newbuf = B.append obuf msg
271
  if B.null remaining
272
    then recvUpdate handle newbuf
273
    else return (newbuf, B.tail remaining)
274

    
275
-- | Waits for a message over a luxi transport.
276
recvMsg :: Client -> IO String
277
recvMsg s = do
278
  cbuf <- readIORef $ rbuf s
279
  let (imsg, ibuf) = B.break (eOM ==) cbuf
280
  (msg, nbuf) <-
281
    if B.null ibuf      -- if old buffer didn't contain a full message
282
      then recvUpdate (socket s) cbuf   -- then we read from network
283
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
284
  writeIORef (rbuf s) nbuf
285
  return $ UTF8.toString msg
286

    
287
-- | Extended wrapper over recvMsg.
288
recvMsgExt :: Client -> IO RecvResult
289
recvMsgExt s =
290
  Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
291
    return $ if isEOFError e
292
               then RecvConnClosed
293
               else RecvError (show e)
294

    
295
-- | Serialize a request to String.
296
buildCall :: LuxiOp  -- ^ The method
297
          -> String  -- ^ The serialized form
298
buildCall lo =
299
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
300
           , (strOfKey Args, opToArgs lo)
301
           ]
302
      jo = toJSObject ja
303
  in encodeStrict jo
304

    
305
-- | Serialize the response to String.
306
buildResponse :: Bool    -- ^ Success
307
              -> JSValue -- ^ The arguments
308
              -> String  -- ^ The serialized form
309
buildResponse success args =
310
  let ja = [ (strOfKey Success, JSBool success)
311
           , (strOfKey Result, args)]
312
      jo = toJSObject ja
313
  in encodeStrict jo
314

    
315
-- | Check that luxi request contains the required keys and parse it.
316
validateCall :: String -> Result LuxiCall
317
validateCall s = do
318
  arr <- fromJResult "parsing top-level luxi message" $
319
         decodeStrict s::Result (JSObject JSValue)
320
  let aobj = fromJSObject arr
321
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
322
  args <- fromObj aobj (strOfKey Args)
323
  return (LuxiCall call args)
324

    
325
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
326
--
327
-- This is currently hand-coded until we make it more uniform so that
328
-- it can be generated using TH.
329
decodeCall :: LuxiCall -> Result LuxiOp
330
decodeCall (LuxiCall call args) =
331
  case call of
332
    ReqQueryJobs -> do
333
              (jids, jargs) <- fromJVal args
334
              jids' <- case jids of
335
                         JSNull -> return []
336
                         _ -> fromJVal jids
337
              return $ QueryJobs jids' jargs
338
    ReqQueryInstances -> do
339
              (names, fields, locking) <- fromJVal args
340
              return $ QueryInstances names fields locking
341
    ReqQueryNodes -> do
342
              (names, fields, locking) <- fromJVal args
343
              return $ QueryNodes names fields locking
344
    ReqQueryGroups -> do
345
              (names, fields, locking) <- fromJVal args
346
              return $ QueryGroups names fields locking
347
    ReqQueryClusterInfo ->
348
              return QueryClusterInfo
349
    ReqQueryNetworks -> do
350
              (names, fields, locking) <- fromJVal args
351
              return $ QueryNetworks names fields locking
352
    ReqQuery -> do
353
              (what, fields, qfilter) <- fromJVal args
354
              return $ Query what fields qfilter
355
    ReqQueryFields -> do
356
              (what, fields) <- fromJVal args
357
              fields' <- case fields of
358
                           JSNull -> return []
359
                           _ -> fromJVal fields
360
              return $ QueryFields what fields'
361
    ReqSubmitJob -> do
362
              [ops1] <- fromJVal args
363
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
364
              return $ SubmitJob ops2
365
    ReqSubmitManyJobs -> do
366
              [ops1] <- fromJVal args
367
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
368
              return $ SubmitManyJobs ops2
369
    ReqWaitForJobChange -> do
370
              (jid, fields, pinfo, pidx, wtmout) <-
371
                -- No instance for 5-tuple, code copied from the
372
                -- json sources and adapted
373
                fromJResult "Parsing WaitForJobChange message" $
374
                case args of
375
                  JSArray [a, b, c, d, e] ->
376
                    (,,,,) `fmap`
377
                    J.readJSON a `ap`
378
                    J.readJSON b `ap`
379
                    J.readJSON c `ap`
380
                    J.readJSON d `ap`
381
                    J.readJSON e
382
                  _ -> J.Error "Not enough values"
383
              return $ WaitForJobChange jid fields pinfo pidx wtmout
384
    ReqArchiveJob -> do
385
              [jid] <- fromJVal args
386
              return $ ArchiveJob jid
387
    ReqAutoArchiveJobs -> do
388
              (age, tmout) <- fromJVal args
389
              return $ AutoArchiveJobs age tmout
390
    ReqQueryExports -> do
391
              (nodes, lock) <- fromJVal args
392
              return $ QueryExports nodes lock
393
    ReqQueryConfigValues -> do
394
              [fields] <- fromJVal args
395
              return $ QueryConfigValues fields
396
    ReqQueryTags -> do
397
              (kind, name) <- fromJVal args
398
              item <- tagObjectFrom kind name
399
              return $ QueryTags item
400
    ReqCancelJob -> do
401
              [jid] <- fromJVal args
402
              return $ CancelJob jid
403
    ReqChangeJobPriority -> do
404
              (jid, priority) <- fromJVal args
405
              return $ ChangeJobPriority jid priority
406
    ReqSetDrainFlag -> do
407
              [flag] <- fromJVal args
408
              return $ SetDrainFlag flag
409
    ReqSetWatcherPause -> do
410
              [duration] <- fromJVal args
411
              return $ SetWatcherPause duration
412

    
413
-- | Check that luxi responses contain the required keys and that the
414
-- call was successful.
415
validateResult :: String -> ErrorResult JSValue
416
validateResult s = do
417
  when (UTF8.replacement_char `elem` s) $
418
       fail "Failed to decode UTF-8, detected replacement char after decoding"
419
  oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
420
  let arr = J.fromJSObject oarr
421
  status <- fromObj arr (strOfKey Success)
422
  result <- fromObj arr (strOfKey Result)
423
  if status
424
    then return result
425
    else decodeError result
426

    
427
-- | Try to decode an error from the server response. This function
428
-- will always fail, since it's called only on the error path (when
429
-- status is False).
430
decodeError :: JSValue -> ErrorResult JSValue
431
decodeError val =
432
  case fromJVal val of
433
    Ok e -> Bad e
434
    Bad msg -> Bad $ GenericError msg
435

    
436
-- | Generic luxi method call.
437
callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
438
callMethod method s = do
439
  sendMsg s $ buildCall method
440
  result <- recvMsg s
441
  let rval = validateResult result
442
  return rval
443

    
444
-- | Parse job submission result.
445
parseSubmitJobResult :: JSValue -> ErrorResult JobId
446
parseSubmitJobResult (JSArray [JSBool True, v]) =
447
  case J.readJSON v of
448
    J.Error msg -> Bad $ LuxiError msg
449
    J.Ok v' -> Ok v'
450
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
451
  Bad . LuxiError $ fromJSString x
452
parseSubmitJobResult v =
453
  Bad . LuxiError $ "Unknown result from the master daemon: " ++
454
      show (pp_value v)
455

    
456
-- | Specialized submitManyJobs call.
457
submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
458
submitManyJobs s jobs = do
459
  rval <- callMethod (SubmitManyJobs jobs) s
460
  -- map each result (status, payload) pair into a nice Result ADT
461
  return $ case rval of
462
             Bad x -> Bad x
463
             Ok (JSArray r) -> mapM parseSubmitJobResult r
464
             x -> Bad . LuxiError $
465
                  "Cannot parse response from Ganeti: " ++ show x
466

    
467
-- | Custom queryJobs call.
468
queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
469
queryJobsStatus s jids = do
470
  rval <- callMethod (QueryJobs jids ["status"]) s
471
  return $ case rval of
472
             Bad x -> Bad x
473
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
474
                       J.Ok vals -> if any null vals
475
                                    then Bad $
476
                                         LuxiError "Missing job status field"
477
                                    else Ok (map head vals)
478
                       J.Error x -> Bad $ LuxiError x