1 {-# LANGUAGE TemplateHaskell #-}
3 {-| Implementation of the Ganeti LUXI interface.
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
53 import Control.Exception (catch)
55 import Data.Ratio (numerator, denominator)
56 import qualified Data.ByteString as B
57 import qualified Data.ByteString.UTF8 as UTF8
58 import Data.Word (Word8)
60 import Prelude hiding (catch)
61 import Text.JSON (encodeStrict, decodeStrict)
62 import qualified Text.JSON as J
63 import Text.JSON.Types
64 import System.Directory (removeFile)
65 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
66 import System.IO.Error (isEOFError)
68 import qualified Network.Socket as S
70 import Ganeti.HTools.JSON
71 import Ganeti.HTools.Types
72 import Ganeti.HTools.Utils
74 import Ganeti.Constants
75 import Ganeti.Jobs (JobStatus)
76 import Ganeti.OpCodes (OpCode)
77 import qualified Ganeti.Qlang as Qlang
80 -- * Utility functions
82 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
83 withTimeout :: Int -> String -> IO a -> IO a
84 withTimeout secs descr action = do
85 result <- timeout (secs * 1000000) action
87 Nothing -> fail $ "Timeout in " ++ descr
90 -- * Generic protocol functionality
92 -- | Result of receiving a message from the socket.
93 data RecvResult = RecvConnClosed -- ^ Connection closed
94 | RecvError String -- ^ Any other error
95 | RecvOk String -- ^ Successfull receive
96 deriving (Show, Read, Eq)
98 -- | The Ganeti job type.
101 -- | Data type representing what items do the tag operations apply to.
102 $(declareSADT "TagObject"
103 [ ("TagInstance", 'tagInstance)
104 , ("TagNode", 'tagNode)
105 , ("TagGroup", 'tagNodegroup)
106 , ("TagCluster", 'tagCluster)
108 $(makeJSONInstance ''TagObject)
110 -- | Currently supported Luxi operations and JSON serialization.
113 [ ("what", [t| Qlang.ItemType |])
114 , ("fields", [t| [String] |])
115 , ("qfilter", [t| Qlang.Filter |])
117 , (luxiReqQueryFields,
118 [ ("what", [t| Qlang.ItemType |])
119 , ("fields", [t| [String] |])
121 , (luxiReqQueryNodes,
122 [ ("names", [t| [String] |])
123 , ("fields", [t| [String] |])
124 , ("lock", [t| Bool |])
126 , (luxiReqQueryGroups,
127 [ ("names", [t| [String] |])
128 , ("fields", [t| [String] |])
129 , ("lock", [t| Bool |])
131 , (luxiReqQueryInstances,
132 [ ("names", [t| [String] |])
133 , ("fields", [t| [String] |])
134 , ("lock", [t| Bool |])
137 [ ("ids", [t| [Int] |])
138 , ("fields", [t| [String] |])
140 , (luxiReqQueryExports,
141 [ ("nodes", [t| [String] |])
142 , ("lock", [t| Bool |])
144 , (luxiReqQueryConfigValues,
145 [ ("fields", [t| [String] |]) ]
147 , (luxiReqQueryClusterInfo, [])
149 [ ("kind", [t| TagObject |])
150 , ("name", [t| String |])
153 [ ("job", [t| [OpCode] |]) ]
155 , (luxiReqSubmitManyJobs,
156 [ ("ops", [t| [[OpCode]] |]) ]
158 , (luxiReqWaitForJobChange,
159 [ ("job", [t| Int |])
160 , ("fields", [t| [String]|])
161 , ("prev_job", [t| JSValue |])
162 , ("prev_log", [t| JSValue |])
163 , ("tmout", [t| Int |])
165 , (luxiReqArchiveJob,
166 [ ("job", [t| Int |]) ]
168 , (luxiReqAutoArchiveJobs,
169 [ ("age", [t| Int |])
170 , ("tmout", [t| Int |])
173 [ ("job", [t| Int |]) ]
175 , (luxiReqSetDrainFlag,
176 [ ("flag", [t| Bool |]) ]
178 , (luxiReqSetWatcherPause,
179 [ ("duration", [t| Double |]) ]
183 $(makeJSONInstance ''LuxiReq)
185 -- | The serialisation of LuxiOps into strings in messages.
186 $(genStrOfOp ''LuxiOp "strOfOp")
188 -- | Type holding the initial (unparsed) Luxi call.
189 data LuxiCall = LuxiCall LuxiReq JSValue
191 -- | The end-of-message separator.
195 -- | The end-of-message encoded as a ByteString.
197 bEOM = B.singleton eOM
199 -- | Valid keys in the requests and responses.
200 data MsgKeys = Method
205 -- | The serialisation of MsgKeys into strings in messages.
206 $(genStrOfKey ''MsgKeys "strOfKey")
208 -- | Luxi client encapsulation.
209 data Client = Client { socket :: Handle -- ^ The socket of the client
210 , rbuf :: IORef B.ByteString -- ^ Already received buffer
213 -- | Connects to the master daemon and returns a luxi Client.
214 getClient :: String -> IO Client
216 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
217 withTimeout connTimeout "creating luxi connection" $
218 S.connect s (S.SockAddrUnix path)
219 rf <- newIORef B.empty
220 h <- S.socketToHandle s ReadWriteMode
221 return Client { socket=h, rbuf=rf }
223 -- | Creates and returns a server endpoint.
224 getServer :: FilePath -> IO S.Socket
226 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
227 S.bindSocket s (S.SockAddrUnix path)
228 S.listen s 5 -- 5 is the max backlog
231 -- | Closes a server endpoint.
232 -- FIXME: this should be encapsulated into a nicer type.
233 closeServer :: FilePath -> S.Socket -> IO ()
234 closeServer path sock = do
238 -- | Accepts a client
239 acceptClient :: S.Socket -> IO Client
241 -- second return is the address of the client, which we ignore here
242 (client_socket, _) <- S.accept s
243 new_buffer <- newIORef B.empty
244 handle <- S.socketToHandle client_socket ReadWriteMode
245 return Client { socket=handle, rbuf=new_buffer }
247 -- | Closes the client socket.
248 closeClient :: Client -> IO ()
249 closeClient = hClose . socket
251 -- | Sends a message over a luxi transport.
252 sendMsg :: Client -> String -> IO ()
253 sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
254 let encoded = UTF8.fromString buf
256 B.hPut handle encoded
260 -- | Given a current buffer and the handle, it will read from the
261 -- network until we get a full message, and it will return that
262 -- message and the leftover buffer contents.
263 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
264 recvUpdate handle obuf = do
265 nbuf <- withTimeout queryTimeout "reading luxi response" $ do
266 _ <- hWaitForInput handle (-1)
267 B.hGetNonBlocking handle 4096
268 let (msg, remaining) = B.break (eOM ==) nbuf
269 newbuf = B.append obuf msg
271 then recvUpdate handle newbuf
272 else return (newbuf, B.tail remaining)
274 -- | Waits for a message over a luxi transport.
275 recvMsg :: Client -> IO String
277 cbuf <- readIORef $ rbuf s
278 let (imsg, ibuf) = B.break (eOM ==) cbuf
280 if B.null ibuf -- if old buffer didn't contain a full message
281 then recvUpdate (socket s) cbuf -- then we read from network
282 else return (imsg, B.tail ibuf) -- else we return data from our buffer
283 writeIORef (rbuf s) nbuf
284 return $ UTF8.toString msg
286 -- | Extended wrapper over recvMsg.
287 recvMsgExt :: Client -> IO RecvResult
289 catch (liftM RecvOk (recvMsg s)) $ \e ->
291 then return RecvConnClosed
292 else return $ RecvError (show e)
294 -- | Serialize a request to String.
295 buildCall :: LuxiOp -- ^ The method
296 -> String -- ^ The serialized form
298 let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
299 , (strOfKey Args, opToArgs lo)
304 -- | Serialize the response to String.
305 buildResponse :: Bool -- ^ Success
306 -> JSValue -- ^ The arguments
307 -> String -- ^ The serialized form
308 buildResponse success args =
309 let ja = [ (strOfKey Success, JSBool success)
310 , (strOfKey Result, args)]
314 -- | Check that luxi request contains the required keys and parse it.
315 validateCall :: String -> Result LuxiCall
317 arr <- fromJResult "parsing top-level luxi message" $
318 decodeStrict s::Result (JSObject JSValue)
319 let aobj = fromJSObject arr
320 call <- fromObj aobj (strOfKey Method)::Result LuxiReq
321 args <- fromObj aobj (strOfKey Args)
322 return (LuxiCall call args)
324 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
326 -- This is currently hand-coded until we make it more uniform so that
327 -- it can be generated using TH.
328 decodeCall :: LuxiCall -> Result LuxiOp
329 decodeCall (LuxiCall call args) =
332 (jid, jargs) <- fromJVal args
333 rid <- mapM parseJobId jid
334 let rargs = map fromJSString jargs
335 return $ QueryJobs rid rargs
336 ReqQueryInstances -> do
337 (names, fields, locking) <- fromJVal args
338 return $ QueryInstances names fields locking
340 (names, fields, locking) <- fromJVal args
341 return $ QueryNodes names fields locking
343 (names, fields, locking) <- fromJVal args
344 return $ QueryGroups names fields locking
345 ReqQueryClusterInfo -> do
346 return QueryClusterInfo
348 (what, fields, qfilter) <- fromJVal args
349 return $ Query what fields qfilter
351 (what, fields) <- fromJVal args
352 fields' <- case fields of
355 return $ QueryFields what fields'
357 [ops1] <- fromJVal args
358 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
359 return $ SubmitJob ops2
360 ReqSubmitManyJobs -> do
361 [ops1] <- fromJVal args
362 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
363 return $ SubmitManyJobs ops2
364 ReqWaitForJobChange -> do
365 (jid, fields, pinfo, pidx, wtmout) <-
366 -- No instance for 5-tuple, code copied from the
367 -- json sources and adapted
368 fromJResult "Parsing WaitForJobChange message" $
370 JSArray [a, b, c, d, e] ->
377 _ -> J.Error "Not enough values"
378 rid <- parseJobId jid
379 return $ WaitForJobChange rid fields pinfo pidx wtmout
381 [jid] <- fromJVal args
382 rid <- parseJobId jid
383 return $ ArchiveJob rid
384 ReqAutoArchiveJobs -> do
385 (age, tmout) <- fromJVal args
386 return $ AutoArchiveJobs age tmout
387 ReqQueryExports -> do
388 (nodes, lock) <- fromJVal args
389 return $ QueryExports nodes lock
390 ReqQueryConfigValues -> do
391 [fields] <- fromJVal args
392 return $ QueryConfigValues fields
394 (kind, name) <- fromJVal args
395 return $ QueryTags kind name
397 [job] <- fromJVal args
398 rid <- parseJobId job
399 return $ CancelJob rid
400 ReqSetDrainFlag -> do
401 [flag] <- fromJVal args
402 return $ SetDrainFlag flag
403 ReqSetWatcherPause -> do
404 [duration] <- fromJVal args
405 return $ SetWatcherPause duration
407 -- | Check that luxi responses contain the required keys and that the
408 -- call was successful.
409 validateResult :: String -> Result JSValue
410 validateResult s = do
411 when (UTF8.replacement_char `elem` s) $
412 fail "Failed to decode UTF-8, detected replacement char after decoding"
413 oarr <- fromJResult "Parsing LUXI response"
414 (decodeStrict s)::Result (JSObject JSValue)
415 let arr = J.fromJSObject oarr
416 status <- fromObj arr (strOfKey Success)::Result Bool
417 let rkey = strOfKey Result
419 then fromObj arr rkey
420 else fromObj arr rkey >>= fail
422 -- | Generic luxi method call.
423 callMethod :: LuxiOp -> Client -> IO (Result JSValue)
424 callMethod method s = do
425 sendMsg s $ buildCall method
427 let rval = validateResult result
430 -- | Parses a job ID.
431 parseJobId :: JSValue -> Result JobId
432 parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
433 parseJobId (JSRational _ x) =
434 if denominator x /= 1
435 then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
436 -- FIXME: potential integer overflow here on 32-bit platforms
437 else Ok . fromIntegral . numerator $ x
438 parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
440 -- | Parse job submission result.
441 parseSubmitJobResult :: JSValue -> Result JobId
442 parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v
443 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
445 parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++
448 -- | Specialized submitManyJobs call.
449 submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId])
450 submitManyJobs s jobs = do
451 rval <- callMethod (SubmitManyJobs jobs) s
452 -- map each result (status, payload) pair into a nice Result ADT
453 return $ case rval of
455 Ok (JSArray r) -> mapM parseSubmitJobResult r
456 x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
458 -- | Custom queryJobs call.
459 queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
460 queryJobsStatus s jids = do
461 rval <- callMethod (QueryJobs jids ["status"]) s
462 return $ case rval of
464 Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
465 J.Ok vals -> if any null vals
466 then Bad "Missing job status field"
467 else Ok (map head vals)