root / src / Ganeti / Luxi.hs @ 229da00f
History | View | Annotate | Download (16.2 kB)
1 |
{-# LANGUAGE TemplateHaskell #-} |
---|---|
2 |
|
3 |
{-| Implementation of the Ganeti LUXI interface. |
4 |
|
5 |
-} |
6 |
|
7 |
{- |
8 |
|
9 |
Copyright (C) 2009, 2010, 2011, 2012 Google Inc. |
10 |
|
11 |
This program is free software; you can redistribute it and/or modify |
12 |
it under the terms of the GNU General Public License as published by |
13 |
the Free Software Foundation; either version 2 of the License, or |
14 |
(at your option) any later version. |
15 |
|
16 |
This program is distributed in the hope that it will be useful, but |
17 |
WITHOUT ANY WARRANTY; without even the implied warranty of |
18 |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
19 |
General Public License for more details. |
20 |
|
21 |
You should have received a copy of the GNU General Public License |
22 |
along with this program; if not, write to the Free Software |
23 |
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
24 |
02110-1301, USA. |
25 |
|
26 |
-} |
27 |
|
28 |
module Ganeti.Luxi |
29 |
( LuxiOp(..) |
30 |
, LuxiReq(..) |
31 |
, Client |
32 |
, JobId |
33 |
, fromJobId |
34 |
, makeJobId |
35 |
, RecvResult(..) |
36 |
, strOfOp |
37 |
, getClient |
38 |
, getServer |
39 |
, acceptClient |
40 |
, closeClient |
41 |
, closeServer |
42 |
, callMethod |
43 |
, submitManyJobs |
44 |
, queryJobsStatus |
45 |
, buildCall |
46 |
, buildResponse |
47 |
, validateCall |
48 |
, decodeCall |
49 |
, recvMsg |
50 |
, recvMsgExt |
51 |
, sendMsg |
52 |
, allLuxiCalls |
53 |
) where |
54 |
|
55 |
import Control.Exception (catch) |
56 |
import Data.IORef |
57 |
import qualified Data.ByteString as B |
58 |
import qualified Data.ByteString.Lazy as BL |
59 |
import qualified Data.ByteString.UTF8 as UTF8 |
60 |
import qualified Data.ByteString.Lazy.UTF8 as UTF8L |
61 |
import Data.Word (Word8) |
62 |
import Control.Monad |
63 |
import Text.JSON (encodeStrict, decodeStrict) |
64 |
import qualified Text.JSON as J |
65 |
import Text.JSON.Pretty (pp_value) |
66 |
import Text.JSON.Types |
67 |
import System.Directory (removeFile) |
68 |
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..)) |
69 |
import System.IO.Error (isEOFError) |
70 |
import System.Timeout |
71 |
import qualified Network.Socket as S |
72 |
|
73 |
import Ganeti.BasicTypes |
74 |
import Ganeti.Constants |
75 |
import Ganeti.Errors |
76 |
import Ganeti.JSON |
77 |
import Ganeti.OpParams (pTagsObject) |
78 |
import Ganeti.OpCodes |
79 |
import qualified Ganeti.Query.Language as Qlang |
80 |
import Ganeti.Runtime (GanetiDaemon(..), MiscGroup(..), GanetiGroup(..)) |
81 |
import Ganeti.THH |
82 |
import Ganeti.Types |
83 |
import Ganeti.Utils |
84 |
|
85 |
-- * Utility functions |
86 |
|
87 |
-- | Wrapper over System.Timeout.timeout that fails in the IO monad. |
88 |
withTimeout :: Int -> String -> IO a -> IO a |
89 |
withTimeout secs descr action = do |
90 |
result <- timeout (secs * 1000000) action |
91 |
case result of |
92 |
Nothing -> fail $ "Timeout in " ++ descr |
93 |
Just v -> return v |
94 |
|
95 |
-- * Generic protocol functionality |
96 |
|
97 |
-- | Result of receiving a message from the socket. |
98 |
data RecvResult = RecvConnClosed -- ^ Connection closed |
99 |
| RecvError String -- ^ Any other error |
100 |
| RecvOk String -- ^ Successfull receive |
101 |
deriving (Show, Eq) |
102 |
|
103 |
-- | Currently supported Luxi operations and JSON serialization. |
104 |
$(genLuxiOp "LuxiOp" |
105 |
[ (luxiReqQuery, |
106 |
[ simpleField "what" [t| Qlang.ItemType |] |
107 |
, simpleField "fields" [t| [String] |] |
108 |
, simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |] |
109 |
]) |
110 |
, (luxiReqQueryFields, |
111 |
[ simpleField "what" [t| Qlang.ItemType |] |
112 |
, simpleField "fields" [t| [String] |] |
113 |
]) |
114 |
, (luxiReqQueryNodes, |
115 |
[ simpleField "names" [t| [String] |] |
116 |
, simpleField "fields" [t| [String] |] |
117 |
, simpleField "lock" [t| Bool |] |
118 |
]) |
119 |
, (luxiReqQueryGroups, |
120 |
[ simpleField "names" [t| [String] |] |
121 |
, simpleField "fields" [t| [String] |] |
122 |
, simpleField "lock" [t| Bool |] |
123 |
]) |
124 |
, (luxiReqQueryNetworks, |
125 |
[ simpleField "names" [t| [String] |] |
126 |
, simpleField "fields" [t| [String] |] |
127 |
, simpleField "lock" [t| Bool |] |
128 |
]) |
129 |
, (luxiReqQueryInstances, |
130 |
[ simpleField "names" [t| [String] |] |
131 |
, simpleField "fields" [t| [String] |] |
132 |
, simpleField "lock" [t| Bool |] |
133 |
]) |
134 |
, (luxiReqQueryJobs, |
135 |
[ simpleField "ids" [t| [JobId] |] |
136 |
, simpleField "fields" [t| [String] |] |
137 |
]) |
138 |
, (luxiReqQueryExports, |
139 |
[ simpleField "nodes" [t| [String] |] |
140 |
, simpleField "lock" [t| Bool |] |
141 |
]) |
142 |
, (luxiReqQueryConfigValues, |
143 |
[ simpleField "fields" [t| [String] |] ] |
144 |
) |
145 |
, (luxiReqQueryClusterInfo, []) |
146 |
, (luxiReqQueryTags, |
147 |
[ pTagsObject |
148 |
, simpleField "name" [t| String |] |
149 |
]) |
150 |
, (luxiReqSubmitJob, |
151 |
[ simpleField "job" [t| [MetaOpCode] |] ] |
152 |
) |
153 |
, (luxiReqSubmitJobToDrainedQueue, |
154 |
[ simpleField "job" [t| [MetaOpCode] |] ] |
155 |
) |
156 |
, (luxiReqSubmitManyJobs, |
157 |
[ simpleField "ops" [t| [[MetaOpCode]] |] ] |
158 |
) |
159 |
, (luxiReqWaitForJobChange, |
160 |
[ simpleField "job" [t| JobId |] |
161 |
, simpleField "fields" [t| [String]|] |
162 |
, simpleField "prev_job" [t| JSValue |] |
163 |
, simpleField "prev_log" [t| JSValue |] |
164 |
, simpleField "tmout" [t| Int |] |
165 |
]) |
166 |
, (luxiReqPickupJob, |
167 |
[ simpleField "job" [t| JobId |] ] |
168 |
) |
169 |
, (luxiReqArchiveJob, |
170 |
[ simpleField "job" [t| JobId |] ] |
171 |
) |
172 |
, (luxiReqAutoArchiveJobs, |
173 |
[ simpleField "age" [t| Int |] |
174 |
, simpleField "tmout" [t| Int |] |
175 |
]) |
176 |
, (luxiReqCancelJob, |
177 |
[ simpleField "job" [t| JobId |] ] |
178 |
) |
179 |
, (luxiReqChangeJobPriority, |
180 |
[ simpleField "job" [t| JobId |] |
181 |
, simpleField "priority" [t| Int |] ] |
182 |
) |
183 |
, (luxiReqSetDrainFlag, |
184 |
[ simpleField "flag" [t| Bool |] ] |
185 |
) |
186 |
, (luxiReqSetWatcherPause, |
187 |
[ simpleField "duration" [t| Double |] ] |
188 |
) |
189 |
]) |
190 |
|
191 |
$(makeJSONInstance ''LuxiReq) |
192 |
|
193 |
-- | List of all defined Luxi calls. |
194 |
$(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls") |
195 |
|
196 |
-- | The serialisation of LuxiOps into strings in messages. |
197 |
$(genStrOfOp ''LuxiOp "strOfOp") |
198 |
|
199 |
-- | Type holding the initial (unparsed) Luxi call. |
200 |
data LuxiCall = LuxiCall LuxiReq JSValue |
201 |
|
202 |
-- | The end-of-message separator. |
203 |
eOM :: Word8 |
204 |
eOM = 3 |
205 |
|
206 |
-- | The end-of-message encoded as a ByteString. |
207 |
bEOM :: B.ByteString |
208 |
bEOM = B.singleton eOM |
209 |
|
210 |
-- | Valid keys in the requests and responses. |
211 |
data MsgKeys = Method |
212 |
| Args |
213 |
| Success |
214 |
| Result |
215 |
|
216 |
-- | The serialisation of MsgKeys into strings in messages. |
217 |
$(genStrOfKey ''MsgKeys "strOfKey") |
218 |
|
219 |
-- | Luxi client encapsulation. |
220 |
data Client = Client { socket :: Handle -- ^ The socket of the client |
221 |
, rbuf :: IORef B.ByteString -- ^ Already received buffer |
222 |
} |
223 |
|
224 |
-- | Connects to the master daemon and returns a luxi Client. |
225 |
getClient :: String -> IO Client |
226 |
getClient path = do |
227 |
s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol |
228 |
withTimeout luxiDefCtmo "creating luxi connection" $ |
229 |
S.connect s (S.SockAddrUnix path) |
230 |
rf <- newIORef B.empty |
231 |
h <- S.socketToHandle s ReadWriteMode |
232 |
return Client { socket=h, rbuf=rf } |
233 |
|
234 |
-- | Creates and returns a server endpoint. |
235 |
getServer :: Bool -> FilePath -> IO S.Socket |
236 |
getServer setOwner path = do |
237 |
s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol |
238 |
S.bindSocket s (S.SockAddrUnix path) |
239 |
when setOwner . setOwnerAndGroupFromNames path GanetiLuxid $ |
240 |
ExtraGroup DaemonsGroup |
241 |
S.listen s 5 -- 5 is the max backlog |
242 |
return s |
243 |
|
244 |
-- | Closes a server endpoint. |
245 |
-- FIXME: this should be encapsulated into a nicer type. |
246 |
closeServer :: FilePath -> S.Socket -> IO () |
247 |
closeServer path sock = do |
248 |
S.sClose sock |
249 |
removeFile path |
250 |
|
251 |
-- | Accepts a client |
252 |
acceptClient :: S.Socket -> IO Client |
253 |
acceptClient s = do |
254 |
-- second return is the address of the client, which we ignore here |
255 |
(client_socket, _) <- S.accept s |
256 |
new_buffer <- newIORef B.empty |
257 |
handle <- S.socketToHandle client_socket ReadWriteMode |
258 |
return Client { socket=handle, rbuf=new_buffer } |
259 |
|
260 |
-- | Closes the client socket. |
261 |
closeClient :: Client -> IO () |
262 |
closeClient = hClose . socket |
263 |
|
264 |
-- | Sends a message over a luxi transport. |
265 |
sendMsg :: Client -> String -> IO () |
266 |
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do |
267 |
let encoded = UTF8L.fromString buf |
268 |
handle = socket s |
269 |
BL.hPut handle encoded |
270 |
B.hPut handle bEOM |
271 |
hFlush handle |
272 |
|
273 |
-- | Given a current buffer and the handle, it will read from the |
274 |
-- network until we get a full message, and it will return that |
275 |
-- message and the leftover buffer contents. |
276 |
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString) |
277 |
recvUpdate handle obuf = do |
278 |
nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do |
279 |
_ <- hWaitForInput handle (-1) |
280 |
B.hGetNonBlocking handle 4096 |
281 |
let (msg, remaining) = B.break (eOM ==) nbuf |
282 |
newbuf = B.append obuf msg |
283 |
if B.null remaining |
284 |
then recvUpdate handle newbuf |
285 |
else return (newbuf, B.tail remaining) |
286 |
|
287 |
-- | Waits for a message over a luxi transport. |
288 |
recvMsg :: Client -> IO String |
289 |
recvMsg s = do |
290 |
cbuf <- readIORef $ rbuf s |
291 |
let (imsg, ibuf) = B.break (eOM ==) cbuf |
292 |
(msg, nbuf) <- |
293 |
if B.null ibuf -- if old buffer didn't contain a full message |
294 |
then recvUpdate (socket s) cbuf -- then we read from network |
295 |
else return (imsg, B.tail ibuf) -- else we return data from our buffer |
296 |
writeIORef (rbuf s) nbuf |
297 |
return $ UTF8.toString msg |
298 |
|
299 |
-- | Extended wrapper over recvMsg. |
300 |
recvMsgExt :: Client -> IO RecvResult |
301 |
recvMsgExt s = |
302 |
Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e -> |
303 |
return $ if isEOFError e |
304 |
then RecvConnClosed |
305 |
else RecvError (show e) |
306 |
|
307 |
-- | Serialize a request to String. |
308 |
buildCall :: LuxiOp -- ^ The method |
309 |
-> String -- ^ The serialized form |
310 |
buildCall lo = |
311 |
let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo) |
312 |
, (strOfKey Args, opToArgs lo) |
313 |
] |
314 |
jo = toJSObject ja |
315 |
in encodeStrict jo |
316 |
|
317 |
-- | Serialize the response to String. |
318 |
buildResponse :: Bool -- ^ Success |
319 |
-> JSValue -- ^ The arguments |
320 |
-> String -- ^ The serialized form |
321 |
buildResponse success args = |
322 |
let ja = [ (strOfKey Success, JSBool success) |
323 |
, (strOfKey Result, args)] |
324 |
jo = toJSObject ja |
325 |
in encodeStrict jo |
326 |
|
327 |
-- | Check that luxi request contains the required keys and parse it. |
328 |
validateCall :: String -> Result LuxiCall |
329 |
validateCall s = do |
330 |
arr <- fromJResult "parsing top-level luxi message" $ |
331 |
decodeStrict s::Result (JSObject JSValue) |
332 |
let aobj = fromJSObject arr |
333 |
call <- fromObj aobj (strOfKey Method)::Result LuxiReq |
334 |
args <- fromObj aobj (strOfKey Args) |
335 |
return (LuxiCall call args) |
336 |
|
337 |
-- | Converts Luxi call arguments into a 'LuxiOp' data structure. |
338 |
-- |
339 |
-- This is currently hand-coded until we make it more uniform so that |
340 |
-- it can be generated using TH. |
341 |
decodeCall :: LuxiCall -> Result LuxiOp |
342 |
decodeCall (LuxiCall call args) = |
343 |
case call of |
344 |
ReqQueryJobs -> do |
345 |
(jids, jargs) <- fromJVal args |
346 |
jids' <- case jids of |
347 |
JSNull -> return [] |
348 |
_ -> fromJVal jids |
349 |
return $ QueryJobs jids' jargs |
350 |
ReqQueryInstances -> do |
351 |
(names, fields, locking) <- fromJVal args |
352 |
return $ QueryInstances names fields locking |
353 |
ReqQueryNodes -> do |
354 |
(names, fields, locking) <- fromJVal args |
355 |
return $ QueryNodes names fields locking |
356 |
ReqQueryGroups -> do |
357 |
(names, fields, locking) <- fromJVal args |
358 |
return $ QueryGroups names fields locking |
359 |
ReqQueryClusterInfo -> |
360 |
return QueryClusterInfo |
361 |
ReqQueryNetworks -> do |
362 |
(names, fields, locking) <- fromJVal args |
363 |
return $ QueryNetworks names fields locking |
364 |
ReqQuery -> do |
365 |
(what, fields, qfilter) <- fromJVal args |
366 |
return $ Query what fields qfilter |
367 |
ReqQueryFields -> do |
368 |
(what, fields) <- fromJVal args |
369 |
fields' <- case fields of |
370 |
JSNull -> return [] |
371 |
_ -> fromJVal fields |
372 |
return $ QueryFields what fields' |
373 |
ReqSubmitJob -> do |
374 |
[ops1] <- fromJVal args |
375 |
ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1 |
376 |
return $ SubmitJob ops2 |
377 |
ReqSubmitJobToDrainedQueue -> do |
378 |
[ops1] <- fromJVal args |
379 |
ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1 |
380 |
return $ SubmitJobToDrainedQueue ops2 |
381 |
ReqSubmitManyJobs -> do |
382 |
[ops1] <- fromJVal args |
383 |
ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1 |
384 |
return $ SubmitManyJobs ops2 |
385 |
ReqWaitForJobChange -> do |
386 |
(jid, fields, pinfo, pidx, wtmout) <- |
387 |
-- No instance for 5-tuple, code copied from the |
388 |
-- json sources and adapted |
389 |
fromJResult "Parsing WaitForJobChange message" $ |
390 |
case args of |
391 |
JSArray [a, b, c, d, e] -> |
392 |
(,,,,) `fmap` |
393 |
J.readJSON a `ap` |
394 |
J.readJSON b `ap` |
395 |
J.readJSON c `ap` |
396 |
J.readJSON d `ap` |
397 |
J.readJSON e |
398 |
_ -> J.Error "Not enough values" |
399 |
return $ WaitForJobChange jid fields pinfo pidx wtmout |
400 |
ReqPickupJob -> do |
401 |
[jid] <- fromJVal args |
402 |
return $ PickupJob jid |
403 |
ReqArchiveJob -> do |
404 |
[jid] <- fromJVal args |
405 |
return $ ArchiveJob jid |
406 |
ReqAutoArchiveJobs -> do |
407 |
(age, tmout) <- fromJVal args |
408 |
return $ AutoArchiveJobs age tmout |
409 |
ReqQueryExports -> do |
410 |
(nodes, lock) <- fromJVal args |
411 |
return $ QueryExports nodes lock |
412 |
ReqQueryConfigValues -> do |
413 |
[fields] <- fromJVal args |
414 |
return $ QueryConfigValues fields |
415 |
ReqQueryTags -> do |
416 |
(kind, name) <- fromJVal args |
417 |
return $ QueryTags kind name |
418 |
ReqCancelJob -> do |
419 |
[jid] <- fromJVal args |
420 |
return $ CancelJob jid |
421 |
ReqChangeJobPriority -> do |
422 |
(jid, priority) <- fromJVal args |
423 |
return $ ChangeJobPriority jid priority |
424 |
ReqSetDrainFlag -> do |
425 |
[flag] <- fromJVal args |
426 |
return $ SetDrainFlag flag |
427 |
ReqSetWatcherPause -> do |
428 |
[duration] <- fromJVal args |
429 |
return $ SetWatcherPause duration |
430 |
|
431 |
-- | Check that luxi responses contain the required keys and that the |
432 |
-- call was successful. |
433 |
validateResult :: String -> ErrorResult JSValue |
434 |
validateResult s = do |
435 |
when (UTF8.replacement_char `elem` s) $ |
436 |
fail "Failed to decode UTF-8, detected replacement char after decoding" |
437 |
oarr <- fromJResult "Parsing LUXI response" (decodeStrict s) |
438 |
let arr = J.fromJSObject oarr |
439 |
status <- fromObj arr (strOfKey Success) |
440 |
result <- fromObj arr (strOfKey Result) |
441 |
if status |
442 |
then return result |
443 |
else decodeError result |
444 |
|
445 |
-- | Try to decode an error from the server response. This function |
446 |
-- will always fail, since it's called only on the error path (when |
447 |
-- status is False). |
448 |
decodeError :: JSValue -> ErrorResult JSValue |
449 |
decodeError val = |
450 |
case fromJVal val of |
451 |
Ok e -> Bad e |
452 |
Bad msg -> Bad $ GenericError msg |
453 |
|
454 |
-- | Generic luxi method call. |
455 |
callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue) |
456 |
callMethod method s = do |
457 |
sendMsg s $ buildCall method |
458 |
result <- recvMsg s |
459 |
let rval = validateResult result |
460 |
return rval |
461 |
|
462 |
-- | Parse job submission result. |
463 |
parseSubmitJobResult :: JSValue -> ErrorResult JobId |
464 |
parseSubmitJobResult (JSArray [JSBool True, v]) = |
465 |
case J.readJSON v of |
466 |
J.Error msg -> Bad $ LuxiError msg |
467 |
J.Ok v' -> Ok v' |
468 |
parseSubmitJobResult (JSArray [JSBool False, JSString x]) = |
469 |
Bad . LuxiError $ fromJSString x |
470 |
parseSubmitJobResult v = |
471 |
Bad . LuxiError $ "Unknown result from the master daemon: " ++ |
472 |
show (pp_value v) |
473 |
|
474 |
-- | Specialized submitManyJobs call. |
475 |
submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId]) |
476 |
submitManyJobs s jobs = do |
477 |
rval <- callMethod (SubmitManyJobs jobs) s |
478 |
-- map each result (status, payload) pair into a nice Result ADT |
479 |
return $ case rval of |
480 |
Bad x -> Bad x |
481 |
Ok (JSArray r) -> mapM parseSubmitJobResult r |
482 |
x -> Bad . LuxiError $ |
483 |
"Cannot parse response from Ganeti: " ++ show x |
484 |
|
485 |
-- | Custom queryJobs call. |
486 |
queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus]) |
487 |
queryJobsStatus s jids = do |
488 |
rval <- callMethod (QueryJobs jids ["status"]) s |
489 |
return $ case rval of |
490 |
Bad x -> Bad x |
491 |
Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of |
492 |
J.Ok vals -> if any null vals |
493 |
then Bad $ |
494 |
LuxiError "Missing job status field" |
495 |
else Ok (map head vals) |
496 |
J.Error x -> Bad $ LuxiError x |