Statistics
| Branch: | Tag: | Revision:

root / htools / Ganeti / Luxi.hs @ 5cd95d46

History | View | Annotate | Download (15.1 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , LuxiReq(..)
31
  , Client
32
  , JobId
33
  , fromJobId
34
  , makeJobId
35
  , RecvResult(..)
36
  , strOfOp
37
  , getClient
38
  , getServer
39
  , acceptClient
40
  , closeClient
41
  , closeServer
42
  , callMethod
43
  , submitManyJobs
44
  , queryJobsStatus
45
  , buildCall
46
  , buildResponse
47
  , validateCall
48
  , decodeCall
49
  , recvMsg
50
  , recvMsgExt
51
  , sendMsg
52
  , allLuxiCalls
53
  ) where
54

    
55
import Control.Exception (catch)
56
import Data.IORef
57
import qualified Data.ByteString as B
58
import qualified Data.ByteString.UTF8 as UTF8
59
import Data.Word (Word8)
60
import Control.Monad
61
import Text.JSON (encodeStrict, decodeStrict)
62
import qualified Text.JSON as J
63
import Text.JSON.Pretty (pp_value)
64
import Text.JSON.Types
65
import System.Directory (removeFile)
66
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
67
import System.IO.Error (isEOFError)
68
import System.Timeout
69
import qualified Network.Socket as S
70

    
71
import Ganeti.BasicTypes
72
import Ganeti.Constants
73
import Ganeti.Errors
74
import Ganeti.JSON
75
import Ganeti.OpParams (pTagsObject)
76
import Ganeti.OpCodes
77
import qualified Ganeti.Query.Language as Qlang
78
import Ganeti.THH
79
import Ganeti.Types
80

    
81
-- * Utility functions
82

    
83
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
84
withTimeout :: Int -> String -> IO a -> IO a
85
withTimeout secs descr action = do
86
  result <- timeout (secs * 1000000) action
87
  case result of
88
    Nothing -> fail $ "Timeout in " ++ descr
89
    Just v -> return v
90

    
91
-- * Generic protocol functionality
92

    
93
-- | Result of receiving a message from the socket.
94
data RecvResult = RecvConnClosed    -- ^ Connection closed
95
                | RecvError String  -- ^ Any other error
96
                | RecvOk String     -- ^ Successfull receive
97
                  deriving (Show, Eq)
98

    
99
-- | Currently supported Luxi operations and JSON serialization.
100
$(genLuxiOp "LuxiOp"
101
  [ (luxiReqQuery,
102
    [ simpleField "what"    [t| Qlang.ItemType |]
103
    , simpleField "fields"  [t| [String]  |]
104
    , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
105
    ])
106
  , (luxiReqQueryFields,
107
    [ simpleField "what"    [t| Qlang.ItemType |]
108
    , simpleField "fields"  [t| [String]  |]
109
    ])
110
  , (luxiReqQueryNodes,
111
     [ simpleField "names"  [t| [String] |]
112
     , simpleField "fields" [t| [String] |]
113
     , simpleField "lock"   [t| Bool     |]
114
     ])
115
  , (luxiReqQueryGroups,
116
     [ simpleField "names"  [t| [String] |]
117
     , simpleField "fields" [t| [String] |]
118
     , simpleField "lock"   [t| Bool     |]
119
     ])
120
  , (luxiReqQueryInstances,
121
     [ simpleField "names"  [t| [String] |]
122
     , simpleField "fields" [t| [String] |]
123
     , simpleField "lock"   [t| Bool     |]
124
     ])
125
  , (luxiReqQueryJobs,
126
     [ simpleField "ids"    [t| [JobId]  |]
127
     , simpleField "fields" [t| [String] |]
128
     ])
129
  , (luxiReqQueryExports,
130
     [ simpleField "nodes" [t| [String] |]
131
     , simpleField "lock"  [t| Bool     |]
132
     ])
133
  , (luxiReqQueryConfigValues,
134
     [ simpleField "fields" [t| [String] |] ]
135
    )
136
  , (luxiReqQueryClusterInfo, [])
137
  , (luxiReqQueryTags,
138
     [ pTagsObject ])
139
  , (luxiReqSubmitJob,
140
     [ simpleField "job" [t| [MetaOpCode] |] ]
141
    )
142
  , (luxiReqSubmitManyJobs,
143
     [ simpleField "ops" [t| [[MetaOpCode]] |] ]
144
    )
145
  , (luxiReqWaitForJobChange,
146
     [ simpleField "job"      [t| JobId   |]
147
     , simpleField "fields"   [t| [String]|]
148
     , simpleField "prev_job" [t| JSValue |]
149
     , simpleField "prev_log" [t| JSValue |]
150
     , simpleField "tmout"    [t| Int     |]
151
     ])
152
  , (luxiReqArchiveJob,
153
     [ simpleField "job" [t| JobId |] ]
154
    )
155
  , (luxiReqAutoArchiveJobs,
156
     [ simpleField "age"   [t| Int |]
157
     , simpleField "tmout" [t| Int |]
158
     ])
159
  , (luxiReqCancelJob,
160
     [ simpleField "job" [t| JobId |] ]
161
    )
162
  , (luxiReqChangeJobPriority,
163
     [ simpleField "job"      [t| JobId |]
164
     , simpleField "priority" [t| Int |] ]
165
    )
166
  , (luxiReqSetDrainFlag,
167
     [ simpleField "flag" [t| Bool |] ]
168
    )
169
  , (luxiReqSetWatcherPause,
170
     [ simpleField "duration" [t| Double |] ]
171
    )
172
  ])
