root / htools / Ganeti / Luxi.hs @ e821050d
History | View | Annotate | Download (13.6 kB)
1 |
{-# LANGUAGE TemplateHaskell #-} |
---|---|
2 |
|
3 |
{-| Implementation of the Ganeti LUXI interface. |
4 |
|
5 |
-} |
6 |
|
7 |
{- |
8 |
|
9 |
Copyright (C) 2009, 2010, 2011, 2012 Google Inc. |
10 |
|
11 |
This program is free software; you can redistribute it and/or modify |
12 |
it under the terms of the GNU General Public License as published by |
13 |
the Free Software Foundation; either version 2 of the License, or |
14 |
(at your option) any later version. |
15 |
|
16 |
This program is distributed in the hope that it will be useful, but |
17 |
WITHOUT ANY WARRANTY; without even the implied warranty of |
18 |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
19 |
General Public License for more details. |
20 |
|
21 |
You should have received a copy of the GNU General Public License |
22 |
along with this program; if not, write to the Free Software |
23 |
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
24 |
02110-1301, USA. |
25 |
|
26 |
-} |
27 |
|
28 |
module Ganeti.Luxi |
29 |
( LuxiOp(..) |
30 |
, QrViaLuxi(..) |
31 |
, ResultStatus(..) |
32 |
, LuxiReq(..) |
33 |
, Client |
34 |
, JobId |
35 |
, checkRS |
36 |
, getClient |
37 |
, closeClient |
38 |
, callMethod |
39 |
, submitManyJobs |
40 |
, queryJobsStatus |
41 |
, buildCall |
42 |
, validateCall |
43 |
, decodeCall |
44 |
) where |
45 |
|
46 |
import Data.IORef |
47 |
import Data.Ratio (numerator, denominator) |
48 |
import qualified Data.ByteString as B |
49 |
import qualified Data.ByteString.UTF8 as UTF8 |
50 |
import Data.Word (Word8) |
51 |
import Control.Monad |
52 |
import Text.JSON (encodeStrict, decodeStrict) |
53 |
import qualified Text.JSON as J |
54 |
import Text.JSON.Types |
55 |
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..)) |
56 |
import System.Timeout |
57 |
import qualified Network.Socket as S |
58 |
|
59 |
import Ganeti.HTools.JSON |
60 |
import Ganeti.HTools.Types |
61 |
import Ganeti.HTools.Utils |
62 |
|
63 |
import Ganeti.Constants |
64 |
import Ganeti.Jobs (JobStatus) |
65 |
import Ganeti.OpCodes (OpCode) |
66 |
import Ganeti.THH |
67 |
|
68 |
-- * Utility functions |
69 |
|
70 |
-- | Wrapper over System.Timeout.timeout that fails in the IO monad. |
71 |
withTimeout :: Int -> String -> IO a -> IO a |
72 |
withTimeout secs descr action = do |
73 |
result <- timeout (secs * 1000000) action |
74 |
case result of |
75 |
Nothing -> fail $ "Timeout in " ++ descr |
76 |
Just v -> return v |
77 |
|
78 |
-- * Generic protocol functionality |
79 |
|
80 |
-- | The Ganeti job type. |
81 |
type JobId = Int |
82 |
|
83 |
$(declareSADT "QrViaLuxi" |
84 |
[ ("QRLock", 'qrLock) |
85 |
, ("QRInstance", 'qrInstance) |
86 |
, ("QRNode", 'qrNode) |
87 |
, ("QRGroup", 'qrGroup) |
88 |
, ("QROs", 'qrOs) |
89 |
]) |
90 |
$(makeJSONInstance ''QrViaLuxi) |
91 |
|
92 |
-- | Currently supported Luxi operations and JSON serialization. |
93 |
$(genLuxiOp "LuxiOp" |
94 |
[(luxiReqQuery, |
95 |
[ ("what", [t| QrViaLuxi |], [| id |]) |
96 |
, ("fields", [t| [String] |], [| id |]) |
97 |
, ("qfilter", [t| () |], [| const JSNull |]) |
98 |
]) |
99 |
, (luxiReqQueryNodes, |
100 |
[ ("names", [t| [String] |], [| id |]) |
101 |
, ("fields", [t| [String] |], [| id |]) |
102 |
, ("lock", [t| Bool |], [| id |]) |
103 |
]) |
104 |
, (luxiReqQueryGroups, |
105 |
[ ("names", [t| [String] |], [| id |]) |
106 |
, ("fields", [t| [String] |], [| id |]) |
107 |
, ("lock", [t| Bool |], [| id |]) |
108 |
]) |
109 |
, (luxiReqQueryInstances, |
110 |
[ ("names", [t| [String] |], [| id |]) |
111 |
, ("fields", [t| [String] |], [| id |]) |
112 |
, ("lock", [t| Bool |], [| id |]) |
113 |
]) |
114 |
, (luxiReqQueryJobs, |
115 |
[ ("ids", [t| [Int] |], [| id |]) |
116 |
, ("fields", [t| [String] |], [| id |]) |
117 |
]) |
118 |
, (luxiReqQueryExports, |
119 |
[ ("nodes", [t| [String] |], [| id |]) |
120 |
, ("lock", [t| Bool |], [| id |]) |
121 |
]) |
122 |
, (luxiReqQueryConfigValues, |
123 |
[ ("fields", [t| [String] |], [| id |]) ] |
124 |
) |
125 |
, (luxiReqQueryClusterInfo, []) |
126 |
, (luxiReqQueryTags, |
127 |
[ ("kind", [t| String |], [| id |]) |
128 |
, ("name", [t| String |], [| id |]) |
129 |
]) |
130 |
, (luxiReqSubmitJob, |
131 |
[ ("job", [t| [OpCode] |], [| id |]) ] |
132 |
) |
133 |
, (luxiReqSubmitManyJobs, |
134 |
[ ("ops", [t| [[OpCode]] |], [| id |]) ] |
135 |
) |
136 |
, (luxiReqWaitForJobChange, |
137 |
[ ("job", [t| Int |], [| id |]) |
138 |
, ("fields", [t| [String]|], [| id |]) |
139 |
, ("prev_job", [t| JSValue |], [| id |]) |
140 |
, ("prev_log", [t| JSValue |], [| id |]) |
141 |
, ("tmout", [t| Int |], [| id |]) |
142 |
]) |
143 |
, (luxiReqArchiveJob, |
144 |
[ ("job", [t| Int |], [| id |]) ] |
145 |
) |
146 |
, (luxiReqAutoArchiveJobs, |
147 |
[ ("age", [t| Int |], [| id |]) |
148 |
, ("tmout", [t| Int |], [| id |]) |
149 |
]) |
150 |
, (luxiReqCancelJob, |
151 |
[ ("job", [t| Int |], [| id |]) ] |
152 |
) |
153 |
, (luxiReqSetDrainFlag, |
154 |
[ ("flag", [t| Bool |], [| id |]) ] |
155 |
) |
156 |
, (luxiReqSetWatcherPause, |
157 |
[ ("duration", [t| Double |], [| id |]) ] |
158 |
) |
159 |
]) |
160 |
|
161 |
$(makeJSONInstance ''LuxiReq) |
162 |
|
163 |
-- | The serialisation of LuxiOps into strings in messages. |
164 |
$(genStrOfOp ''LuxiOp "strOfOp") |
165 |
|
166 |
$(declareIADT "ResultStatus" |
167 |
[ ("RSNormal", 'rsNormal) |
168 |
, ("RSUnknown", 'rsUnknown) |
169 |
, ("RSNoData", 'rsNodata) |
170 |
, ("RSUnavailable", 'rsUnavail) |
171 |
, ("RSOffline", 'rsOffline) |
172 |
]) |
173 |
|
174 |
$(makeJSONInstance ''ResultStatus) |
175 |
|
176 |
-- | Type holding the initial (unparsed) Luxi call. |
177 |
data LuxiCall = LuxiCall LuxiReq JSValue |
178 |
|
179 |
-- | Check that ResultStatus is success or fail with descriptive message. |
180 |
checkRS :: (Monad m) => ResultStatus -> a -> m a |
181 |
checkRS RSNormal val = return val |
182 |
checkRS RSUnknown _ = fail "Unknown field" |
183 |
checkRS RSNoData _ = fail "No data for a field" |
184 |
checkRS RSUnavailable _ = fail "Ganeti reports unavailable data" |
185 |
checkRS RSOffline _ = fail "Ganeti reports resource as offline" |
186 |
|
187 |
-- | The end-of-message separator. |
188 |
eOM :: Word8 |
189 |
eOM = 3 |
190 |
|
191 |
-- | The end-of-message encoded as a ByteString. |
192 |
bEOM :: B.ByteString |
193 |
bEOM = B.singleton eOM |
194 |
|
195 |
-- | Valid keys in the requests and responses. |
196 |
data MsgKeys = Method |
197 |
| Args |
198 |
| Success |
199 |
| Result |
200 |
|
201 |
-- | The serialisation of MsgKeys into strings in messages. |
202 |
$(genStrOfKey ''MsgKeys "strOfKey") |
203 |
|
204 |
-- | Luxi client encapsulation. |
205 |
data Client = Client { socket :: Handle -- ^ The socket of the client |
206 |
, rbuf :: IORef B.ByteString -- ^ Already received buffer |
207 |
} |
208 |
|
209 |
-- | Connects to the master daemon and returns a luxi Client. |
210 |
getClient :: String -> IO Client |
211 |
getClient path = do |
212 |
s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol |
213 |
withTimeout connTimeout "creating luxi connection" $ |
214 |
S.connect s (S.SockAddrUnix path) |
215 |
rf <- newIORef B.empty |
216 |
h <- S.socketToHandle s ReadWriteMode |
217 |
return Client { socket=h, rbuf=rf } |
218 |
|
219 |
-- | Closes the client socket. |
220 |
closeClient :: Client -> IO () |
221 |
closeClient = hClose . socket |
222 |
|
223 |
-- | Sends a message over a luxi transport. |
224 |
sendMsg :: Client -> String -> IO () |
225 |
sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do |
226 |
let encoded = UTF8.fromString buf |
227 |
handle = socket s |
228 |
B.hPut handle encoded |
229 |
B.hPut handle bEOM |
230 |
hFlush handle |
231 |
|
232 |
-- | Given a current buffer and the handle, it will read from the |
233 |
-- network until we get a full message, and it will return that |
234 |
-- message and the leftover buffer contents. |
235 |
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString) |
236 |
recvUpdate handle obuf = do |
237 |
nbuf <- withTimeout queryTimeout "reading luxi response" $ do |
238 |
_ <- hWaitForInput handle (-1) |
239 |
B.hGetNonBlocking handle 4096 |
240 |
let (msg, remaining) = B.break (eOM ==) nbuf |
241 |
newbuf = B.append obuf msg |
242 |
if B.null remaining |
243 |
then recvUpdate handle newbuf |
244 |
else return (newbuf, B.tail remaining) |
245 |
|
246 |
-- | Waits for a message over a luxi transport. |
247 |
recvMsg :: Client -> IO String |
248 |
recvMsg s = do |
249 |
cbuf <- readIORef $ rbuf s |
250 |
let (imsg, ibuf) = B.break (eOM ==) cbuf |
251 |
(msg, nbuf) <- |
252 |
if B.null ibuf -- if old buffer didn't contain a full message |
253 |
then recvUpdate (socket s) cbuf -- then we read from network |
254 |
else return (imsg, B.tail ibuf) -- else we return data from our buffer |
255 |
writeIORef (rbuf s) nbuf |
256 |
return $ UTF8.toString msg |
257 |
|
258 |
-- | Serialize a request to String. |
259 |
buildCall :: LuxiOp -- ^ The method |
260 |
-> String -- ^ The serialized form |
261 |
buildCall lo = |
262 |
let ja = [ (strOfKey Method, JSString $ toJSString $ strOfOp lo::JSValue) |
263 |
, (strOfKey Args, opToArgs lo::JSValue) |
264 |
] |
265 |
jo = toJSObject ja |
266 |
in encodeStrict jo |
267 |
|
268 |
-- | Check that luxi request contains the required keys and parse it. |
269 |
validateCall :: String -> Result LuxiCall |
270 |
validateCall s = do |
271 |
arr <- fromJResult "luxi call" $ decodeStrict s::Result (JSObject JSValue) |
272 |
let aobj = fromJSObject arr |
273 |
call <- fromObj aobj (strOfKey Method)::Result LuxiReq |
274 |
args <- fromObj aobj (strOfKey Args) |
275 |
return (LuxiCall call args) |
276 |
|
277 |
-- | Converts Luxi call arguments into a 'LuxiOp' data structure. |
278 |
-- |
279 |
-- This is currently hand-coded until we make it more uniform so that |
280 |
-- it can be generated using TH. |
281 |
decodeCall :: LuxiCall -> Result LuxiOp |
282 |
decodeCall (LuxiCall call args) = |
283 |
case call of |
284 |
ReqQueryJobs -> do |
285 |
(jid, jargs) <- fromJVal args |
286 |
rid <- mapM parseJobId jid |
287 |
let rargs = map fromJSString jargs |
288 |
return $ QueryJobs rid rargs |
289 |
ReqQueryInstances -> do |
290 |
(names, fields, locking) <- fromJVal args |
291 |
return $ QueryInstances names fields locking |
292 |
ReqQueryNodes -> do |
293 |
(names, fields, locking) <- fromJVal args |
294 |
return $ QueryNodes names fields locking |
295 |
ReqQueryGroups -> do |
296 |
(names, fields, locking) <- fromJVal args |
297 |
return $ QueryGroups names fields locking |
298 |
ReqQueryClusterInfo -> do |
299 |
return QueryClusterInfo |
300 |
ReqQuery -> do |
301 |
(what, fields, _) <- |
302 |
fromJVal args::Result (QrViaLuxi, [String], JSValue) |
303 |
return $ Query what fields () |
304 |
ReqSubmitJob -> do |
305 |
[ops1] <- fromJVal args |
306 |
ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1 |
307 |
return $ SubmitJob ops2 |
308 |
ReqSubmitManyJobs -> do |
309 |
[ops1] <- fromJVal args |
310 |
ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1 |
311 |
return $ SubmitManyJobs ops2 |
312 |
ReqWaitForJobChange -> do |
313 |
(jid, fields, pinfo, pidx, wtmout) <- |
314 |
-- No instance for 5-tuple, code copied from the |
315 |
-- json sources and adapted |
316 |
fromJResult "Parsing WaitForJobChange message" $ |
317 |
case args of |
318 |
JSArray [a, b, c, d, e] -> |
319 |
(,,,,) `fmap` |
320 |
J.readJSON a `ap` |
321 |
J.readJSON b `ap` |
322 |
J.readJSON c `ap` |
323 |
J.readJSON d `ap` |
324 |
J.readJSON e |
325 |
_ -> J.Error "Not enough values" |
326 |
rid <- parseJobId jid |
327 |
return $ WaitForJobChange rid fields pinfo pidx wtmout |
328 |
ReqArchiveJob -> do |
329 |
[jid] <- fromJVal args |
330 |
rid <- parseJobId jid |
331 |
return $ ArchiveJob rid |
332 |
ReqAutoArchiveJobs -> do |
333 |
(age, tmout) <- fromJVal args |
334 |
return $ AutoArchiveJobs age tmout |
335 |
ReqQueryExports -> do |
336 |
(nodes, lock) <- fromJVal args |
337 |
return $ QueryExports nodes lock |
338 |
ReqQueryConfigValues -> do |
339 |
[fields] <- fromJVal args |
340 |
return $ QueryConfigValues fields |
341 |
ReqQueryTags -> do |
342 |
(kind, name) <- fromJVal args |
343 |
return $ QueryTags kind name |
344 |
ReqCancelJob -> do |
345 |
[job] <- fromJVal args |
346 |
rid <- parseJobId job |
347 |
return $ CancelJob rid |
348 |
ReqSetDrainFlag -> do |
349 |
[flag] <- fromJVal args |
350 |
return $ SetDrainFlag flag |
351 |
ReqSetWatcherPause -> do |
352 |
[duration] <- fromJVal args |
353 |
return $ SetWatcherPause duration |
354 |
|
355 |
-- | Check that luxi responses contain the required keys and that the |
356 |
-- call was successful. |
357 |
validateResult :: String -> Result JSValue |
358 |
validateResult s = do |
359 |
when (UTF8.replacement_char `elem` s) $ |
360 |
fail "Failed to decode UTF-8, detected replacement char after decoding" |
361 |
oarr <- fromJResult "Parsing LUXI response" |
362 |
(decodeStrict s)::Result (JSObject JSValue) |
363 |
let arr = J.fromJSObject oarr |
364 |
status <- fromObj arr (strOfKey Success)::Result Bool |
365 |
let rkey = strOfKey Result |
366 |
if status |
367 |
then fromObj arr rkey |
368 |
else fromObj arr rkey >>= fail |
369 |
|
370 |
-- | Generic luxi method call. |
371 |
callMethod :: LuxiOp -> Client -> IO (Result JSValue) |
372 |
callMethod method s = do |
373 |
sendMsg s $ buildCall method |
374 |
result <- recvMsg s |
375 |
let rval = validateResult result |
376 |
return rval |
377 |
|
378 |
-- | Parses a job ID. |
379 |
parseJobId :: JSValue -> Result JobId |
380 |
parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x |
381 |
parseJobId (JSRational _ x) = |
382 |
if denominator x /= 1 |
383 |
then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x |
384 |
-- FIXME: potential integer overflow here on 32-bit platforms |
385 |
else Ok . fromIntegral . numerator $ x |
386 |
parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x |
387 |
|
388 |
-- | Parse job submission result. |
389 |
parseSubmitJobResult :: JSValue -> Result JobId |
390 |
parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v |
391 |
parseSubmitJobResult (JSArray [JSBool False, JSString x]) = |
392 |
Bad (fromJSString x) |
393 |
parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++ |
394 |
show v |
395 |
|
396 |
-- | Specialized submitManyJobs call. |
397 |
submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId]) |
398 |
submitManyJobs s jobs = do |
399 |
rval <- callMethod (SubmitManyJobs jobs) s |
400 |
-- map each result (status, payload) pair into a nice Result ADT |
401 |
return $ case rval of |
402 |
Bad x -> Bad x |
403 |
Ok (JSArray r) -> mapM parseSubmitJobResult r |
404 |
x -> Bad ("Cannot parse response from Ganeti: " ++ show x) |
405 |
|
406 |
-- | Custom queryJobs call. |
407 |
queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus]) |
408 |
queryJobsStatus s jids = do |
409 |
rval <- callMethod (QueryJobs jids ["status"]) s |
410 |
return $ case rval of |
411 |
Bad x -> Bad x |
412 |
Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of |
413 |
J.Ok vals -> if any null vals |
414 |
then Bad "Missing job status field" |
415 |
else Ok (map head vals) |
416 |
J.Error x -> Bad x |