1 {-# LANGUAGE TemplateHaskell #-}
3 {-| Implementation of the Ganeti LUXI interface.
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
55 import Control.Exception (catch)
57 import qualified Data.ByteString as B
58 import qualified Data.ByteString.Lazy as BL
59 import qualified Data.ByteString.UTF8 as UTF8
60 import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61 import Data.Word (Word8)
63 import Text.JSON (encodeStrict, decodeStrict)
64 import qualified Text.JSON as J
65 import Text.JSON.Pretty (pp_value)
66 import Text.JSON.Types
67 import System.Directory (removeFile)
68 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69 import System.IO.Error (isEOFError)
71 import qualified Network.Socket as S
73 import Ganeti.BasicTypes
74 import Ganeti.Constants
77 import Ganeti.OpParams (pTagsObject)
80 import qualified Ganeti.Query.Language as Qlang
85 -- * Utility functions
87 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
88 withTimeout :: Int -> String -> IO a -> IO a
89 withTimeout secs descr action = do
90 result <- timeout (secs * 1000000) action
92 Nothing -> fail $ "Timeout in " ++ descr
95 -- * Generic protocol functionality
97 -- | Result of receiving a message from the socket.
98 data RecvResult = RecvConnClosed -- ^ Connection closed
99 | RecvError String -- ^ Any other error
100 | RecvOk String -- ^ Successfull receive
103 -- | Currently supported Luxi operations and JSON serialization.
106 [ simpleField "what" [t| Qlang.ItemType |]
107 , simpleField "fields" [t| [String] |]
108 , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
110 , (luxiReqQueryFields,
111 [ simpleField "what" [t| Qlang.ItemType |]
112 , simpleField "fields" [t| [String] |]
114 , (luxiReqQueryNodes,
115 [ simpleField "names" [t| [String] |]
116 , simpleField "fields" [t| [String] |]
117 , simpleField "lock" [t| Bool |]
119 , (luxiReqQueryGroups,
120 [ simpleField "names" [t| [String] |]
121 , simpleField "fields" [t| [String] |]
122 , simpleField "lock" [t| Bool |]
124 , (luxiReqQueryNetworks,
125 [ simpleField "names" [t| [String] |]
126 , simpleField "fields" [t| [String] |]
127 , simpleField "lock" [t| Bool |]
129 , (luxiReqQueryInstances,
130 [ simpleField "names" [t| [String] |]
131 , simpleField "fields" [t| [String] |]
132 , simpleField "lock" [t| Bool |]
135 [ simpleField "ids" [t| [JobId] |]
136 , simpleField "fields" [t| [String] |]
138 , (luxiReqQueryExports,
139 [ simpleField "nodes" [t| [String] |]
140 , simpleField "lock" [t| Bool |]
142 , (luxiReqQueryConfigValues,
143 [ simpleField "fields" [t| [String] |] ]
145 , (luxiReqQueryClusterInfo, [])
148 , simpleField "name" [t| String |]
151 [ simpleField "job" [t| [MetaOpCode] |] ]
153 , (luxiReqSubmitManyJobs,
154 [ simpleField "ops" [t| [[MetaOpCode]] |] ]
156 , (luxiReqWaitForJobChange,
157 [ simpleField "job" [t| JobId |]
158 , simpleField "fields" [t| [String]|]
159 , simpleField "prev_job" [t| JSValue |]
160 , simpleField "prev_log" [t| JSValue |]
161 , simpleField "tmout" [t| Int |]
163 , (luxiReqArchiveJob,
164 [ simpleField "job" [t| JobId |] ]
166 , (luxiReqAutoArchiveJobs,
167 [ simpleField "age" [t| Int |]
168 , simpleField "tmout" [t| Int |]
171 [ simpleField "job" [t| JobId |] ]
173 , (luxiReqChangeJobPriority,
174 [ simpleField "job" [t| JobId |]
175 , simpleField "priority" [t| Int |] ]
177 , (luxiReqSetDrainFlag,
178 [ simpleField "flag" [t| Bool |] ]
180 , (luxiReqSetWatcherPause,
181 [ simpleField "duration" [t| Double |] ]
185 $(makeJSONInstance ''LuxiReq)
187 -- | List of all defined Luxi calls.
188 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
190 -- | The serialisation of LuxiOps into strings in messages.
191 $(genStrOfOp ''LuxiOp "strOfOp")
193 -- | Type holding the initial (unparsed) Luxi call.
194 data LuxiCall = LuxiCall LuxiReq JSValue
196 -- | The end-of-message separator.
200 -- | The end-of-message encoded as a ByteString.
202 bEOM = B.singleton eOM
204 -- | Valid keys in the requests and responses.
205 data MsgKeys = Method
210 -- | The serialisation of MsgKeys into strings in messages.
211 $(genStrOfKey ''MsgKeys "strOfKey")
213 -- | Luxi client encapsulation.
214 data Client = Client { socket :: Handle -- ^ The socket of the client
215 , rbuf :: IORef B.ByteString -- ^ Already received buffer
218 -- | Connects to the master daemon and returns a luxi Client.
219 getClient :: String -> IO Client
221 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
222 withTimeout luxiDefCtmo "creating luxi connection" $
223 S.connect s (S.SockAddrUnix path)
224 rf <- newIORef B.empty
225 h <- S.socketToHandle s ReadWriteMode
226 return Client { socket=h, rbuf=rf }
228 -- | Creates and returns a server endpoint.
229 getServer :: Bool -> FilePath -> IO S.Socket
230 getServer setOwner path = do
231 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
232 S.bindSocket s (S.SockAddrUnix path)
233 when setOwner . setOwnerAndGroupFromNames path GanetiLuxid $
234 ExtraGroup DaemonsGroup
235 S.listen s 5 -- 5 is the max backlog
238 -- | Closes a server endpoint.
239 -- FIXME: this should be encapsulated into a nicer type.
240 closeServer :: FilePath -> S.Socket -> IO ()
241 closeServer path sock = do
245 -- | Accepts a client
246 acceptClient :: S.Socket -> IO Client
248 -- second return is the address of the client, which we ignore here
249 (client_socket, _) <- S.accept s
250 new_buffer <- newIORef B.empty
251 handle <- S.socketToHandle client_socket ReadWriteMode
252 return Client { socket=handle, rbuf=new_buffer }
254 -- | Closes the client socket.
255 closeClient :: Client -> IO ()
256 closeClient = hClose . socket
258 -- | Sends a message over a luxi transport.
259 sendMsg :: Client -> String -> IO ()
260 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
261 let encoded = UTF8L.fromString buf
263 BL.hPut handle encoded
267 -- | Given a current buffer and the handle, it will read from the
268 -- network until we get a full message, and it will return that
269 -- message and the leftover buffer contents.
270 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
271 recvUpdate handle obuf = do
272 nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
273 _ <- hWaitForInput handle (-1)
274 B.hGetNonBlocking handle 4096
275 let (msg, remaining) = B.break (eOM ==) nbuf
276 newbuf = B.append obuf msg
278 then recvUpdate handle newbuf
279 else return (newbuf, B.tail remaining)
281 -- | Waits for a message over a luxi transport.
282 recvMsg :: Client -> IO String
284 cbuf <- readIORef $ rbuf s
285 let (imsg, ibuf) = B.break (eOM ==) cbuf
287 if B.null ibuf -- if old buffer didn't contain a full message
288 then recvUpdate (socket s) cbuf -- then we read from network
289 else return (imsg, B.tail ibuf) -- else we return data from our buffer
290 writeIORef (rbuf s) nbuf
291 return $ UTF8.toString msg
293 -- | Extended wrapper over recvMsg.
294 recvMsgExt :: Client -> IO RecvResult
296 Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
297 return $ if isEOFError e
299 else RecvError (show e)
301 -- | Serialize a request to String.
302 buildCall :: LuxiOp -- ^ The method
303 -> String -- ^ The serialized form
305 let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
306 , (strOfKey Args, opToArgs lo)
311 -- | Serialize the response to String.
312 buildResponse :: Bool -- ^ Success
313 -> JSValue -- ^ The arguments
314 -> String -- ^ The serialized form
315 buildResponse success args =
316 let ja = [ (strOfKey Success, JSBool success)
317 , (strOfKey Result, args)]
321 -- | Check that luxi request contains the required keys and parse it.
322 validateCall :: String -> Result LuxiCall
324 arr <- fromJResult "parsing top-level luxi message" $
325 decodeStrict s::Result (JSObject JSValue)
326 let aobj = fromJSObject arr
327 call <- fromObj aobj (strOfKey Method)::Result LuxiReq
328 args <- fromObj aobj (strOfKey Args)
329 return (LuxiCall call args)
331 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
333 -- This is currently hand-coded until we make it more uniform so that
334 -- it can be generated using TH.
335 decodeCall :: LuxiCall -> Result LuxiOp
336 decodeCall (LuxiCall call args) =
339 (jids, jargs) <- fromJVal args
340 jids' <- case jids of
343 return $ QueryJobs jids' jargs
344 ReqQueryInstances -> do
345 (names, fields, locking) <- fromJVal args
346 return $ QueryInstances names fields locking
348 (names, fields, locking) <- fromJVal args
349 return $ QueryNodes names fields locking
351 (names, fields, locking) <- fromJVal args
352 return $ QueryGroups names fields locking
353 ReqQueryClusterInfo ->
354 return QueryClusterInfo
355 ReqQueryNetworks -> do
356 (names, fields, locking) <- fromJVal args
357 return $ QueryNetworks names fields locking
359 (what, fields, qfilter) <- fromJVal args
360 return $ Query what fields qfilter
362 (what, fields) <- fromJVal args
363 fields' <- case fields of
366 return $ QueryFields what fields'
368 [ops1] <- fromJVal args
369 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
370 return $ SubmitJob ops2
371 ReqSubmitManyJobs -> do
372 [ops1] <- fromJVal args
373 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
374 return $ SubmitManyJobs ops2
375 ReqWaitForJobChange -> do
376 (jid, fields, pinfo, pidx, wtmout) <-
377 -- No instance for 5-tuple, code copied from the
378 -- json sources and adapted
379 fromJResult "Parsing WaitForJobChange message" $
381 JSArray [a, b, c, d, e] ->
388 _ -> J.Error "Not enough values"
389 return $ WaitForJobChange jid fields pinfo pidx wtmout
391 [jid] <- fromJVal args
392 return $ ArchiveJob jid
393 ReqAutoArchiveJobs -> do
394 (age, tmout) <- fromJVal args
395 return $ AutoArchiveJobs age tmout
396 ReqQueryExports -> do
397 (nodes, lock) <- fromJVal args
398 return $ QueryExports nodes lock
399 ReqQueryConfigValues -> do
400 [fields] <- fromJVal args
401 return $ QueryConfigValues fields
403 (kind, name) <- fromJVal args
404 return $ QueryTags kind name
406 [jid] <- fromJVal args
407 return $ CancelJob jid
408 ReqChangeJobPriority -> do
409 (jid, priority) <- fromJVal args
410 return $ ChangeJobPriority jid priority
411 ReqSetDrainFlag -> do
412 [flag] <- fromJVal args
413 return $ SetDrainFlag flag
414 ReqSetWatcherPause -> do
415 [duration] <- fromJVal args
416 return $ SetWatcherPause duration
418 -- | Check that luxi responses contain the required keys and that the
419 -- call was successful.
420 validateResult :: String -> ErrorResult JSValue
421 validateResult s = do
422 when (UTF8.replacement_char `elem` s) $
423 fail "Failed to decode UTF-8, detected replacement char after decoding"
424 oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
425 let arr = J.fromJSObject oarr
426 status <- fromObj arr (strOfKey Success)
427 result <- fromObj arr (strOfKey Result)
430 else decodeError result
432 -- | Try to decode an error from the server response. This function
433 -- will always fail, since it's called only on the error path (when
435 decodeError :: JSValue -> ErrorResult JSValue
439 Bad msg -> Bad $ GenericError msg
441 -- | Generic luxi method call.
442 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
443 callMethod method s = do
444 sendMsg s $ buildCall method
446 let rval = validateResult result
449 -- | Parse job submission result.
450 parseSubmitJobResult :: JSValue -> ErrorResult JobId
451 parseSubmitJobResult (JSArray [JSBool True, v]) =
453 J.Error msg -> Bad $ LuxiError msg
455 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
456 Bad . LuxiError $ fromJSString x
457 parseSubmitJobResult v =
458 Bad . LuxiError $ "Unknown result from the master daemon: " ++
461 -- | Specialized submitManyJobs call.
462 submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
463 submitManyJobs s jobs = do
464 rval <- callMethod (SubmitManyJobs jobs) s
465 -- map each result (status, payload) pair into a nice Result ADT
466 return $ case rval of
468 Ok (JSArray r) -> mapM parseSubmitJobResult r
469 x -> Bad . LuxiError $
470 "Cannot parse response from Ganeti: " ++ show x
472 -- | Custom queryJobs call.
473 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
474 queryJobsStatus s jids = do
475 rval <- callMethod (QueryJobs jids ["status"]) s
476 return $ case rval of
478 Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
479 J.Ok vals -> if any null vals
481 LuxiError "Missing job status field"
482 else Ok (map head vals)
483 J.Error x -> Bad $ LuxiError x