Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Luxi.hs @ 289e7fcc

History | View | Annotate | Download (15.8 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , LuxiReq(..)
31
  , Client
32
  , JobId
33
  , fromJobId
34
  , makeJobId
35
  , RecvResult(..)
36
  , strOfOp
37
  , getClient
38
  , getServer
39
  , acceptClient
40
  , closeClient
41
  , closeServer
42
  , callMethod
43
  , submitManyJobs
44
  , queryJobsStatus
45
  , buildCall
46
  , buildResponse
47
  , validateCall
48
  , decodeCall
49
  , recvMsg
50
  , recvMsgExt
51
  , sendMsg
52
  , allLuxiCalls
53
  ) where
54

    
55
import Control.Exception (catch)
56
import Data.IORef
57
import qualified Data.ByteString as B
58
import qualified Data.ByteString.Lazy as BL
59
import qualified Data.ByteString.UTF8 as UTF8
60
import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61
import Data.Word (Word8)
62
import Control.Monad
63
import Text.JSON (encodeStrict, decodeStrict)
64
import qualified Text.JSON as J
65
import Text.JSON.Pretty (pp_value)
66
import Text.JSON.Types
67
import System.Directory (removeFile)
68
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69
import System.IO.Error (isEOFError)
70
import System.Posix.Files
71
import System.Timeout
72
import qualified Network.Socket as S
73

    
74
import Ganeti.BasicTypes
75
import Ganeti.Constants
76
import Ganeti.Errors
77
import Ganeti.JSON
78
import Ganeti.OpParams (pTagsObject)
79
import Ganeti.OpCodes
80
import Ganeti.Runtime
81
import qualified Ganeti.Query.Language as Qlang
82
import Ganeti.THH
83
import Ganeti.Types
84
import Ganeti.Utils
85

    
86
-- * Utility functions
87

    
88
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
89
withTimeout :: Int -> String -> IO a -> IO a
90
withTimeout secs descr action = do
91
  result <- timeout (secs * 1000000) action
92
  case result of
93
    Nothing -> fail $ "Timeout in " ++ descr
94
    Just v -> return v
95

    
96
-- * Generic protocol functionality
97

    
98
-- | Result of receiving a message from the socket.
99
data RecvResult = RecvConnClosed    -- ^ Connection closed
100
                | RecvError String  -- ^ Any other error
101
                | RecvOk String     -- ^ Successfull receive
102
                  deriving (Show, Eq)
103

    
104
-- | Currently supported Luxi operations and JSON serialization.
105
$(genLuxiOp "LuxiOp"
106
  [ (luxiReqQuery,
107
    [ simpleField "what"    [t| Qlang.ItemType |]
108
    , simpleField "fields"  [t| [String]  |]
109
    , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
110
    ])
111
  , (luxiReqQueryFields,
112
    [ simpleField "what"    [t| Qlang.ItemType |]
113
    , simpleField "fields"  [t| [String]  |]
114
    ])
115
  , (luxiReqQueryNodes,
116
     [ simpleField "names"  [t| [String] |]
117
     , simpleField "fields" [t| [String] |]
118
     , simpleField "lock"   [t| Bool     |]
119
     ])
120
  , (luxiReqQueryGroups,
121
     [ simpleField "names"  [t| [String] |]
122
     , simpleField "fields" [t| [String] |]
123
     , simpleField "lock"   [t| Bool     |]
124
     ])
125
  , (luxiReqQueryNetworks,
126
     [ simpleField "names"  [t| [String] |]
127
     , simpleField "fields" [t| [String] |]
128
     , simpleField "lock"   [t| Bool     |]
129
     ])
130
  , (luxiReqQueryInstances,
131
     [ simpleField "names"  [t| [String] |]
132
     , simpleField "fields" [t| [String] |]
133
     , simpleField "lock"   [t| Bool     |]
134
     ])
135
  , (luxiReqQueryJobs,
136
     [ simpleField "ids"    [t| [JobId]  |]
137
     , simpleField "fields" [t| [String] |]
138
     ])
139
  , (luxiReqQueryExports,
140
     [ simpleField "nodes" [t| [String] |]
141
     , simpleField "lock"  [t| Bool     |]
142
     ])
143
  , (luxiReqQueryConfigValues,
144
     [ simpleField "fields" [t| [String] |] ]
145
    )
146
  , (luxiReqQueryClusterInfo, [])
147
  , (luxiReqQueryTags,
148
     [ pTagsObject ])
149
  , (luxiReqSubmitJob,
150
     [ simpleField "job" [t| [MetaOpCode] |] ]
151
    )
152
  , (luxiReqSubmitManyJobs,
153
     [ simpleField "ops" [t| [[MetaOpCode]] |] ]
154
    )
155
  , (luxiReqWaitForJobChange,
156
     [ simpleField "job"      [t| JobId   |]
157
     , simpleField "fields"   [t| [String]|]
158
     , simpleField "prev_job" [t| JSValue |]
159
     , simpleField "prev_log" [t| JSValue |]
160
     , simpleField "tmout"    [t| Int     |]
161
     ])
162
  , (luxiReqArchiveJob,
163
     [ simpleField "job" [t| JobId |] ]
164
    )
165
  , (luxiReqAutoArchiveJobs,
166
     [ simpleField "age"   [t| Int |]
167
     , simpleField "tmout" [t| Int |]
168
     ])
169
  , (luxiReqCancelJob,
170
     [ simpleField "job" [t| JobId |] ]
171
    )
172
  , (luxiReqChangeJobPriority,
173
     [ simpleField "job"      [t| JobId |]
174
     , simpleField "priority" [t| Int |] ]
175
    )
176
  , (luxiReqSetDrainFlag,
177
     [ simpleField "flag" [t| Bool |] ]
178
    )
179
  , (luxiReqSetWatcherPause,
180
     [ simpleField "duration" [t| Double |] ]
181
    )
182
  ])
