Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Luxi.hs @ 55c87175

History | View | Annotate | Download (16.2 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , LuxiReq(..)
31
  , Client
32
  , JobId
33
  , fromJobId
34
  , makeJobId
35
  , RecvResult(..)
36
  , strOfOp
37
  , getClient
38
  , getServer
39
  , acceptClient
40
  , closeClient
41
  , closeServer
42
  , callMethod
43
  , submitManyJobs
44
  , queryJobsStatus
45
  , buildCall
46
  , buildResponse
47
  , validateCall
48
  , decodeCall
49
  , recvMsg
50
  , recvMsgExt
51
  , sendMsg
52
  , allLuxiCalls
53
  ) where
54

    
55
import Control.Exception (catch)
56
import Data.IORef
57
import qualified Data.ByteString as B
58
import qualified Data.ByteString.Lazy as BL
59
import qualified Data.ByteString.UTF8 as UTF8
60
import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61
import Data.Word (Word8)
62
import Control.Monad
63
import Text.JSON (encodeStrict, decodeStrict)
64
import qualified Text.JSON as J
65
import Text.JSON.Pretty (pp_value)
66
import Text.JSON.Types
67
import System.Directory (removeFile)
68
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69
import System.IO.Error (isEOFError)
70
import System.Timeout
71
import qualified Network.Socket as S
72

    
73
import Ganeti.BasicTypes
74
import Ganeti.Constants
75
import Ganeti.Errors
76
import Ganeti.JSON
77
import Ganeti.OpParams (pTagsObject)
78
import Ganeti.OpCodes
79
import qualified Ganeti.Query.Language as Qlang
80
import Ganeti.Runtime (GanetiDaemon(..), MiscGroup(..), GanetiGroup(..))
81
import Ganeti.THH
82
import Ganeti.Types
83
import Ganeti.Utils
84

    
85
-- * Utility functions
86

    
87
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
88
withTimeout :: Int -> String -> IO a -> IO a
89
withTimeout secs descr action = do
90
  result <- timeout (secs * 1000000) action
91
  case result of
92
    Nothing -> fail $ "Timeout in " ++ descr
93
    Just v -> return v
94

    
95
-- * Generic protocol functionality
96

    
97
-- | Result of receiving a message from the socket.
98
data RecvResult = RecvConnClosed    -- ^ Connection closed
99
                | RecvError String  -- ^ Any other error
100
                | RecvOk String     -- ^ Successfull receive
101
                  deriving (Show, Eq)
102

    
103
-- | Currently supported Luxi operations and JSON serialization.
104
$(genLuxiOp "LuxiOp"
105
  [ (luxiReqQuery,
106
    [ simpleField "what"    [t| Qlang.ItemType |]
107
    , simpleField "fields"  [t| [String]  |]
108
    , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
109
    ])
110
  , (luxiReqQueryFields,
111
    [ simpleField "what"    [t| Qlang.ItemType |]
112
    , simpleField "fields"  [t| [String]  |]
113
    ])
114
  , (luxiReqQueryNodes,
115
     [ simpleField "names"  [t| [String] |]
116
     , simpleField "fields" [t| [String] |]
117
     , simpleField "lock"   [t| Bool     |]
118
     ])
119
  , (luxiReqQueryGroups,
120
     [ simpleField "names"  [t| [String] |]
121
     , simpleField "fields" [t| [String] |]
122
     , simpleField "lock"   [t| Bool     |]
123
     ])
124
  , (luxiReqQueryNetworks,
125
     [ simpleField "names"  [t| [String] |]
126
     , simpleField "fields" [t| [String] |]
127
     , simpleField "lock"   [t| Bool     |]
128
     ])
129
  , (luxiReqQueryInstances,
130
     [ simpleField "names"  [t| [String] |]
131
     , simpleField "fields" [t| [String] |]
132
     , simpleField "lock"   [t| Bool     |]
133
     ])
134
  , (luxiReqQueryJobs,
135
     [ simpleField "ids"    [t| [JobId]  |]
136
     , simpleField "fields" [t| [String] |]
137
     ])
138
  , (luxiReqQueryExports,
139
     [ simpleField "nodes" [t| [String] |]
140
     , simpleField "lock"  [t| Bool     |]
141
     ])
142
  , (luxiReqQueryConfigValues,
143
     [ simpleField "fields" [t| [String] |] ]
144
    )
145
  , (luxiReqQueryClusterInfo, [])
146
  , (luxiReqQueryTags,
147
     [ pTagsObject 
148
     , simpleField "name" [t| String |]
149
     ])
150
  , (luxiReqSubmitJob,
151
     [ simpleField "job" [t| [MetaOpCode] |] ]
152
    )
153
  , (luxiReqSubmitJobToDrainedQueue,
154
     [ simpleField "job" [t| [MetaOpCode] |] ]
155
    )
156
  , (luxiReqSubmitManyJobs,
157
     [ simpleField "ops" [t| [[MetaOpCode]] |] ]
158
    )
159
  , (luxiReqWaitForJobChange,
160
     [ simpleField "job"      [t| JobId   |]
161
     , simpleField "fields"   [t| [String]|]
162
     , simpleField "prev_job" [t| JSValue |]
163
     , simpleField "prev_log" [t| JSValue |]
164
     , simpleField "tmout"    [t| Int     |]
165
     ])
166
  , (luxiReqPickupJob,
167
     [ simpleField "job" [t| JobId |] ]
168
    )
169
  , (luxiReqArchiveJob,
170
     [ simpleField "job" [t| JobId |] ]
171
    )
172
  , (luxiReqAutoArchiveJobs,
173
     [ simpleField "age"   [t| Int |]
174
     , simpleField "tmout" [t| Int |]
175
     ])
176
  , (luxiReqCancelJob,
177
     [ simpleField "job" [t| JobId |] ]
178
    )
179
  , (luxiReqChangeJobPriority,
180
     [ simpleField "job"      [t| JobId |]
181
     , simpleField "priority" [t| Int |] ]
182
    )
183
  , (luxiReqSetDrainFlag,
184
     [ simpleField "flag" [t| Bool |] ]
185
    )
186
  , (luxiReqSetWatcherPause,
187
     [ simpleField "duration" [t| Double |] ]
188
    )
189
  ])
