1 {-# LANGUAGE TemplateHaskell #-}
3 {-| Implementation of the Ganeti LUXI interface.
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
55 import Control.Exception (catch)
57 import qualified Data.ByteString as B
58 import qualified Data.ByteString.Lazy as BL
59 import qualified Data.ByteString.UTF8 as UTF8
60 import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61 import Data.Word (Word8)
63 import Text.JSON (encodeStrict, decodeStrict)
64 import qualified Text.JSON as J
65 import Text.JSON.Pretty (pp_value)
66 import Text.JSON.Types
67 import System.Directory (removeFile)
68 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69 import System.IO.Error (isEOFError)
71 import qualified Network.Socket as S
73 import Ganeti.BasicTypes
74 import Ganeti.Constants
77 import Ganeti.OpParams (pTagsObject)
79 import qualified Ganeti.Query.Language as Qlang
83 -- * Utility functions
85 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
86 withTimeout :: Int -> String -> IO a -> IO a
87 withTimeout secs descr action = do
88 result <- timeout (secs * 1000000) action
90 Nothing -> fail $ "Timeout in " ++ descr
93 -- * Generic protocol functionality
95 -- | Result of receiving a message from the socket.
96 data RecvResult = RecvConnClosed -- ^ Connection closed
97 | RecvError String -- ^ Any other error
98 | RecvOk String -- ^ Successfull receive
101 -- | Currently supported Luxi operations and JSON serialization.
104 [ simpleField "what" [t| Qlang.ItemType |]
105 , simpleField "fields" [t| [String] |]
106 , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
108 , (luxiReqQueryFields,
109 [ simpleField "what" [t| Qlang.ItemType |]
110 , simpleField "fields" [t| [String] |]
112 , (luxiReqQueryNodes,
113 [ simpleField "names" [t| [String] |]
114 , simpleField "fields" [t| [String] |]
115 , simpleField "lock" [t| Bool |]
117 , (luxiReqQueryGroups,
118 [ simpleField "names" [t| [String] |]
119 , simpleField "fields" [t| [String] |]
120 , simpleField "lock" [t| Bool |]
122 , (luxiReqQueryNetworks,
123 [ simpleField "names" [t| [String] |]
124 , simpleField "fields" [t| [String] |]
125 , simpleField "lock" [t| Bool |]
127 , (luxiReqQueryInstances,
128 [ simpleField "names" [t| [String] |]
129 , simpleField "fields" [t| [String] |]
130 , simpleField "lock" [t| Bool |]
133 [ simpleField "ids" [t| [JobId] |]
134 , simpleField "fields" [t| [String] |]
136 , (luxiReqQueryExports,
137 [ simpleField "nodes" [t| [String] |]
138 , simpleField "lock" [t| Bool |]
140 , (luxiReqQueryConfigValues,
141 [ simpleField "fields" [t| [String] |] ]
143 , (luxiReqQueryClusterInfo, [])
147 [ simpleField "job" [t| [MetaOpCode] |] ]
149 , (luxiReqSubmitManyJobs,
150 [ simpleField "ops" [t| [[MetaOpCode]] |] ]
152 , (luxiReqWaitForJobChange,
153 [ simpleField "job" [t| JobId |]
154 , simpleField "fields" [t| [String]|]
155 , simpleField "prev_job" [t| JSValue |]
156 , simpleField "prev_log" [t| JSValue |]
157 , simpleField "tmout" [t| Int |]
159 , (luxiReqArchiveJob,
160 [ simpleField "job" [t| JobId |] ]
162 , (luxiReqAutoArchiveJobs,
163 [ simpleField "age" [t| Int |]
164 , simpleField "tmout" [t| Int |]
167 [ simpleField "job" [t| JobId |] ]
169 , (luxiReqChangeJobPriority,
170 [ simpleField "job" [t| JobId |]
171 , simpleField "priority" [t| Int |] ]
173 , (luxiReqSetDrainFlag,
174 [ simpleField "flag" [t| Bool |] ]
176 , (luxiReqSetWatcherPause,
177 [ simpleField "duration" [t| Double |] ]
181 $(makeJSONInstance ''LuxiReq)
183 -- | List of all defined Luxi calls.
184 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
186 -- | The serialisation of LuxiOps into strings in messages.
187 $(genStrOfOp ''LuxiOp "strOfOp")
189 -- | Type holding the initial (unparsed) Luxi call.
190 data LuxiCall = LuxiCall LuxiReq JSValue
192 -- | The end-of-message separator.
196 -- | The end-of-message encoded as a ByteString.
198 bEOM = B.singleton eOM
200 -- | Valid keys in the requests and responses.
201 data MsgKeys = Method
206 -- | The serialisation of MsgKeys into strings in messages.
207 $(genStrOfKey ''MsgKeys "strOfKey")
209 -- | Luxi client encapsulation.
210 data Client = Client { socket :: Handle -- ^ The socket of the client
211 , rbuf :: IORef B.ByteString -- ^ Already received buffer
214 -- | Connects to the master daemon and returns a luxi Client.
215 getClient :: String -> IO Client
217 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
218 withTimeout luxiDefCtmo "creating luxi connection" $
219 S.connect s (S.SockAddrUnix path)
220 rf <- newIORef B.empty
221 h <- S.socketToHandle s ReadWriteMode
222 return Client { socket=h, rbuf=rf }
224 -- | Creates and returns a server endpoint.
225 getServer :: FilePath -> IO S.Socket
227 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
228 S.bindSocket s (S.SockAddrUnix path)
229 S.listen s 5 -- 5 is the max backlog
232 -- | Closes a server endpoint.
233 -- FIXME: this should be encapsulated into a nicer type.
234 closeServer :: FilePath -> S.Socket -> IO ()
235 closeServer path sock = do
239 -- | Accepts a client
240 acceptClient :: S.Socket -> IO Client
242 -- second return is the address of the client, which we ignore here
243 (client_socket, _) <- S.accept s
244 new_buffer <- newIORef B.empty
245 handle <- S.socketToHandle client_socket ReadWriteMode
246 return Client { socket=handle, rbuf=new_buffer }
248 -- | Closes the client socket.
249 closeClient :: Client -> IO ()
250 closeClient = hClose . socket
252 -- | Sends a message over a luxi transport.
253 sendMsg :: Client -> String -> IO ()
254 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
255 let encoded = UTF8L.fromString buf
257 BL.hPut handle encoded
261 -- | Given a current buffer and the handle, it will read from the
262 -- network until we get a full message, and it will return that
263 -- message and the leftover buffer contents.
264 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
265 recvUpdate handle obuf = do
266 nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
267 _ <- hWaitForInput handle (-1)
268 B.hGetNonBlocking handle 4096
269 let (msg, remaining) = B.break (eOM ==) nbuf
270 newbuf = B.append obuf msg
272 then recvUpdate handle newbuf
273 else return (newbuf, B.tail remaining)
275 -- | Waits for a message over a luxi transport.
276 recvMsg :: Client -> IO String
278 cbuf <- readIORef $ rbuf s
279 let (imsg, ibuf) = B.break (eOM ==) cbuf
281 if B.null ibuf -- if old buffer didn't contain a full message
282 then recvUpdate (socket s) cbuf -- then we read from network
283 else return (imsg, B.tail ibuf) -- else we return data from our buffer
284 writeIORef (rbuf s) nbuf
285 return $ UTF8.toString msg
287 -- | Extended wrapper over recvMsg.
288 recvMsgExt :: Client -> IO RecvResult
290 Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
291 return $ if isEOFError e
293 else RecvError (show e)
295 -- | Serialize a request to String.
296 buildCall :: LuxiOp -- ^ The method
297 -> String -- ^ The serialized form
299 let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
300 , (strOfKey Args, opToArgs lo)
305 -- | Serialize the response to String.
306 buildResponse :: Bool -- ^ Success
307 -> JSValue -- ^ The arguments
308 -> String -- ^ The serialized form
309 buildResponse success args =
310 let ja = [ (strOfKey Success, JSBool success)
311 , (strOfKey Result, args)]
315 -- | Check that luxi request contains the required keys and parse it.
316 validateCall :: String -> Result LuxiCall
318 arr <- fromJResult "parsing top-level luxi message" $
319 decodeStrict s::Result (JSObject JSValue)
320 let aobj = fromJSObject arr
321 call <- fromObj aobj (strOfKey Method)::Result LuxiReq
322 args <- fromObj aobj (strOfKey Args)
323 return (LuxiCall call args)
325 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
327 -- This is currently hand-coded until we make it more uniform so that
328 -- it can be generated using TH.
329 decodeCall :: LuxiCall -> Result LuxiOp
330 decodeCall (LuxiCall call args) =
333 (jids, jargs) <- fromJVal args
334 jids' <- case jids of
337 return $ QueryJobs jids' jargs
338 ReqQueryInstances -> do
339 (names, fields, locking) <- fromJVal args
340 return $ QueryInstances names fields locking
342 (names, fields, locking) <- fromJVal args
343 return $ QueryNodes names fields locking
345 (names, fields, locking) <- fromJVal args
346 return $ QueryGroups names fields locking
347 ReqQueryClusterInfo ->
348 return QueryClusterInfo
349 ReqQueryNetworks -> do
350 (names, fields, locking) <- fromJVal args
351 return $ QueryNetworks names fields locking
353 (what, fields, qfilter) <- fromJVal args
354 return $ Query what fields qfilter
356 (what, fields) <- fromJVal args
357 fields' <- case fields of
360 return $ QueryFields what fields'
362 [ops1] <- fromJVal args
363 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
364 return $ SubmitJob ops2
365 ReqSubmitManyJobs -> do
366 [ops1] <- fromJVal args
367 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
368 return $ SubmitManyJobs ops2
369 ReqWaitForJobChange -> do
370 (jid, fields, pinfo, pidx, wtmout) <-
371 -- No instance for 5-tuple, code copied from the
372 -- json sources and adapted
373 fromJResult "Parsing WaitForJobChange message" $
375 JSArray [a, b, c, d, e] ->
382 _ -> J.Error "Not enough values"
383 return $ WaitForJobChange jid fields pinfo pidx wtmout
385 [jid] <- fromJVal args
386 return $ ArchiveJob jid
387 ReqAutoArchiveJobs -> do
388 (age, tmout) <- fromJVal args
389 return $ AutoArchiveJobs age tmout
390 ReqQueryExports -> do
391 (nodes, lock) <- fromJVal args
392 return $ QueryExports nodes lock
393 ReqQueryConfigValues -> do
394 [fields] <- fromJVal args
395 return $ QueryConfigValues fields
397 (kind, name) <- fromJVal args
398 item <- tagObjectFrom kind name
399 return $ QueryTags item
401 [jid] <- fromJVal args
402 return $ CancelJob jid
403 ReqChangeJobPriority -> do
404 (jid, priority) <- fromJVal args
405 return $ ChangeJobPriority jid priority
406 ReqSetDrainFlag -> do
407 [flag] <- fromJVal args
408 return $ SetDrainFlag flag
409 ReqSetWatcherPause -> do
410 [duration] <- fromJVal args
411 return $ SetWatcherPause duration
413 -- | Check that luxi responses contain the required keys and that the
414 -- call was successful.
415 validateResult :: String -> ErrorResult JSValue
416 validateResult s = do
417 when (UTF8.replacement_char `elem` s) $
418 fail "Failed to decode UTF-8, detected replacement char after decoding"
419 oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
420 let arr = J.fromJSObject oarr
421 status <- fromObj arr (strOfKey Success)
422 result <- fromObj arr (strOfKey Result)
425 else decodeError result
427 -- | Try to decode an error from the server response. This function
428 -- will always fail, since it's called only on the error path (when
430 decodeError :: JSValue -> ErrorResult JSValue
434 Bad msg -> Bad $ GenericError msg
436 -- | Generic luxi method call.
437 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
438 callMethod method s = do
439 sendMsg s $ buildCall method
441 let rval = validateResult result
444 -- | Parse job submission result.
445 parseSubmitJobResult :: JSValue -> ErrorResult JobId
446 parseSubmitJobResult (JSArray [JSBool True, v]) =
448 J.Error msg -> Bad $ LuxiError msg
450 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
451 Bad . LuxiError $ fromJSString x
452 parseSubmitJobResult v =
453 Bad . LuxiError $ "Unknown result from the master daemon: " ++
456 -- | Specialized submitManyJobs call.
457 submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
458 submitManyJobs s jobs = do
459 rval <- callMethod (SubmitManyJobs jobs) s
460 -- map each result (status, payload) pair into a nice Result ADT
461 return $ case rval of
463 Ok (JSArray r) -> mapM parseSubmitJobResult r
464 x -> Bad . LuxiError $
465 "Cannot parse response from Ganeti: " ++ show x
467 -- | Custom queryJobs call.
468 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
469 queryJobsStatus s jids = do
470 rval <- callMethod (QueryJobs jids ["status"]) s
471 return $ case rval of
473 Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
474 J.Ok vals -> if any null vals
476 LuxiError "Missing job status field"
477 else Ok (map head vals)
478 J.Error x -> Bad $ LuxiError x