Statistics
| Branch: | Tag: | Revision:

root / htools / Ganeti / Luxi.hs @ 1a865afe

History | View | Annotate | Download (15.8 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , LuxiReq(..)
31
  , Client
32
  , JobId
33
  , RecvResult(..)
34
  , strOfOp
35
  , getClient
36
  , getServer
37
  , acceptClient
38
  , closeClient
39
  , closeServer
40
  , callMethod
41
  , submitManyJobs
42
  , queryJobsStatus
43
  , buildCall
44
  , buildResponse
45
  , validateCall
46
  , decodeCall
47
  , recvMsg
48
  , recvMsgExt
49
  , sendMsg
50
  , allLuxiCalls
51
  ) where
52

    
53
import Control.Exception (catch)
54
import Data.IORef
55
import Data.Ratio (numerator, denominator)
56
import qualified Data.ByteString as B
57
import qualified Data.ByteString.UTF8 as UTF8
58
import Data.Word (Word8)
59
import Control.Monad
60
import Text.JSON (encodeStrict, decodeStrict)
61
import qualified Text.JSON as J
62
import Text.JSON.Pretty (pp_value)
63
import Text.JSON.Types
64
import System.Directory (removeFile)
65
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
66
import System.IO.Error (isEOFError)
67
import System.Timeout
68
import qualified Network.Socket as S
69

    
70
import Ganeti.BasicTypes
71
import Ganeti.Constants
72
import Ganeti.Errors
73
import Ganeti.JSON
74
import Ganeti.Jobs (JobStatus)
75
import Ganeti.OpCodes
76
import Ganeti.Utils
77
import qualified Ganeti.Query.Language as Qlang
78
import Ganeti.THH
79

    
80
-- * Utility functions
81

    
82
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
83
withTimeout :: Int -> String -> IO a -> IO a
84
withTimeout secs descr action = do
85
  result <- timeout (secs * 1000000) action
86
  case result of
87
    Nothing -> fail $ "Timeout in " ++ descr
88
    Just v -> return v
89

    
90
-- * Generic protocol functionality
91

    
92
-- | Result of receiving a message from the socket.
93
data RecvResult = RecvConnClosed    -- ^ Connection closed
94
                | RecvError String  -- ^ Any other error
95
                | RecvOk String     -- ^ Successfull receive
96
                  deriving (Show, Read, Eq)
97

    
98
-- | The Ganeti job type.
99
type JobId = Int
100

    
101
-- | Currently supported Luxi operations and JSON serialization.
102
$(genLuxiOp "LuxiOp"
103
  [ (luxiReqQuery,
104
    [ simpleField "what"    [t| Qlang.ItemType |]
105
    , simpleField "fields"  [t| [String]  |]
106
    , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
107
    ])
108
  , (luxiReqQueryFields,
109
    [ simpleField "what"    [t| Qlang.ItemType |]
110
    , simpleField "fields"  [t| [String]  |]
111
    ])
112
  , (luxiReqQueryNodes,
113
     [ simpleField "names"  [t| [String] |]
114
     , simpleField "fields" [t| [String] |]
115
     , simpleField "lock"   [t| Bool     |]
116
     ])
117
  , (luxiReqQueryGroups,
118
     [ simpleField "names"  [t| [String] |]
119
     , simpleField "fields" [t| [String] |]
120
     , simpleField "lock"   [t| Bool     |]
121
     ])
122
  , (luxiReqQueryInstances,
123
     [ simpleField "names"  [t| [String] |]
124
     , simpleField "fields" [t| [String] |]
125
     , simpleField "lock"   [t| Bool     |]
126
     ])
127
  , (luxiReqQueryJobs,
128
     [ simpleField "ids"    [t| [Int]    |]
129
     , simpleField "fields" [t| [String] |]
130
     ])
131
  , (luxiReqQueryExports,
132
     [ simpleField "nodes" [t| [String] |]
133
     , simpleField "lock"  [t| Bool     |]
134
     ])
135
  , (luxiReqQueryConfigValues,
136
     [ simpleField "fields" [t| [String] |] ]
137
    )
138
  , (luxiReqQueryClusterInfo, [])
139
  , (luxiReqQueryTags,
140
     [ customField 'decodeTagObject 'encodeTagObject $
141
       simpleField "kind" [t| TagObject |]
142
     ])
143
  , (luxiReqSubmitJob,
144
     [ simpleField "job" [t| [OpCode] |] ]
145
    )
146
  , (luxiReqSubmitManyJobs,
147
     [ simpleField "ops" [t| [[OpCode]] |] ]
148
    )
149
  , (luxiReqWaitForJobChange,
150
     [ simpleField "job"      [t| Int     |]
151
     , simpleField "fields"   [t| [String]|]
152
     , simpleField "prev_job" [t| JSValue |]
153
     , simpleField "prev_log" [t| JSValue |]
154
     , simpleField "tmout"    [t| Int     |]
155
     ])
156
  , (luxiReqArchiveJob,
157
     [ simpleField "job" [t| Int |] ]
158
    )
159
  , (luxiReqAutoArchiveJobs,
160
     [ simpleField "age"   [t| Int |]
161
     , simpleField "tmout" [t| Int |]
162
     ])
163
  , (luxiReqCancelJob,
164
     [ simpleField "job" [t| Int |] ]
165
    )
166
  , (luxiReqChangeJobPriority,
167
     [ simpleField "job" [t| Int |]
168
     , simpleField "priority" [t| Int |] ]
169
    )
170
  , (luxiReqSetDrainFlag,
171
     [ simpleField "flag" [t| Bool |] ]
172
    )
173
  , (luxiReqSetWatcherPause,
174
     [ simpleField "duration" [t| Double |] ]
175
    )
176
  ])
