Rename Query2.hs to Qlang.hs
[ganeti-local] / htools / Ganeti / Luxi.hs
1 {-# LANGUAGE TemplateHaskell #-}
2
3 {-| Implementation of the Ganeti LUXI interface.
4
5 -}
6
7 {-
8
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
15
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 02110-1301, USA.
25
26 -}
27
28 module Ganeti.Luxi
29   ( LuxiOp(..)
30   , QrViaLuxi(..)
31   , ResultStatus(..)
32   , LuxiReq(..)
33   , Client
34   , JobId
35   , RecvResult(..)
36   , strOfOp
37   , checkRS
38   , getClient
39   , getServer
40   , acceptClient
41   , closeClient
42   , closeServer
43   , callMethod
44   , submitManyJobs
45   , queryJobsStatus
46   , buildCall
47   , buildResponse
48   , validateCall
49   , decodeCall
50   , recvMsg
51   , recvMsgExt
52   , sendMsg
53   ) where
54
55 import Control.Exception (catch)
56 import Data.IORef
57 import Data.Ratio (numerator, denominator)
58 import qualified Data.ByteString as B
59 import qualified Data.ByteString.UTF8 as UTF8
60 import Data.Word (Word8)
61 import Control.Monad
62 import Prelude hiding (catch)
63 import Text.JSON (encodeStrict, decodeStrict)
64 import qualified Text.JSON as J
65 import Text.JSON.Types
66 import System.Directory (removeFile)
67 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
68 import System.IO.Error (isEOFError)
69 import System.Timeout
70 import qualified Network.Socket as S
71
72 import Ganeti.HTools.JSON
73 import Ganeti.HTools.Types
74 import Ganeti.HTools.Utils
75
76 import Ganeti.Constants
77 import Ganeti.Jobs (JobStatus)
78 import Ganeti.OpCodes (OpCode)
79 import qualified Ganeti.Qlang as Qlang
80 import Ganeti.THH
81
82 -- * Utility functions
83
84 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
85 withTimeout :: Int -> String -> IO a -> IO a
86 withTimeout secs descr action = do
87   result <- timeout (secs * 1000000) action
88   case result of
89     Nothing -> fail $ "Timeout in " ++ descr
90     Just v -> return v
91
92 -- * Generic protocol functionality
93
94 -- | Result of receiving a message from the socket.
95 data RecvResult = RecvConnClosed    -- ^ Connection closed
96                 | RecvError String  -- ^ Any other error
97                 | RecvOk String     -- ^ Successfull receive
98                   deriving (Show, Read, Eq)
99
100 -- | The Ganeti job type.
101 type JobId = Int
102
103 $(declareSADT "QrViaLuxi"
104   [ ("QRLock",     'qrLock)
105   , ("QRInstance", 'qrInstance)
106   , ("QRNode",     'qrNode)
107   , ("QRGroup",    'qrGroup)
108   , ("QROs",       'qrOs)
109   , ("QRJob",      'qrJob)
110   ])
111 $(makeJSONInstance ''QrViaLuxi)
112
113 -- | Currently supported Luxi operations and JSON serialization.
114 $(genLuxiOp "LuxiOp"
115   [(luxiReqQuery,
116     [ ("what",    [t| QrViaLuxi |], [| id |])
117     , ("fields",  [t| [String]  |], [| id |])
118     , ("qfilter", [t| Qlang.Filter |], [| id |])
119     ])
120   , (luxiReqQueryNodes,
121      [ ("names",  [t| [String] |], [| id |])
122      , ("fields", [t| [String] |], [| id |])
123      , ("lock",   [t| Bool     |], [| id |])
124      ])
125   , (luxiReqQueryGroups,
126      [ ("names",  [t| [String] |], [| id |])
127      , ("fields", [t| [String] |], [| id |])
128      , ("lock",   [t| Bool     |], [| id |])
129      ])
130   , (luxiReqQueryInstances,
131      [ ("names",  [t| [String] |], [| id |])
132      , ("fields", [t| [String] |], [| id |])
133      , ("lock",   [t| Bool     |], [| id |])
134      ])
135   , (luxiReqQueryJobs,
136      [ ("ids",    [t| [Int]    |], [| id |])
137      , ("fields", [t| [String] |], [| id |])
138      ])
139   , (luxiReqQueryExports,
140      [ ("nodes", [t| [String] |], [| id |])
141      , ("lock",  [t| Bool     |], [| id |])
142      ])
143   , (luxiReqQueryConfigValues,
144      [ ("fields", [t| [String] |], [| id |]) ]
145     )
146   , (luxiReqQueryClusterInfo, [])
147   , (luxiReqQueryTags,
148      [ ("kind", [t| String |], [| id |])
149      , ("name", [t| String |], [| id |])
150      ])
151   , (luxiReqSubmitJob,
152      [ ("job", [t| [OpCode] |], [| id |]) ]
153     )
154   , (luxiReqSubmitManyJobs,
155      [ ("ops", [t| [[OpCode]] |], [| id |]) ]
156     )
157   , (luxiReqWaitForJobChange,
158      [ ("job",      [t| Int     |], [| id |])
159      , ("fields",   [t| [String]|], [| id |])
160      , ("prev_job", [t| JSValue |], [| id |])
161      , ("prev_log", [t| JSValue |], [| id |])
162      , ("tmout",    [t| Int     |], [| id |])
163      ])
164   , (luxiReqArchiveJob,
165      [ ("job", [t| Int |], [| id |]) ]
166     )
167   , (luxiReqAutoArchiveJobs,
168      [ ("age",   [t| Int |], [| id |])
169      , ("tmout", [t| Int |], [| id |])
170      ])
171   , (luxiReqCancelJob,
172      [ ("job", [t| Int |], [| id |]) ]
173     )
174   , (luxiReqSetDrainFlag,
175      [ ("flag", [t| Bool |], [| id |]) ]
176     )
177   , (luxiReqSetWatcherPause,
178      [ ("duration", [t| Double |], [| id |]) ]
179     )
180   ])
181
182 $(makeJSONInstance ''LuxiReq)
183
184 -- | The serialisation of LuxiOps into strings in messages.
185 $(genStrOfOp ''LuxiOp "strOfOp")
186
187 $(declareIADT "ResultStatus"
188   [ ("RSNormal", 'rsNormal)
189   , ("RSUnknown", 'rsUnknown)
190   , ("RSNoData", 'rsNodata)
191   , ("RSUnavailable", 'rsUnavail)
192   , ("RSOffline", 'rsOffline)
193   ])
194
195 $(makeJSONInstance ''ResultStatus)
196
197 -- | Type holding the initial (unparsed) Luxi call.
198 data LuxiCall = LuxiCall LuxiReq JSValue
199
200 -- | Check that ResultStatus is success or fail with descriptive message.
201 checkRS :: (Monad m) => ResultStatus -> a -> m a
202 checkRS RSNormal val    = return val
203 checkRS RSUnknown _     = fail "Unknown field"
204 checkRS RSNoData _      = fail "No data for a field"
205 checkRS RSUnavailable _ = fail "Ganeti reports unavailable data"
206 checkRS RSOffline _     = fail "Ganeti reports resource as offline"
207
208 -- | The end-of-message separator.
209 eOM :: Word8
210 eOM = 3
211
212 -- | The end-of-message encoded as a ByteString.
213 bEOM :: B.ByteString
214 bEOM = B.singleton eOM
215
216 -- | Valid keys in the requests and responses.
217 data MsgKeys = Method
218              | Args
219              | Success
220              | Result
221
222 -- | The serialisation of MsgKeys into strings in messages.
223 $(genStrOfKey ''MsgKeys "strOfKey")
224
225 -- | Luxi client encapsulation.
226 data Client = Client { socket :: Handle           -- ^ The socket of the client
227                      , rbuf :: IORef B.ByteString -- ^ Already received buffer
228                      }
229
230 -- | Connects to the master daemon and returns a luxi Client.
231 getClient :: String -> IO Client
232 getClient path = do
233   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
234   withTimeout connTimeout "creating luxi connection" $
235               S.connect s (S.SockAddrUnix path)
236   rf <- newIORef B.empty
237   h <- S.socketToHandle s ReadWriteMode
238   return Client { socket=h, rbuf=rf }
239
240 -- | Creates and returns a server endpoint.
241 getServer :: FilePath -> IO S.Socket
242 getServer path = do
243   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
244   S.bindSocket s (S.SockAddrUnix path)
245   S.listen s 5 -- 5 is the max backlog
246   return s
247
248 -- | Closes a server endpoint.
249 -- FIXME: this should be encapsulated into a nicer type.
250 closeServer :: FilePath -> S.Socket -> IO ()
251 closeServer path sock = do
252   S.sClose sock
253   removeFile path
254
255 -- | Accepts a client
256 acceptClient :: S.Socket -> IO Client
257 acceptClient s = do
258   -- second return is the address of the client, which we ignore here
259   (client_socket, _) <- S.accept s
260   new_buffer <- newIORef B.empty
261   handle <- S.socketToHandle client_socket ReadWriteMode
262   return Client { socket=handle, rbuf=new_buffer }
263
264 -- | Closes the client socket.
265 closeClient :: Client -> IO ()
266 closeClient = hClose . socket
267
268 -- | Sends a message over a luxi transport.
269 sendMsg :: Client -> String -> IO ()
270 sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
271   let encoded = UTF8.fromString buf
272       handle = socket s
273   B.hPut handle encoded
274   B.hPut handle bEOM
275   hFlush handle
276
277 -- | Given a current buffer and the handle, it will read from the
278 -- network until we get a full message, and it will return that
279 -- message and the leftover buffer contents.
280 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
281 recvUpdate handle obuf = do
282   nbuf <- withTimeout queryTimeout "reading luxi response" $ do
283             _ <- hWaitForInput handle (-1)
284             B.hGetNonBlocking handle 4096
285   let (msg, remaining) = B.break (eOM ==) nbuf
286       newbuf = B.append obuf msg
287   if B.null remaining
288     then recvUpdate handle newbuf
289     else return (newbuf, B.tail remaining)
290
291 -- | Waits for a message over a luxi transport.
292 recvMsg :: Client -> IO String
293 recvMsg s = do
294   cbuf <- readIORef $ rbuf s
295   let (imsg, ibuf) = B.break (eOM ==) cbuf
296   (msg, nbuf) <-
297     if B.null ibuf      -- if old buffer didn't contain a full message
298       then recvUpdate (socket s) cbuf   -- then we read from network
299       else return (imsg, B.tail ibuf)   -- else we return data from our buffer
300   writeIORef (rbuf s) nbuf
301   return $ UTF8.toString msg
302
303 -- | Extended wrapper over recvMsg.
304 recvMsgExt :: Client -> IO RecvResult
305 recvMsgExt s =
306   catch (liftM RecvOk (recvMsg s)) $ \e ->
307     if isEOFError e
308       then return RecvConnClosed
309       else return $ RecvError (show e)
310
311 -- | Serialize a request to String.
312 buildCall :: LuxiOp  -- ^ The method
313           -> String  -- ^ The serialized form
314 buildCall lo =
315   let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
316            , (strOfKey Args, opToArgs lo)
317            ]
318       jo = toJSObject ja
319   in encodeStrict jo
320
321 -- | Serialize the response to String.
322 buildResponse :: Bool    -- ^ Success
323               -> JSValue -- ^ The arguments
324               -> String  -- ^ The serialized form
325 buildResponse success args =
326   let ja = [ (strOfKey Success, JSBool success)
327            , (strOfKey Result, args)]
328       jo = toJSObject ja
329   in encodeStrict jo
330
331 -- | Check that luxi request contains the required keys and parse it.
332 validateCall :: String -> Result LuxiCall
333 validateCall s = do
334   arr <- fromJResult "parsing top-level luxi message" $
335          decodeStrict s::Result (JSObject JSValue)
336   let aobj = fromJSObject arr
337   call <- fromObj aobj (strOfKey Method)::Result LuxiReq
338   args <- fromObj aobj (strOfKey Args)
339   return (LuxiCall call args)
340
341 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
342 --
343 -- This is currently hand-coded until we make it more uniform so that
344 -- it can be generated using TH.
345 decodeCall :: LuxiCall -> Result LuxiOp
346 decodeCall (LuxiCall call args) =
347   case call of
348     ReqQueryJobs -> do
349               (jid, jargs) <- fromJVal args
350               rid <- mapM parseJobId jid
351               let rargs = map fromJSString jargs
352               return $ QueryJobs rid rargs
353     ReqQueryInstances -> do
354               (names, fields, locking) <- fromJVal args
355               return $ QueryInstances names fields locking
356     ReqQueryNodes -> do
357               (names, fields, locking) <- fromJVal args
358               return $ QueryNodes names fields locking
359     ReqQueryGroups -> do
360               (names, fields, locking) <- fromJVal args
361               return $ QueryGroups names fields locking
362     ReqQueryClusterInfo -> do
363               return QueryClusterInfo
364     ReqQuery -> do
365               (what, fields, qfilter) <- fromJVal args
366               return $ Query what fields qfilter
367     ReqSubmitJob -> do
368               [ops1] <- fromJVal args
369               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
370               return $ SubmitJob ops2
371     ReqSubmitManyJobs -> do
372               [ops1] <- fromJVal args
373               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
374               return $ SubmitManyJobs ops2
375     ReqWaitForJobChange -> do
376               (jid, fields, pinfo, pidx, wtmout) <-
377                 -- No instance for 5-tuple, code copied from the
378                 -- json sources and adapted
379                 fromJResult "Parsing WaitForJobChange message" $
380                 case args of
381                   JSArray [a, b, c, d, e] ->
382                     (,,,,) `fmap`
383                     J.readJSON a `ap`
384                     J.readJSON b `ap`
385                     J.readJSON c `ap`
386                     J.readJSON d `ap`
387                     J.readJSON e
388                   _ -> J.Error "Not enough values"
389               rid <- parseJobId jid
390               return $ WaitForJobChange rid fields pinfo pidx wtmout
391     ReqArchiveJob -> do
392               [jid] <- fromJVal args
393               rid <- parseJobId jid
394               return $ ArchiveJob rid
395     ReqAutoArchiveJobs -> do
396               (age, tmout) <- fromJVal args
397               return $ AutoArchiveJobs age tmout
398     ReqQueryExports -> do
399               (nodes, lock) <- fromJVal args
400               return $ QueryExports nodes lock
401     ReqQueryConfigValues -> do
402               [fields] <- fromJVal args
403               return $ QueryConfigValues fields
404     ReqQueryTags -> do
405               (kind, name) <- fromJVal args
406               return $ QueryTags kind name
407     ReqCancelJob -> do
408               [job] <- fromJVal args
409               rid <- parseJobId job
410               return $ CancelJob rid
411     ReqSetDrainFlag -> do
412               [flag] <- fromJVal args
413               return $ SetDrainFlag flag
414     ReqSetWatcherPause -> do
415               [duration] <- fromJVal args
416               return $ SetWatcherPause duration
417
418 -- | Check that luxi responses contain the required keys and that the
419 -- call was successful.
420 validateResult :: String -> Result JSValue
421 validateResult s = do
422   when (UTF8.replacement_char `elem` s) $
423        fail "Failed to decode UTF-8, detected replacement char after decoding"
424   oarr <- fromJResult "Parsing LUXI response"
425           (decodeStrict s)::Result (JSObject JSValue)
426   let arr = J.fromJSObject oarr
427   status <- fromObj arr (strOfKey Success)::Result Bool
428   let rkey = strOfKey Result
429   if status
430     then fromObj arr rkey
431     else fromObj arr rkey >>= fail
432
433 -- | Generic luxi method call.
434 callMethod :: LuxiOp -> Client -> IO (Result JSValue)
435 callMethod method s = do
436   sendMsg s $ buildCall method
437   result <- recvMsg s
438   let rval = validateResult result
439   return rval
440
441 -- | Parses a job ID.
442 parseJobId :: JSValue -> Result JobId
443 parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
444 parseJobId (JSRational _ x) =
445   if denominator x /= 1
446     then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
447     -- FIXME: potential integer overflow here on 32-bit platforms
448     else Ok . fromIntegral . numerator $ x
449 parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
450
451 -- | Parse job submission result.
452 parseSubmitJobResult :: JSValue -> Result JobId
453 parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v
454 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
455   Bad (fromJSString x)
456 parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++
457                          show v
458
459 -- | Specialized submitManyJobs call.
460 submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId])
461 submitManyJobs s jobs = do
462   rval <- callMethod (SubmitManyJobs jobs) s
463   -- map each result (status, payload) pair into a nice Result ADT
464   return $ case rval of
465              Bad x -> Bad x
466              Ok (JSArray r) -> mapM parseSubmitJobResult r
467              x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
468
469 -- | Custom queryJobs call.
470 queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
471 queryJobsStatus s jids = do
472   rval <- callMethod (QueryJobs jids ["status"]) s
473   return $ case rval of
474              Bad x -> Bad x
475              Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
476                        J.Ok vals -> if any null vals
477                                     then Bad "Missing job status field"
478                                     else Ok (map head vals)
479                        J.Error x -> Bad x