190

    
191
$(makeJSONInstance ''LuxiReq)
192

    
193
-- | List of all defined Luxi calls.
194
$(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
195

    
196
-- | The serialisation of LuxiOps into strings in messages.
197
$(genStrOfOp ''LuxiOp "strOfOp")
198

    
199
-- | Type holding the initial (unparsed) Luxi call.
200
data LuxiCall = LuxiCall LuxiReq JSValue
201

    
202
-- | The end-of-message separator.
203
eOM :: Word8
204
eOM = 3
205

    
206
-- | The end-of-message encoded as a ByteString.
207
bEOM :: B.ByteString
208
bEOM = B.singleton eOM
209

    
210
-- | Valid keys in the requests and responses.
211
data MsgKeys = Method
212
             | Args
213
             | Success
214
             | Result
215

    
216
-- | The serialisation of MsgKeys into strings in messages.
217
$(genStrOfKey ''MsgKeys "strOfKey")
218

    
219
-- | Luxi client encapsulation.
220
data Client = Client { socket :: Handle           -- ^ The socket of the client
221
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
222
                     }
223

    
224
-- | Connects to the master daemon and returns a luxi Client.
225
getClient :: String -> IO Client
226
getClient path = do
227
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
228
  withTimeout luxiDefCtmo "creating luxi connection" $
229
              S.connect s (S.SockAddrUnix path)
230
  rf <- newIORef B.empty
231
  h <- S.socketToHandle s ReadWriteMode
232
  return Client { socket=h, rbuf=rf }
233

    
234
-- | Creates and returns a server endpoint.
235
getServer :: Bool -> FilePath -> IO S.Socket
236
getServer setOwner path = do
237
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
238
  S.bindSocket s (S.SockAddrUnix path)
239
  when setOwner . setOwnerAndGroupFromNames path GanetiLuxid $
240
    ExtraGroup DaemonsGroup
241
  S.listen s 5 -- 5 is the max backlog
242
  return s
243

    
244
-- | Closes a server endpoint.
245
-- FIXME: this should be encapsulated into a nicer type.
246
closeServer :: FilePath -> S.Socket -> IO ()
247
closeServer path sock = do
248
  S.sClose sock
249
  removeFile path
250

    
251
-- | Accepts a client
252
acceptClient :: S.Socket -> IO Client
253
acceptClient s = do
254
  -- second return is the address of the client, which we ignore here
255
  (client_socket, _) <- S.accept s
256
  new_buffer <- newIORef B.empty
257
  handle <- S.socketToHandle client_socket ReadWriteMode
258
  return Client { socket=handle, rbuf=new_buffer }
259

    
260
-- | Closes the client socket.
261
closeClient :: Client -> IO ()
262
closeClient = hClose . socket
263

    
264
-- | Sends a message over a luxi transport.
265
sendMsg :: Client -> String -> IO ()
266
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
267
  let encoded = UTF8L.fromString buf
268
      handle = socket s
269
  BL.hPut handle encoded
270
  B.hPut handle bEOM
271
  hFlush handle
272

    
273
-- | Given a current buffer and the handle, it will read from the
274
-- network until we get a full message, and it will return that
275
-- message and the leftover buffer contents.
276
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
277
recvUpdate handle obuf = do
278
  nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
279
            _ <- hWaitForInput handle (-1)
280
            B.hGetNonBlocking handle 4096
281
  let (msg, remaining) = B.break (eOM ==) nbuf
282
      newbuf = B.append obuf msg
283
  if B.null remaining
284
    then recvUpdate handle newbuf
285
    else return (newbuf, B.tail remaining)
286

    
287
-- | Waits for a message over a luxi transport.
288
recvMsg :: Client -> IO String
289
recvMsg s = do
290
  cbuf <- readIORef $ rbuf s
291
  let (imsg, ibuf) = B.break (eOM ==) cbuf
292
  (msg, nbuf) <-
293
    if B.null ibuf      -- if old buffer didn't contain a full message
294
      then recvUpdate (socket s) cbuf   -- then we read from network
295
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
296
  writeIORef (rbuf s) nbuf
297
  return $ UTF8.toString msg
298

    
299
-- | Extended wrapper over recvMsg.
300
recvMsgExt :: Client -> IO RecvResult
301
recvMsgExt s =
302
  Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
303
    return $ if isEOFError e
304
               then RecvConnClosed
305
               else RecvError (show e)
306

    
307
-- | Serialize a request to String.
308
buildCall :: LuxiOp  -- ^ The method
309
          -> String  -- ^ The serialized form
310
buildCall lo =
311
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
312
           , (strOfKey Args, opToArgs lo)
313
           ]