177

    
178
$(makeJSONInstance ''LuxiReq)
179

    
180
-- | List of all defined Luxi calls.
181
$(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
182

    
183
-- | The serialisation of LuxiOps into strings in messages.
184
$(genStrOfOp ''LuxiOp "strOfOp")
185

    
186
-- | Type holding the initial (unparsed) Luxi call.
187
data LuxiCall = LuxiCall LuxiReq JSValue
188

    
189
-- | The end-of-message separator.
190
eOM :: Word8
191
eOM = 3
192

    
193
-- | The end-of-message encoded as a ByteString.
194
bEOM :: B.ByteString
195
bEOM = B.singleton eOM
196

    
197
-- | Valid keys in the requests and responses.
198
data MsgKeys = Method
199
             | Args
200
             | Success
201
             | Result
202

    
203
-- | The serialisation of MsgKeys into strings in messages.
204
$(genStrOfKey ''MsgKeys "strOfKey")
205

    
206
-- | Luxi client encapsulation.
207
data Client = Client { socket :: Handle           -- ^ The socket of the client
208
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
209
                     }
210

    
211
-- | Connects to the master daemon and returns a luxi Client.
212
getClient :: String -> IO Client
213
getClient path = do
214
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
215
  withTimeout luxiDefCtmo "creating luxi connection" $
216
              S.connect s (S.SockAddrUnix path)
217
  rf <- newIORef B.empty
218
  h <- S.socketToHandle s ReadWriteMode
219
  return Client { socket=h, rbuf=rf }
220

    
221
-- | Creates and returns a server endpoint.
222
getServer :: FilePath -> IO S.Socket
223
getServer path = do
224
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
225
  S.bindSocket s (S.SockAddrUnix path)
226
  S.listen s 5 -- 5 is the max backlog
227
  return s
228

    
229
-- | Closes a server endpoint.
230
-- FIXME: this should be encapsulated into a nicer type.
231
closeServer :: FilePath -> S.Socket -> IO ()
232
closeServer path sock = do
233
  S.sClose sock
234
  removeFile path
235

    
236
-- | Accepts a client
237
acceptClient :: S.Socket -> IO Client
238
acceptClient s = do
239
  -- second return is the address of the client, which we ignore here
240
  (client_socket, _) <- S.accept s
241
  new_buffer <- newIORef B.empty
242
  handle <- S.socketToHandle client_socket ReadWriteMode
243
  return Client { socket=handle, rbuf=new_buffer }
244

    
245
-- | Closes the client socket.
246
closeClient :: Client -> IO ()
247
closeClient = hClose . socket
248

    
249
-- | Sends a message over a luxi transport.
250
sendMsg :: Client -> String -> IO ()
251
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
252
  let encoded = UTF8.fromString buf
253
      handle = socket s
254
  B.hPut handle encoded
255
  B.hPut handle bEOM
256
  hFlush handle
257

    
258
-- | Given a current buffer and the handle, it will read from the
259
-- network until we get a full message, and it will return that
260
-- message and the leftover buffer contents.
261
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
262
recvUpdate handle obuf = do
263
  nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
264
            _ <- hWaitForInput handle (-1)
265
            B.hGetNonBlocking handle 4096
266
  let (msg, remaining) = B.break (eOM ==) nbuf
267
      newbuf = B.append obuf msg
268
  if B.null remaining
269
    then recvUpdate handle newbuf
270
    else return (newbuf, B.tail remaining)
271

    
272
-- | Waits for a message over a luxi transport.
273
recvMsg :: Client -> IO String
274
recvMsg s = do
275
  cbuf <- readIORef $ rbuf s
276
  let (imsg, ibuf) = B.break (eOM ==) cbuf
277
  (msg, nbuf) <-
278
    if B.null ibuf      -- if old buffer didn't contain a full message
279
      then recvUpdate (socket s) cbuf   -- then we read from network
280
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
281
  writeIORef (rbuf s) nbuf
282
  return $ UTF8.toString msg
283

    
284
-- | Extended wrapper over recvMsg.
285
recvMsgExt :: Client -> IO RecvResult
286
recvMsgExt s =
287
  Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
288
    return $ if isEOFError e
289
               then RecvConnClosed
290
               else RecvError (show e)
291

    
292
-- | Serialize a request to String.
293
buildCall :: LuxiOp  -- ^ The method
294
          -> String  -- ^ The serialized form
295
buildCall lo =
296
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
297
           , (strOfKey Args, opToArgs lo)
298
           ]
