Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Luxi.hs @ d08a8359

History | View | Annotate | Download (15.7 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , LuxiReq(..)
31
  , Client
32
  , JobId
33
  , fromJobId
34
  , makeJobId
35
  , RecvResult(..)
36
  , strOfOp
37
  , getClient
38
  , getServer
39
  , acceptClient
40
  , closeClient
41
  , closeServer
42
  , callMethod
43
  , submitManyJobs
44
  , queryJobsStatus
45
  , buildCall
46
  , buildResponse
47
  , validateCall
48
  , decodeCall
49
  , recvMsg
50
  , recvMsgExt
51
  , sendMsg
52
  , allLuxiCalls
53
  ) where
54

    
55
import Control.Exception (catch)
56
import Data.IORef
57
import qualified Data.ByteString as B
58
import qualified Data.ByteString.Lazy as BL
59
import qualified Data.ByteString.UTF8 as UTF8
60
import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61
import Data.Word (Word8)
62
import Control.Monad
63
import Text.JSON (encodeStrict, decodeStrict)
64
import qualified Text.JSON as J
65
import Text.JSON.Pretty (pp_value)
66
import Text.JSON.Types
67
import System.Directory (removeFile)
68
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69
import System.IO.Error (isEOFError)
70
import System.Timeout
71
import qualified Network.Socket as S
72

    
73
import Ganeti.BasicTypes
74
import Ganeti.Constants
75
import Ganeti.Errors
76
import Ganeti.JSON
77
import Ganeti.OpParams (pTagsObject)
78
import Ganeti.OpCodes
79
import Ganeti.Runtime
80
import qualified Ganeti.Query.Language as Qlang
81
import Ganeti.THH
82
import Ganeti.Types
83
import Ganeti.Utils
84

    
85
-- * Utility functions
86

    
87
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
88
withTimeout :: Int -> String -> IO a -> IO a
89
withTimeout secs descr action = do
90
  result <- timeout (secs * 1000000) action
91
  case result of
92
    Nothing -> fail $ "Timeout in " ++ descr
93
    Just v -> return v
94

    
95
-- * Generic protocol functionality
96

    
97
-- | Result of receiving a message from the socket.
98
data RecvResult = RecvConnClosed    -- ^ Connection closed
99
                | RecvError String  -- ^ Any other error
100
                | RecvOk String     -- ^ Successfull receive
101
                  deriving (Show, Eq)
102

    
103
-- | Currently supported Luxi operations and JSON serialization.
104
$(genLuxiOp "LuxiOp"
105
  [ (luxiReqQuery,
106
    [ simpleField "what"    [t| Qlang.ItemType |]
107
    , simpleField "fields"  [t| [String]  |]
108
    , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
109
    ])
110
  , (luxiReqQueryFields,
111
    [ simpleField "what"    [t| Qlang.ItemType |]
112
    , simpleField "fields"  [t| [String]  |]
113
    ])
114
  , (luxiReqQueryNodes,
115
     [ simpleField "names"  [t| [String] |]
116
     , simpleField "fields" [t| [String] |]
117
     , simpleField "lock"   [t| Bool     |]
118
     ])
119
  , (luxiReqQueryGroups,
120
     [ simpleField "names"  [t| [String] |]
121
     , simpleField "fields" [t| [String] |]
122
     , simpleField "lock"   [t| Bool     |]
123
     ])
124
  , (luxiReqQueryNetworks,
125
     [ simpleField "names"  [t| [String] |]
126
     , simpleField "fields" [t| [String] |]
127
     , simpleField "lock"   [t| Bool     |]
128
     ])
129
  , (luxiReqQueryInstances,
130
     [ simpleField "names"  [t| [String] |]
131
     , simpleField "fields" [t| [String] |]
132
     , simpleField "lock"   [t| Bool     |]
133
     ])
134
  , (luxiReqQueryJobs,
135
     [ simpleField "ids"    [t| [JobId]  |]
136
     , simpleField "fields" [t| [String] |]
137
     ])
138
  , (luxiReqQueryExports,
139
     [ simpleField "nodes" [t| [String] |]
140
     , simpleField "lock"  [t| Bool     |]
141
     ])
142
  , (luxiReqQueryConfigValues,
143
     [ simpleField "fields" [t| [String] |] ]
144
    )
145
  , (luxiReqQueryClusterInfo, [])
146
  , (luxiReqQueryTags,
147
     [ pTagsObject ])
148
  , (luxiReqSubmitJob,
149
     [ simpleField "job" [t| [MetaOpCode] |] ]
150
    )
151
  , (luxiReqSubmitManyJobs,
152
     [ simpleField "ops" [t| [[MetaOpCode]] |] ]
153
    )
154
  , (luxiReqWaitForJobChange,
155
     [ simpleField "job"      [t| JobId   |]
156
     , simpleField "fields"   [t| [String]|]
157
     , simpleField "prev_job" [t| JSValue |]
158
     , simpleField "prev_log" [t| JSValue |]
159
     , simpleField "tmout"    [t| Int     |]
160
     ])
161
  , (luxiReqArchiveJob,
162
     [ simpleField "job" [t| JobId |] ]
163
    )
164
  , (luxiReqAutoArchiveJobs,
165
     [ simpleField "age"   [t| Int |]
166
     , simpleField "tmout" [t| Int |]
167
     ])
168
  , (luxiReqCancelJob,
169
     [ simpleField "job" [t| JobId |] ]
170
    )
171
  , (luxiReqChangeJobPriority,
172
     [ simpleField "job"      [t| JobId |]
173
     , simpleField "priority" [t| Int |] ]
174
    )
175
  , (luxiReqSetDrainFlag,
176
     [ simpleField "flag" [t| Bool |] ]
177
    )
178
  , (luxiReqSetWatcherPause,
179
     [ simpleField "duration" [t| Double |] ]
180
    )
181
  ])
