Statistics
| Branch: | Tag: | Revision:

root / htools / Ganeti / Luxi.hs @ 4cab6703

History | View | Annotate | Download (14.9 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , LuxiReq(..)
31
  , Client
32
  , JobId
33
  , RecvResult(..)
34
  , TagObject(..)
35
  , strOfOp
36
  , getClient
37
  , getServer
38
  , acceptClient
39
  , closeClient
40
  , closeServer
41
  , callMethod
42
  , submitManyJobs
43
  , queryJobsStatus
44
  , buildCall
45
  , buildResponse
46
  , validateCall
47
  , decodeCall
48
  , recvMsg
49
  , recvMsgExt
50
  , sendMsg
51
  ) where
52

    
53
import Control.Exception (catch)
54
import Data.IORef
55
import Data.Ratio (numerator, denominator)
56
import qualified Data.ByteString as B
57
import qualified Data.ByteString.UTF8 as UTF8
58
import Data.Word (Word8)
59
import Control.Monad
60
import Prelude hiding (catch)
61
import Text.JSON (encodeStrict, decodeStrict)
62
import qualified Text.JSON as J
63
import Text.JSON.Types
64
import System.Directory (removeFile)
65
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
66
import System.IO.Error (isEOFError)
67
import System.Timeout
68
import qualified Network.Socket as S
69

    
70
import Ganeti.HTools.JSON
71
import Ganeti.HTools.Types
72
import Ganeti.HTools.Utils
73

    
74
import Ganeti.Constants
75
import Ganeti.Jobs (JobStatus)
76
import Ganeti.OpCodes (OpCode)
77
import qualified Ganeti.Query.Language as Qlang
78
import Ganeti.THH
79

    
80
-- * Utility functions
81

    
82
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
83
withTimeout :: Int -> String -> IO a -> IO a
84
withTimeout secs descr action = do
85
  result <- timeout (secs * 1000000) action
86
  case result of
87
    Nothing -> fail $ "Timeout in " ++ descr
88
    Just v -> return v
89

    
90
-- * Generic protocol functionality
91

    
92
-- | Result of receiving a message from the socket.
93
data RecvResult = RecvConnClosed    -- ^ Connection closed
94
                | RecvError String  -- ^ Any other error
95
                | RecvOk String     -- ^ Successfull receive
96
                  deriving (Show, Read, Eq)