183

    
184
$(makeJSONInstance ''LuxiReq)
185

    
186
-- | List of all defined Luxi calls.
187
$(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
188

    
189
-- | The serialisation of LuxiOps into strings in messages.
190
$(genStrOfOp ''LuxiOp "strOfOp")
191

    
192
-- | Type holding the initial (unparsed) Luxi call.
193
data LuxiCall = LuxiCall LuxiReq JSValue
194

    
195
-- | The end-of-message separator.
196
eOM :: Word8
197
eOM = 3
198

    
199
-- | The end-of-message encoded as a ByteString.
200
bEOM :: B.ByteString
201
bEOM = B.singleton eOM
202

    
203
-- | Valid keys in the requests and responses.
204
data MsgKeys = Method
205
             | Args
206
             | Success
207
             | Result
208

    
209
-- | The serialisation of MsgKeys into strings in messages.
210
$(genStrOfKey ''MsgKeys "strOfKey")
211

    
212
-- | Luxi client encapsulation.
213
data Client = Client { socket :: Handle           -- ^ The socket of the client
214
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
215
                     }
216

    
217
-- | Connects to the master daemon and returns a luxi Client.
218
getClient :: String -> IO Client
219
getClient path = do
220
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
221
  withTimeout luxiDefCtmo "creating luxi connection" $
222
              S.connect s (S.SockAddrUnix path)
223
  rf <- newIORef B.empty
224
  h <- S.socketToHandle s ReadWriteMode
225
  return Client { socket=h, rbuf=rf }
226

    
227
-- | Creates and returns a server endpoint.
228
getServer :: Bool -> FilePath -> IO S.Socket
229
getServer setOwner path = do
230
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
231
  S.bindSocket s (S.SockAddrUnix path)
232
  when setOwner $ do
233
    setOwnerAndGroupFromNames path GanetiLuxid $ ExtraGroup DaemonsGroup
234
    setFileMode path $ fromIntegral luxiSocketPerms
235
  S.listen s 5 -- 5 is the max backlog
236
  return s
237

    
238
-- | Closes a server endpoint.
239
-- FIXME: this should be encapsulated into a nicer type.
240
closeServer :: FilePath -> S.Socket -> IO ()
241
closeServer path sock = do
242
  S.sClose sock
243
  removeFile path
244

    
245
-- | Accepts a client
246
acceptClient :: S.Socket -> IO Client
247
acceptClient s = do
248
  -- second return is the address of the client, which we ignore here
249
  (client_socket, _) <- S.accept s
250
  new_buffer <- newIORef B.empty
251
  handle <- S.socketToHandle client_socket ReadWriteMode
252
  return Client { socket=handle, rbuf=new_buffer }
253

    
254
-- | Closes the client socket.
255
closeClient :: Client -> IO ()
256
closeClient = hClose . socket
257

    
258
-- | Sends a message over a luxi transport.
259
sendMsg :: Client -> String -> IO ()
260
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
261
  let encoded = UTF8L.fromString buf
262
      handle = socket s
263
  BL.hPut handle encoded
264
  B.hPut handle bEOM
265
  hFlush handle
266

    
267
-- | Given a current buffer and the handle, it will read from the
268
-- network until we get a full message, and it will return that
269
-- message and the leftover buffer contents.
270
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
271
recvUpdate handle obuf = do
272
  nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
273
            _ <- hWaitForInput handle (-1)
274
            B.hGetNonBlocking handle 4096
275
  let (msg, remaining) = B.break (eOM ==) nbuf
276
      newbuf = B.append obuf msg
277
  if B.null remaining
278
    then recvUpdate handle newbuf
279
    else return (newbuf, B.tail remaining)
280

    
281
-- | Waits for a message over a luxi transport.
282
recvMsg :: Client -> IO String
283
recvMsg s = do
284
  cbuf <- readIORef $ rbuf s
285
  let (imsg, ibuf) = B.break (eOM ==) cbuf
286
  (msg, nbuf) <-
287
    if B.null ibuf      -- if old buffer didn't contain a full message
288
      then recvUpdate (socket s) cbuf   -- then we read from network
289
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
290
  writeIORef (rbuf s) nbuf
291
  return $ UTF8.toString msg
292

    
293
-- | Extended wrapper over recvMsg.
294
recvMsgExt :: Client -> IO RecvResult
295
recvMsgExt s =
296
  Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
297
    return $ if isEOFError e
298
               then RecvConnClosed
299
               else RecvError (show e)
300

    
301
-- | Serialize a request to String.
302
buildCall :: LuxiOp  -- ^ The method
303
          -> String  -- ^ The serialized form
304
buildCall lo =
305
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
306
           , (strOfKey Args, opToArgs lo)
307
           ]
