Statistics
| Branch: | Tag: | Revision:

root / htools / Ganeti / Luxi.hs @ 62377cf5

History | View | Annotate | Download (14.5 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the Ganeti LUXI interface.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Luxi
29
  ( LuxiOp(..)
30
  , LuxiReq(..)
31
  , Client
32
  , JobId
33
  , RecvResult(..)
34
  , TagObject(..)
35
  , strOfOp
36
  , getClient
37
  , getServer
38
  , acceptClient
39
  , closeClient
40
  , closeServer
41
  , callMethod
42
  , submitManyJobs
43
  , queryJobsStatus
44
  , buildCall
45
  , buildResponse
46
  , validateCall
47
  , decodeCall
48
  , recvMsg
49
  , recvMsgExt
50
  , sendMsg
51
  ) where
52

    
53
import Control.Exception (catch)
54
import Data.IORef
55
import Data.Ratio (numerator, denominator)
56
import qualified Data.ByteString as B
57
import qualified Data.ByteString.UTF8 as UTF8
58
import Data.Word (Word8)
59
import Control.Monad
60
import Prelude hiding (catch)
61
import Text.JSON (encodeStrict, decodeStrict)
62
import qualified Text.JSON as J
63
import Text.JSON.Types
64
import System.Directory (removeFile)
65
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
66
import System.IO.Error (isEOFError)
67
import System.Timeout
68
import qualified Network.Socket as S
69

    
70
import Ganeti.HTools.JSON
71
import Ganeti.HTools.Types
72
import Ganeti.HTools.Utils
73

    
74
import Ganeti.Constants
75
import Ganeti.Jobs (JobStatus)
76
import Ganeti.OpCodes (OpCode)
77
import qualified Ganeti.Qlang as Qlang
78
import Ganeti.THH
79

    
80
-- * Utility functions
81

    
82
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
83
withTimeout :: Int -> String -> IO a -> IO a
84
withTimeout secs descr action = do
85
  result <- timeout (secs * 1000000) action
86
  case result of
87
    Nothing -> fail $ "Timeout in " ++ descr
88
    Just v -> return v
89

    
90
-- * Generic protocol functionality
91

    
92
-- | Result of receiving a message from the socket.
93
data RecvResult = RecvConnClosed    -- ^ Connection closed
94
                | RecvError String  -- ^ Any other error
95
                | RecvOk String     -- ^ Successfull receive
96
                  deriving (Show, Read, Eq)
97

    
98
-- | The Ganeti job type.
99
type JobId = Int
100

    
101
-- | Data type representing what items do the tag operations apply to.
102
$(declareSADT "TagObject"
103
  [ ("TagInstance", 'tagInstance)
104
  , ("TagNode",     'tagNode)
105
  , ("TagGroup",    'tagNodegroup)
106
  , ("TagCluster",  'tagCluster)
107
  ])
108
$(makeJSONInstance ''TagObject)
109

    
110
-- | Currently supported Luxi operations and JSON serialization.
111
$(genLuxiOp "LuxiOp"
112
  [(luxiReqQuery,
113
    [ ("what",    [t| Qlang.ItemType |])
114
    , ("fields",  [t| [String]  |])
115
    , ("qfilter", [t| Qlang.Filter |])
116
    ])
117
  , (luxiReqQueryNodes,
118
     [ ("names",  [t| [String] |])
119
     , ("fields", [t| [String] |])
120
     , ("lock",   [t| Bool     |])
121
     ])
122
  , (luxiReqQueryGroups,
123
     [ ("names",  [t| [String] |])
124
     , ("fields", [t| [String] |])
125
     , ("lock",   [t| Bool     |])
126
     ])
127
  , (luxiReqQueryInstances,
128
     [ ("names",  [t| [String] |])
129
     , ("fields", [t| [String] |])
130
     , ("lock",   [t| Bool     |])
131
     ])
132
  , (luxiReqQueryJobs,
133
     [ ("ids",    [t| [Int]    |])
134
     , ("fields", [t| [String] |])
135
     ])
136
  , (luxiReqQueryExports,
137
     [ ("nodes", [t| [String] |])
138
     , ("lock",  [t| Bool     |])
139
     ])
140
  , (luxiReqQueryConfigValues,
141
     [ ("fields", [t| [String] |]) ]
142
    )
143
  , (luxiReqQueryClusterInfo, [])
144
  , (luxiReqQueryTags,
145
     [ ("kind", [t| TagObject |])
146
     , ("name", [t| String |])
147
     ])
148
  , (luxiReqSubmitJob,
149
     [ ("job", [t| [OpCode] |]) ]
150
    )
151
  , (luxiReqSubmitManyJobs,
152
     [ ("ops", [t| [[OpCode]] |]) ]
153
    )
154
  , (luxiReqWaitForJobChange,
155
     [ ("job",      [t| Int     |])
156
     , ("fields",   [t| [String]|])
157
     , ("prev_job", [t| JSValue |])
158
     , ("prev_log", [t| JSValue |])
159
     , ("tmout",    [t| Int     |])
160
     ])
161
  , (luxiReqArchiveJob,
162
     [ ("job", [t| Int |]) ]
163
    )
164
  , (luxiReqAutoArchiveJobs,
165
     [ ("age",   [t| Int |])
166
     , ("tmout", [t| Int |])
167
     ])
168
  , (luxiReqCancelJob,
169
     [ ("job", [t| Int |]) ]
170
    )
171
  , (luxiReqSetDrainFlag,
172
     [ ("flag", [t| Bool |]) ]
173
    )
174
  , (luxiReqSetWatcherPause,
175
     [ ("duration", [t| Double |]) ]
176
    )
177
  ])
178

    
179
$(makeJSONInstance ''LuxiReq)
180

    
181
-- | The serialisation of LuxiOps into strings in messages.
182
$(genStrOfOp ''LuxiOp "strOfOp")
183

    
184
-- | Type holding the initial (unparsed) Luxi call.
185
data LuxiCall = LuxiCall LuxiReq JSValue
186

    
187
-- | The end-of-message separator.
188
eOM :: Word8
189
eOM = 3
190

    
191
-- | The end-of-message encoded as a ByteString.
192
bEOM :: B.ByteString
193
bEOM = B.singleton eOM
194

    
195
-- | Valid keys in the requests and responses.
196
data MsgKeys = Method
197
             | Args
198
             | Success
199
             | Result
200

    
201
-- | The serialisation of MsgKeys into strings in messages.
202
$(genStrOfKey ''MsgKeys "strOfKey")
203

    
204
-- | Luxi client encapsulation.
205
data Client = Client { socket :: Handle           -- ^ The socket of the client
206
                     , rbuf :: IORef B.ByteString -- ^ Already received buffer
207
                     }
208

    
209
-- | Connects to the master daemon and returns a luxi Client.
210
getClient :: String -> IO Client
211
getClient path = do
212
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
213
  withTimeout connTimeout "creating luxi connection" $
214
              S.connect s (S.SockAddrUnix path)
215
  rf <- newIORef B.empty
216
  h <- S.socketToHandle s ReadWriteMode
217
  return Client { socket=h, rbuf=rf }
218

    
219
-- | Creates and returns a server endpoint.
220
getServer :: FilePath -> IO S.Socket
221
getServer path = do
222
  s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
223
  S.bindSocket s (S.SockAddrUnix path)
224
  S.listen s 5 -- 5 is the max backlog
225
  return s
226

    
227
-- | Closes a server endpoint.
228
-- FIXME: this should be encapsulated into a nicer type.
229
closeServer :: FilePath -> S.Socket -> IO ()
230
closeServer path sock = do
231
  S.sClose sock
232
  removeFile path
233

    
234
-- | Accepts a client
235
acceptClient :: S.Socket -> IO Client
236
acceptClient s = do
237
  -- second return is the address of the client, which we ignore here
238
  (client_socket, _) <- S.accept s
239
  new_buffer <- newIORef B.empty
240
  handle <- S.socketToHandle client_socket ReadWriteMode
241
  return Client { socket=handle, rbuf=new_buffer }
242

    
243
-- | Closes the client socket.
244
closeClient :: Client -> IO ()
245
closeClient = hClose . socket
246

    
247
-- | Sends a message over a luxi transport.
248
sendMsg :: Client -> String -> IO ()
249
sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
250
  let encoded = UTF8.fromString buf
251
      handle = socket s
252
  B.hPut handle encoded
253
  B.hPut handle bEOM
254
  hFlush handle
255

    
256
-- | Given a current buffer and the handle, it will read from the
257
-- network until we get a full message, and it will return that
258
-- message and the leftover buffer contents.
259
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
260
recvUpdate handle obuf = do
261
  nbuf <- withTimeout queryTimeout "reading luxi response" $ do
262
            _ <- hWaitForInput handle (-1)
263
            B.hGetNonBlocking handle 4096
264
  let (msg, remaining) = B.break (eOM ==) nbuf
265
      newbuf = B.append obuf msg
266
  if B.null remaining
267
    then recvUpdate handle newbuf
268
    else return (newbuf, B.tail remaining)
269

    
270
-- | Waits for a message over a luxi transport.
271
recvMsg :: Client -> IO String
272
recvMsg s = do
273
  cbuf <- readIORef $ rbuf s
274
  let (imsg, ibuf) = B.break (eOM ==) cbuf
275
  (msg, nbuf) <-
276
    if B.null ibuf      -- if old buffer didn't contain a full message
277
      then recvUpdate (socket s) cbuf   -- then we read from network
278
      else return (imsg, B.tail ibuf)   -- else we return data from our buffer
279
  writeIORef (rbuf s) nbuf
280
  return $ UTF8.toString msg
281

    
282
-- | Extended wrapper over recvMsg.
283
recvMsgExt :: Client -> IO RecvResult
284
recvMsgExt s =
285
  catch (liftM RecvOk (recvMsg s)) $ \e ->
286
    if isEOFError e
287
      then return RecvConnClosed
288
      else return $ RecvError (show e)
289

    
290
-- | Serialize a request to String.
291
buildCall :: LuxiOp  -- ^ The method
292
          -> String  -- ^ The serialized form
293
buildCall lo =
294
  let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
295
           , (strOfKey Args, opToArgs lo)
296
           ]
297
      jo = toJSObject ja
298
  in encodeStrict jo
299

    
300
-- | Serialize the response to String.
301
buildResponse :: Bool    -- ^ Success
302
              -> JSValue -- ^ The arguments
303
              -> String  -- ^ The serialized form
304
buildResponse success args =
305
  let ja = [ (strOfKey Success, JSBool success)
306
           , (strOfKey Result, args)]
307
      jo = toJSObject ja
308
  in encodeStrict jo
309

    
310
-- | Check that luxi request contains the required keys and parse it.
311
validateCall :: String -> Result LuxiCall
312
validateCall s = do
313
  arr <- fromJResult "parsing top-level luxi message" $
314
         decodeStrict s::Result (JSObject JSValue)
315
  let aobj = fromJSObject arr
316
  call <- fromObj aobj (strOfKey Method)::Result LuxiReq
317
  args <- fromObj aobj (strOfKey Args)
318
  return (LuxiCall call args)
319

    
320
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
321
--
322
-- This is currently hand-coded until we make it more uniform so that
323
-- it can be generated using TH.
324
decodeCall :: LuxiCall -> Result LuxiOp
325
decodeCall (LuxiCall call args) =
326
  case call of
327
    ReqQueryJobs -> do
328
              (jid, jargs) <- fromJVal args
329
              rid <- mapM parseJobId jid
330
              let rargs = map fromJSString jargs
331
              return $ QueryJobs rid rargs
332
    ReqQueryInstances -> do
333
              (names, fields, locking) <- fromJVal args
334
              return $ QueryInstances names fields locking
335
    ReqQueryNodes -> do
336
              (names, fields, locking) <- fromJVal args
337
              return $ QueryNodes names fields locking
338
    ReqQueryGroups -> do
339
              (names, fields, locking) <- fromJVal args
340
              return $ QueryGroups names fields locking
341
    ReqQueryClusterInfo -> do
342
              return QueryClusterInfo
343
    ReqQuery -> do
344
              (what, fields, qfilter) <- fromJVal args
345
              return $ Query what fields qfilter
346
    ReqSubmitJob -> do
347
              [ops1] <- fromJVal args
348
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
349
              return $ SubmitJob ops2
350
    ReqSubmitManyJobs -> do
351
              [ops1] <- fromJVal args
352
              ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
353
              return $ SubmitManyJobs ops2
354
    ReqWaitForJobChange -> do
355
              (jid, fields, pinfo, pidx, wtmout) <-
356
                -- No instance for 5-tuple, code copied from the
357
                -- json sources and adapted
358
                fromJResult "Parsing WaitForJobChange message" $
359
                case args of
360
                  JSArray [a, b, c, d, e] ->
361
                    (,,,,) `fmap`
362
                    J.readJSON a `ap`
363
                    J.readJSON b `ap`
364
                    J.readJSON c `ap`
365
                    J.readJSON d `ap`
366
                    J.readJSON e
367
                  _ -> J.Error "Not enough values"
368
              rid <- parseJobId jid
369
              return $ WaitForJobChange rid fields pinfo pidx wtmout
370
    ReqArchiveJob -> do
371
              [jid] <- fromJVal args
372
              rid <- parseJobId jid
373
              return $ ArchiveJob rid
374
    ReqAutoArchiveJobs -> do
375
              (age, tmout) <- fromJVal args
376
              return $ AutoArchiveJobs age tmout
377
    ReqQueryExports -> do
378
              (nodes, lock) <- fromJVal args
379
              return $ QueryExports nodes lock
380
    ReqQueryConfigValues -> do
381
              [fields] <- fromJVal args
382
              return $ QueryConfigValues fields
383
    ReqQueryTags -> do
384
              (kind, name) <- fromJVal args
385
              return $ QueryTags kind name
386
    ReqCancelJob -> do
387
              [job] <- fromJVal args
388
              rid <- parseJobId job
389
              return $ CancelJob rid
390
    ReqSetDrainFlag -> do
391
              [flag] <- fromJVal args
392
              return $ SetDrainFlag flag
393
    ReqSetWatcherPause -> do
394
              [duration] <- fromJVal args
395
              return $ SetWatcherPause duration
396

    
397
-- | Check that luxi responses contain the required keys and that the
398
-- call was successful.
399
validateResult :: String -> Result JSValue
400
validateResult s = do
401
  when (UTF8.replacement_char `elem` s) $
402
       fail "Failed to decode UTF-8, detected replacement char after decoding"
403
  oarr <- fromJResult "Parsing LUXI response"
404
          (decodeStrict s)::Result (JSObject JSValue)
405
  let arr = J.fromJSObject oarr
406
  status <- fromObj arr (strOfKey Success)::Result Bool
407
  let rkey = strOfKey Result
408
  if status
409
    then fromObj arr rkey
410
    else fromObj arr rkey >>= fail
411

    
412
-- | Generic luxi method call.
413
callMethod :: LuxiOp -> Client -> IO (Result JSValue)
414
callMethod method s = do
415
  sendMsg s $ buildCall method
416
  result <- recvMsg s
417
  let rval = validateResult result
418
  return rval
419

    
420
-- | Parses a job ID.
421
parseJobId :: JSValue -> Result JobId
422
parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
423
parseJobId (JSRational _ x) =
424
  if denominator x /= 1
425
    then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
426
    -- FIXME: potential integer overflow here on 32-bit platforms
427
    else Ok . fromIntegral . numerator $ x
428
parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
429

    
430
-- | Parse job submission result.
431
parseSubmitJobResult :: JSValue -> Result JobId
432
parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v
433
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
434
  Bad (fromJSString x)
435
parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++
436
                         show v
437

    
438
-- | Specialized submitManyJobs call.
439
submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId])
440
submitManyJobs s jobs = do
441
  rval <- callMethod (SubmitManyJobs jobs) s
442
  -- map each result (status, payload) pair into a nice Result ADT
443
  return $ case rval of
444
             Bad x -> Bad x
445
             Ok (JSArray r) -> mapM parseSubmitJobResult r
446
             x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
447

    
448
-- | Custom queryJobs call.
449
queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
450
queryJobsStatus s jids = do
451
  rval <- callMethod (QueryJobs jids ["status"]) s
452
  return $ case rval of
453
             Bad x -> Bad x
454
             Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
455
                       J.Ok vals -> if any null vals
456
                                    then Bad "Missing job status field"
457
                                    else Ok (map head vals)
458
                       J.Error x -> Bad x