1 {-# LANGUAGE TemplateHaskell #-}
3 {-| Implementation of the Ganeti LUXI interface.
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
53 import Control.Exception (catch)
55 import Data.Ratio (numerator, denominator)
56 import qualified Data.ByteString as B
57 import qualified Data.ByteString.UTF8 as UTF8
58 import Data.Word (Word8)
60 import Text.JSON (encodeStrict, decodeStrict)
61 import qualified Text.JSON as J
62 import Text.JSON.Pretty (pp_value)
63 import Text.JSON.Types
64 import System.Directory (removeFile)
65 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
66 import System.IO.Error (isEOFError)
68 import qualified Network.Socket as S
70 import Ganeti.BasicTypes
71 import Ganeti.Constants
74 import Ganeti.Jobs (JobStatus)
77 import qualified Ganeti.Query.Language as Qlang
80 -- * Utility functions
82 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
83 withTimeout :: Int -> String -> IO a -> IO a
84 withTimeout secs descr action = do
85 result <- timeout (secs * 1000000) action
87 Nothing -> fail $ "Timeout in " ++ descr
90 -- * Generic protocol functionality
92 -- | Result of receiving a message from the socket.
93 data RecvResult = RecvConnClosed -- ^ Connection closed
94 | RecvError String -- ^ Any other error
95 | RecvOk String -- ^ Successfull receive
98 -- | The Ganeti job type.
101 -- | Currently supported Luxi operations and JSON serialization.
104 [ simpleField "what" [t| Qlang.ItemType |]
105 , simpleField "fields" [t| [String] |]
106 , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
108 , (luxiReqQueryFields,
109 [ simpleField "what" [t| Qlang.ItemType |]
110 , simpleField "fields" [t| [String] |]
112 , (luxiReqQueryNodes,
113 [ simpleField "names" [t| [String] |]
114 , simpleField "fields" [t| [String] |]
115 , simpleField "lock" [t| Bool |]
117 , (luxiReqQueryGroups,
118 [ simpleField "names" [t| [String] |]
119 , simpleField "fields" [t| [String] |]
120 , simpleField "lock" [t| Bool |]
122 , (luxiReqQueryInstances,
123 [ simpleField "names" [t| [String] |]
124 , simpleField "fields" [t| [String] |]
125 , simpleField "lock" [t| Bool |]
128 [ simpleField "ids" [t| [Int] |]
129 , simpleField "fields" [t| [String] |]
131 , (luxiReqQueryExports,
132 [ simpleField "nodes" [t| [String] |]
133 , simpleField "lock" [t| Bool |]
135 , (luxiReqQueryConfigValues,
136 [ simpleField "fields" [t| [String] |] ]
138 , (luxiReqQueryClusterInfo, [])
140 [ customField 'decodeTagObject 'encodeTagObject $
141 simpleField "kind" [t| TagObject |]
144 [ simpleField "job" [t| [OpCode] |] ]
146 , (luxiReqSubmitManyJobs,
147 [ simpleField "ops" [t| [[OpCode]] |] ]
149 , (luxiReqWaitForJobChange,
150 [ simpleField "job" [t| Int |]
151 , simpleField "fields" [t| [String]|]
152 , simpleField "prev_job" [t| JSValue |]
153 , simpleField "prev_log" [t| JSValue |]
154 , simpleField "tmout" [t| Int |]
156 , (luxiReqArchiveJob,
157 [ simpleField "job" [t| Int |] ]
159 , (luxiReqAutoArchiveJobs,
160 [ simpleField "age" [t| Int |]
161 , simpleField "tmout" [t| Int |]
164 [ simpleField "job" [t| Int |] ]
166 , (luxiReqChangeJobPriority,
167 [ simpleField "job" [t| Int |]
168 , simpleField "priority" [t| Int |] ]
170 , (luxiReqSetDrainFlag,
171 [ simpleField "flag" [t| Bool |] ]
173 , (luxiReqSetWatcherPause,
174 [ simpleField "duration" [t| Double |] ]
178 $(makeJSONInstance ''LuxiReq)
180 -- | List of all defined Luxi calls.
181 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
183 -- | The serialisation of LuxiOps into strings in messages.
184 $(genStrOfOp ''LuxiOp "strOfOp")
186 -- | Type holding the initial (unparsed) Luxi call.
187 data LuxiCall = LuxiCall LuxiReq JSValue
189 -- | The end-of-message separator.
193 -- | The end-of-message encoded as a ByteString.
195 bEOM = B.singleton eOM
197 -- | Valid keys in the requests and responses.
198 data MsgKeys = Method
203 -- | The serialisation of MsgKeys into strings in messages.
204 $(genStrOfKey ''MsgKeys "strOfKey")
206 -- | Luxi client encapsulation.
207 data Client = Client { socket :: Handle -- ^ The socket of the client
208 , rbuf :: IORef B.ByteString -- ^ Already received buffer
211 -- | Connects to the master daemon and returns a luxi Client.
212 getClient :: String -> IO Client
214 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
215 withTimeout luxiDefCtmo "creating luxi connection" $
216 S.connect s (S.SockAddrUnix path)
217 rf <- newIORef B.empty
218 h <- S.socketToHandle s ReadWriteMode
219 return Client { socket=h, rbuf=rf }
221 -- | Creates and returns a server endpoint.
222 getServer :: FilePath -> IO S.Socket
224 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
225 S.bindSocket s (S.SockAddrUnix path)
226 S.listen s 5 -- 5 is the max backlog
229 -- | Closes a server endpoint.
230 -- FIXME: this should be encapsulated into a nicer type.
231 closeServer :: FilePath -> S.Socket -> IO ()
232 closeServer path sock = do
236 -- | Accepts a client
237 acceptClient :: S.Socket -> IO Client
239 -- second return is the address of the client, which we ignore here
240 (client_socket, _) <- S.accept s
241 new_buffer <- newIORef B.empty
242 handle <- S.socketToHandle client_socket ReadWriteMode
243 return Client { socket=handle, rbuf=new_buffer }
245 -- | Closes the client socket.
246 closeClient :: Client -> IO ()
247 closeClient = hClose . socket
249 -- | Sends a message over a luxi transport.
250 sendMsg :: Client -> String -> IO ()
251 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
252 let encoded = UTF8.fromString buf
254 B.hPut handle encoded
258 -- | Given a current buffer and the handle, it will read from the
259 -- network until we get a full message, and it will return that
260 -- message and the leftover buffer contents.
261 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
262 recvUpdate handle obuf = do
263 nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
264 _ <- hWaitForInput handle (-1)
265 B.hGetNonBlocking handle 4096
266 let (msg, remaining) = B.break (eOM ==) nbuf
267 newbuf = B.append obuf msg
269 then recvUpdate handle newbuf
270 else return (newbuf, B.tail remaining)
272 -- | Waits for a message over a luxi transport.
273 recvMsg :: Client -> IO String
275 cbuf <- readIORef $ rbuf s
276 let (imsg, ibuf) = B.break (eOM ==) cbuf
278 if B.null ibuf -- if old buffer didn't contain a full message
279 then recvUpdate (socket s) cbuf -- then we read from network
280 else return (imsg, B.tail ibuf) -- else we return data from our buffer
281 writeIORef (rbuf s) nbuf
282 return $ UTF8.toString msg
284 -- | Extended wrapper over recvMsg.
285 recvMsgExt :: Client -> IO RecvResult
287 Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
288 return $ if isEOFError e
290 else RecvError (show e)
292 -- | Serialize a request to String.
293 buildCall :: LuxiOp -- ^ The method
294 -> String -- ^ The serialized form
296 let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
297 , (strOfKey Args, opToArgs lo)
302 -- | Serialize the response to String.
303 buildResponse :: Bool -- ^ Success
304 -> JSValue -- ^ The arguments
305 -> String -- ^ The serialized form
306 buildResponse success args =
307 let ja = [ (strOfKey Success, JSBool success)
308 , (strOfKey Result, args)]
312 -- | Check that luxi request contains the required keys and parse it.
313 validateCall :: String -> Result LuxiCall
315 arr <- fromJResult "parsing top-level luxi message" $
316 decodeStrict s::Result (JSObject JSValue)
317 let aobj = fromJSObject arr
318 call <- fromObj aobj (strOfKey Method)::Result LuxiReq
319 args <- fromObj aobj (strOfKey Args)
320 return (LuxiCall call args)
322 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
324 -- This is currently hand-coded until we make it more uniform so that
325 -- it can be generated using TH.
326 decodeCall :: LuxiCall -> Result LuxiOp
327 decodeCall (LuxiCall call args) =
330 (jid, jargs) <- fromJVal args
331 rid <- mapM parseJobId jid
332 let rargs = map fromJSString jargs
333 return $ QueryJobs rid rargs
334 ReqQueryInstances -> do
335 (names, fields, locking) <- fromJVal args
336 return $ QueryInstances names fields locking
338 (names, fields, locking) <- fromJVal args
339 return $ QueryNodes names fields locking
341 (names, fields, locking) <- fromJVal args
342 return $ QueryGroups names fields locking
343 ReqQueryClusterInfo ->
344 return QueryClusterInfo
346 (what, fields, qfilter) <- fromJVal args
347 return $ Query what fields qfilter
349 (what, fields) <- fromJVal args
350 fields' <- case fields of
353 return $ QueryFields what fields'
355 [ops1] <- fromJVal args
356 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
357 return $ SubmitJob ops2
358 ReqSubmitManyJobs -> do
359 [ops1] <- fromJVal args
360 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
361 return $ SubmitManyJobs ops2
362 ReqWaitForJobChange -> do
363 (jid, fields, pinfo, pidx, wtmout) <-
364 -- No instance for 5-tuple, code copied from the
365 -- json sources and adapted
366 fromJResult "Parsing WaitForJobChange message" $
368 JSArray [a, b, c, d, e] ->
375 _ -> J.Error "Not enough values"
376 rid <- parseJobId jid
377 return $ WaitForJobChange rid fields pinfo pidx wtmout
379 [jid] <- fromJVal args
380 rid <- parseJobId jid
381 return $ ArchiveJob rid
382 ReqAutoArchiveJobs -> do
383 (age, tmout) <- fromJVal args
384 return $ AutoArchiveJobs age tmout
385 ReqQueryExports -> do
386 (nodes, lock) <- fromJVal args
387 return $ QueryExports nodes lock
388 ReqQueryConfigValues -> do
389 [fields] <- fromJVal args
390 return $ QueryConfigValues fields
392 (kind, name) <- fromJVal args
393 item <- tagObjectFrom kind name
394 return $ QueryTags item
396 [job] <- fromJVal args
397 rid <- parseJobId job
398 return $ CancelJob rid
399 ReqChangeJobPriority -> do
400 (job, priority) <- fromJVal args
401 rid <- parseJobId job
402 return $ ChangeJobPriority rid priority
403 ReqSetDrainFlag -> do
404 [flag] <- fromJVal args
405 return $ SetDrainFlag flag
406 ReqSetWatcherPause -> do
407 [duration] <- fromJVal args
408 return $ SetWatcherPause duration
410 -- | Check that luxi responses contain the required keys and that the
411 -- call was successful.
412 validateResult :: String -> ErrorResult JSValue
413 validateResult s = do
414 when (UTF8.replacement_char `elem` s) $
415 fail "Failed to decode UTF-8, detected replacement char after decoding"
416 oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
417 let arr = J.fromJSObject oarr
418 status <- fromObj arr (strOfKey Success)
419 result <- fromObj arr (strOfKey Result)
422 else decodeError result
424 -- | Try to decode an error from the server response. This function
425 -- will always fail, since it's called only on the error path (when
427 decodeError :: JSValue -> ErrorResult JSValue
431 Bad msg -> Bad $ GenericError msg
433 -- | Generic luxi method call.
434 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
435 callMethod method s = do
436 sendMsg s $ buildCall method
438 let rval = validateResult result
441 -- | Parses a job ID.
442 parseJobId :: JSValue -> Result JobId
443 parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
444 parseJobId (JSRational _ x) =
445 if denominator x /= 1
446 then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
447 -- FIXME: potential integer overflow here on 32-bit platforms
448 else Ok . fromIntegral . numerator $ x
449 parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
451 -- | Parse job submission result.
452 parseSubmitJobResult :: JSValue -> ErrorResult JobId
453 parseSubmitJobResult (JSArray [JSBool True, v]) =
455 Bad msg -> Bad $ LuxiError msg
457 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
458 Bad . LuxiError $ fromJSString x
459 parseSubmitJobResult v =
460 Bad . LuxiError $ "Unknown result from the master daemon: " ++
463 -- | Specialized submitManyJobs call.
464 submitManyJobs :: Client -> [[OpCode]] -> IO (ErrorResult [JobId])
465 submitManyJobs s jobs = do
466 rval <- callMethod (SubmitManyJobs jobs) s
467 -- map each result (status, payload) pair into a nice Result ADT
468 return $ case rval of
470 Ok (JSArray r) -> mapM parseSubmitJobResult r
471 x -> Bad . LuxiError $
472 "Cannot parse response from Ganeti: " ++ show x
474 -- | Custom queryJobs call.
475 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
476 queryJobsStatus s jids = do
477 rval <- callMethod (QueryJobs jids ["status"]) s
478 return $ case rval of
480 Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
481 J.Ok vals -> if any null vals
483 LuxiError "Missing job status field"
484 else Ok (map head vals)
485 J.Error x -> Bad $ LuxiError x