Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Luxi.hs @ 346c3037

History | View | Annotate | Download (16.1 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , LuxiReq(..)
31
  , Client
32
  , JobId
33
  , fromJobId
34
  , makeJobId
35
  , RecvResult(..)
36
  , strOfOp
37
  , getClient
38
  , getServer
39
  , acceptClient
40
  , closeClient
41
  , closeServer
42
  , callMethod
43
  , submitManyJobs
44
  , queryJobsStatus
45
  , buildCall
46
  , buildResponse
47
  , validateCall
48
  , decodeCall
49
  , recvMsg
50
  , recvMsgExt
51
  , sendMsg
52
  , allLuxiCalls
53
  ) where
54

    
55
import Control.Exception (catch)
56
import Data.IORef
57
import qualified Data.ByteString as B
58
import qualified Data.ByteString.Lazy as BL
59
import qualified Data.ByteString.UTF8 as UTF8
60
import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61
import Data.Word (Word8)
62
import Control.Monad
63
import Text.JSON (encodeStrict, decodeStrict)
64
import qualified Text.JSON as J
65
import Text.JSON.Pretty (pp_value)
66
import Text.JSON.Types
67
import System.Directory (removeFile)
68
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69
import System.IO.Error (isEOFError)
70
import System.Timeout
71
import qualified Network.Socket as S
72

    
73
import Ganeti.BasicTypes
74
import Ganeti.Constants
75
import Ganeti.Errors
76
import Ganeti.JSON
77
import Ganeti.OpParams (pTagsObject)
78
import Ganeti.OpCodes
79
import qualified Ganeti.Query.Language as Qlang
80
import Ganeti.Runtime (GanetiDaemon(..), MiscGroup(..), GanetiGroup(..))
81
import Ganeti.THH
82
import Ganeti.Types
83
import Ganeti.Utils
84

    
85
-- * Utility functions
86

    
87
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
88
withTimeout :: Int -> String -> IO a -> IO a
89
withTimeout secs descr action = do
90
  result <- timeout (secs * 1000000) action
91
  case result of
92
    Nothing -> fail $ "Timeout in " ++ descr
93
    Just v -> return v
94

    
95
-- * Generic protocol functionality
96

    
97
-- | Result of receiving a message from the socket.
98
data RecvResult = RecvConnClosed    -- ^ Connection closed
99
                | RecvError String  -- ^ Any other error
100
                | RecvOk String     -- ^ Successfull receive
101
                  deriving (Show, Eq)
102

    
103
-- | Currently supported Luxi operations and JSON serialization.
104
$(genLuxiOp "LuxiOp"
105
  [ (luxiReqQuery,
106
    [ simpleField "what"    [t| Qlang.ItemType |]
107
    , simpleField "fields"  [t| [String]  |]
108
    , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
109
    ])
110
  , (luxiReqQueryFields,
111
    [ simpleField "what"    [t| Qlang.ItemType |]
112
    , simpleField "fields"  [t| [String]  |]
113
    ])
114
  , (luxiReqQueryNodes,
115
     [ simpleField "names"  [t| [String] |]
116
     , simpleField "fields" [t| [String] |]
117
     , simpleField "lock"   [t| Bool     |]
118
     ])
119
  , (luxiReqQueryGroups,
120
     [ simpleField "names"  [t| [String] |]
121
     , simpleField "fields" [t| [String] |]
122
     , simpleField "lock"   [t| Bool     |]
123
     ])
124
  , (luxiReqQueryNetworks,
125
     [ simpleField "names"  [t| [String] |]
126
     , simpleField "fields" [t| [String] |]
127
     , simpleField "lock"   [t| Bool     |]
128
     ])
129
  , (luxiReqQueryInstances,
130
     [ simpleField "names"  [t| [String] |]
131
     , simpleField "fields" [t| [String] |]
132
     , simpleField "lock"   [t| Bool     |]
133
     ])
134
  , (luxiReqQueryJobs,
135
     [ simpleField "ids"    [t| [JobId]  |]
136
     , simpleField "fields" [t| [String] |]
137
     ])
138
  , (luxiReqQueryExports,
139
     [ simpleField "nodes" [t| [String] |]
140
     , simpleField "lock"  [t| Bool     |]
141
     ])
142
  , (luxiReqQueryConfigValues,
143
     [ simpleField "fields" [t| [String] |] ]
144
    )
145
  , (luxiReqQueryClusterInfo, [])
146
  , (luxiReqQueryTags,
147
     [ pTagsObject 
148
     , simpleField "name" [t| String |]
149
     ])
150
  , (luxiReqSubmitJob,
151
     [ simpleField "job" [t| [MetaOpCode] |] ]
152
    )
153
  , (luxiReqSubmitJobToDrainedQueue,
154
     [ simpleField "job" [t| [MetaOpCode] |] ]
155
    )
156
  , (luxiReqSubmitManyJobs,
157
     [ simpleField "ops" [t| [[MetaOpCode]] |] ]
158
    )
159
  , (luxiReqWaitForJobChange,
160
     [ simpleField "job"      [t| JobId   |]
161
     , simpleField "fields"   [t| [String]|]
162
     , simpleField "prev_job" [t| JSValue |]
163
     , simpleField "prev_log" [t| JSValue |]
164
     , simpleField "tmout"    [t| Int     |]
165
     ])
166
  , (luxiReqArchiveJob,
167
     [ simpleField "job" [t| JobId |] ]
168
    )
169
  , (luxiReqAutoArchiveJobs,
170
     [ simpleField "age"   [t| Int |]
171
     , simpleField "tmout" [t| Int |]
172
     ])
173
  , (luxiReqCancelJob,
174
     [ simpleField "job" [t| JobId |] ]
175
    )
176
  , (luxiReqChangeJobPriority,
177
     [ simpleField "job"      [t| JobId |]
178
     , simpleField "priority" [t| Int |] ]
179
    )
180
  , (luxiReqSetDrainFlag,
181
     [ simpleField "flag" [t| Bool |] ]
182
    )
183
  , (luxiReqSetWatcherPause,
184
     [ simpleField "duration" [t| Double |] ]
185
    )
186
  ])