173

    
174
$(makeJSONInstance ''LuxiReq)
175

    
176
-- | List of all defined Luxi calls.
177
$(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
178

    
179
-- | The serialisation of LuxiOps into strings in messages.
180
$(genStrOfOp ''LuxiOp "strOfOp")
181

    
182
-- | Type holding the initial (unparsed) Luxi call.
183
data LuxiCall = LuxiCall LuxiReq JSValue
184

    
185
-- | The end-of-message separator.
186
eOM :: Word8
187
eOM = 3
188

    
189
-- | The end-of-message encoded as a ByteString.
190
bEOM :: B.ByteString
191
bEOM = B.singleton eOM
192

    
193
-- | Valid keys in the requests and responses.
194
data MsgKeys = Method
195
             | Args
196
             | Success
197
             | Result
198

    
199
-- | The serialisation of MsgKeys into strings in messages.
200
$(genStrOfKey ''MsgKeys "strOfKey")
201

    
202
-- | Luxi client encapsulation.
203
data Client = Client { socket :: Handle           -- ^ The socket of the client
204
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
205
                     }
206

    
207
-- | Connects to the master daemon and returns a luxi Client.
208
getClient :: String -> IO Client
209
getClient path = do
210
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
211
  withTimeout luxiDefCtmo "creating luxi connection" $
212
              S.connect s (S.SockAddrUnix path)
213
  rf <- newIORef B.empty
214
  h <- S.socketToHandle s ReadWriteMode
215
  return Client { socket=h, rbuf=rf }
216

    
217
-- | Creates and returns a server endpoint.
218
getServer :: FilePath -> IO S.Socket
219
getServer path = do
220
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
221
  S.bindSocket s (S.SockAddrUnix path)
222
  S.listen s 5 -- 5 is the max backlog
223
  return s
224

    
225
-- | Closes a server endpoint.
226
-- FIXME: this should be encapsulated into a nicer type.
227
closeServer :: FilePath -> S.Socket -> IO ()
228
closeServer path sock = do
229
  S.sClose sock
230
  removeFile path
231

    
232
-- | Accepts a client
233
acceptClient :: S.Socket -> IO Client
234
acceptClient s = do
235
  -- second return is the address of the client, which we ignore here
236
  (client_socket, _) <- S.accept s
237
  new_buffer <- newIORef B.empty
238
  handle <- S.socketToHandle client_socket ReadWriteMode
239
  return Client { socket=handle, rbuf=new_buffer }
240

    
241
-- | Closes the client socket.
242
closeClient :: Client -> IO ()
243
closeClient = hClose . socket
244

    
245
-- | Sends a message over a luxi transport.
246
sendMsg :: Client -> String -> IO ()
247
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
248
  let encoded = UTF8.fromString buf
249
      handle = socket s
250
  B.hPut handle encoded
251
  B.hPut handle bEOM
252
  hFlush handle
253

    
254
-- | Given a current buffer and the handle, it will read from the
255
-- network until we get a full message, and it will return that
256
-- message and the leftover buffer contents.
257
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
258
recvUpdate handle obuf = do
259
  nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
260
            _ <- hWaitForInput handle (-1)
261
            B.hGetNonBlocking handle 4096
262
  let (msg, remaining) = B.break (eOM ==) nbuf
263
      newbuf = B.append obuf msg
264
  if B.null remaining
265
    then recvUpdate handle newbuf
266
    else return (newbuf, B.tail remaining)
267

    
268
-- | Waits for a message over a luxi transport.
269
recvMsg :: Client -> IO String
270
recvMsg s = do
271
  cbuf <- readIORef $ rbuf s
272
  let (imsg, ibuf) = B.break (eOM ==) cbuf
273
  (msg, nbuf) <-
274
    if B.null ibuf      -- if old buffer didn't contain a full message
275
      then recvUpdate (socket s) cbuf   -- then we read from network
276
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
277
  writeIORef (rbuf s) nbuf
278
  return $ UTF8.toString msg
279

    
280
-- | Extended wrapper over recvMsg.
281
recvMsgExt :: Client -> IO RecvResult
282
recvMsgExt s =
283
  Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
284
    return $ if isEOFError e
285
               then RecvConnClosed
286
               else RecvError (show e)
287

    
288
-- | Serialize a request to String.
289
buildCall :: LuxiOp  -- ^ The method
290
          -> String  -- ^ The serialized form
291
buildCall lo =
292
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
293
           , (strOfKey Args, opToArgs lo)
294
           ]
