1 {-# LANGUAGE TemplateHaskell #-}
3 {-| Implementation of the Ganeti LUXI interface.
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
55 import Control.Exception (catch)
57 import qualified Data.ByteString as B
58 import qualified Data.ByteString.UTF8 as UTF8
59 import Data.Word (Word8)
61 import Text.JSON (encodeStrict, decodeStrict)
62 import qualified Text.JSON as J
63 import Text.JSON.Pretty (pp_value)
64 import Text.JSON.Types
65 import System.Directory (removeFile)
66 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
67 import System.IO.Error (isEOFError)
69 import qualified Network.Socket as S
71 import Ganeti.BasicTypes
72 import Ganeti.Constants
75 import Ganeti.OpParams (pTagsObject)
77 import qualified Ganeti.Query.Language as Qlang
81 -- * Utility functions
83 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
84 withTimeout :: Int -> String -> IO a -> IO a
85 withTimeout secs descr action = do
86 result <- timeout (secs * 1000000) action
88 Nothing -> fail $ "Timeout in " ++ descr
91 -- * Generic protocol functionality
93 -- | Result of receiving a message from the socket.
94 data RecvResult = RecvConnClosed -- ^ Connection closed
95 | RecvError String -- ^ Any other error
96 | RecvOk String -- ^ Successfull receive
99 -- | Currently supported Luxi operations and JSON serialization.
102 [ simpleField "what" [t| Qlang.ItemType |]
103 , simpleField "fields" [t| [String] |]
104 , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
106 , (luxiReqQueryFields,
107 [ simpleField "what" [t| Qlang.ItemType |]
108 , simpleField "fields" [t| [String] |]
110 , (luxiReqQueryNodes,
111 [ simpleField "names" [t| [String] |]
112 , simpleField "fields" [t| [String] |]
113 , simpleField "lock" [t| Bool |]
115 , (luxiReqQueryGroups,
116 [ simpleField "names" [t| [String] |]
117 , simpleField "fields" [t| [String] |]
118 , simpleField "lock" [t| Bool |]
120 , (luxiReqQueryInstances,
121 [ simpleField "names" [t| [String] |]
122 , simpleField "fields" [t| [String] |]
123 , simpleField "lock" [t| Bool |]
126 [ simpleField "ids" [t| [JobId] |]
127 , simpleField "fields" [t| [String] |]
129 , (luxiReqQueryExports,
130 [ simpleField "nodes" [t| [String] |]
131 , simpleField "lock" [t| Bool |]
133 , (luxiReqQueryConfigValues,
134 [ simpleField "fields" [t| [String] |] ]
136 , (luxiReqQueryClusterInfo, [])
140 [ simpleField "job" [t| [MetaOpCode] |] ]
142 , (luxiReqSubmitManyJobs,
143 [ simpleField "ops" [t| [[MetaOpCode]] |] ]
145 , (luxiReqWaitForJobChange,
146 [ simpleField "job" [t| JobId |]
147 , simpleField "fields" [t| [String]|]
148 , simpleField "prev_job" [t| JSValue |]
149 , simpleField "prev_log" [t| JSValue |]
150 , simpleField "tmout" [t| Int |]
152 , (luxiReqArchiveJob,
153 [ simpleField "job" [t| JobId |] ]
155 , (luxiReqAutoArchiveJobs,
156 [ simpleField "age" [t| Int |]
157 , simpleField "tmout" [t| Int |]
160 [ simpleField "job" [t| JobId |] ]
162 , (luxiReqChangeJobPriority,
163 [ simpleField "job" [t| JobId |]
164 , simpleField "priority" [t| Int |] ]
166 , (luxiReqSetDrainFlag,
167 [ simpleField "flag" [t| Bool |] ]
169 , (luxiReqSetWatcherPause,
170 [ simpleField "duration" [t| Double |] ]
174 $(makeJSONInstance ''LuxiReq)
176 -- | List of all defined Luxi calls.
177 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
179 -- | The serialisation of LuxiOps into strings in messages.
180 $(genStrOfOp ''LuxiOp "strOfOp")
182 -- | Type holding the initial (unparsed) Luxi call.
183 data LuxiCall = LuxiCall LuxiReq JSValue
185 -- | The end-of-message separator.
189 -- | The end-of-message encoded as a ByteString.
191 bEOM = B.singleton eOM
193 -- | Valid keys in the requests and responses.
194 data MsgKeys = Method
199 -- | The serialisation of MsgKeys into strings in messages.
200 $(genStrOfKey ''MsgKeys "strOfKey")
202 -- | Luxi client encapsulation.
203 data Client = Client { socket :: Handle -- ^ The socket of the client
204 , rbuf :: IORef B.ByteString -- ^ Already received buffer
207 -- | Connects to the master daemon and returns a luxi Client.
208 getClient :: String -> IO Client
210 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
211 withTimeout luxiDefCtmo "creating luxi connection" $
212 S.connect s (S.SockAddrUnix path)
213 rf <- newIORef B.empty
214 h <- S.socketToHandle s ReadWriteMode
215 return Client { socket=h, rbuf=rf }
217 -- | Creates and returns a server endpoint.
218 getServer :: FilePath -> IO S.Socket
220 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
221 S.bindSocket s (S.SockAddrUnix path)
222 S.listen s 5 -- 5 is the max backlog
225 -- | Closes a server endpoint.
226 -- FIXME: this should be encapsulated into a nicer type.
227 closeServer :: FilePath -> S.Socket -> IO ()
228 closeServer path sock = do
232 -- | Accepts a client
233 acceptClient :: S.Socket -> IO Client
235 -- second return is the address of the client, which we ignore here
236 (client_socket, _) <- S.accept s
237 new_buffer <- newIORef B.empty
238 handle <- S.socketToHandle client_socket ReadWriteMode
239 return Client { socket=handle, rbuf=new_buffer }
241 -- | Closes the client socket.
242 closeClient :: Client -> IO ()
243 closeClient = hClose . socket
245 -- | Sends a message over a luxi transport.
246 sendMsg :: Client -> String -> IO ()
247 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
248 let encoded = UTF8.fromString buf
250 B.hPut handle encoded
254 -- | Given a current buffer and the handle, it will read from the
255 -- network until we get a full message, and it will return that
256 -- message and the leftover buffer contents.
257 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
258 recvUpdate handle obuf = do
259 nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
260 _ <- hWaitForInput handle (-1)
261 B.hGetNonBlocking handle 4096
262 let (msg, remaining) = B.break (eOM ==) nbuf
263 newbuf = B.append obuf msg
265 then recvUpdate handle newbuf
266 else return (newbuf, B.tail remaining)
268 -- | Waits for a message over a luxi transport.
269 recvMsg :: Client -> IO String
271 cbuf <- readIORef $ rbuf s
272 let (imsg, ibuf) = B.break (eOM ==) cbuf
274 if B.null ibuf -- if old buffer didn't contain a full message
275 then recvUpdate (socket s) cbuf -- then we read from network
276 else return (imsg, B.tail ibuf) -- else we return data from our buffer
277 writeIORef (rbuf s) nbuf
278 return $ UTF8.toString msg
280 -- | Extended wrapper over recvMsg.
281 recvMsgExt :: Client -> IO RecvResult
283 Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
284 return $ if isEOFError e
286 else RecvError (show e)
288 -- | Serialize a request to String.
289 buildCall :: LuxiOp -- ^ The method
290 -> String -- ^ The serialized form
292 let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
293 , (strOfKey Args, opToArgs lo)
298 -- | Serialize the response to String.
299 buildResponse :: Bool -- ^ Success
300 -> JSValue -- ^ The arguments
301 -> String -- ^ The serialized form
302 buildResponse success args =
303 let ja = [ (strOfKey Success, JSBool success)
304 , (strOfKey Result, args)]
308 -- | Check that luxi request contains the required keys and parse it.
309 validateCall :: String -> Result LuxiCall
311 arr <- fromJResult "parsing top-level luxi message" $
312 decodeStrict s::Result (JSObject JSValue)
313 let aobj = fromJSObject arr
314 call <- fromObj aobj (strOfKey Method)::Result LuxiReq
315 args <- fromObj aobj (strOfKey Args)
316 return (LuxiCall call args)
318 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
320 -- This is currently hand-coded until we make it more uniform so that
321 -- it can be generated using TH.
322 decodeCall :: LuxiCall -> Result LuxiOp
323 decodeCall (LuxiCall call args) =
326 (jids, jargs) <- fromJVal args
327 jids' <- case jids of
330 return $ QueryJobs jids' jargs
331 ReqQueryInstances -> do
332 (names, fields, locking) <- fromJVal args
333 return $ QueryInstances names fields locking
335 (names, fields, locking) <- fromJVal args
336 return $ QueryNodes names fields locking
338 (names, fields, locking) <- fromJVal args
339 return $ QueryGroups names fields locking
340 ReqQueryClusterInfo ->
341 return QueryClusterInfo
343 (what, fields, qfilter) <- fromJVal args
344 return $ Query what fields qfilter
346 (what, fields) <- fromJVal args
347 fields' <- case fields of
350 return $ QueryFields what fields'
352 [ops1] <- fromJVal args
353 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
354 return $ SubmitJob ops2
355 ReqSubmitManyJobs -> do
356 [ops1] <- fromJVal args
357 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
358 return $ SubmitManyJobs ops2
359 ReqWaitForJobChange -> do
360 (jid, fields, pinfo, pidx, wtmout) <-
361 -- No instance for 5-tuple, code copied from the
362 -- json sources and adapted
363 fromJResult "Parsing WaitForJobChange message" $
365 JSArray [a, b, c, d, e] ->
372 _ -> J.Error "Not enough values"
373 return $ WaitForJobChange jid fields pinfo pidx wtmout
375 [jid] <- fromJVal args
376 return $ ArchiveJob jid
377 ReqAutoArchiveJobs -> do
378 (age, tmout) <- fromJVal args
379 return $ AutoArchiveJobs age tmout
380 ReqQueryExports -> do
381 (nodes, lock) <- fromJVal args
382 return $ QueryExports nodes lock
383 ReqQueryConfigValues -> do
384 [fields] <- fromJVal args
385 return $ QueryConfigValues fields
387 (kind, name) <- fromJVal args
388 item <- tagObjectFrom kind name
389 return $ QueryTags item
391 [jid] <- fromJVal args
392 return $ CancelJob jid
393 ReqChangeJobPriority -> do
394 (jid, priority) <- fromJVal args
395 return $ ChangeJobPriority jid priority
396 ReqSetDrainFlag -> do
397 [flag] <- fromJVal args
398 return $ SetDrainFlag flag
399 ReqSetWatcherPause -> do
400 [duration] <- fromJVal args
401 return $ SetWatcherPause duration
403 -- | Check that luxi responses contain the required keys and that the
404 -- call was successful.
405 validateResult :: String -> ErrorResult JSValue
406 validateResult s = do
407 when (UTF8.replacement_char `elem` s) $
408 fail "Failed to decode UTF-8, detected replacement char after decoding"
409 oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
410 let arr = J.fromJSObject oarr
411 status <- fromObj arr (strOfKey Success)
412 result <- fromObj arr (strOfKey Result)
415 else decodeError result
417 -- | Try to decode an error from the server response. This function
418 -- will always fail, since it's called only on the error path (when
420 decodeError :: JSValue -> ErrorResult JSValue
424 Bad msg -> Bad $ GenericError msg
426 -- | Generic luxi method call.
427 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
428 callMethod method s = do
429 sendMsg s $ buildCall method
431 let rval = validateResult result
434 -- | Parse job submission result.
435 parseSubmitJobResult :: JSValue -> ErrorResult JobId
436 parseSubmitJobResult (JSArray [JSBool True, v]) =
438 J.Error msg -> Bad $ LuxiError msg
440 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
441 Bad . LuxiError $ fromJSString x
442 parseSubmitJobResult v =
443 Bad . LuxiError $ "Unknown result from the master daemon: " ++
446 -- | Specialized submitManyJobs call.
447 submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
448 submitManyJobs s jobs = do
449 rval <- callMethod (SubmitManyJobs jobs) s
450 -- map each result (status, payload) pair into a nice Result ADT
451 return $ case rval of
453 Ok (JSArray r) -> mapM parseSubmitJobResult r
454 x -> Bad . LuxiError $
455 "Cannot parse response from Ganeti: " ++ show x
457 -- | Custom queryJobs call.
458 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
459 queryJobsStatus s jids = do
460 rval <- callMethod (QueryJobs jids ["status"]) s
461 return $ case rval of
463 Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
464 J.Ok vals -> if any null vals
466 LuxiError "Missing job status field"
467 else Ok (map head vals)
468 J.Error x -> Bad $ LuxiError x