182

    
183
$(makeJSONInstance ''LuxiReq)
184

    
185
-- | List of all defined Luxi calls.
186
$(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
187

    
188
-- | The serialisation of LuxiOps into strings in messages.
189
$(genStrOfOp ''LuxiOp "strOfOp")
190

    
191
-- | Type holding the initial (unparsed) Luxi call.
192
data LuxiCall = LuxiCall LuxiReq JSValue
193

    
194
-- | The end-of-message separator.
195
eOM :: Word8
196
eOM = 3
197

    
198
-- | The end-of-message encoded as a ByteString.
199
bEOM :: B.ByteString
200
bEOM = B.singleton eOM
201

    
202
-- | Valid keys in the requests and responses.
203
data MsgKeys = Method
204
             | Args
205
             | Success
206
             | Result
207

    
208
-- | The serialisation of MsgKeys into strings in messages.
209
$(genStrOfKey ''MsgKeys "strOfKey")
210

    
211
-- | Luxi client encapsulation.
212
data Client = Client { socket :: Handle           -- ^ The socket of the client
213
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
214
                     }
215

    
216
-- | Connects to the master daemon and returns a luxi Client.
217
getClient :: String -> IO Client
218
getClient path = do
219
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
220
  withTimeout luxiDefCtmo "creating luxi connection" $
221
              S.connect s (S.SockAddrUnix path)
222
  rf <- newIORef B.empty
223
  h <- S.socketToHandle s ReadWriteMode
224
  return Client { socket=h, rbuf=rf }
225

    
226
-- | Creates and returns a server endpoint.
227
getServer :: Bool -> FilePath -> IO S.Socket
228
getServer setOwner path = do
229
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
230
  S.bindSocket s (S.SockAddrUnix path)
231
  when setOwner . setOwnerAndGroupFromNames path GanetiLuxid $
232
    ExtraGroup DaemonsGroup
233
  S.listen s 5 -- 5 is the max backlog
234
  return s
235

    
236
-- | Closes a server endpoint.
237
-- FIXME: this should be encapsulated into a nicer type.
238
closeServer :: FilePath -> S.Socket -> IO ()
239
closeServer path sock = do
240
  S.sClose sock
241
  removeFile path
242

    
243
-- | Accepts a client
244
acceptClient :: S.Socket -> IO Client
245
acceptClient s = do
246
  -- second return is the address of the client, which we ignore here
247
  (client_socket, _) <- S.accept s
248
  new_buffer <- newIORef B.empty
249
  handle <- S.socketToHandle client_socket ReadWriteMode
250
  return Client { socket=handle, rbuf=new_buffer }
251

    
252
-- | Closes the client socket.
253
closeClient :: Client -> IO ()
254
closeClient = hClose . socket
255

    
256
-- | Sends a message over a luxi transport.
257
sendMsg :: Client -> String -> IO ()
258
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
259
  let encoded = UTF8L.fromString buf
260
      handle = socket s
261
  BL.hPut handle encoded
262
  B.hPut handle bEOM
263
  hFlush handle
264

    
265
-- | Given a current buffer and the handle, it will read from the
266
-- network until we get a full message, and it will return that
267
-- message and the leftover buffer contents.
268
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
269
recvUpdate handle obuf = do
270
  nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
271
            _ <- hWaitForInput handle (-1)
272
            B.hGetNonBlocking handle 4096
273
  let (msg, remaining) = B.break (eOM ==) nbuf
274
      newbuf = B.append obuf msg
275
  if B.null remaining
276
    then recvUpdate handle newbuf
277
    else return (newbuf, B.tail remaining)
278

    
279
-- | Waits for a message over a luxi transport.
280
recvMsg :: Client -> IO String
281
recvMsg s = do
282
  cbuf <- readIORef $ rbuf s
283
  let (imsg, ibuf) = B.break (eOM ==) cbuf
284
  (msg, nbuf) <-
285
    if B.null ibuf      -- if old buffer didn't contain a full message
286
      then recvUpdate (socket s) cbuf   -- then we read from network
287
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
288
  writeIORef (rbuf s) nbuf
289
  return $ UTF8.toString msg
290

    
291
-- | Extended wrapper over recvMsg.
292
recvMsgExt :: Client -> IO RecvResult
293
recvMsgExt s =
294
  Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
295
    return $ if isEOFError e
296
               then RecvConnClosed
297
               else RecvError (show e)
298

    
299
-- | Serialize a request to String.
300
buildCall :: LuxiOp  -- ^ The method
301
          -> String  -- ^ The serialized form
302
buildCall lo =
303
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
304
           , (strOfKey Args, opToArgs lo)
305
           ]