314
      jo = toJSObject ja
315
  in encodeStrict jo
316

    
317
-- | Serialize the response to String.
318
buildResponse :: Bool    -- ^ Success
319
              -> JSValue -- ^ The arguments
320
              -> String  -- ^ The serialized form
321
buildResponse success args =
322
  let ja = [ (strOfKey Success, JSBool success)
323
           , (strOfKey Result, args)]
324
      jo = toJSObject ja
325
  in encodeStrict jo
326

    
327
-- | Check that luxi request contains the required keys and parse it.
328
validateCall :: String -> Result LuxiCall
329
validateCall s = do
330
  arr <- fromJResult "parsing top-level luxi message" $
331
         decodeStrict s::Result (JSObject JSValue)
332
  let aobj = fromJSObject arr
333
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
334
  args <- fromObj aobj (strOfKey Args)
335
  return (LuxiCall call args)
336

    
337
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
338
--
339
-- This is currently hand-coded until we make it more uniform so that
340
-- it can be generated using TH.
341
decodeCall :: LuxiCall -> Result LuxiOp
342
decodeCall (LuxiCall call args) =
343
  case call of
344
    ReqQueryJobs -> do
345
              (jids, jargs) <- fromJVal args
346
              jids' <- case jids of
347
                         JSNull -> return []
348
                         _ -> fromJVal jids
349
              return $ QueryJobs jids' jargs
350
    ReqQueryInstances -> do
351
              (names, fields, locking) <- fromJVal args
352
              return $ QueryInstances names fields locking
353
    ReqQueryNodes -> do
354
              (names, fields, locking) <- fromJVal args
355
              return $ QueryNodes names fields locking
356
    ReqQueryGroups -> do
357
              (names, fields, locking) <- fromJVal args
358
              return $ QueryGroups names fields locking
359
    ReqQueryClusterInfo ->
360
              return QueryClusterInfo
361
    ReqQueryNetworks -> do
362
              (names, fields, locking) <- fromJVal args
363
              return $ QueryNetworks names fields locking
364
    ReqQuery -> do
365
              (what, fields, qfilter) <- fromJVal args
366
              return $ Query what fields qfilter
367
    ReqQueryFields -> do
368
              (what, fields) <- fromJVal args
369
              fields' <- case fields of
370
                           JSNull -> return []
371
                           _ -> fromJVal fields
372
              return $ QueryFields what fields'
373
    ReqSubmitJob -> do
374
              [ops1] <- fromJVal args
375
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
376
              return $ SubmitJob ops2
377
    ReqSubmitJobToDrainedQueue -> do
378
              [ops1] <- fromJVal args
379
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
380
              return $ SubmitJobToDrainedQueue ops2
381
    ReqSubmitManyJobs -> do
382
              [ops1] <- fromJVal args
383
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
384
              return $ SubmitManyJobs ops2