299
      jo = toJSObject ja
300
  in encodeStrict jo
301

    
302
-- | Serialize the response to String.
303
buildResponse :: Bool    -- ^ Success
304
              -> JSValue -- ^ The arguments
305
              -> String  -- ^ The serialized form
306
buildResponse success args =
307
  let ja = [ (strOfKey Success, JSBool success)
308
           , (strOfKey Result, args)]
309
      jo = toJSObject ja
310
  in encodeStrict jo
311

    
312
-- | Check that luxi request contains the required keys and parse it.
313
validateCall :: String -> Result LuxiCall
314
validateCall s = do
315
  arr <- fromJResult "parsing top-level luxi message" $
316
         decodeStrict s::Result (JSObject JSValue)
317
  let aobj = fromJSObject arr
318
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
319
  args <- fromObj aobj (strOfKey Args)
320
  return (LuxiCall call args)
321

    
322
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
323
--
324
-- This is currently hand-coded until we make it more uniform so that
325
-- it can be generated using TH.
326
decodeCall :: LuxiCall -> Result LuxiOp
327
decodeCall (LuxiCall call args) =
328
  case call of
329
    ReqQueryJobs -> do
330
              (jid, jargs) <- fromJVal args
331
              rid <- mapM parseJobId jid
332
              let rargs = map fromJSString jargs
333
              return $ QueryJobs rid rargs
334
    ReqQueryInstances -> do
335
              (names, fields, locking) <- fromJVal args
336
              return $ QueryInstances names fields locking
337
    ReqQueryNodes -> do
338
              (names, fields, locking) <- fromJVal args
339
              return $ QueryNodes names fields locking
340
    ReqQueryGroups -> do
341
              (names, fields, locking) <- fromJVal args
342
              return $ QueryGroups names fields locking
343
    ReqQueryClusterInfo ->
344
              return QueryClusterInfo
345
    ReqQuery -> do
346
              (what, fields, qfilter) <- fromJVal args
347
              return $ Query what fields qfilter
348
    ReqQueryFields -> do
349
              (what, fields) <- fromJVal args
350
              fields' <- case fields of
351
                           JSNull -> return []
352
                           _ -> fromJVal fields
353
              return $ QueryFields what fields'
354
    ReqSubmitJob -> do
355
              [ops1] <- fromJVal args
356
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
357
              return $ SubmitJob ops2
358
    ReqSubmitManyJobs -> do
359
              [ops1] <- fromJVal args
360
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
361
              return $ SubmitManyJobs ops2
362
    ReqWaitForJobChange -> do
363
              (jid, fields, pinfo, pidx, wtmout) <-
364
                -- No instance for 5-tuple, code copied from the
365
                -- json sources and adapted
366
                fromJResult "Parsing WaitForJobChange message" $
367
                case args of
368
                  JSArray [a, b, c, d, e] ->
369
                    (,,,,) `fmap`
370
                    J.readJSON a `ap`
371
                    J.readJSON b `ap`
372
                    J.readJSON c `ap`
373
                    J.readJSON d `ap`
374
                    J.readJSON e
375
                  _ -> J.Error "Not enough values"
