1 {-# LANGUAGE TemplateHaskell #-}
3 {-| Implementation of the Ganeti LUXI interface.
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
54 import Control.Exception (catch)
56 import Data.Ratio (numerator, denominator)
57 import qualified Data.ByteString as B
58 import qualified Data.ByteString.UTF8 as UTF8
59 import Data.Word (Word8)
61 import Prelude hiding (catch)
62 import Text.JSON (encodeStrict, decodeStrict)
63 import qualified Text.JSON as J
64 import Text.JSON.Pretty (pp_value)
65 import Text.JSON.Types
66 import System.Directory (removeFile)
67 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
68 import System.IO.Error (isEOFError)
70 import qualified Network.Socket as S
72 import Ganeti.BasicTypes
73 import Ganeti.Constants
76 import Ganeti.Jobs (JobStatus)
77 import Ganeti.OpCodes (OpCode)
79 import qualified Ganeti.Query.Language as Qlang
82 -- * Utility functions
84 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
85 withTimeout :: Int -> String -> IO a -> IO a
86 withTimeout secs descr action = do
87 result <- timeout (secs * 1000000) action
89 Nothing -> fail $ "Timeout in " ++ descr
92 -- * Generic protocol functionality
94 -- | Result of receiving a message from the socket.
95 data RecvResult = RecvConnClosed -- ^ Connection closed
96 | RecvError String -- ^ Any other error
97 | RecvOk String -- ^ Successfull receive
98 deriving (Show, Read, Eq)
100 -- | The Ganeti job type.
103 -- | Data type representing what items do the tag operations apply to.
104 $(declareSADT "TagObject"
105 [ ("TagInstance", 'tagInstance)
106 , ("TagNode", 'tagNode)
107 , ("TagGroup", 'tagNodegroup)
108 , ("TagCluster", 'tagCluster)
110 $(makeJSONInstance ''TagObject)
112 -- | Currently supported Luxi operations and JSON serialization.
115 [ ("what", [t| Qlang.ItemType |])
116 , ("fields", [t| [String] |])
117 , ("qfilter", [t| Qlang.Filter Qlang.FilterField |])
119 , (luxiReqQueryFields,
120 [ ("what", [t| Qlang.ItemType |])
121 , ("fields", [t| [String] |])
123 , (luxiReqQueryNodes,
124 [ ("names", [t| [String] |])
125 , ("fields", [t| [String] |])
126 , ("lock", [t| Bool |])
128 , (luxiReqQueryGroups,
129 [ ("names", [t| [String] |])
130 , ("fields", [t| [String] |])
131 , ("lock", [t| Bool |])
133 , (luxiReqQueryInstances,
134 [ ("names", [t| [String] |])
135 , ("fields", [t| [String] |])
136 , ("lock", [t| Bool |])
139 [ ("ids", [t| [Int] |])
140 , ("fields", [t| [String] |])
142 , (luxiReqQueryExports,
143 [ ("nodes", [t| [String] |])
144 , ("lock", [t| Bool |])
146 , (luxiReqQueryConfigValues,
147 [ ("fields", [t| [String] |]) ]
149 , (luxiReqQueryClusterInfo, [])
151 [ ("kind", [t| TagObject |])
152 , ("name", [t| String |])
155 [ ("job", [t| [OpCode] |]) ]
157 , (luxiReqSubmitManyJobs,
158 [ ("ops", [t| [[OpCode]] |]) ]
160 , (luxiReqWaitForJobChange,
161 [ ("job", [t| Int |])
162 , ("fields", [t| [String]|])
163 , ("prev_job", [t| JSValue |])
164 , ("prev_log", [t| JSValue |])
165 , ("tmout", [t| Int |])
167 , (luxiReqArchiveJob,
168 [ ("job", [t| Int |]) ]
170 , (luxiReqAutoArchiveJobs,
171 [ ("age", [t| Int |])
172 , ("tmout", [t| Int |])
175 [ ("job", [t| Int |]) ]
177 , (luxiReqSetDrainFlag,
178 [ ("flag", [t| Bool |]) ]
180 , (luxiReqSetWatcherPause,
181 [ ("duration", [t| Double |]) ]
185 $(makeJSONInstance ''LuxiReq)
187 -- | List of all defined Luxi calls.
188 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
190 -- | The serialisation of LuxiOps into strings in messages.
191 $(genStrOfOp ''LuxiOp "strOfOp")
193 -- | Type holding the initial (unparsed) Luxi call.
194 data LuxiCall = LuxiCall LuxiReq JSValue
196 -- | The end-of-message separator.
200 -- | The end-of-message encoded as a ByteString.
202 bEOM = B.singleton eOM
204 -- | Valid keys in the requests and responses.
205 data MsgKeys = Method
210 -- | The serialisation of MsgKeys into strings in messages.
211 $(genStrOfKey ''MsgKeys "strOfKey")
213 -- | Luxi client encapsulation.
214 data Client = Client { socket :: Handle -- ^ The socket of the client
215 , rbuf :: IORef B.ByteString -- ^ Already received buffer
218 -- | Connects to the master daemon and returns a luxi Client.
219 getClient :: String -> IO Client
221 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
222 withTimeout luxiDefCtmo "creating luxi connection" $
223 S.connect s (S.SockAddrUnix path)
224 rf <- newIORef B.empty
225 h <- S.socketToHandle s ReadWriteMode
226 return Client { socket=h, rbuf=rf }
228 -- | Creates and returns a server endpoint.
229 getServer :: FilePath -> IO S.Socket
231 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
232 S.bindSocket s (S.SockAddrUnix path)
233 S.listen s 5 -- 5 is the max backlog
236 -- | Closes a server endpoint.
237 -- FIXME: this should be encapsulated into a nicer type.
238 closeServer :: FilePath -> S.Socket -> IO ()
239 closeServer path sock = do
243 -- | Accepts a client
244 acceptClient :: S.Socket -> IO Client
246 -- second return is the address of the client, which we ignore here
247 (client_socket, _) <- S.accept s
248 new_buffer <- newIORef B.empty
249 handle <- S.socketToHandle client_socket ReadWriteMode
250 return Client { socket=handle, rbuf=new_buffer }
252 -- | Closes the client socket.
253 closeClient :: Client -> IO ()
254 closeClient = hClose . socket
256 -- | Sends a message over a luxi transport.
257 sendMsg :: Client -> String -> IO ()
258 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
259 let encoded = UTF8.fromString buf
261 B.hPut handle encoded
265 -- | Given a current buffer and the handle, it will read from the
266 -- network until we get a full message, and it will return that
267 -- message and the leftover buffer contents.
268 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
269 recvUpdate handle obuf = do
270 nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
271 _ <- hWaitForInput handle (-1)
272 B.hGetNonBlocking handle 4096
273 let (msg, remaining) = B.break (eOM ==) nbuf
274 newbuf = B.append obuf msg
276 then recvUpdate handle newbuf
277 else return (newbuf, B.tail remaining)
279 -- | Waits for a message over a luxi transport.
280 recvMsg :: Client -> IO String
282 cbuf <- readIORef $ rbuf s
283 let (imsg, ibuf) = B.break (eOM ==) cbuf
285 if B.null ibuf -- if old buffer didn't contain a full message
286 then recvUpdate (socket s) cbuf -- then we read from network
287 else return (imsg, B.tail ibuf) -- else we return data from our buffer
288 writeIORef (rbuf s) nbuf
289 return $ UTF8.toString msg
291 -- | Extended wrapper over recvMsg.
292 recvMsgExt :: Client -> IO RecvResult
294 catch (liftM RecvOk (recvMsg s)) $ \e ->
295 return $ if isEOFError e
297 else RecvError (show e)
299 -- | Serialize a request to String.
300 buildCall :: LuxiOp -- ^ The method
301 -> String -- ^ The serialized form
303 let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
304 , (strOfKey Args, opToArgs lo)
309 -- | Serialize the response to String.
310 buildResponse :: Bool -- ^ Success
311 -> JSValue -- ^ The arguments
312 -> String -- ^ The serialized form
313 buildResponse success args =
314 let ja = [ (strOfKey Success, JSBool success)
315 , (strOfKey Result, args)]
319 -- | Check that luxi request contains the required keys and parse it.
320 validateCall :: String -> Result LuxiCall
322 arr <- fromJResult "parsing top-level luxi message" $
323 decodeStrict s::Result (JSObject JSValue)
324 let aobj = fromJSObject arr
325 call <- fromObj aobj (strOfKey Method)::Result LuxiReq
326 args <- fromObj aobj (strOfKey Args)
327 return (LuxiCall call args)
329 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
331 -- This is currently hand-coded until we make it more uniform so that
332 -- it can be generated using TH.
333 decodeCall :: LuxiCall -> Result LuxiOp
334 decodeCall (LuxiCall call args) =
337 (jid, jargs) <- fromJVal args
338 rid <- mapM parseJobId jid
339 let rargs = map fromJSString jargs
340 return $ QueryJobs rid rargs
341 ReqQueryInstances -> do
342 (names, fields, locking) <- fromJVal args
343 return $ QueryInstances names fields locking
345 (names, fields, locking) <- fromJVal args
346 return $ QueryNodes names fields locking
348 (names, fields, locking) <- fromJVal args
349 return $ QueryGroups names fields locking
350 ReqQueryClusterInfo ->
351 return QueryClusterInfo
353 (what, fields, qfilter) <- fromJVal args
354 return $ Query what fields qfilter
356 (what, fields) <- fromJVal args
357 fields' <- case fields of
360 return $ QueryFields what fields'
362 [ops1] <- fromJVal args
363 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
364 return $ SubmitJob ops2
365 ReqSubmitManyJobs -> do
366 [ops1] <- fromJVal args
367 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
368 return $ SubmitManyJobs ops2
369 ReqWaitForJobChange -> do
370 (jid, fields, pinfo, pidx, wtmout) <-
371 -- No instance for 5-tuple, code copied from the
372 -- json sources and adapted
373 fromJResult "Parsing WaitForJobChange message" $
375 JSArray [a, b, c, d, e] ->
382 _ -> J.Error "Not enough values"
383 rid <- parseJobId jid
384 return $ WaitForJobChange rid fields pinfo pidx wtmout
386 [jid] <- fromJVal args
387 rid <- parseJobId jid
388 return $ ArchiveJob rid
389 ReqAutoArchiveJobs -> do
390 (age, tmout) <- fromJVal args
391 return $ AutoArchiveJobs age tmout
392 ReqQueryExports -> do
393 (nodes, lock) <- fromJVal args
394 return $ QueryExports nodes lock
395 ReqQueryConfigValues -> do
396 [fields] <- fromJVal args
397 return $ QueryConfigValues fields
399 (kind, name) <- fromJVal args
400 return $ QueryTags kind name
402 [job] <- fromJVal args
403 rid <- parseJobId job
404 return $ CancelJob rid
405 ReqSetDrainFlag -> do
406 [flag] <- fromJVal args
407 return $ SetDrainFlag flag
408 ReqSetWatcherPause -> do
409 [duration] <- fromJVal args
410 return $ SetWatcherPause duration
412 -- | Check that luxi responses contain the required keys and that the
413 -- call was successful.
414 validateResult :: String -> ErrorResult JSValue
415 validateResult s = do
416 when (UTF8.replacement_char `elem` s) $
417 fail "Failed to decode UTF-8, detected replacement char after decoding"
418 oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
419 let arr = J.fromJSObject oarr
420 status <- fromObj arr (strOfKey Success)
421 result <- fromObj arr (strOfKey Result)
424 else decodeError result
426 -- | Try to decode an error from the server response. This function
427 -- will always fail, since it's called only on the error path (when
429 decodeError :: JSValue -> ErrorResult JSValue
433 Bad msg -> Bad $ GenericError msg
435 -- | Generic luxi method call.
436 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
437 callMethod method s = do
438 sendMsg s $ buildCall method
440 let rval = validateResult result
443 -- | Parses a job ID.
444 parseJobId :: JSValue -> Result JobId
445 parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
446 parseJobId (JSRational _ x) =
447 if denominator x /= 1
448 then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
449 -- FIXME: potential integer overflow here on 32-bit platforms
450 else Ok . fromIntegral . numerator $ x
451 parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
453 -- | Parse job submission result.
454 parseSubmitJobResult :: JSValue -> ErrorResult JobId
455 parseSubmitJobResult (JSArray [JSBool True, v]) =
457 Bad msg -> Bad $ LuxiError msg
459 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
460 Bad . LuxiError $ fromJSString x
461 parseSubmitJobResult v =
462 Bad . LuxiError $ "Unknown result from the master daemon: " ++
465 -- | Specialized submitManyJobs call.
466 submitManyJobs :: Client -> [[OpCode]] -> IO (ErrorResult [JobId])
467 submitManyJobs s jobs = do
468 rval <- callMethod (SubmitManyJobs jobs) s
469 -- map each result (status, payload) pair into a nice Result ADT
470 return $ case rval of
472 Ok (JSArray r) -> mapM parseSubmitJobResult r
473 x -> Bad . LuxiError $
474 "Cannot parse response from Ganeti: " ++ show x
476 -- | Custom queryJobs call.
477 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
478 queryJobsStatus s jids = do
479 rval <- callMethod (QueryJobs jids ["status"]) s
480 return $ case rval of
482 Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
483 J.Ok vals -> if any null vals
485 LuxiError "Missing job status field"
486 else Ok (map head vals)
487 J.Error x -> Bad $ LuxiError x