385
    ReqWaitForJobChange -> do
386
              (jid, fields, pinfo, pidx, wtmout) <-
387
                -- No instance for 5-tuple, code copied from the
388
                -- json sources and adapted
389
                fromJResult "Parsing WaitForJobChange message" $
390
                case args of
391
                  JSArray [a, b, c, d, e] ->
392
                    (,,,,) `fmap`
393
                    J.readJSON a `ap`
394
                    J.readJSON b `ap`
395
                    J.readJSON c `ap`
396
                    J.readJSON d `ap`
397
                    J.readJSON e
398
                  _ -> J.Error "Not enough values"
399
              return $ WaitForJobChange jid fields pinfo pidx wtmout
400
    ReqPickupJob -> do
401
              [jid] <- fromJVal args
402
              return $ PickupJob jid
403
    ReqArchiveJob -> do
404
              [jid] <- fromJVal args
405
              return $ ArchiveJob jid
406
    ReqAutoArchiveJobs -> do
407
              (age, tmout) <- fromJVal args
408
              return $ AutoArchiveJobs age tmout
409
    ReqQueryExports -> do
410
              (nodes, lock) <- fromJVal args
411
              return $ QueryExports nodes lock
412
    ReqQueryConfigValues -> do
413
              [fields] <- fromJVal args
414
              return $ QueryConfigValues fields
415
    ReqQueryTags -> do
416
              (kind, name) <- fromJVal args
417
              return $ QueryTags kind name
418
    ReqCancelJob -> do
419
              [jid] <- fromJVal args
420
              return $ CancelJob jid
421
    ReqChangeJobPriority -> do
422
              (jid, priority) <- fromJVal args
423
              return $ ChangeJobPriority jid priority
424
    ReqSetDrainFlag -> do
425
              [flag] <- fromJVal args
426
              return $ SetDrainFlag flag
427
    ReqSetWatcherPause -> do
428
              [duration] <- fromJVal args
429
              return $ SetWatcherPause duration
430

    
431
-- | Check that luxi responses contain the required keys and that the
432
-- call was successful.
433
validateResult :: String -> ErrorResult JSValue
434
validateResult s = do
435
  when (UTF8.replacement_char `elem` s) $
436
       fail "Failed to decode UTF-8, detected replacement char after decoding"
437
  oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
438
  let arr = J.fromJSObject oarr
439
  status <- fromObj arr (strOfKey Success)
440
  result <- fromObj arr (strOfKey Result)
441
  if status
442
    then return result
443
    else decodeError result
444

    
445
-- | Try to decode an error from the server response. This function
446
-- will always fail, since it's called only on the error path (when
447
-- status is False).
448
decodeError :: JSValue -> ErrorResult JSValue
449
decodeError val =
450
  case fromJVal val of
451
    Ok e -> Bad e
452
    Bad msg -> Bad $ GenericError msg
453

    
454
-- | Generic luxi method call.
455
callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
456
callMethod method s = do
457
  sendMsg s $ buildCall method
458
  result <- recvMsg s
459
  let rval = validateResult result
460
  return rval
461

    
462
-- | Parse job submission result.
463
parseSubmitJobResult :: JSValue -> ErrorResult JobId
464
parseSubmitJobResult (JSArray [JSBool True, v]) =
465
  case J.readJSON v of
466
    J.Error msg -> Bad $ LuxiError msg
467
    J.Ok v' -> Ok v'
468
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
469
  Bad . LuxiError $ fromJSString x
470
parseSubmitJobResult v =
471
  Bad . LuxiError $ "Unknown result from the master daemon: " ++
472
      show (pp_value v)
473

    
474
-- | Specialized submitManyJobs call.
475
submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
476
submitManyJobs s jobs = do
477
  rval <- callMethod (SubmitManyJobs jobs) s
478
  -- map each result (status, payload) pair into a nice Result ADT
479
  return $ case rval of
480
             Bad x -> Bad x
481
             Ok (JSArray r) -> mapM parseSubmitJobResult r
482
             x -> Bad . LuxiError $
483
                  "Cannot parse response from Ganeti: " ++ show x
484

    
485
-- | Custom queryJobs call.
486
queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
487
queryJobsStatus s jids = do
488
  rval <- callMethod (QueryJobs jids ["status"]) s
489
  return $ case rval of
490
             Bad x -> Bad x
491
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
492
                       J.Ok vals -> if any null vals
493
                                    then Bad $
494
                                         LuxiError "Missing job status field"
495
                                    else Ok (map head vals)
496
                       J.Error x -> Bad $ LuxiError x