308
      jo = toJSObject ja
309
  in encodeStrict jo
310

    
311
-- | Serialize the response to String.
312
buildResponse :: Bool    -- ^ Success
313
              -> JSValue -- ^ The arguments
314
              -> String  -- ^ The serialized form
315
buildResponse success args =
316
  let ja = [ (strOfKey Success, JSBool success)
317
           , (strOfKey Result, args)]
318
      jo = toJSObject ja
319
  in encodeStrict jo
320

    
321
-- | Check that luxi request contains the required keys and parse it.
322
validateCall :: String -> Result LuxiCall
323
validateCall s = do
324
  arr <- fromJResult "parsing top-level luxi message" $
325
         decodeStrict s::Result (JSObject JSValue)
326
  let aobj = fromJSObject arr
327
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
328
  args <- fromObj aobj (strOfKey Args)
329
  return (LuxiCall call args)
330

    
331
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
332
--
333
-- This is currently hand-coded until we make it more uniform so that
334
-- it can be generated using TH.
335
decodeCall :: LuxiCall -> Result LuxiOp
336
decodeCall (LuxiCall call args) =
337
  case call of
338
    ReqQueryJobs -> do
339
              (jids, jargs) <- fromJVal args
340
              jids' <- case jids of
341
                         JSNull -> return []
342
                         _ -> fromJVal jids
343
              return $ QueryJobs jids' jargs
344
    ReqQueryInstances -> do
345
              (names, fields, locking) <- fromJVal args
346
              return $ QueryInstances names fields locking
347
    ReqQueryNodes -> do
348
              (names, fields, locking) <- fromJVal args
349
              return $ QueryNodes names fields locking
350
    ReqQueryGroups -> do
351
              (names, fields, locking) <- fromJVal args
352
              return $ QueryGroups names fields locking
353
    ReqQueryClusterInfo ->
354
              return QueryClusterInfo
355
    ReqQueryNetworks -> do
356
              (names, fields, locking) <- fromJVal args
357
              return $ QueryNetworks names fields locking
358
    ReqQuery -> do
359
              (what, fields, qfilter) <- fromJVal args
360
              return $ Query what fields qfilter
361
    ReqQueryFields -> do
362
              (what, fields) <- fromJVal args
363
              fields' <- case fields of
364
                           JSNull -> return []
365
                           _ -> fromJVal fields
366
              return $ QueryFields what fields'
367
    ReqSubmitJob -> do
368
              [ops1] <- fromJVal args
369
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
370
              return $ SubmitJob ops2
371
    ReqSubmitManyJobs -> do
372
              [ops1] <- fromJVal args
373
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
374
              return $ SubmitManyJobs ops2
375
    ReqWaitForJobChange -> do