295
      jo = toJSObject ja
296
  in encodeStrict jo
297

    
298
-- | Serialize the response to String.
299
buildResponse :: Bool    -- ^ Success
300
              -> JSValue -- ^ The arguments
301
              -> String  -- ^ The serialized form
302
buildResponse success args =
303
  let ja = [ (strOfKey Success, JSBool success)
304
           , (strOfKey Result, args)]
305
      jo = toJSObject ja
306
  in encodeStrict jo
307

    
308
-- | Check that luxi request contains the required keys and parse it.
309
validateCall :: String -> Result LuxiCall
310
validateCall s = do
311
  arr <- fromJResult "parsing top-level luxi message" $
312
         decodeStrict s::Result (JSObject JSValue)
313
  let aobj = fromJSObject arr
314
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
315
  args <- fromObj aobj (strOfKey Args)
316
  return (LuxiCall call args)
317

    
318
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
319
--
320
-- This is currently hand-coded until we make it more uniform so that
321
-- it can be generated using TH.
322
decodeCall :: LuxiCall -> Result LuxiOp
323
decodeCall (LuxiCall call args) =
324
  case call of
325
    ReqQueryJobs -> do
326
              (jids, jargs) <- fromJVal args
327
              let rargs = map fromJSString jargs
328
              return $ QueryJobs jids rargs
329
    ReqQueryInstances -> do
330
              (names, fields, locking) <- fromJVal args
331
              return $ QueryInstances names fields locking
332
    ReqQueryNodes -> do
333
              (names, fields, locking) <- fromJVal args
334
              return $ QueryNodes names fields locking
335
    ReqQueryGroups -> do
336
              (names, fields, locking) <- fromJVal args
337
              return $ QueryGroups names fields locking
338
    ReqQueryClusterInfo ->
339
              return QueryClusterInfo
340
    ReqQuery -> do
341
              (what, fields, qfilter) <- fromJVal args
342
              return $ Query what fields qfilter
343
    ReqQueryFields -> do
344
              (what, fields) <- fromJVal args
345
              fields' <- case fields of
346
                           JSNull -> return []
347
                           _ -> fromJVal fields
348
              return $ QueryFields what fields'
349
    ReqSubmitJob -> do
350
              [ops1] <- fromJVal args
351
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
352
              return $ SubmitJob ops2
353
    ReqSubmitManyJobs -> do
354
              [ops1] <- fromJVal args
355
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
356
              return $ SubmitManyJobs ops2
357
    ReqWaitForJobChange -> do
358
              (jid, fields, pinfo, pidx, wtmout) <-
359
                -- No instance for 5-tuple, code copied from the
360
                -- json sources and adapted