306
      jo = toJSObject ja
307
  in encodeStrict jo
308

    
309
-- | Serialize the response to String.
310
buildResponse :: Bool    -- ^ Success
311
              -> JSValue -- ^ The arguments
312
              -> String  -- ^ The serialized form
313
buildResponse success args =
314
  let ja = [ (strOfKey Success, JSBool success)
315
           , (strOfKey Result, args)]
316
      jo = toJSObject ja
317
  in encodeStrict jo
318

    
319
-- | Check that luxi request contains the required keys and parse it.
320
validateCall :: String -> Result LuxiCall
321
validateCall s = do
322
  arr <- fromJResult "parsing top-level luxi message" $
323
         decodeStrict s::Result (JSObject JSValue)
324
  let aobj = fromJSObject arr
325
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
326
  args <- fromObj aobj (strOfKey Args)
327
  return (LuxiCall call args)
328

    
329
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
330
--
331
-- This is currently hand-coded until we make it more uniform so that
332
-- it can be generated using TH.
333
decodeCall :: LuxiCall -> Result LuxiOp
334
decodeCall (LuxiCall call args) =
335
  case call of
336
    ReqQueryJobs -> do
337
              (jids, jargs) <- fromJVal args
338
              jids' <- case jids of
339
                         JSNull -> return []
340
                         _ -> fromJVal jids
341
              return $ QueryJobs jids' jargs
342
    ReqQueryInstances -> do
343
              (names, fields, locking) <- fromJVal args
344
              return $ QueryInstances names fields locking
345
    ReqQueryNodes -> do
346
              (names, fields, locking) <- fromJVal args
347
              return $ QueryNodes names fields locking
348
    ReqQueryGroups -> do
349
              (names, fields, locking) <- fromJVal args
350
              return $ QueryGroups names fields locking
351
    ReqQueryClusterInfo ->
352
              return QueryClusterInfo
353
    ReqQueryNetworks -> do
354
              (names, fields, locking) <- fromJVal args
355
              return $ QueryNetworks names fields locking
356
    ReqQuery -> do
357
              (what, fields, qfilter) <- fromJVal args
358
              return $ Query what fields qfilter
359
    ReqQueryFields -> do
360
              (what, fields) <- fromJVal args
361
              fields' <- case fields of
362
                           JSNull -> return []
363
                           _ -> fromJVal fields
364
              return $ QueryFields what fields'
365
    ReqSubmitJob -> do
366
              [ops1] <- fromJVal args
367
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
368
              return $ SubmitJob ops2
369
    ReqSubmitManyJobs -> do
370
              [ops1] <- fromJVal args
371
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
372
              return $ SubmitManyJobs ops2
373
    ReqWaitForJobChange -> do