376
              (jid, fields, pinfo, pidx, wtmout) <-
377
                -- No instance for 5-tuple, code copied from the
378
                -- json sources and adapted
379
                fromJResult "Parsing WaitForJobChange message" $
380
                case args of
381
                  JSArray [a, b, c, d, e] ->
382
                    (,,,,) `fmap`
383
                    J.readJSON a `ap`
384
                    J.readJSON b `ap`
385
                    J.readJSON c `ap`
386
                    J.readJSON d `ap`
387
                    J.readJSON e
388
                  _ -> J.Error "Not enough values"
389
              return $ WaitForJobChange jid fields pinfo pidx wtmout
390
    ReqArchiveJob -> do
391
              [jid] <- fromJVal args
392
              return $ ArchiveJob jid
393
    ReqAutoArchiveJobs -> do
394
              (age, tmout) <- fromJVal args
395
              return $ AutoArchiveJobs age tmout
396
    ReqQueryExports -> do
397
              (nodes, lock) <- fromJVal args
398
              return $ QueryExports nodes lock
399
    ReqQueryConfigValues -> do
400
              [fields] <- fromJVal args
401
              return $ QueryConfigValues fields
402
    ReqQueryTags -> do
403
              (kind, name) <- fromJVal args
404
              item <- tagObjectFrom kind name
405
              return $ QueryTags item
406
    ReqCancelJob -> do
407
              [jid] <- fromJVal args
408
              return $ CancelJob jid
409
    ReqChangeJobPriority -> do
410
              (jid, priority) <- fromJVal args
411
              return $ ChangeJobPriority jid priority
412
    ReqSetDrainFlag -> do
413
              [flag] <- fromJVal args
414
              return $ SetDrainFlag flag
415
    ReqSetWatcherPause -> do
416
              [duration] <- fromJVal args
417
              return $ SetWatcherPause duration
418

    
419
-- | Check that luxi responses contain the required keys and that the
420
-- call was successful.
421
validateResult :: String -> ErrorResult JSValue
422
validateResult s = do
423
  when (UTF8.replacement_char `elem` s) $
424
       fail "Failed to decode UTF-8, detected replacement char after decoding"
425
  oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
426
  let arr = J.fromJSObject oarr
427
  status <- fromObj arr (strOfKey Success)
428
  result <- fromObj arr (strOfKey Result)
429
  if status
430
    then return result
431
    else decodeError result
432

    
433
-- | Try to decode an error from the server response. This function
434
-- will always fail, since it's called only on the error path (when
435
-- status is False).
436
decodeError :: JSValue -> ErrorResult JSValue
437
decodeError val =
438
  case fromJVal val of
439
    Ok e -> Bad e
440
    Bad msg -> Bad $ GenericError msg
441

    
442
-- | Generic luxi method call.
443
callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
444
callMethod method s = do
445
  sendMsg s $ buildCall method
446
  result <- recvMsg s
447
  let rval = validateResult result
448
  return rval
449

    
450
-- | Parse job submission result.
451
parseSubmitJobResult :: JSValue -> ErrorResult JobId
452
parseSubmitJobResult (JSArray [JSBool True, v]) =
453
  case J.readJSON v of
454
    J.Error msg -> Bad $ LuxiError msg
455
    J.Ok v' -> Ok v'
456
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
457
  Bad . LuxiError $ fromJSString x
458
parseSubmitJobResult v =
459
  Bad . LuxiError $ "Unknown result from the master daemon: " ++
460
      show (pp_value v)
461

    
462
-- | Specialized submitManyJobs call.
463
submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
464
submitManyJobs s jobs = do
465
  rval <- callMethod (SubmitManyJobs jobs) s
466
  -- map each result (status, payload) pair into a nice Result ADT
467
  return $ case rval of
468
             Bad x -> Bad x
469
             Ok (JSArray r) -> mapM parseSubmitJobResult r
470
             x -> Bad . LuxiError $
471
                  "Cannot parse response from Ganeti: " ++ show x
472

    
473
-- | Custom queryJobs call.
474
queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
475
queryJobsStatus s jids = do
476
  rval <- callMethod (QueryJobs jids ["status"]) s
477
  return $ case rval of
478
             Bad x -> Bad x
479
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
480
                       J.Ok vals -> if any null vals
481
                                    then Bad $
482
                                         LuxiError "Missing job status field"
483
                                    else Ok (map head vals)
484
                       J.Error x -> Bad $ LuxiError x