361
                fromJResult "Parsing WaitForJobChange message" $
362
                case args of
363
                  JSArray [a, b, c, d, e] ->
364
                    (,,,,) `fmap`
365
                    J.readJSON a `ap`
366
                    J.readJSON b `ap`
367
                    J.readJSON c `ap`
368
                    J.readJSON d `ap`
369
                    J.readJSON e
370
                  _ -> J.Error "Not enough values"
371
              return $ WaitForJobChange jid fields pinfo pidx wtmout
372
    ReqArchiveJob -> do
373
              [jid] <- fromJVal args
374
              return $ ArchiveJob jid
375
    ReqAutoArchiveJobs -> do
376
              (age, tmout) <- fromJVal args
377
              return $ AutoArchiveJobs age tmout
378
    ReqQueryExports -> do
379
              (nodes, lock) <- fromJVal args
380
              return $ QueryExports nodes lock
381
    ReqQueryConfigValues -> do
382
              [fields] <- fromJVal args
383
              return $ QueryConfigValues fields
384
    ReqQueryTags -> do
385
              (kind, name) <- fromJVal args
386
              item <- tagObjectFrom kind name
387
              return $ QueryTags item
388
    ReqCancelJob -> do
389
              [jid] <- fromJVal args
390
              return $ CancelJob jid
391
    ReqChangeJobPriority -> do
392
              (jid, priority) <- fromJVal args
393
              return $ ChangeJobPriority jid priority
394
    ReqSetDrainFlag -> do
395
              [flag] <- fromJVal args
396
              return $ SetDrainFlag flag
397
    ReqSetWatcherPause -> do
398
              [duration] <- fromJVal args
399
              return $ SetWatcherPause duration
400

    
401
-- | Check that luxi responses contain the required keys and that the
402
-- call was successful.
403
validateResult :: String -> ErrorResult JSValue
404
validateResult s = do
405
  when (UTF8.replacement_char `elem` s) $
406
       fail "Failed to decode UTF-8, detected replacement char after decoding"
407
  oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
408
  let arr = J.fromJSObject oarr
409
  status <- fromObj arr (strOfKey Success)
410
  result <- fromObj arr (strOfKey Result)
411
  if status
412
    then return result
413
    else decodeError result
414

    
415
-- | Try to decode an error from the server response. This function
416
-- will always fail, since it's called only on the error path (when
417
-- status is False).
418
decodeError :: JSValue -> ErrorResult JSValue
419
decodeError val =
420
  case fromJVal val of
421
    Ok e -> Bad e
422
    Bad msg -> Bad $ GenericError msg
423

    
424
-- | Generic luxi method call.
425
callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
426
callMethod method s = do
427
  sendMsg s $ buildCall method
428
  result <- recvMsg s
429
  let rval = validateResult result
430
  return rval
431

    
432
-- | Parse job submission result.
433
parseSubmitJobResult :: JSValue -> ErrorResult JobId
434
parseSubmitJobResult (JSArray [JSBool True, v]) =
435
  case J.readJSON v of
436
    J.Error msg -> Bad $ LuxiError msg
437
    J.Ok v' -> Ok v'
438
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
439
  Bad . LuxiError $ fromJSString x
440
parseSubmitJobResult v =
441
  Bad . LuxiError $ "Unknown result from the master daemon: " ++
442
      show (pp_value v)
443

    
444
-- | Specialized submitManyJobs call.
445
submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
446
submitManyJobs s jobs = do
447
  rval <- callMethod (SubmitManyJobs jobs) s
448
  -- map each result (status, payload) pair into a nice Result ADT
449
  return $ case rval of
450
             Bad x -> Bad x
451
             Ok (JSArray r) -> mapM parseSubmitJobResult r
452
             x -> Bad . LuxiError $
453
                  "Cannot parse response from Ganeti: " ++ show x
454

    
455
-- | Custom queryJobs call.
456
queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
457
queryJobsStatus s jids = do
458
  rval <- callMethod (QueryJobs jids ["status"]) s
459
  return $ case rval of
460
             Bad x -> Bad x
461
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
462
                       J.Ok vals -> if any null vals
463
                                    then Bad $
464
                                         LuxiError "Missing job status field"
465
                                    else Ok (map head vals)
466
                       J.Error x -> Bad $ LuxiError x