1 {-# LANGUAGE TemplateHaskell #-}
3 {-| Implementation of the Ganeti LUXI interface.
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
55 import Control.Exception (catch)
57 import qualified Data.ByteString as B
58 import qualified Data.ByteString.Lazy as BL
59 import qualified Data.ByteString.UTF8 as UTF8
60 import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61 import Data.Word (Word8)
63 import Text.JSON (encodeStrict, decodeStrict)
64 import qualified Text.JSON as J
65 import Text.JSON.Pretty (pp_value)
66 import Text.JSON.Types
67 import System.Directory (removeFile)
68 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69 import System.IO.Error (isEOFError)
71 import qualified Network.Socket as S
73 import Ganeti.BasicTypes
74 import Ganeti.Constants
77 import Ganeti.OpParams (pTagsObject)
79 import qualified Ganeti.Query.Language as Qlang
80 import Ganeti.Runtime (GanetiDaemon(..), MiscGroup(..), GanetiGroup(..))
85 -- * Utility functions
87 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
88 withTimeout :: Int -> String -> IO a -> IO a
89 withTimeout secs descr action = do
90 result <- timeout (secs * 1000000) action
92 Nothing -> fail $ "Timeout in " ++ descr
95 -- * Generic protocol functionality
97 -- | Result of receiving a message from the socket.
98 data RecvResult = RecvConnClosed -- ^ Connection closed
99 | RecvError String -- ^ Any other error
100 | RecvOk String -- ^ Successfull receive
103 -- | Currently supported Luxi operations and JSON serialization.
106 [ simpleField "what" [t| Qlang.ItemType |]
107 , simpleField "fields" [t| [String] |]
108 , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
110 , (luxiReqQueryFields,
111 [ simpleField "what" [t| Qlang.ItemType |]
112 , simpleField "fields" [t| [String] |]
114 , (luxiReqQueryNodes,
115 [ simpleField "names" [t| [String] |]
116 , simpleField "fields" [t| [String] |]
117 , simpleField "lock" [t| Bool |]
119 , (luxiReqQueryGroups,
120 [ simpleField "names" [t| [String] |]
121 , simpleField "fields" [t| [String] |]
122 , simpleField "lock" [t| Bool |]
124 , (luxiReqQueryNetworks,
125 [ simpleField "names" [t| [String] |]
126 , simpleField "fields" [t| [String] |]
127 , simpleField "lock" [t| Bool |]
129 , (luxiReqQueryInstances,
130 [ simpleField "names" [t| [String] |]
131 , simpleField "fields" [t| [String] |]
132 , simpleField "lock" [t| Bool |]
135 [ simpleField "ids" [t| [JobId] |]
136 , simpleField "fields" [t| [String] |]
138 , (luxiReqQueryExports,
139 [ simpleField "nodes" [t| [String] |]
140 , simpleField "lock" [t| Bool |]
142 , (luxiReqQueryConfigValues,
143 [ simpleField "fields" [t| [String] |] ]
145 , (luxiReqQueryClusterInfo, [])
148 , simpleField "name" [t| String |]
151 [ simpleField "job" [t| [MetaOpCode] |] ]
153 , (luxiReqSubmitJobToDrainedQueue,
154 [ simpleField "job" [t| [MetaOpCode] |] ]
156 , (luxiReqSubmitManyJobs,
157 [ simpleField "ops" [t| [[MetaOpCode]] |] ]
159 , (luxiReqWaitForJobChange,
160 [ simpleField "job" [t| JobId |]
161 , simpleField "fields" [t| [String]|]
162 , simpleField "prev_job" [t| JSValue |]
163 , simpleField "prev_log" [t| JSValue |]
164 , simpleField "tmout" [t| Int |]
166 , (luxiReqArchiveJob,
167 [ simpleField "job" [t| JobId |] ]
169 , (luxiReqAutoArchiveJobs,
170 [ simpleField "age" [t| Int |]
171 , simpleField "tmout" [t| Int |]
174 [ simpleField "job" [t| JobId |] ]
176 , (luxiReqChangeJobPriority,
177 [ simpleField "job" [t| JobId |]
178 , simpleField "priority" [t| Int |] ]
180 , (luxiReqSetDrainFlag,
181 [ simpleField "flag" [t| Bool |] ]
183 , (luxiReqSetWatcherPause,
184 [ simpleField "duration" [t| Double |] ]
188 $(makeJSONInstance ''LuxiReq)
190 -- | List of all defined Luxi calls.
191 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
193 -- | The serialisation of LuxiOps into strings in messages.
194 $(genStrOfOp ''LuxiOp "strOfOp")
196 -- | Type holding the initial (unparsed) Luxi call.
197 data LuxiCall = LuxiCall LuxiReq JSValue
199 -- | The end-of-message separator.
203 -- | The end-of-message encoded as a ByteString.
205 bEOM = B.singleton eOM
207 -- | Valid keys in the requests and responses.
208 data MsgKeys = Method
213 -- | The serialisation of MsgKeys into strings in messages.
214 $(genStrOfKey ''MsgKeys "strOfKey")
216 -- | Luxi client encapsulation.
217 data Client = Client { socket :: Handle -- ^ The socket of the client
218 , rbuf :: IORef B.ByteString -- ^ Already received buffer
221 -- | Connects to the master daemon and returns a luxi Client.
222 getClient :: String -> IO Client
224 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
225 withTimeout luxiDefCtmo "creating luxi connection" $
226 S.connect s (S.SockAddrUnix path)
227 rf <- newIORef B.empty
228 h <- S.socketToHandle s ReadWriteMode
229 return Client { socket=h, rbuf=rf }
231 -- | Creates and returns a server endpoint.
232 getServer :: Bool -> FilePath -> IO S.Socket
233 getServer setOwner path = do
234 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
235 S.bindSocket s (S.SockAddrUnix path)
236 when setOwner . setOwnerAndGroupFromNames path GanetiLuxid $
237 ExtraGroup DaemonsGroup
238 S.listen s 5 -- 5 is the max backlog
241 -- | Closes a server endpoint.
242 -- FIXME: this should be encapsulated into a nicer type.
243 closeServer :: FilePath -> S.Socket -> IO ()
244 closeServer path sock = do
248 -- | Accepts a client
249 acceptClient :: S.Socket -> IO Client
251 -- second return is the address of the client, which we ignore here
252 (client_socket, _) <- S.accept s
253 new_buffer <- newIORef B.empty
254 handle <- S.socketToHandle client_socket ReadWriteMode
255 return Client { socket=handle, rbuf=new_buffer }
257 -- | Closes the client socket.
258 closeClient :: Client -> IO ()
259 closeClient = hClose . socket
261 -- | Sends a message over a luxi transport.
262 sendMsg :: Client -> String -> IO ()
263 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
264 let encoded = UTF8L.fromString buf
266 BL.hPut handle encoded
270 -- | Given a current buffer and the handle, it will read from the
271 -- network until we get a full message, and it will return that
272 -- message and the leftover buffer contents.
273 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
274 recvUpdate handle obuf = do
275 nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
276 _ <- hWaitForInput handle (-1)
277 B.hGetNonBlocking handle 4096
278 let (msg, remaining) = B.break (eOM ==) nbuf
279 newbuf = B.append obuf msg
281 then recvUpdate handle newbuf
282 else return (newbuf, B.tail remaining)
284 -- | Waits for a message over a luxi transport.
285 recvMsg :: Client -> IO String
287 cbuf <- readIORef $ rbuf s
288 let (imsg, ibuf) = B.break (eOM ==) cbuf
290 if B.null ibuf -- if old buffer didn't contain a full message
291 then recvUpdate (socket s) cbuf -- then we read from network
292 else return (imsg, B.tail ibuf) -- else we return data from our buffer
293 writeIORef (rbuf s) nbuf
294 return $ UTF8.toString msg
296 -- | Extended wrapper over recvMsg.
297 recvMsgExt :: Client -> IO RecvResult
299 Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
300 return $ if isEOFError e
302 else RecvError (show e)
304 -- | Serialize a request to String.
305 buildCall :: LuxiOp -- ^ The method
306 -> String -- ^ The serialized form
308 let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
309 , (strOfKey Args, opToArgs lo)
314 -- | Serialize the response to String.
315 buildResponse :: Bool -- ^ Success
316 -> JSValue -- ^ The arguments
317 -> String -- ^ The serialized form
318 buildResponse success args =
319 let ja = [ (strOfKey Success, JSBool success)
320 , (strOfKey Result, args)]
324 -- | Check that luxi request contains the required keys and parse it.
325 validateCall :: String -> Result LuxiCall
327 arr <- fromJResult "parsing top-level luxi message" $
328 decodeStrict s::Result (JSObject JSValue)
329 let aobj = fromJSObject arr
330 call <- fromObj aobj (strOfKey Method)::Result LuxiReq
331 args <- fromObj aobj (strOfKey Args)
332 return (LuxiCall call args)
334 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
336 -- This is currently hand-coded until we make it more uniform so that
337 -- it can be generated using TH.
338 decodeCall :: LuxiCall -> Result LuxiOp
339 decodeCall (LuxiCall call args) =
342 (jids, jargs) <- fromJVal args
343 jids' <- case jids of
346 return $ QueryJobs jids' jargs
347 ReqQueryInstances -> do
348 (names, fields, locking) <- fromJVal args
349 return $ QueryInstances names fields locking
351 (names, fields, locking) <- fromJVal args
352 return $ QueryNodes names fields locking
354 (names, fields, locking) <- fromJVal args
355 return $ QueryGroups names fields locking
356 ReqQueryClusterInfo ->
357 return QueryClusterInfo
358 ReqQueryNetworks -> do
359 (names, fields, locking) <- fromJVal args
360 return $ QueryNetworks names fields locking
362 (what, fields, qfilter) <- fromJVal args
363 return $ Query what fields qfilter
365 (what, fields) <- fromJVal args
366 fields' <- case fields of
369 return $ QueryFields what fields'
371 [ops1] <- fromJVal args
372 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
373 return $ SubmitJob ops2
374 ReqSubmitJobToDrainedQueue -> do
375 [ops1] <- fromJVal args
376 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
377 return $ SubmitJobToDrainedQueue ops2
378 ReqSubmitManyJobs -> do
379 [ops1] <- fromJVal args
380 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
381 return $ SubmitManyJobs ops2
382 ReqWaitForJobChange -> do
383 (jid, fields, pinfo, pidx, wtmout) <-
384 -- No instance for 5-tuple, code copied from the
385 -- json sources and adapted
386 fromJResult "Parsing WaitForJobChange message" $
388 JSArray [a, b, c, d, e] ->
395 _ -> J.Error "Not enough values"
396 return $ WaitForJobChange jid fields pinfo pidx wtmout
398 [jid] <- fromJVal args
399 return $ ArchiveJob jid
400 ReqAutoArchiveJobs -> do
401 (age, tmout) <- fromJVal args
402 return $ AutoArchiveJobs age tmout
403 ReqQueryExports -> do
404 (nodes, lock) <- fromJVal args
405 return $ QueryExports nodes lock
406 ReqQueryConfigValues -> do
407 [fields] <- fromJVal args
408 return $ QueryConfigValues fields
410 (kind, name) <- fromJVal args
411 return $ QueryTags kind name
413 [jid] <- fromJVal args
414 return $ CancelJob jid
415 ReqChangeJobPriority -> do
416 (jid, priority) <- fromJVal args
417 return $ ChangeJobPriority jid priority
418 ReqSetDrainFlag -> do
419 [flag] <- fromJVal args
420 return $ SetDrainFlag flag
421 ReqSetWatcherPause -> do
422 [duration] <- fromJVal args
423 return $ SetWatcherPause duration
425 -- | Check that luxi responses contain the required keys and that the
426 -- call was successful.
427 validateResult :: String -> ErrorResult JSValue
428 validateResult s = do
429 when (UTF8.replacement_char `elem` s) $
430 fail "Failed to decode UTF-8, detected replacement char after decoding"
431 oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
432 let arr = J.fromJSObject oarr
433 status <- fromObj arr (strOfKey Success)
434 result <- fromObj arr (strOfKey Result)
437 else decodeError result
439 -- | Try to decode an error from the server response. This function
440 -- will always fail, since it's called only on the error path (when
442 decodeError :: JSValue -> ErrorResult JSValue
446 Bad msg -> Bad $ GenericError msg
448 -- | Generic luxi method call.
449 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
450 callMethod method s = do
451 sendMsg s $ buildCall method
453 let rval = validateResult result
456 -- | Parse job submission result.
457 parseSubmitJobResult :: JSValue -> ErrorResult JobId
458 parseSubmitJobResult (JSArray [JSBool True, v]) =
460 J.Error msg -> Bad $ LuxiError msg
462 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
463 Bad . LuxiError $ fromJSString x
464 parseSubmitJobResult v =
465 Bad . LuxiError $ "Unknown result from the master daemon: " ++
468 -- | Specialized submitManyJobs call.
469 submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
470 submitManyJobs s jobs = do
471 rval <- callMethod (SubmitManyJobs jobs) s
472 -- map each result (status, payload) pair into a nice Result ADT
473 return $ case rval of
475 Ok (JSArray r) -> mapM parseSubmitJobResult r
476 x -> Bad . LuxiError $
477 "Cannot parse response from Ganeti: " ++ show x
479 -- | Custom queryJobs call.
480 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
481 queryJobsStatus s jids = do
482 rval <- callMethod (QueryJobs jids ["status"]) s
483 return $ case rval of
485 Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
486 J.Ok vals -> if any null vals
488 LuxiError "Missing job status field"
489 else Ok (map head vals)
490 J.Error x -> Bad $ LuxiError x