187

    
188
$(makeJSONInstance ''LuxiReq)
189

    
190
-- | List of all defined Luxi calls.
191
$(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
192

    
193
-- | The serialisation of LuxiOps into strings in messages.
194
$(genStrOfOp ''LuxiOp "strOfOp")
195

    
196
-- | Type holding the initial (unparsed) Luxi call.
197
data LuxiCall = LuxiCall LuxiReq JSValue
198

    
199
-- | The end-of-message separator.
200
eOM :: Word8
201
eOM = 3
202

    
203
-- | The end-of-message encoded as a ByteString.
204
bEOM :: B.ByteString
205
bEOM = B.singleton eOM
206

    
207
-- | Valid keys in the requests and responses.
208
data MsgKeys = Method
209
             | Args
210
             | Success
211
             | Result
212

    
213
-- | The serialisation of MsgKeys into strings in messages.
214
$(genStrOfKey ''MsgKeys "strOfKey")
215

    
216
-- | Luxi client encapsulation.
217
data Client = Client { socket :: Handle           -- ^ The socket of the client
218
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
219
                     }
220

    
221
-- | Connects to the master daemon and returns a luxi Client.
222
getClient :: String -> IO Client
223
getClient path = do
224
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
225
  withTimeout luxiDefCtmo "creating luxi connection" $
226
              S.connect s (S.SockAddrUnix path)
227
  rf <- newIORef B.empty
228
  h <- S.socketToHandle s ReadWriteMode
229
  return Client { socket=h, rbuf=rf }
230

    
231
-- | Creates and returns a server endpoint.
232
getServer :: Bool -> FilePath -> IO S.Socket
233
getServer setOwner path = do
234
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
235
  S.bindSocket s (S.SockAddrUnix path)
236
  when setOwner . setOwnerAndGroupFromNames path GanetiLuxid $
237
    ExtraGroup DaemonsGroup
238
  S.listen s 5 -- 5 is the max backlog
239
  return s
240

    
241
-- | Closes a server endpoint.
242
-- FIXME: this should be encapsulated into a nicer type.
243
closeServer :: FilePath -> S.Socket -> IO ()
244
closeServer path sock = do
245
  S.sClose sock
246
  removeFile path
247

    
248
-- | Accepts a client
249
acceptClient :: S.Socket -> IO Client
250
acceptClient s = do
251
  -- second return is the address of the client, which we ignore here
252
  (client_socket, _) <- S.accept s
253
  new_buffer <- newIORef B.empty
254
  handle <- S.socketToHandle client_socket ReadWriteMode
255
  return Client { socket=handle, rbuf=new_buffer }
256

    
257
-- | Closes the client socket.
258
closeClient :: Client -> IO ()
259
closeClient = hClose . socket
260

    
261
-- | Sends a message over a luxi transport.
262
sendMsg :: Client -> String -> IO ()
263
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
264
  let encoded = UTF8L.fromString buf
265
      handle = socket s
266
  BL.hPut handle encoded
267
  B.hPut handle bEOM
268
  hFlush handle
269

    
270
-- | Given a current buffer and the handle, it will read from the
271
-- network until we get a full message, and it will return that
272
-- message and the leftover buffer contents.
273
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
274
recvUpdate handle obuf = do
275
  nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
276
            _ <- hWaitForInput handle (-1)
277
            B.hGetNonBlocking handle 4096
278
  let (msg, remaining) = B.break (eOM ==) nbuf
279
      newbuf = B.append obuf msg
280
  if B.null remaining
281
    then recvUpdate handle newbuf
282
    else return (newbuf, B.tail remaining)
283

    
284
-- | Waits for a message over a luxi transport.
285
recvMsg :: Client -> IO String
286
recvMsg s = do
287
  cbuf <- readIORef $ rbuf s
288
  let (imsg, ibuf) = B.break (eOM ==) cbuf
289
  (msg, nbuf) <-
290
    if B.null ibuf      -- if old buffer didn't contain a full message
291
      then recvUpdate (socket s) cbuf   -- then we read from network
292
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
293
  writeIORef (rbuf s) nbuf
294
  return $ UTF8.toString msg
295

    
296
-- | Extended wrapper over recvMsg.
297
recvMsgExt :: Client -> IO RecvResult
298
recvMsgExt s =
299
  Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
300
    return $ if isEOFError e
301
               then RecvConnClosed
302
               else RecvError (show e)
303

    
304
-- | Serialize a request to String.
305
buildCall :: LuxiOp  -- ^ The method
306
          -> String  -- ^ The serialized form
307
buildCall lo =
308
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
309
           , (strOfKey Args, opToArgs lo)
310
           ]