97

    
98
-- | The Ganeti job type.
99
type JobId = Int
100

    
101
-- | Data type representing what items do the tag operations apply to.
102
$(declareSADT "TagObject"
103
  [ ("TagInstance", 'tagInstance)
104
  , ("TagNode",     'tagNode)
105
  , ("TagGroup",    'tagNodegroup)
106
  , ("TagCluster",  'tagCluster)
107
  ])
108
$(makeJSONInstance ''TagObject)
109

    
110
-- | Currently supported Luxi operations and JSON serialization.
111
$(genLuxiOp "LuxiOp"
112
  [ (luxiReqQuery,
113
    [ ("what",    [t| Qlang.ItemType |])
114
    , ("fields",  [t| [String]  |])
115
    , ("qfilter", [t| Qlang.Filter Qlang.FilterField |])
116
    ])
117
  , (luxiReqQueryFields,
118
    [ ("what",    [t| Qlang.ItemType |])
119
    , ("fields",  [t| [String]  |])
120
    ])
121
  , (luxiReqQueryNodes,
122
     [ ("names",  [t| [String] |])
123
     , ("fields", [t| [String] |])
124
     , ("lock",   [t| Bool     |])
125
     ])
126
  , (luxiReqQueryGroups,
127
     [ ("names",  [t| [String] |])
128
     , ("fields", [t| [String] |])
129
     , ("lock",   [t| Bool     |])
130
     ])
131
  , (luxiReqQueryInstances,
132
     [ ("names",  [t| [String] |])
133
     , ("fields", [t| [String] |])
134
     , ("lock",   [t| Bool     |])
135
     ])
136
  , (luxiReqQueryJobs,
137
     [ ("ids",    [t| [Int]    |])
138
     , ("fields", [t| [String] |])
139
     ])
140
  , (luxiReqQueryExports,
141
     [ ("nodes", [t| [String] |])
142
     , ("lock",  [t| Bool     |])
143
     ])
144
  , (luxiReqQueryConfigValues,
145
     [ ("fields", [t| [String] |]) ]
146
    )
147
  , (luxiReqQueryClusterInfo, [])
148
  , (luxiReqQueryTags,
149
     [ ("kind", [t| TagObject |])
150
     , ("name", [t| String |])
151
     ])
152
  , (luxiReqSubmitJob,
153
     [ ("job", [t| [OpCode] |]) ]
154
    )
155
  , (luxiReqSubmitManyJobs,
156
     [ ("ops", [t| [[OpCode]] |]) ]
157
    )
158
  , (luxiReqWaitForJobChange,
159
     [ ("job",      [t| Int     |])
160
     , ("fields",   [t| [String]|])
161
     , ("prev_job", [t| JSValue |])
162
     , ("prev_log", [t| JSValue |])
163
     , ("tmout",    [t| Int     |])
164
     ])
165
  , (luxiReqArchiveJob,
166
     [ ("job", [t| Int |]) ]
167
    )
168
  , (luxiReqAutoArchiveJobs,
169
     [ ("age",   [t| Int |])
170
     , ("tmout", [t| Int |])
171
     ])
172
  , (luxiReqCancelJob,
173
     [ ("job", [t| Int |]) ]
174
    )
175
  , (luxiReqSetDrainFlag,
176
     [ ("flag", [t| Bool |]) ]
177
    )
178
  , (luxiReqSetWatcherPause,
179
     [ ("duration", [t| Double |]) ]
180
    )
181
  ])
182

    
183
$(makeJSONInstance ''LuxiReq)
184

    
185
-- | The serialisation of LuxiOps into strings in messages.
186
$(genStrOfOp ''LuxiOp "strOfOp")
187

    
188
-- | Type holding the initial (unparsed) Luxi call.
189
data LuxiCall = LuxiCall LuxiReq JSValue
190

    
191
-- | The end-of-message separator.
192
eOM :: Word8
193
eOM = 3
194

    
195
-- | The end-of-message encoded as a ByteString.
196
bEOM :: B.ByteString
197
bEOM = B.singleton eOM
198

    
199
-- | Valid keys in the requests and responses.
200
data MsgKeys = Method
201
             | Args
202
             | Success
203
             | Result
204

    
205
-- | The serialisation of MsgKeys into strings in messages.
206
$(genStrOfKey ''MsgKeys "strOfKey")
207

    
208
-- | Luxi client encapsulation.
209
data Client = Client { socket :: Handle           -- ^ The socket of the client
210
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
211
                     }
212

    
213
-- | Connects to the master daemon and returns a luxi Client.
214
getClient :: String -> IO Client
215
getClient path = do
216
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
217
  withTimeout connTimeout "creating luxi connection" $
218
              S.connect s (S.SockAddrUnix path)
219
  rf <- newIORef B.empty
220
  h <- S.socketToHandle s ReadWriteMode
221
  return Client { socket=h, rbuf=rf }
222

    
223
-- | Creates and returns a server endpoint.
224
getServer :: FilePath -> IO S.Socket
225
getServer path = do
226
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
227
  S.bindSocket s (S.SockAddrUnix path)
228
  S.listen s 5 -- 5 is the max backlog
229
  return s
230

    
231
-- | Closes a server endpoint.
232
-- FIXME: this should be encapsulated into a nicer type.
233
closeServer :: FilePath -> S.Socket -> IO ()
234
closeServer path sock = do
235
  S.sClose sock
236
  removeFile path
237

    
238
-- | Accepts a client
239
acceptClient :: S.Socket -> IO Client
240
acceptClient s = do
241
  -- second return is the address of the client, which we ignore here
242
  (client_socket, _) <- S.accept s
243
  new_buffer <- newIORef B.empty
244
  handle <- S.socketToHandle client_socket ReadWriteMode
245
  return Client { socket=handle, rbuf=new_buffer }
246

    
247
-- | Closes the client socket.
248
closeClient :: Client -> IO ()
249
closeClient = hClose . socket
250

    
251
-- | Sends a message over a luxi transport.
252
sendMsg :: Client -> String -> IO ()
253
sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
254
  let encoded = UTF8.fromString buf
255
      handle = socket s
256
  B.hPut handle encoded
257
  B.hPut handle bEOM
258
  hFlush handle
259

    
260
-- | Given a current buffer and the handle, it will read from the
261
-- network until we get a full message, and it will return that
262
-- message and the leftover buffer contents.
263
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
264
recvUpdate handle obuf = do
265
  nbuf <- withTimeout queryTimeout "reading luxi response" $ do
266
            _ <- hWaitForInput handle (-1)
267
            B.hGetNonBlocking handle 4096
268
  let (msg, remaining) = B.break (eOM ==) nbuf
269
      newbuf = B.append obuf msg
270
  if B.null remaining
271
    then recvUpdate handle newbuf
272
    else return (newbuf, B.tail remaining)
273

    
274
-- | Waits for a message over a luxi transport.
275
recvMsg :: Client -> IO String
276
recvMsg s = do
277
  cbuf <- readIORef $ rbuf s
278
  let (imsg, ibuf) = B.break (eOM ==) cbuf
279
  (msg, nbuf) <-
280
    if B.null ibuf      -- if old buffer didn't contain a full message
281
      then recvUpdate (socket s) cbuf   -- then we read from network
282
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
283
  writeIORef (rbuf s) nbuf
284
  return $ UTF8.toString msg
285

    
286
-- | Extended wrapper over recvMsg.
287
recvMsgExt :: Client -> IO RecvResult
288
recvMsgExt s =
289
  catch (liftM RecvOk (recvMsg s)) $ \e ->
290
    if isEOFError e
291
      then return RecvConnClosed
292
      else return $ RecvError (show e)
293

    
294
-- | Serialize a request to String.
295
buildCall :: LuxiOp  -- ^ The method
296
          -> String  -- ^ The serialized form
297
buildCall lo =
298
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
299
           , (strOfKey Args, opToArgs lo)
300
           ]
301
      jo = toJSObject ja
302
  in encodeStrict jo
303

    
304
-- | Serialize the response to String.
305
buildResponse :: Bool    -- ^ Success
306
              -> JSValue -- ^ The arguments
307
              -> String  -- ^ The serialized form
308
buildResponse success args =
309
  let ja = [ (strOfKey Success, JSBool success)
310
           , (strOfKey Result, args)]
311
      jo = toJSObject ja
312
  in encodeStrict jo
313

    
314
-- | Check that luxi request contains the required keys and parse it.
315
validateCall :: String -> Result LuxiCall
316
validateCall s = do
317
  arr <- fromJResult "parsing top-level luxi message" $
318
         decodeStrict s::Result (JSObject JSValue)
319
  let aobj = fromJSObject arr
320
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
321
  args <- fromObj aobj (strOfKey Args)
322
  return (LuxiCall call args)
323

    
324
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
325
--
326
-- This is currently hand-coded until we make it more uniform so that
327
-- it can be generated using TH.
328
decodeCall :: LuxiCall -> Result LuxiOp
329
decodeCall (LuxiCall call args) =
330
  case call of
331
    ReqQueryJobs -> do
332
              (jid, jargs) <- fromJVal args
333
              rid <- mapM parseJobId jid
334
              let rargs = map fromJSString jargs
335
              return $ QueryJobs rid rargs
336
    ReqQueryInstances -> do
337
              (names, fields, locking) <- fromJVal args
338
              return $ QueryInstances names fields locking
339
    ReqQueryNodes -> do
340
              (names, fields, locking) <- fromJVal args
341
              return $ QueryNodes names fields locking
342
    ReqQueryGroups -> do
343
              (names, fields, locking) <- fromJVal args
344
              return $ QueryGroups names fields locking
345
    ReqQueryClusterInfo -> do
346
              return QueryClusterInfo
347
    ReqQuery -> do
348
              (what, fields, qfilter) <- fromJVal args
349
              return $ Query what fields qfilter
350
    ReqQueryFields -> do
351
              (what, fields) <- fromJVal args
352
              fields' <- case fields of
353
                           JSNull -> return []
354
                           _ -> fromJVal fields
355
              return $ QueryFields what fields'
356
    ReqSubmitJob -> do
357
              [ops1] <- fromJVal args
358
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
359
              return $ SubmitJob ops2
360
    ReqSubmitManyJobs -> do
361
              [ops1] <- fromJVal args
362
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
363
              return $ SubmitManyJobs ops2
364
    ReqWaitForJobChange -> do
365
              (jid, fields, pinfo, pidx, wtmout) <-
366
                -- No instance for 5-tuple, code copied from the
367
                -- json sources and adapted
368
                fromJResult "Parsing WaitForJobChange message" $
369
                case args of
370
                  JSArray [a, b, c, d, e] ->
371
                    (,,,,) `fmap`
372
                    J.readJSON a `ap`
373
                    J.readJSON b `ap`
374
                    J.readJSON c `ap`
375
                    J.readJSON d `ap`
376
                    J.readJSON e
377
                  _ -> J.Error "Not enough values"
378
              rid <- parseJobId jid
379
              return $ WaitForJobChange rid fields pinfo pidx wtmout
380
    ReqArchiveJob -> do
381
              [jid] <- fromJVal args
382
              rid <- parseJobId jid
383
              return $ ArchiveJob rid
384
    ReqAutoArchiveJobs -> do
385
              (age, tmout) <- fromJVal args
386
              return $ AutoArchiveJobs age tmout
387
    ReqQueryExports -> do
388
              (nodes, lock) <- fromJVal args
389
              return $ QueryExports nodes lock
390
    ReqQueryConfigValues -> do
391
              [fields] <- fromJVal args
392
              return $ QueryConfigValues fields
393
    ReqQueryTags -> do
394
              (kind, name) <- fromJVal args
395
              return $ QueryTags kind name
396
    ReqCancelJob -> do
397
              [job] <- fromJVal args
398
              rid <- parseJobId job
399
              return $ CancelJob rid
400
    ReqSetDrainFlag -> do
401
              [flag] <- fromJVal args
402
              return $ SetDrainFlag flag
403
    ReqSetWatcherPause -> do
404
              [duration] <- fromJVal args
405
              return $ SetWatcherPause duration
406

    
407
-- | Check that luxi responses contain the required keys and that the
408
-- call was successful.
409
validateResult :: String -> Result JSValue
410
validateResult s = do
411
  when (UTF8.replacement_char `elem` s) $
412
       fail "Failed to decode UTF-8, detected replacement char after decoding"
413
  oarr <- fromJResult "Parsing LUXI response"
414
          (decodeStrict s)::Result (JSObject JSValue)
415
  let arr = J.fromJSObject oarr
416
  status <- fromObj arr (strOfKey Success)::Result Bool
417
  let rkey = strOfKey Result
418
  if status
419
    then fromObj arr rkey
420
    else fromObj arr rkey >>= fail
421

    
422
-- | Generic luxi method call.
423
callMethod :: LuxiOp -> Client -> IO (Result JSValue)
424
callMethod method s = do
425
  sendMsg s $ buildCall method
426
  result <- recvMsg s
427
  let rval = validateResult result
428
  return rval
429

    
430
-- | Parses a job ID.
431
parseJobId :: JSValue -> Result JobId
432
parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
433
parseJobId (JSRational _ x) =
434
  if denominator x /= 1
435
    then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
436
    -- FIXME: potential integer overflow here on 32-bit platforms
437
    else Ok . fromIntegral . numerator $ x
438
parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
439

    
440
-- | Parse job submission result.
441
parseSubmitJobResult :: JSValue -> Result JobId
442
parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v
443
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
444
  Bad (fromJSString x)
445
parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++
446
                         show v
447

    
448
-- | Specialized submitManyJobs call.
449
submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId])
450
submitManyJobs s jobs = do
451
  rval <- callMethod (SubmitManyJobs jobs) s
452
  -- map each result (status, payload) pair into a nice Result ADT
453
  return $ case rval of
454
             Bad x -> Bad x
455
             Ok (JSArray r) -> mapM parseSubmitJobResult r
456
             x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
457

    
458
-- | Custom queryJobs call.
459
queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
460
queryJobsStatus s jids = do
461
  rval <- callMethod (QueryJobs jids ["status"]) s
462
  return $ case rval of
463
             Bad x -> Bad x
464
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
465
                       J.Ok vals -> if any null vals
466
                                    then Bad "Missing job status field"
467
                                    else Ok (map head vals)
468
                       J.Error x -> Bad x