374
              (jid, fields, pinfo, pidx, wtmout) <-
375
                -- No instance for 5-tuple, code copied from the
376
                -- json sources and adapted
377
                fromJResult "Parsing WaitForJobChange message" $
378
                case args of
379
                  JSArray [a, b, c, d, e] ->
380
                    (,,,,) `fmap`
381
                    J.readJSON a `ap`
382
                    J.readJSON b `ap`
383
                    J.readJSON c `ap`
384
                    J.readJSON d `ap`
385
                    J.readJSON e
386
                  _ -> J.Error "Not enough values"
387
              return $ WaitForJobChange jid fields pinfo pidx wtmout
388
    ReqArchiveJob -> do
389
              [jid] <- fromJVal args
390
              return $ ArchiveJob jid
391
    ReqAutoArchiveJobs -> do
392
              (age, tmout) <- fromJVal args
393
              return $ AutoArchiveJobs age tmout
394
    ReqQueryExports -> do
395
              (nodes, lock) <- fromJVal args
396
              return $ QueryExports nodes lock
397
    ReqQueryConfigValues -> do
398
              [fields] <- fromJVal args
399
              return $ QueryConfigValues fields
400
    ReqQueryTags -> do
401
              (kind, name) <- fromJVal args
402
              item <- tagObjectFrom kind name
403
              return $ QueryTags item
404
    ReqCancelJob -> do
405
              [jid] <- fromJVal args
406
              return $ CancelJob jid
407
    ReqChangeJobPriority -> do
408
              (jid, priority) <- fromJVal args
409
              return $ ChangeJobPriority jid priority
410
    ReqSetDrainFlag -> do
411
              [flag] <- fromJVal args
412
              return $ SetDrainFlag flag
413
    ReqSetWatcherPause -> do
414
              [duration] <- fromJVal args
415
              return $ SetWatcherPause duration
416

    
417
-- | Check that luxi responses contain the required keys and that the
418
-- call was successful.
419
validateResult :: String -> ErrorResult JSValue
420
validateResult s = do
421
  when (UTF8.replacement_char `elem` s) $
422
       fail "Failed to decode UTF-8, detected replacement char after decoding"
423
  oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
424
  let arr = J.fromJSObject oarr
425
  status <- fromObj arr (strOfKey Success)
426
  result <- fromObj arr (strOfKey Result)
427
  if status
428
    then return result
429
    else decodeError result
430

    
431
-- | Try to decode an error from the server response. This function
432
-- will always fail, since it's called only on the error path (when
433
-- status is False).
434
decodeError :: JSValue -> ErrorResult JSValue
435
decodeError val =
436
  case fromJVal val of
437
    Ok e -> Bad e
438
    Bad msg -> Bad $ GenericError msg
439

    
440
-- | Generic luxi method call.
441
callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
442
callMethod method s = do
443
  sendMsg s $ buildCall method
444
  result <- recvMsg s
445
  let rval = validateResult result
446
  return rval
447

    
448
-- | Parse job submission result.
449
parseSubmitJobResult :: JSValue -> ErrorResult JobId
450
parseSubmitJobResult (JSArray [JSBool True, v]) =
451
  case J.readJSON v of
452
    J.Error msg -> Bad $ LuxiError msg
453
    J.Ok v' -> Ok v'
454
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
455
  Bad . LuxiError $ fromJSString x
456
parseSubmitJobResult v =
457
  Bad . LuxiError $ "Unknown result from the master daemon: " ++
458
      show (pp_value v)
459

    
460
-- | Specialized submitManyJobs call.
461
submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
462
submitManyJobs s jobs = do
463
  rval <- callMethod (SubmitManyJobs jobs) s
464
  -- map each result (status, payload) pair into a nice Result ADT
465
  return $ case rval of
466
             Bad x -> Bad x
467
             Ok (JSArray r) -> mapM parseSubmitJobResult r
468
             x -> Bad . LuxiError $
469
                  "Cannot parse response from Ganeti: " ++ show x
470

    
471
-- | Custom queryJobs call.
472
queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
473
queryJobsStatus s jids = do
474
  rval <- callMethod (QueryJobs jids ["status"]) s
475
  return $ case rval of
476
             Bad x -> Bad x
477
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
478
                       J.Ok vals -> if any null vals
479
                                    then Bad $
480
                                         LuxiError "Missing job status field"
481
                                    else Ok (map head vals)
482
                       J.Error x -> Bad $ LuxiError x