311
      jo = toJSObject ja
312
  in encodeStrict jo
313

    
314
-- | Serialize the response to String.
315
buildResponse :: Bool    -- ^ Success
316
              -> JSValue -- ^ The arguments
317
              -> String  -- ^ The serialized form
318
buildResponse success args =
319
  let ja = [ (strOfKey Success, JSBool success)
320
           , (strOfKey Result, args)]
321
      jo = toJSObject ja
322
  in encodeStrict jo
323

    
324
-- | Check that luxi request contains the required keys and parse it.
325
validateCall :: String -> Result LuxiCall
326
validateCall s = do
327
  arr <- fromJResult "parsing top-level luxi message" $
328
         decodeStrict s::Result (JSObject JSValue)
329
  let aobj = fromJSObject arr
330
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
331
  args <- fromObj aobj (strOfKey Args)
332
  return (LuxiCall call args)
333

    
334
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
335
--
336
-- This is currently hand-coded until we make it more uniform so that
337
-- it can be generated using TH.
338
decodeCall :: LuxiCall -> Result LuxiOp
339
decodeCall (LuxiCall call args) =
340
  case call of
341
    ReqQueryJobs -> do
342
              (jids, jargs) <- fromJVal args
343
              jids' <- case jids of
344
                         JSNull -> return []
345
                         _ -> fromJVal jids
346
              return $ QueryJobs jids' jargs
347
    ReqQueryInstances -> do
348
              (names, fields, locking) <- fromJVal args
349
              return $ QueryInstances names fields locking
350
    ReqQueryNodes -> do
351
              (names, fields, locking) <- fromJVal args
352
              return $ QueryNodes names fields locking
353
    ReqQueryGroups -> do
354
              (names, fields, locking) <- fromJVal args
355
              return $ QueryGroups names fields locking
356
    ReqQueryClusterInfo ->
357
              return QueryClusterInfo
358
    ReqQueryNetworks -> do
359
              (names, fields, locking) <- fromJVal args
360
              return $ QueryNetworks names fields locking
361
    ReqQuery -> do
362
              (what, fields, qfilter) <- fromJVal args
363
              return $ Query what fields qfilter
364
    ReqQueryFields -> do
365
              (what, fields) <- fromJVal args
366
              fields' <- case fields of
367
                           JSNull -> return []
368
                           _ -> fromJVal fields
369
              return $ QueryFields what fields'
370
    ReqSubmitJob -> do
371
              [ops1] <- fromJVal args
372
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
373
              return $ SubmitJob ops2
374
    ReqSubmitJobToDrainedQueue -> do
375
              [ops1] <- fromJVal args
376
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
377
              return $ SubmitJobToDrainedQueue ops2
378
    ReqSubmitManyJobs -> do
379
              [ops1] <- fromJVal args
