1 {-# LANGUAGE TemplateHaskell #-}
3 {-| Implementation of the Ganeti LUXI interface.
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
55 import Control.Exception (catch)
57 import Data.Ratio (numerator, denominator)
58 import qualified Data.ByteString as B
59 import qualified Data.ByteString.UTF8 as UTF8
60 import Data.Word (Word8)
62 import Prelude hiding (catch)
63 import Text.JSON (encodeStrict, decodeStrict)
64 import qualified Text.JSON as J
65 import Text.JSON.Types
66 import System.Directory (removeFile)
67 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
68 import System.IO.Error (isEOFError)
70 import qualified Network.Socket as S
72 import Ganeti.HTools.JSON
73 import Ganeti.HTools.Types
74 import Ganeti.HTools.Utils
76 import Ganeti.Constants
77 import Ganeti.Jobs (JobStatus)
78 import Ganeti.OpCodes (OpCode)
79 import qualified Ganeti.Qlang as Qlang
82 -- * Utility functions
84 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
85 withTimeout :: Int -> String -> IO a -> IO a
86 withTimeout secs descr action = do
87 result <- timeout (secs * 1000000) action
89 Nothing -> fail $ "Timeout in " ++ descr
92 -- * Generic protocol functionality
94 -- | Result of receiving a message from the socket.
95 data RecvResult = RecvConnClosed -- ^ Connection closed
96 | RecvError String -- ^ Any other error
97 | RecvOk String -- ^ Successfull receive
98 deriving (Show, Read, Eq)
100 -- | The Ganeti job type.
103 $(declareSADT "QrViaLuxi"
104 [ ("QRLock", 'qrLock)
105 , ("QRInstance", 'qrInstance)
106 , ("QRNode", 'qrNode)
107 , ("QRGroup", 'qrGroup)
111 $(makeJSONInstance ''QrViaLuxi)
113 -- | Currently supported Luxi operations and JSON serialization.
116 [ ("what", [t| QrViaLuxi |], [| id |])
117 , ("fields", [t| [String] |], [| id |])
118 , ("qfilter", [t| Qlang.Filter |], [| id |])
120 , (luxiReqQueryNodes,
121 [ ("names", [t| [String] |], [| id |])
122 , ("fields", [t| [String] |], [| id |])
123 , ("lock", [t| Bool |], [| id |])
125 , (luxiReqQueryGroups,
126 [ ("names", [t| [String] |], [| id |])
127 , ("fields", [t| [String] |], [| id |])
128 , ("lock", [t| Bool |], [| id |])
130 , (luxiReqQueryInstances,
131 [ ("names", [t| [String] |], [| id |])
132 , ("fields", [t| [String] |], [| id |])
133 , ("lock", [t| Bool |], [| id |])
136 [ ("ids", [t| [Int] |], [| id |])
137 , ("fields", [t| [String] |], [| id |])
139 , (luxiReqQueryExports,
140 [ ("nodes", [t| [String] |], [| id |])
141 , ("lock", [t| Bool |], [| id |])
143 , (luxiReqQueryConfigValues,
144 [ ("fields", [t| [String] |], [| id |]) ]
146 , (luxiReqQueryClusterInfo, [])
148 [ ("kind", [t| String |], [| id |])
149 , ("name", [t| String |], [| id |])
152 [ ("job", [t| [OpCode] |], [| id |]) ]
154 , (luxiReqSubmitManyJobs,
155 [ ("ops", [t| [[OpCode]] |], [| id |]) ]
157 , (luxiReqWaitForJobChange,
158 [ ("job", [t| Int |], [| id |])
159 , ("fields", [t| [String]|], [| id |])
160 , ("prev_job", [t| JSValue |], [| id |])
161 , ("prev_log", [t| JSValue |], [| id |])
162 , ("tmout", [t| Int |], [| id |])
164 , (luxiReqArchiveJob,
165 [ ("job", [t| Int |], [| id |]) ]
167 , (luxiReqAutoArchiveJobs,
168 [ ("age", [t| Int |], [| id |])
169 , ("tmout", [t| Int |], [| id |])
172 [ ("job", [t| Int |], [| id |]) ]
174 , (luxiReqSetDrainFlag,
175 [ ("flag", [t| Bool |], [| id |]) ]
177 , (luxiReqSetWatcherPause,
178 [ ("duration", [t| Double |], [| id |]) ]
182 $(makeJSONInstance ''LuxiReq)
184 -- | The serialisation of LuxiOps into strings in messages.
185 $(genStrOfOp ''LuxiOp "strOfOp")
187 $(declareIADT "ResultStatus"
188 [ ("RSNormal", 'rsNormal)
189 , ("RSUnknown", 'rsUnknown)
190 , ("RSNoData", 'rsNodata)
191 , ("RSUnavailable", 'rsUnavail)
192 , ("RSOffline", 'rsOffline)
195 $(makeJSONInstance ''ResultStatus)
197 -- | Type holding the initial (unparsed) Luxi call.
198 data LuxiCall = LuxiCall LuxiReq JSValue
200 -- | Check that ResultStatus is success or fail with descriptive message.
201 checkRS :: (Monad m) => ResultStatus -> a -> m a
202 checkRS RSNormal val = return val
203 checkRS RSUnknown _ = fail "Unknown field"
204 checkRS RSNoData _ = fail "No data for a field"
205 checkRS RSUnavailable _ = fail "Ganeti reports unavailable data"
206 checkRS RSOffline _ = fail "Ganeti reports resource as offline"
208 -- | The end-of-message separator.
212 -- | The end-of-message encoded as a ByteString.
214 bEOM = B.singleton eOM
216 -- | Valid keys in the requests and responses.
217 data MsgKeys = Method
222 -- | The serialisation of MsgKeys into strings in messages.
223 $(genStrOfKey ''MsgKeys "strOfKey")
225 -- | Luxi client encapsulation.
226 data Client = Client { socket :: Handle -- ^ The socket of the client
227 , rbuf :: IORef B.ByteString -- ^ Already received buffer
230 -- | Connects to the master daemon and returns a luxi Client.
231 getClient :: String -> IO Client
233 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
234 withTimeout connTimeout "creating luxi connection" $
235 S.connect s (S.SockAddrUnix path)
236 rf <- newIORef B.empty
237 h <- S.socketToHandle s ReadWriteMode
238 return Client { socket=h, rbuf=rf }
240 -- | Creates and returns a server endpoint.
241 getServer :: FilePath -> IO S.Socket
243 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
244 S.bindSocket s (S.SockAddrUnix path)
245 S.listen s 5 -- 5 is the max backlog
248 -- | Closes a server endpoint.
249 -- FIXME: this should be encapsulated into a nicer type.
250 closeServer :: FilePath -> S.Socket -> IO ()
251 closeServer path sock = do
255 -- | Accepts a client
256 acceptClient :: S.Socket -> IO Client
258 -- second return is the address of the client, which we ignore here
259 (client_socket, _) <- S.accept s
260 new_buffer <- newIORef B.empty
261 handle <- S.socketToHandle client_socket ReadWriteMode
262 return Client { socket=handle, rbuf=new_buffer }
264 -- | Closes the client socket.
265 closeClient :: Client -> IO ()
266 closeClient = hClose . socket
268 -- | Sends a message over a luxi transport.
269 sendMsg :: Client -> String -> IO ()
270 sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
271 let encoded = UTF8.fromString buf
273 B.hPut handle encoded
277 -- | Given a current buffer and the handle, it will read from the
278 -- network until we get a full message, and it will return that
279 -- message and the leftover buffer contents.
280 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
281 recvUpdate handle obuf = do
282 nbuf <- withTimeout queryTimeout "reading luxi response" $ do
283 _ <- hWaitForInput handle (-1)
284 B.hGetNonBlocking handle 4096
285 let (msg, remaining) = B.break (eOM ==) nbuf
286 newbuf = B.append obuf msg
288 then recvUpdate handle newbuf
289 else return (newbuf, B.tail remaining)
291 -- | Waits for a message over a luxi transport.
292 recvMsg :: Client -> IO String
294 cbuf <- readIORef $ rbuf s
295 let (imsg, ibuf) = B.break (eOM ==) cbuf
297 if B.null ibuf -- if old buffer didn't contain a full message
298 then recvUpdate (socket s) cbuf -- then we read from network
299 else return (imsg, B.tail ibuf) -- else we return data from our buffer
300 writeIORef (rbuf s) nbuf
301 return $ UTF8.toString msg
303 -- | Extended wrapper over recvMsg.
304 recvMsgExt :: Client -> IO RecvResult
306 catch (liftM RecvOk (recvMsg s)) $ \e ->
308 then return RecvConnClosed
309 else return $ RecvError (show e)
311 -- | Serialize a request to String.
312 buildCall :: LuxiOp -- ^ The method
313 -> String -- ^ The serialized form
315 let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
316 , (strOfKey Args, opToArgs lo)
321 -- | Serialize the response to String.
322 buildResponse :: Bool -- ^ Success
323 -> JSValue -- ^ The arguments
324 -> String -- ^ The serialized form
325 buildResponse success args =
326 let ja = [ (strOfKey Success, JSBool success)
327 , (strOfKey Result, args)]
331 -- | Check that luxi request contains the required keys and parse it.
332 validateCall :: String -> Result LuxiCall
334 arr <- fromJResult "parsing top-level luxi message" $
335 decodeStrict s::Result (JSObject JSValue)
336 let aobj = fromJSObject arr
337 call <- fromObj aobj (strOfKey Method)::Result LuxiReq
338 args <- fromObj aobj (strOfKey Args)
339 return (LuxiCall call args)
341 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
343 -- This is currently hand-coded until we make it more uniform so that
344 -- it can be generated using TH.
345 decodeCall :: LuxiCall -> Result LuxiOp
346 decodeCall (LuxiCall call args) =
349 (jid, jargs) <- fromJVal args
350 rid <- mapM parseJobId jid
351 let rargs = map fromJSString jargs
352 return $ QueryJobs rid rargs
353 ReqQueryInstances -> do
354 (names, fields, locking) <- fromJVal args
355 return $ QueryInstances names fields locking
357 (names, fields, locking) <- fromJVal args
358 return $ QueryNodes names fields locking
360 (names, fields, locking) <- fromJVal args
361 return $ QueryGroups names fields locking
362 ReqQueryClusterInfo -> do
363 return QueryClusterInfo
365 (what, fields, qfilter) <- fromJVal args
366 return $ Query what fields qfilter
368 [ops1] <- fromJVal args
369 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
370 return $ SubmitJob ops2
371 ReqSubmitManyJobs -> do
372 [ops1] <- fromJVal args
373 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
374 return $ SubmitManyJobs ops2
375 ReqWaitForJobChange -> do
376 (jid, fields, pinfo, pidx, wtmout) <-
377 -- No instance for 5-tuple, code copied from the
378 -- json sources and adapted
379 fromJResult "Parsing WaitForJobChange message" $
381 JSArray [a, b, c, d, e] ->
388 _ -> J.Error "Not enough values"
389 rid <- parseJobId jid
390 return $ WaitForJobChange rid fields pinfo pidx wtmout
392 [jid] <- fromJVal args
393 rid <- parseJobId jid
394 return $ ArchiveJob rid
395 ReqAutoArchiveJobs -> do
396 (age, tmout) <- fromJVal args
397 return $ AutoArchiveJobs age tmout
398 ReqQueryExports -> do
399 (nodes, lock) <- fromJVal args
400 return $ QueryExports nodes lock
401 ReqQueryConfigValues -> do
402 [fields] <- fromJVal args
403 return $ QueryConfigValues fields
405 (kind, name) <- fromJVal args
406 return $ QueryTags kind name
408 [job] <- fromJVal args
409 rid <- parseJobId job
410 return $ CancelJob rid
411 ReqSetDrainFlag -> do
412 [flag] <- fromJVal args
413 return $ SetDrainFlag flag
414 ReqSetWatcherPause -> do
415 [duration] <- fromJVal args
416 return $ SetWatcherPause duration
418 -- | Check that luxi responses contain the required keys and that the
419 -- call was successful.
420 validateResult :: String -> Result JSValue
421 validateResult s = do
422 when (UTF8.replacement_char `elem` s) $
423 fail "Failed to decode UTF-8, detected replacement char after decoding"
424 oarr <- fromJResult "Parsing LUXI response"
425 (decodeStrict s)::Result (JSObject JSValue)
426 let arr = J.fromJSObject oarr
427 status <- fromObj arr (strOfKey Success)::Result Bool
428 let rkey = strOfKey Result
430 then fromObj arr rkey
431 else fromObj arr rkey >>= fail
433 -- | Generic luxi method call.
434 callMethod :: LuxiOp -> Client -> IO (Result JSValue)
435 callMethod method s = do
436 sendMsg s $ buildCall method
438 let rval = validateResult result
441 -- | Parses a job ID.
442 parseJobId :: JSValue -> Result JobId
443 parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
444 parseJobId (JSRational _ x) =
445 if denominator x /= 1
446 then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
447 -- FIXME: potential integer overflow here on 32-bit platforms
448 else Ok . fromIntegral . numerator $ x
449 parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
451 -- | Parse job submission result.
452 parseSubmitJobResult :: JSValue -> Result JobId
453 parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v
454 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
456 parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++
459 -- | Specialized submitManyJobs call.
460 submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId])
461 submitManyJobs s jobs = do
462 rval <- callMethod (SubmitManyJobs jobs) s
463 -- map each result (status, payload) pair into a nice Result ADT
464 return $ case rval of
466 Ok (JSArray r) -> mapM parseSubmitJobResult r
467 x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
469 -- | Custom queryJobs call.
470 queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
471 queryJobsStatus s jids = do
472 rval <- callMethod (QueryJobs jids ["status"]) s
473 return $ case rval of
475 Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
476 J.Ok vals -> if any null vals
477 then Bad "Missing job status field"
478 else Ok (map head vals)