1 {-# LANGUAGE TemplateHaskell #-}
3 {-| Implementation of the Ganeti LUXI interface.
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
55 import Control.Exception (catch)
57 import qualified Data.ByteString as B
58 import qualified Data.ByteString.Lazy as BL
59 import qualified Data.ByteString.UTF8 as UTF8
60 import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61 import Data.Word (Word8)
63 import Text.JSON (encodeStrict, decodeStrict)
64 import qualified Text.JSON as J
65 import Text.JSON.Pretty (pp_value)
66 import Text.JSON.Types
67 import System.Directory (removeFile)
68 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69 import System.IO.Error (isEOFError)
71 import qualified Network.Socket as S
73 import Ganeti.BasicTypes
74 import Ganeti.Constants
77 import Ganeti.OpParams (pTagsObject)
79 import qualified Ganeti.Query.Language as Qlang
83 -- * Utility functions
85 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
86 withTimeout :: Int -> String -> IO a -> IO a
87 withTimeout secs descr action = do
88 result <- timeout (secs * 1000000) action
90 Nothing -> fail $ "Timeout in " ++ descr
93 -- * Generic protocol functionality
95 -- | Result of receiving a message from the socket.
96 data RecvResult = RecvConnClosed -- ^ Connection closed
97 | RecvError String -- ^ Any other error
98 | RecvOk String -- ^ Successfull receive
101 -- | Currently supported Luxi operations and JSON serialization.
104 [ simpleField "what" [t| Qlang.ItemType |]
105 , simpleField "fields" [t| [String] |]
106 , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
108 , (luxiReqQueryFields,
109 [ simpleField "what" [t| Qlang.ItemType |]
110 , simpleField "fields" [t| [String] |]
112 , (luxiReqQueryNodes,
113 [ simpleField "names" [t| [String] |]
114 , simpleField "fields" [t| [String] |]
115 , simpleField "lock" [t| Bool |]
117 , (luxiReqQueryGroups,
118 [ simpleField "names" [t| [String] |]
119 , simpleField "fields" [t| [String] |]
120 , simpleField "lock" [t| Bool |]
122 , (luxiReqQueryInstances,
123 [ simpleField "names" [t| [String] |]
124 , simpleField "fields" [t| [String] |]
125 , simpleField "lock" [t| Bool |]
128 [ simpleField "ids" [t| [JobId] |]
129 , simpleField "fields" [t| [String] |]
131 , (luxiReqQueryExports,
132 [ simpleField "nodes" [t| [String] |]
133 , simpleField "lock" [t| Bool |]
135 , (luxiReqQueryConfigValues,
136 [ simpleField "fields" [t| [String] |] ]
138 , (luxiReqQueryClusterInfo, [])
142 [ simpleField "job" [t| [MetaOpCode] |] ]
144 , (luxiReqSubmitManyJobs,
145 [ simpleField "ops" [t| [[MetaOpCode]] |] ]
147 , (luxiReqWaitForJobChange,
148 [ simpleField "job" [t| JobId |]
149 , simpleField "fields" [t| [String]|]
150 , simpleField "prev_job" [t| JSValue |]
151 , simpleField "prev_log" [t| JSValue |]
152 , simpleField "tmout" [t| Int |]
154 , (luxiReqArchiveJob,
155 [ simpleField "job" [t| JobId |] ]
157 , (luxiReqAutoArchiveJobs,
158 [ simpleField "age" [t| Int |]
159 , simpleField "tmout" [t| Int |]
162 [ simpleField "job" [t| JobId |] ]
164 , (luxiReqChangeJobPriority,
165 [ simpleField "job" [t| JobId |]
166 , simpleField "priority" [t| Int |] ]
168 , (luxiReqSetDrainFlag,
169 [ simpleField "flag" [t| Bool |] ]
171 , (luxiReqSetWatcherPause,
172 [ simpleField "duration" [t| Double |] ]
176 $(makeJSONInstance ''LuxiReq)
178 -- | List of all defined Luxi calls.
179 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
181 -- | The serialisation of LuxiOps into strings in messages.
182 $(genStrOfOp ''LuxiOp "strOfOp")
184 -- | Type holding the initial (unparsed) Luxi call.
185 data LuxiCall = LuxiCall LuxiReq JSValue
187 -- | The end-of-message separator.
191 -- | The end-of-message encoded as a ByteString.
193 bEOM = B.singleton eOM
195 -- | Valid keys in the requests and responses.
196 data MsgKeys = Method
201 -- | The serialisation of MsgKeys into strings in messages.
202 $(genStrOfKey ''MsgKeys "strOfKey")
204 -- | Luxi client encapsulation.
205 data Client = Client { socket :: Handle -- ^ The socket of the client
206 , rbuf :: IORef B.ByteString -- ^ Already received buffer
209 -- | Connects to the master daemon and returns a luxi Client.
210 getClient :: String -> IO Client
212 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
213 withTimeout luxiDefCtmo "creating luxi connection" $
214 S.connect s (S.SockAddrUnix path)
215 rf <- newIORef B.empty
216 h <- S.socketToHandle s ReadWriteMode
217 return Client { socket=h, rbuf=rf }
219 -- | Creates and returns a server endpoint.
220 getServer :: FilePath -> IO S.Socket
222 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
223 S.bindSocket s (S.SockAddrUnix path)
224 S.listen s 5 -- 5 is the max backlog
227 -- | Closes a server endpoint.
228 -- FIXME: this should be encapsulated into a nicer type.
229 closeServer :: FilePath -> S.Socket -> IO ()
230 closeServer path sock = do
234 -- | Accepts a client
235 acceptClient :: S.Socket -> IO Client
237 -- second return is the address of the client, which we ignore here
238 (client_socket, _) <- S.accept s
239 new_buffer <- newIORef B.empty
240 handle <- S.socketToHandle client_socket ReadWriteMode
241 return Client { socket=handle, rbuf=new_buffer }
243 -- | Closes the client socket.
244 closeClient :: Client -> IO ()
245 closeClient = hClose . socket
247 -- | Sends a message over a luxi transport.
248 sendMsg :: Client -> String -> IO ()
249 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
250 let encoded = UTF8L.fromString buf
252 BL.hPut handle encoded
256 -- | Given a current buffer and the handle, it will read from the
257 -- network until we get a full message, and it will return that
258 -- message and the leftover buffer contents.
259 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
260 recvUpdate handle obuf = do
261 nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
262 _ <- hWaitForInput handle (-1)
263 B.hGetNonBlocking handle 4096
264 let (msg, remaining) = B.break (eOM ==) nbuf
265 newbuf = B.append obuf msg
267 then recvUpdate handle newbuf
268 else return (newbuf, B.tail remaining)
270 -- | Waits for a message over a luxi transport.
271 recvMsg :: Client -> IO String
273 cbuf <- readIORef $ rbuf s
274 let (imsg, ibuf) = B.break (eOM ==) cbuf
276 if B.null ibuf -- if old buffer didn't contain a full message
277 then recvUpdate (socket s) cbuf -- then we read from network
278 else return (imsg, B.tail ibuf) -- else we return data from our buffer
279 writeIORef (rbuf s) nbuf
280 return $ UTF8.toString msg
282 -- | Extended wrapper over recvMsg.
283 recvMsgExt :: Client -> IO RecvResult
285 Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
286 return $ if isEOFError e
288 else RecvError (show e)
290 -- | Serialize a request to String.
291 buildCall :: LuxiOp -- ^ The method
292 -> String -- ^ The serialized form
294 let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
295 , (strOfKey Args, opToArgs lo)
300 -- | Serialize the response to String.
301 buildResponse :: Bool -- ^ Success
302 -> JSValue -- ^ The arguments
303 -> String -- ^ The serialized form
304 buildResponse success args =
305 let ja = [ (strOfKey Success, JSBool success)
306 , (strOfKey Result, args)]
310 -- | Check that luxi request contains the required keys and parse it.
311 validateCall :: String -> Result LuxiCall
313 arr <- fromJResult "parsing top-level luxi message" $
314 decodeStrict s::Result (JSObject JSValue)
315 let aobj = fromJSObject arr
316 call <- fromObj aobj (strOfKey Method)::Result LuxiReq
317 args <- fromObj aobj (strOfKey Args)
318 return (LuxiCall call args)
320 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
322 -- This is currently hand-coded until we make it more uniform so that
323 -- it can be generated using TH.
324 decodeCall :: LuxiCall -> Result LuxiOp
325 decodeCall (LuxiCall call args) =
328 (jids, jargs) <- fromJVal args
329 jids' <- case jids of
332 return $ QueryJobs jids' jargs
333 ReqQueryInstances -> do
334 (names, fields, locking) <- fromJVal args
335 return $ QueryInstances names fields locking
337 (names, fields, locking) <- fromJVal args
338 return $ QueryNodes names fields locking
340 (names, fields, locking) <- fromJVal args
341 return $ QueryGroups names fields locking
342 ReqQueryClusterInfo ->
343 return QueryClusterInfo
345 (what, fields, qfilter) <- fromJVal args
346 return $ Query what fields qfilter
348 (what, fields) <- fromJVal args
349 fields' <- case fields of
352 return $ QueryFields what fields'
354 [ops1] <- fromJVal args
355 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
356 return $ SubmitJob ops2
357 ReqSubmitManyJobs -> do
358 [ops1] <- fromJVal args
359 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
360 return $ SubmitManyJobs ops2
361 ReqWaitForJobChange -> do
362 (jid, fields, pinfo, pidx, wtmout) <-
363 -- No instance for 5-tuple, code copied from the
364 -- json sources and adapted
365 fromJResult "Parsing WaitForJobChange message" $
367 JSArray [a, b, c, d, e] ->
374 _ -> J.Error "Not enough values"
375 return $ WaitForJobChange jid fields pinfo pidx wtmout
377 [jid] <- fromJVal args
378 return $ ArchiveJob jid
379 ReqAutoArchiveJobs -> do
380 (age, tmout) <- fromJVal args
381 return $ AutoArchiveJobs age tmout
382 ReqQueryExports -> do
383 (nodes, lock) <- fromJVal args
384 return $ QueryExports nodes lock
385 ReqQueryConfigValues -> do
386 [fields] <- fromJVal args
387 return $ QueryConfigValues fields
389 (kind, name) <- fromJVal args
390 item <- tagObjectFrom kind name
391 return $ QueryTags item
393 [jid] <- fromJVal args
394 return $ CancelJob jid
395 ReqChangeJobPriority -> do
396 (jid, priority) <- fromJVal args
397 return $ ChangeJobPriority jid priority
398 ReqSetDrainFlag -> do
399 [flag] <- fromJVal args
400 return $ SetDrainFlag flag
401 ReqSetWatcherPause -> do
402 [duration] <- fromJVal args
403 return $ SetWatcherPause duration
405 -- | Check that luxi responses contain the required keys and that the
406 -- call was successful.
407 validateResult :: String -> ErrorResult JSValue
408 validateResult s = do
409 when (UTF8.replacement_char `elem` s) $
410 fail "Failed to decode UTF-8, detected replacement char after decoding"
411 oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
412 let arr = J.fromJSObject oarr
413 status <- fromObj arr (strOfKey Success)
414 result <- fromObj arr (strOfKey Result)
417 else decodeError result
419 -- | Try to decode an error from the server response. This function
420 -- will always fail, since it's called only on the error path (when
422 decodeError :: JSValue -> ErrorResult JSValue
426 Bad msg -> Bad $ GenericError msg
428 -- | Generic luxi method call.
429 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
430 callMethod method s = do
431 sendMsg s $ buildCall method
433 let rval = validateResult result
436 -- | Parse job submission result.
437 parseSubmitJobResult :: JSValue -> ErrorResult JobId
438 parseSubmitJobResult (JSArray [JSBool True, v]) =
440 J.Error msg -> Bad $ LuxiError msg
442 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
443 Bad . LuxiError $ fromJSString x
444 parseSubmitJobResult v =
445 Bad . LuxiError $ "Unknown result from the master daemon: " ++
448 -- | Specialized submitManyJobs call.
449 submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
450 submitManyJobs s jobs = do
451 rval <- callMethod (SubmitManyJobs jobs) s
452 -- map each result (status, payload) pair into a nice Result ADT
453 return $ case rval of
455 Ok (JSArray r) -> mapM parseSubmitJobResult r
456 x -> Bad . LuxiError $
457 "Cannot parse response from Ganeti: " ++ show x
459 -- | Custom queryJobs call.
460 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
461 queryJobsStatus s jids = do
462 rval <- callMethod (QueryJobs jids ["status"]) s
463 return $ case rval of
465 Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
466 J.Ok vals -> if any null vals
468 LuxiError "Missing job status field"
469 else Ok (map head vals)
470 J.Error x -> Bad $ LuxiError x