380
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
381
              return $ SubmitManyJobs ops2
382
    ReqWaitForJobChange -> do
383
              (jid, fields, pinfo, pidx, wtmout) <-
384
                -- No instance for 5-tuple, code copied from the
385
                -- json sources and adapted
386
                fromJResult "Parsing WaitForJobChange message" $
387
                case args of
388
                  JSArray [a, b, c, d, e] ->
389
                    (,,,,) `fmap`
390
                    J.readJSON a `ap`
391
                    J.readJSON b `ap`
392
                    J.readJSON c `ap`
393
                    J.readJSON d `ap`
394
                    J.readJSON e
395
                  _ -> J.Error "Not enough values"
396
              return $ WaitForJobChange jid fields pinfo pidx wtmout
397
    ReqArchiveJob -> do
398
              [jid] <- fromJVal args
399
              return $ ArchiveJob jid
400
    ReqAutoArchiveJobs -> do
401
              (age, tmout) <- fromJVal args
402
              return $ AutoArchiveJobs age tmout
403
    ReqQueryExports -> do
404
              (nodes, lock) <- fromJVal args
405
              return $ QueryExports nodes lock
406
    ReqQueryConfigValues -> do
407
              [fields] <- fromJVal args
408
              return $ QueryConfigValues fields
409
    ReqQueryTags -> do
410
              (kind, name) <- fromJVal args
411
              return $ QueryTags kind name
412
    ReqCancelJob -> do
413
              [jid] <- fromJVal args
414
              return $ CancelJob jid
415
    ReqChangeJobPriority -> do
416
              (jid, priority) <- fromJVal args
417
              return $ ChangeJobPriority jid priority
418
    ReqSetDrainFlag -> do
419
              [flag] <- fromJVal args
420
              return $ SetDrainFlag flag
421
    ReqSetWatcherPause -> do
422
              [duration] <- fromJVal args
423
              return $ SetWatcherPause duration
424

    
425
-- | Check that luxi responses contain the required keys and that the
426
-- call was successful.
427
validateResult :: String -> ErrorResult JSValue
428
validateResult s = do
429
  when (UTF8.replacement_char `elem` s) $
430
       fail "Failed to decode UTF-8, detected replacement char after decoding"
431
  oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
432
  let arr = J.fromJSObject oarr
433
  status <- fromObj arr (strOfKey Success)
434
  result <- fromObj arr (strOfKey Result)
435
  if status
436
    then return result
437
    else decodeError result
438

    
439
-- | Try to decode an error from the server response. This function
440
-- will always fail, since it's called only on the error path (when
441
-- status is False).
442
decodeError :: JSValue -> ErrorResult JSValue
443
decodeError val =
444
  case fromJVal val of
445
    Ok e -> Bad e
446
    Bad msg -> Bad $ GenericError msg
447

    
448
-- | Generic luxi method call.
449
callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
450
callMethod method s = do
451
  sendMsg s $ buildCall method
452
  result <- recvMsg s
453
  let rval = validateResult result
454
  return rval
455

    
456
-- | Parse job submission result.
457
parseSubmitJobResult :: JSValue -> ErrorResult JobId
458
parseSubmitJobResult (JSArray [JSBool True, v]) =
459
  case J.readJSON v of
460
    J.Error msg -> Bad $ LuxiError msg
461
    J.Ok v' -> Ok v'
462
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
463
  Bad . LuxiError $ fromJSString x
464
parseSubmitJobResult v =
465
  Bad . LuxiError $ "Unknown result from the master daemon: " ++
466
      show (pp_value v)
467

    
468
-- | Specialized submitManyJobs call.
469
submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
470
submitManyJobs s jobs = do
471
  rval <- callMethod (SubmitManyJobs jobs) s
472
  -- map each result (status, payload) pair into a nice Result ADT
473
  return $ case rval of
474
             Bad x -> Bad x
475
             Ok (JSArray r) -> mapM parseSubmitJobResult r
476
             x -> Bad . LuxiError $
477
                  "Cannot parse response from Ganeti: " ++ show x
478

    
479
-- | Custom queryJobs call.
480
queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
481
queryJobsStatus s jids = do
482
  rval <- callMethod (QueryJobs jids ["status"]) s
483
  return $ case rval of
484
             Bad x -> Bad x
485
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
486
                       J.Ok vals -> if any null vals
487
                                    then Bad $
488
                                         LuxiError "Missing job status field"
489
                                    else Ok (map head vals)
490
                       J.Error x -> Bad $ LuxiError x