1 {-# LANGUAGE TemplateHaskell #-}
3 {-| Implementation of the Ganeti LUXI interface.
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
55 import Control.Exception (catch)
57 import qualified Data.ByteString as B
58 import qualified Data.ByteString.Lazy as BL
59 import qualified Data.ByteString.UTF8 as UTF8
60 import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61 import Data.Word (Word8)
63 import Text.JSON (encodeStrict, decodeStrict)
64 import qualified Text.JSON as J
65 import Text.JSON.Pretty (pp_value)
66 import Text.JSON.Types
67 import System.Directory (removeFile)
68 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69 import System.IO.Error (isEOFError)
71 import qualified Network.Socket as S
73 import Ganeti.BasicTypes
74 import Ganeti.Constants
77 import Ganeti.OpParams (pTagsObject)
80 import qualified Ganeti.Query.Language as Qlang
85 -- * Utility functions
87 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
88 withTimeout :: Int -> String -> IO a -> IO a
89 withTimeout secs descr action = do
90 result <- timeout (secs * 1000000) action
92 Nothing -> fail $ "Timeout in " ++ descr
95 -- * Generic protocol functionality
97 -- | Result of receiving a message from the socket.
98 data RecvResult = RecvConnClosed -- ^ Connection closed
99 | RecvError String -- ^ Any other error
100 | RecvOk String -- ^ Successfull receive
103 -- | Currently supported Luxi operations and JSON serialization.
106 [ simpleField "what" [t| Qlang.ItemType |]
107 , simpleField "fields" [t| [String] |]
108 , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
110 , (luxiReqQueryFields,
111 [ simpleField "what" [t| Qlang.ItemType |]
112 , simpleField "fields" [t| [String] |]
114 , (luxiReqQueryNodes,
115 [ simpleField "names" [t| [String] |]
116 , simpleField "fields" [t| [String] |]
117 , simpleField "lock" [t| Bool |]
119 , (luxiReqQueryGroups,
120 [ simpleField "names" [t| [String] |]
121 , simpleField "fields" [t| [String] |]
122 , simpleField "lock" [t| Bool |]
124 , (luxiReqQueryNetworks,
125 [ simpleField "names" [t| [String] |]
126 , simpleField "fields" [t| [String] |]
127 , simpleField "lock" [t| Bool |]
129 , (luxiReqQueryInstances,
130 [ simpleField "names" [t| [String] |]
131 , simpleField "fields" [t| [String] |]
132 , simpleField "lock" [t| Bool |]
135 [ simpleField "ids" [t| [JobId] |]
136 , simpleField "fields" [t| [String] |]
138 , (luxiReqQueryExports,
139 [ simpleField "nodes" [t| [String] |]
140 , simpleField "lock" [t| Bool |]
142 , (luxiReqQueryConfigValues,
143 [ simpleField "fields" [t| [String] |] ]
145 , (luxiReqQueryClusterInfo, [])
149 [ simpleField "job" [t| [MetaOpCode] |] ]
151 , (luxiReqSubmitManyJobs,
152 [ simpleField "ops" [t| [[MetaOpCode]] |] ]
154 , (luxiReqWaitForJobChange,
155 [ simpleField "job" [t| JobId |]
156 , simpleField "fields" [t| [String]|]
157 , simpleField "prev_job" [t| JSValue |]
158 , simpleField "prev_log" [t| JSValue |]
159 , simpleField "tmout" [t| Int |]
161 , (luxiReqArchiveJob,
162 [ simpleField "job" [t| JobId |] ]
164 , (luxiReqAutoArchiveJobs,
165 [ simpleField "age" [t| Int |]
166 , simpleField "tmout" [t| Int |]
169 [ simpleField "job" [t| JobId |] ]
171 , (luxiReqChangeJobPriority,
172 [ simpleField "job" [t| JobId |]
173 , simpleField "priority" [t| Int |] ]
175 , (luxiReqSetDrainFlag,
176 [ simpleField "flag" [t| Bool |] ]
178 , (luxiReqSetWatcherPause,
179 [ simpleField "duration" [t| Double |] ]
183 $(makeJSONInstance ''LuxiReq)
185 -- | List of all defined Luxi calls.
186 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
188 -- | The serialisation of LuxiOps into strings in messages.
189 $(genStrOfOp ''LuxiOp "strOfOp")
191 -- | Type holding the initial (unparsed) Luxi call.
192 data LuxiCall = LuxiCall LuxiReq JSValue
194 -- | The end-of-message separator.
198 -- | The end-of-message encoded as a ByteString.
200 bEOM = B.singleton eOM
202 -- | Valid keys in the requests and responses.
203 data MsgKeys = Method
208 -- | The serialisation of MsgKeys into strings in messages.
209 $(genStrOfKey ''MsgKeys "strOfKey")
211 -- | Luxi client encapsulation.
212 data Client = Client { socket :: Handle -- ^ The socket of the client
213 , rbuf :: IORef B.ByteString -- ^ Already received buffer
216 -- | Connects to the master daemon and returns a luxi Client.
217 getClient :: String -> IO Client
219 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
220 withTimeout luxiDefCtmo "creating luxi connection" $
221 S.connect s (S.SockAddrUnix path)
222 rf <- newIORef B.empty
223 h <- S.socketToHandle s ReadWriteMode
224 return Client { socket=h, rbuf=rf }
226 -- | Creates and returns a server endpoint.
227 getServer :: Bool -> FilePath -> IO S.Socket
228 getServer setOwner path = do
229 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
230 S.bindSocket s (S.SockAddrUnix path)
231 when setOwner . setOwnerAndGroupFromNames path GanetiLuxid $
232 ExtraGroup DaemonsGroup
233 S.listen s 5 -- 5 is the max backlog
236 -- | Closes a server endpoint.
237 -- FIXME: this should be encapsulated into a nicer type.
238 closeServer :: FilePath -> S.Socket -> IO ()
239 closeServer path sock = do
243 -- | Accepts a client
244 acceptClient :: S.Socket -> IO Client
246 -- second return is the address of the client, which we ignore here
247 (client_socket, _) <- S.accept s
248 new_buffer <- newIORef B.empty
249 handle <- S.socketToHandle client_socket ReadWriteMode
250 return Client { socket=handle, rbuf=new_buffer }
252 -- | Closes the client socket.
253 closeClient :: Client -> IO ()
254 closeClient = hClose . socket
256 -- | Sends a message over a luxi transport.
257 sendMsg :: Client -> String -> IO ()
258 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
259 let encoded = UTF8L.fromString buf
261 BL.hPut handle encoded
265 -- | Given a current buffer and the handle, it will read from the
266 -- network until we get a full message, and it will return that
267 -- message and the leftover buffer contents.
268 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
269 recvUpdate handle obuf = do
270 nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
271 _ <- hWaitForInput handle (-1)
272 B.hGetNonBlocking handle 4096
273 let (msg, remaining) = B.break (eOM ==) nbuf
274 newbuf = B.append obuf msg
276 then recvUpdate handle newbuf
277 else return (newbuf, B.tail remaining)
279 -- | Waits for a message over a luxi transport.
280 recvMsg :: Client -> IO String
282 cbuf <- readIORef $ rbuf s
283 let (imsg, ibuf) = B.break (eOM ==) cbuf
285 if B.null ibuf -- if old buffer didn't contain a full message
286 then recvUpdate (socket s) cbuf -- then we read from network
287 else return (imsg, B.tail ibuf) -- else we return data from our buffer
288 writeIORef (rbuf s) nbuf
289 return $ UTF8.toString msg
291 -- | Extended wrapper over recvMsg.
292 recvMsgExt :: Client -> IO RecvResult
294 Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
295 return $ if isEOFError e
297 else RecvError (show e)
299 -- | Serialize a request to String.
300 buildCall :: LuxiOp -- ^ The method
301 -> String -- ^ The serialized form
303 let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
304 , (strOfKey Args, opToArgs lo)
309 -- | Serialize the response to String.
310 buildResponse :: Bool -- ^ Success
311 -> JSValue -- ^ The arguments
312 -> String -- ^ The serialized form
313 buildResponse success args =
314 let ja = [ (strOfKey Success, JSBool success)
315 , (strOfKey Result, args)]
319 -- | Check that luxi request contains the required keys and parse it.
320 validateCall :: String -> Result LuxiCall
322 arr <- fromJResult "parsing top-level luxi message" $
323 decodeStrict s::Result (JSObject JSValue)
324 let aobj = fromJSObject arr
325 call <- fromObj aobj (strOfKey Method)::Result LuxiReq
326 args <- fromObj aobj (strOfKey Args)
327 return (LuxiCall call args)
329 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
331 -- This is currently hand-coded until we make it more uniform so that
332 -- it can be generated using TH.
333 decodeCall :: LuxiCall -> Result LuxiOp
334 decodeCall (LuxiCall call args) =
337 (jids, jargs) <- fromJVal args
338 jids' <- case jids of
341 return $ QueryJobs jids' jargs
342 ReqQueryInstances -> do
343 (names, fields, locking) <- fromJVal args
344 return $ QueryInstances names fields locking
346 (names, fields, locking) <- fromJVal args
347 return $ QueryNodes names fields locking
349 (names, fields, locking) <- fromJVal args
350 return $ QueryGroups names fields locking
351 ReqQueryClusterInfo ->
352 return QueryClusterInfo
353 ReqQueryNetworks -> do
354 (names, fields, locking) <- fromJVal args
355 return $ QueryNetworks names fields locking
357 (what, fields, qfilter) <- fromJVal args
358 return $ Query what fields qfilter
360 (what, fields) <- fromJVal args
361 fields' <- case fields of
364 return $ QueryFields what fields'
366 [ops1] <- fromJVal args
367 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
368 return $ SubmitJob ops2
369 ReqSubmitManyJobs -> do
370 [ops1] <- fromJVal args
371 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
372 return $ SubmitManyJobs ops2
373 ReqWaitForJobChange -> do
374 (jid, fields, pinfo, pidx, wtmout) <-
375 -- No instance for 5-tuple, code copied from the
376 -- json sources and adapted
377 fromJResult "Parsing WaitForJobChange message" $
379 JSArray [a, b, c, d, e] ->
386 _ -> J.Error "Not enough values"
387 return $ WaitForJobChange jid fields pinfo pidx wtmout
389 [jid] <- fromJVal args
390 return $ ArchiveJob jid
391 ReqAutoArchiveJobs -> do
392 (age, tmout) <- fromJVal args
393 return $ AutoArchiveJobs age tmout
394 ReqQueryExports -> do
395 (nodes, lock) <- fromJVal args
396 return $ QueryExports nodes lock
397 ReqQueryConfigValues -> do
398 [fields] <- fromJVal args
399 return $ QueryConfigValues fields
401 (kind, name) <- fromJVal args
402 item <- tagObjectFrom kind name
403 return $ QueryTags item
405 [jid] <- fromJVal args
406 return $ CancelJob jid
407 ReqChangeJobPriority -> do
408 (jid, priority) <- fromJVal args
409 return $ ChangeJobPriority jid priority
410 ReqSetDrainFlag -> do
411 [flag] <- fromJVal args
412 return $ SetDrainFlag flag
413 ReqSetWatcherPause -> do
414 [duration] <- fromJVal args
415 return $ SetWatcherPause duration
417 -- | Check that luxi responses contain the required keys and that the
418 -- call was successful.
419 validateResult :: String -> ErrorResult JSValue
420 validateResult s = do
421 when (UTF8.replacement_char `elem` s) $
422 fail "Failed to decode UTF-8, detected replacement char after decoding"
423 oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
424 let arr = J.fromJSObject oarr
425 status <- fromObj arr (strOfKey Success)
426 result <- fromObj arr (strOfKey Result)
429 else decodeError result
431 -- | Try to decode an error from the server response. This function
432 -- will always fail, since it's called only on the error path (when
434 decodeError :: JSValue -> ErrorResult JSValue
438 Bad msg -> Bad $ GenericError msg
440 -- | Generic luxi method call.
441 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
442 callMethod method s = do
443 sendMsg s $ buildCall method
445 let rval = validateResult result
448 -- | Parse job submission result.
449 parseSubmitJobResult :: JSValue -> ErrorResult JobId
450 parseSubmitJobResult (JSArray [JSBool True, v]) =
452 J.Error msg -> Bad $ LuxiError msg
454 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
455 Bad . LuxiError $ fromJSString x
456 parseSubmitJobResult v =
457 Bad . LuxiError $ "Unknown result from the master daemon: " ++
460 -- | Specialized submitManyJobs call.
461 submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
462 submitManyJobs s jobs = do
463 rval <- callMethod (SubmitManyJobs jobs) s
464 -- map each result (status, payload) pair into a nice Result ADT
465 return $ case rval of
467 Ok (JSArray r) -> mapM parseSubmitJobResult r
468 x -> Bad . LuxiError $
469 "Cannot parse response from Ganeti: " ++ show x
471 -- | Custom queryJobs call.
472 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
473 queryJobsStatus s jids = do
474 rval <- callMethod (QueryJobs jids ["status"]) s
475 return $ case rval of
477 Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
478 J.Ok vals -> if any null vals
480 LuxiError "Missing job status field"
481 else Ok (map head vals)
482 J.Error x -> Bad $ LuxiError x