376
              rid <- parseJobId jid
377
              return $ WaitForJobChange rid fields pinfo pidx wtmout
378
    ReqArchiveJob -> do
379
              [jid] <- fromJVal args
380
              rid <- parseJobId jid
381
              return $ ArchiveJob rid
382
    ReqAutoArchiveJobs -> do
383
              (age, tmout) <- fromJVal args
384
              return $ AutoArchiveJobs age tmout
385
    ReqQueryExports -> do
386
              (nodes, lock) <- fromJVal args
387
              return $ QueryExports nodes lock
388
    ReqQueryConfigValues -> do
389
              [fields] <- fromJVal args
390
              return $ QueryConfigValues fields
391
    ReqQueryTags -> do
392
              (kind, name) <- fromJVal args
393
              item <- tagObjectFrom kind name
394
              return $ QueryTags item
395
    ReqCancelJob -> do
396
              [job] <- fromJVal args
397
              rid <- parseJobId job
398
              return $ CancelJob rid
399
    ReqChangeJobPriority -> do
400
              (job, priority) <- fromJVal args
401
              rid <- parseJobId job
402
              return $ ChangeJobPriority rid priority
403
    ReqSetDrainFlag -> do
404
              [flag] <- fromJVal args
405
              return $ SetDrainFlag flag
406
    ReqSetWatcherPause -> do
407
              [duration] <- fromJVal args
408
              return $ SetWatcherPause duration
409

    
410
-- | Check that luxi responses contain the required keys and that the
411
-- call was successful.
412
validateResult :: String -> ErrorResult JSValue
413
validateResult s = do
414
  when (UTF8.replacement_char `elem` s) $
415
       fail "Failed to decode UTF-8, detected replacement char after decoding"
416
  oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
417
  let arr = J.fromJSObject oarr
418
  status <- fromObj arr (strOfKey Success)
419
  result <- fromObj arr (strOfKey Result)
420
  if status
421
    then return result
422
    else decodeError result
423

    
424
-- | Try to decode an error from the server response. This function
425
-- will always fail, since it's called only on the error path (when
426
-- status is False).
427
decodeError :: JSValue -> ErrorResult JSValue
428
decodeError val =
429
  case fromJVal val of
430
    Ok e -> Bad e
431
    Bad msg -> Bad $ GenericError msg
432

    
433
-- | Generic luxi method call.
434
callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
435
callMethod method s = do
436
  sendMsg s $ buildCall method
437
  result <- recvMsg s
438
  let rval = validateResult result
439
  return rval
440

    
441
-- | Parses a job ID.
442
parseJobId :: JSValue -> Result JobId
443
parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
444
parseJobId (JSRational _ x) =
445
  if denominator x /= 1
446
    then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
447
    -- FIXME: potential integer overflow here on 32-bit platforms
448
    else Ok . fromIntegral . numerator $ x
449
parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
450

    
451
-- | Parse job submission result.
452
parseSubmitJobResult :: JSValue -> ErrorResult JobId
453
parseSubmitJobResult (JSArray [JSBool True, v]) =
454
  case parseJobId v of
455
    Bad msg -> Bad $ LuxiError msg
456
    Ok v' -> Ok v'
457
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
458
  Bad . LuxiError $ fromJSString x
459
parseSubmitJobResult v =
460
  Bad . LuxiError $ "Unknown result from the master daemon: " ++
461
      show (pp_value v)
462

    
463
-- | Specialized submitManyJobs call.
464
submitManyJobs :: Client -> [[OpCode]] -> IO (ErrorResult [JobId])
465
submitManyJobs s jobs = do
466
  rval <- callMethod (SubmitManyJobs jobs) s
467
  -- map each result (status, payload) pair into a nice Result ADT
468
  return $ case rval of
469
             Bad x -> Bad x
470
             Ok (JSArray r) -> mapM parseSubmitJobResult r
471
             x -> Bad . LuxiError $
472
                  "Cannot parse response from Ganeti: " ++ show x
473

    
474
-- | Custom queryJobs call.
475
queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
476
queryJobsStatus s jids = do
477
  rval <- callMethod (QueryJobs jids ["status"]) s
478
  return $ case rval of
479
             Bad x -> Bad x
480
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
481
                       J.Ok vals -> if any null vals
482
                                    then Bad $
483
                                         LuxiError "Missing job status field"
484
                                    else Ok (map head vals)
485
                       J.Error x -> Bad $ LuxiError x