root / Ganeti / Luxi.hs @ 306cccd5
History | View | Annotate | Download (8.4 kB)
1 |
{-| Implementation of the Ganeti LUXI interface. |
---|---|
2 |
|
3 |
-} |
4 |
|
5 |
{- |
6 |
|
7 |
Copyright (C) 2009 Google Inc. |
8 |
|
9 |
This program is free software; you can redistribute it and/or modify |
10 |
it under the terms of the GNU General Public License as published by |
11 |
the Free Software Foundation; either version 2 of the License, or |
12 |
(at your option) any later version. |
13 |
|
14 |
This program is distributed in the hope that it will be useful, but |
15 |
WITHOUT ANY WARRANTY; without even the implied warranty of |
16 |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
17 |
General Public License for more details. |
18 |
|
19 |
You should have received a copy of the GNU General Public License |
20 |
along with this program; if not, write to the Free Software |
21 |
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
22 |
02110-1301, USA. |
23 |
|
24 |
-} |
25 |
|
26 |
module Ganeti.Luxi |
27 |
( LuxiOp(..) |
28 |
, Client |
29 |
, getClient |
30 |
, closeClient |
31 |
, callMethod |
32 |
, submitManyJobs |
33 |
, queryJobsStatus |
34 |
) where |
35 |
|
36 |
import Data.IORef |
37 |
import Control.Monad |
38 |
import Text.JSON (encodeStrict, decodeStrict) |
39 |
import qualified Text.JSON as J |
40 |
import Text.JSON.Types |
41 |
import System.Timeout |
42 |
import qualified Network.Socket as S |
43 |
|
44 |
import Ganeti.HTools.Utils |
45 |
import Ganeti.HTools.Types |
46 |
|
47 |
import Ganeti.Jobs (JobStatus) |
48 |
import Ganeti.OpCodes (OpCode) |
49 |
|
50 |
-- * Utility functions |
51 |
|
52 |
-- | Wrapper over System.Timeout.timeout that fails in the IO monad. |
53 |
withTimeout :: Int -> String -> IO a -> IO a |
54 |
withTimeout secs descr action = do |
55 |
result <- timeout (secs * 1000000) action |
56 |
(case result of |
57 |
Nothing -> fail $ "Timeout in " ++ descr |
58 |
Just v -> return v) |
59 |
|
60 |
-- * Generic protocol functionality |
61 |
|
62 |
-- | Currently supported Luxi operations. |
63 |
data LuxiOp = QueryInstances [String] [String] Bool |
64 |
| QueryNodes [String] [String] Bool |
65 |
| QueryJobs [Int] [String] |
66 |
| QueryExports [String] Bool |
67 |
| QueryConfigValues [String] |
68 |
| QueryClusterInfo |
69 |
| QueryTags String String |
70 |
| SubmitJob [OpCode] |
71 |
| SubmitManyJobs [[OpCode]] |
72 |
| WaitForJobChange Int [String] JSValue JSValue Int |
73 |
| ArchiveJob Int |
74 |
| AutoArchiveJobs Int Int |
75 |
| CancelJob Int |
76 |
| SetDrainFlag Bool |
77 |
| SetWatcherPause Double |
78 |
deriving (Show) |
79 |
|
80 |
-- | The serialisation of LuxiOps into strings in messages. |
81 |
strOfOp :: LuxiOp -> String |
82 |
strOfOp QueryNodes {} = "QueryNodes" |
83 |
strOfOp QueryInstances {} = "QueryInstances" |
84 |
strOfOp QueryJobs {} = "QueryJobs" |
85 |
strOfOp QueryExports {} = "QueryExports" |
86 |
strOfOp QueryConfigValues {} = "QueryConfigValues" |
87 |
strOfOp QueryClusterInfo {} = "QueryClusterInfo" |
88 |
strOfOp QueryTags {} = "QueryTags" |
89 |
strOfOp SubmitManyJobs {} = "SubmitManyJobs" |
90 |
strOfOp WaitForJobChange {} = "WaitForJobChange" |
91 |
strOfOp SubmitJob {} = "SubmitJob" |
92 |
strOfOp ArchiveJob {} = "ArchiveJob" |
93 |
strOfOp AutoArchiveJobs {} = "AutoArchiveJobs" |
94 |
strOfOp CancelJob {} = "CancelJob" |
95 |
strOfOp SetDrainFlag {} = "SetDrainFlag" |
96 |
strOfOp SetWatcherPause {} = "SetWatcherPause" |
97 |
|
98 |
-- | The end-of-message separator. |
99 |
eOM :: Char |
100 |
eOM = '\3' |
101 |
|
102 |
-- | Valid keys in the requests and responses. |
103 |
data MsgKeys = Method |
104 |
| Args |
105 |
| Success |
106 |
| Result |
107 |
|
108 |
-- | The serialisation of MsgKeys into strings in messages. |
109 |
strOfKey :: MsgKeys -> String |
110 |
strOfKey Method = "method" |
111 |
strOfKey Args = "args" |
112 |
strOfKey Success = "success" |
113 |
strOfKey Result = "result" |
114 |
|
115 |
-- | Luxi client encapsulation. |
116 |
data Client = Client { socket :: S.Socket -- ^ The socket of the client |
117 |
, rbuf :: IORef String -- ^ Already received buffer |
118 |
} |
119 |
|
120 |
-- | Connects to the master daemon and returns a luxi Client. |
121 |
getClient :: String -> IO Client |
122 |
getClient path = do |
123 |
s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol |
124 |
withTimeout connTimeout "creating luxi connection" $ |
125 |
S.connect s (S.SockAddrUnix path) |
126 |
rf <- newIORef "" |
127 |
return Client { socket=s, rbuf=rf} |
128 |
|
129 |
-- | Closes the client socket. |
130 |
closeClient :: Client -> IO () |
131 |
closeClient = S.sClose . socket |
132 |
|
133 |
-- | Sends a message over a luxi transport. |
134 |
sendMsg :: Client -> String -> IO () |
135 |
sendMsg s buf = |
136 |
let _send obuf = do |
137 |
sbytes <- withTimeout queryTimeout |
138 |
"sending luxi message" $ |
139 |
S.send (socket s) obuf |
140 |
unless (sbytes == length obuf) $ _send (drop sbytes obuf) |
141 |
in _send (buf ++ [eOM]) |
142 |
|
143 |
-- | Waits for a message over a luxi transport. |
144 |
recvMsg :: Client -> IO String |
145 |
recvMsg s = do |
146 |
let _recv obuf = do |
147 |
nbuf <- withTimeout queryTimeout "reading luxi response" $ |
148 |
S.recv (socket s) 4096 |
149 |
let (msg, remaining) = break (eOM ==) nbuf |
150 |
(if null remaining |
151 |
then _recv (obuf ++ msg) |
152 |
else return (obuf ++ msg, tail remaining)) |
153 |
cbuf <- readIORef $ rbuf s |
154 |
let (imsg, ibuf) = break (eOM ==) cbuf |
155 |
(msg, nbuf) <- |
156 |
(if null ibuf -- if old buffer didn't contain a full message |
157 |
then _recv cbuf -- then we read from network |
158 |
else return (imsg, tail ibuf)) -- else we return data from our buffer |
159 |
writeIORef (rbuf s) nbuf |
160 |
return msg |
161 |
|
162 |
-- | Compute the serialized form of a Luxi operation |
163 |
opToArgs :: LuxiOp -> JSValue |
164 |
opToArgs (QueryNodes names fields lock) = J.showJSON (names, fields, lock) |
165 |
opToArgs (QueryInstances names fields lock) = J.showJSON (names, fields, lock) |
166 |
opToArgs (QueryJobs ids fields) = J.showJSON (map show ids, fields) |
167 |
opToArgs (QueryExports nodes lock) = J.showJSON (nodes, lock) |
168 |
opToArgs (QueryConfigValues fields) = J.showJSON fields |
169 |
opToArgs (QueryClusterInfo) = J.showJSON () |
170 |
opToArgs (QueryTags kind name) = J.showJSON (kind, name) |
171 |
opToArgs (SubmitJob j) = J.showJSON j |
172 |
opToArgs (SubmitManyJobs ops) = J.showJSON ops |
173 |
-- This is special, since the JSON library doesn't export an instance |
174 |
-- of a 5-tuple |
175 |
opToArgs (WaitForJobChange a b c d e) = |
176 |
JSArray [ J.showJSON a, J.showJSON b, J.showJSON c |
177 |
, J.showJSON d, J.showJSON e] |
178 |
opToArgs (ArchiveJob a) = J.showJSON (show a) |
179 |
opToArgs (AutoArchiveJobs a b) = J.showJSON (a, b) |
180 |
opToArgs (CancelJob a) = J.showJSON (show a) |
181 |
opToArgs (SetDrainFlag flag) = J.showJSON flag |
182 |
opToArgs (SetWatcherPause duration) = J.showJSON [duration] |
183 |
|
184 |
-- | Serialize a request to String. |
185 |
buildCall :: LuxiOp -- ^ The method |
186 |
-> String -- ^ The serialized form |
187 |
buildCall lo = |
188 |
let ja = [ (strOfKey Method, JSString $ toJSString $ strOfOp lo::JSValue) |
189 |
, (strOfKey Args, opToArgs lo::JSValue) |
190 |
] |
191 |
jo = toJSObject ja |
192 |
in encodeStrict jo |
193 |
|
194 |
-- | Check that luxi responses contain the required keys and that the |
195 |
-- call was successful. |
196 |
validateResult :: String -> Result JSValue |
197 |
validateResult s = do |
198 |
oarr <- fromJResult "Parsing LUXI response" |
199 |
(decodeStrict s)::Result (JSObject JSValue) |
200 |
let arr = J.fromJSObject oarr |
201 |
status <- fromObj (strOfKey Success) arr::Result Bool |
202 |
let rkey = strOfKey Result |
203 |
(if status |
204 |
then fromObj rkey arr |
205 |
else fromObj rkey arr >>= fail) |
206 |
|
207 |
-- | Generic luxi method call. |
208 |
callMethod :: LuxiOp -> Client -> IO (Result JSValue) |
209 |
callMethod method s = do |
210 |
sendMsg s $ buildCall method |
211 |
result <- recvMsg s |
212 |
let rval = validateResult result |
213 |
return rval |
214 |
|
215 |
-- | Specialized submitManyJobs call. |
216 |
submitManyJobs :: Client -> [[OpCode]] -> IO (Result [String]) |
217 |
submitManyJobs s jobs = do |
218 |
rval <- callMethod (SubmitManyJobs jobs) s |
219 |
-- map each result (status, payload) pair into a nice Result ADT |
220 |
return $ case rval of |
221 |
Bad x -> Bad x |
222 |
Ok (JSArray r) -> |
223 |
mapM (\v -> case v of |
224 |
JSArray [JSBool True, JSString x] -> |
225 |
Ok (fromJSString x) |
226 |
JSArray [JSBool False, JSString x] -> |
227 |
Bad (fromJSString x) |
228 |
_ -> Bad "Unknown result from the master daemon" |
229 |
) r |
230 |
x -> Bad ("Cannot parse response from Ganeti: " ++ show x) |
231 |
|
232 |
-- | Custom queryJobs call. |
233 |
queryJobsStatus :: Client -> [String] -> IO (Result [JobStatus]) |
234 |
queryJobsStatus s jids = do |
235 |
rval <- callMethod (QueryJobs (map read jids) ["status"]) s |
236 |
return $ case rval of |
237 |
Bad x -> Bad x |
238 |
Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of |
239 |
J.Ok vals -> if any null vals |
240 |
then Bad "Missing job status field" |
241 |
else Ok (map head vals) |
242 |
J.Error x -> Bad x |