Move JSON.hs and Compat.hs out from under HTools/
[ganeti-local] / htools / Ganeti / Luxi.hs
1 {-# LANGUAGE TemplateHaskell #-}
2
3 {-| Implementation of the Ganeti LUXI interface.
4
5 -}
6
7 {-
8
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
15
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 02110-1301, USA.
25
26 -}
27
28 module Ganeti.Luxi
29   ( LuxiOp(..)
30   , LuxiReq(..)
31   , Client
32   , JobId
33   , RecvResult(..)
34   , TagObject(..)
35   , strOfOp
36   , getClient
37   , getServer
38   , acceptClient
39   , closeClient
40   , closeServer
41   , callMethod
42   , submitManyJobs
43   , queryJobsStatus
44   , buildCall
45   , buildResponse
46   , validateCall
47   , decodeCall
48   , recvMsg
49   , recvMsgExt
50   , sendMsg
51   ) where
52
53 import Control.Exception (catch)
54 import Data.IORef
55 import Data.Ratio (numerator, denominator)
56 import qualified Data.ByteString as B
57 import qualified Data.ByteString.UTF8 as UTF8
58 import Data.Word (Word8)
59 import Control.Monad
60 import Prelude hiding (catch)
61 import Text.JSON (encodeStrict, decodeStrict)
62 import qualified Text.JSON as J
63 import Text.JSON.Types
64 import System.Directory (removeFile)
65 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
66 import System.IO.Error (isEOFError)
67 import System.Timeout
68 import qualified Network.Socket as S
69
70 import Ganeti.JSON
71 import Ganeti.HTools.Types
72 import Ganeti.HTools.Utils
73
74 import Ganeti.Constants
75 import Ganeti.Jobs (JobStatus)
76 import Ganeti.OpCodes (OpCode)
77 import qualified Ganeti.Query.Language as Qlang
78 import Ganeti.THH
79
80 -- * Utility functions
81
82 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
83 withTimeout :: Int -> String -> IO a -> IO a
84 withTimeout secs descr action = do
85   result <- timeout (secs * 1000000) action
86   case result of
87     Nothing -> fail $ "Timeout in " ++ descr
88     Just v -> return v
89
90 -- * Generic protocol functionality
91
92 -- | Result of receiving a message from the socket.
93 data RecvResult = RecvConnClosed    -- ^ Connection closed
94                 | RecvError String  -- ^ Any other error
95                 | RecvOk String     -- ^ Successfull receive
96                   deriving (Show, Read, Eq)
97
98 -- | The Ganeti job type.
99 type JobId = Int
100
101 -- | Data type representing what items do the tag operations apply to.
102 $(declareSADT "TagObject"
103   [ ("TagInstance", 'tagInstance)
104   , ("TagNode",     'tagNode)
105   , ("TagGroup",    'tagNodegroup)
106   , ("TagCluster",  'tagCluster)
107   ])
108 $(makeJSONInstance ''TagObject)
109
110 -- | Currently supported Luxi operations and JSON serialization.
111 $(genLuxiOp "LuxiOp"
112   [ (luxiReqQuery,
113     [ ("what",    [t| Qlang.ItemType |])
114     , ("fields",  [t| [String]  |])
115     , ("qfilter", [t| Qlang.Filter Qlang.FilterField |])
116     ])
117   , (luxiReqQueryFields,
118     [ ("what",    [t| Qlang.ItemType |])
119     , ("fields",  [t| [String]  |])
120     ])
121   , (luxiReqQueryNodes,
122      [ ("names",  [t| [String] |])
123      , ("fields", [t| [String] |])
124      , ("lock",   [t| Bool     |])
125      ])
126   , (luxiReqQueryGroups,
127      [ ("names",  [t| [String] |])
128      , ("fields", [t| [String] |])
129      , ("lock",   [t| Bool     |])
130      ])
131   , (luxiReqQueryInstances,
132      [ ("names",  [t| [String] |])
133      , ("fields", [t| [String] |])
134      , ("lock",   [t| Bool     |])
135      ])
136   , (luxiReqQueryJobs,
137      [ ("ids",    [t| [Int]    |])
138      , ("fields", [t| [String] |])
139      ])
140   , (luxiReqQueryExports,
141      [ ("nodes", [t| [String] |])
142      , ("lock",  [t| Bool     |])
143      ])
144   , (luxiReqQueryConfigValues,
145      [ ("fields", [t| [String] |]) ]
146     )
147   , (luxiReqQueryClusterInfo, [])
148   , (luxiReqQueryTags,
149      [ ("kind", [t| TagObject |])
150      , ("name", [t| String |])
151      ])
152   , (luxiReqSubmitJob,
153      [ ("job", [t| [OpCode] |]) ]
154     )
155   , (luxiReqSubmitManyJobs,
156      [ ("ops", [t| [[OpCode]] |]) ]
157     )
158   , (luxiReqWaitForJobChange,
159      [ ("job",      [t| Int     |])
160      , ("fields",   [t| [String]|])
161      , ("prev_job", [t| JSValue |])
162      , ("prev_log", [t| JSValue |])
163      , ("tmout",    [t| Int     |])
164      ])
165   , (luxiReqArchiveJob,
166      [ ("job", [t| Int |]) ]
167     )
168   , (luxiReqAutoArchiveJobs,
169      [ ("age",   [t| Int |])
170      , ("tmout", [t| Int |])
171      ])
172   , (luxiReqCancelJob,
173      [ ("job", [t| Int |]) ]
174     )
175   , (luxiReqSetDrainFlag,
176      [ ("flag", [t| Bool |]) ]
177     )
178   , (luxiReqSetWatcherPause,
179      [ ("duration", [t| Double |]) ]
180     )
181   ])
182
183 $(makeJSONInstance ''LuxiReq)
184
185 -- | The serialisation of LuxiOps into strings in messages.
186 $(genStrOfOp ''LuxiOp "strOfOp")
187
188 -- | Type holding the initial (unparsed) Luxi call.
189 data LuxiCall = LuxiCall LuxiReq JSValue
190
191 -- | The end-of-message separator.
192 eOM :: Word8
193 eOM = 3
194
195 -- | The end-of-message encoded as a ByteString.
196 bEOM :: B.ByteString
197 bEOM = B.singleton eOM
198
199 -- | Valid keys in the requests and responses.
200 data MsgKeys = Method
201              | Args
202              | Success
203              | Result
204
205 -- | The serialisation of MsgKeys into strings in messages.
206 $(genStrOfKey ''MsgKeys "strOfKey")
207
208 -- | Luxi client encapsulation.
209 data Client = Client { socket :: Handle           -- ^ The socket of the client
210                      , rbuf :: IORef B.ByteString -- ^ Already received buffer
211                      }
212
213 -- | Connects to the master daemon and returns a luxi Client.
214 getClient :: String -> IO Client
215 getClient path = do
216   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
217   withTimeout connTimeout "creating luxi connection" $
218               S.connect s (S.SockAddrUnix path)
219   rf <- newIORef B.empty
220   h <- S.socketToHandle s ReadWriteMode
221   return Client { socket=h, rbuf=rf }
222
223 -- | Creates and returns a server endpoint.
224 getServer :: FilePath -> IO S.Socket
225 getServer path = do
226   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
227   S.bindSocket s (S.SockAddrUnix path)
228   S.listen s 5 -- 5 is the max backlog
229   return s
230
231 -- | Closes a server endpoint.
232 -- FIXME: this should be encapsulated into a nicer type.
233 closeServer :: FilePath -> S.Socket -> IO ()
234 closeServer path sock = do
235   S.sClose sock
236   removeFile path
237
238 -- | Accepts a client
239 acceptClient :: S.Socket -> IO Client
240 acceptClient s = do
241   -- second return is the address of the client, which we ignore here
242   (client_socket, _) <- S.accept s
243   new_buffer <- newIORef B.empty
244   handle <- S.socketToHandle client_socket ReadWriteMode
245   return Client { socket=handle, rbuf=new_buffer }
246
247 -- | Closes the client socket.
248 closeClient :: Client -> IO ()
249 closeClient = hClose . socket
250
251 -- | Sends a message over a luxi transport.
252 sendMsg :: Client -> String -> IO ()
253 sendMsg s buf = withTimeout queryTimeout "sending luxi message" $ do
254   let encoded = UTF8.fromString buf
255       handle = socket s
256   B.hPut handle encoded
257   B.hPut handle bEOM
258   hFlush handle
259
260 -- | Given a current buffer and the handle, it will read from the
261 -- network until we get a full message, and it will return that
262 -- message and the leftover buffer contents.
263 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
264 recvUpdate handle obuf = do
265   nbuf <- withTimeout queryTimeout "reading luxi response" $ do
266             _ <- hWaitForInput handle (-1)
267             B.hGetNonBlocking handle 4096
268   let (msg, remaining) = B.break (eOM ==) nbuf
269       newbuf = B.append obuf msg
270   if B.null remaining
271     then recvUpdate handle newbuf
272     else return (newbuf, B.tail remaining)
273
274 -- | Waits for a message over a luxi transport.
275 recvMsg :: Client -> IO String
276 recvMsg s = do
277   cbuf <- readIORef $ rbuf s
278   let (imsg, ibuf) = B.break (eOM ==) cbuf
279   (msg, nbuf) <-
280     if B.null ibuf      -- if old buffer didn't contain a full message
281       then recvUpdate (socket s) cbuf   -- then we read from network
282       else return (imsg, B.tail ibuf)   -- else we return data from our buffer
283   writeIORef (rbuf s) nbuf
284   return $ UTF8.toString msg
285
286 -- | Extended wrapper over recvMsg.
287 recvMsgExt :: Client -> IO RecvResult
288 recvMsgExt s =
289   catch (liftM RecvOk (recvMsg s)) $ \e ->
290     if isEOFError e
291       then return RecvConnClosed
292       else return $ RecvError (show e)
293
294 -- | Serialize a request to String.
295 buildCall :: LuxiOp  -- ^ The method
296           -> String  -- ^ The serialized form
297 buildCall lo =
298   let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
299            , (strOfKey Args, opToArgs lo)
300            ]
301       jo = toJSObject ja
302   in encodeStrict jo
303
304 -- | Serialize the response to String.
305 buildResponse :: Bool    -- ^ Success
306               -> JSValue -- ^ The arguments
307               -> String  -- ^ The serialized form
308 buildResponse success args =
309   let ja = [ (strOfKey Success, JSBool success)
310            , (strOfKey Result, args)]
311       jo = toJSObject ja
312   in encodeStrict jo
313
314 -- | Check that luxi request contains the required keys and parse it.
315 validateCall :: String -> Result LuxiCall
316 validateCall s = do
317   arr <- fromJResult "parsing top-level luxi message" $
318          decodeStrict s::Result (JSObject JSValue)
319   let aobj = fromJSObject arr
320   call <- fromObj aobj (strOfKey Method)::Result LuxiReq
321   args <- fromObj aobj (strOfKey Args)
322   return (LuxiCall call args)
323
324 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
325 --
326 -- This is currently hand-coded until we make it more uniform so that
327 -- it can be generated using TH.
328 decodeCall :: LuxiCall -> Result LuxiOp
329 decodeCall (LuxiCall call args) =
330   case call of
331     ReqQueryJobs -> do
332               (jid, jargs) <- fromJVal args
333               rid <- mapM parseJobId jid
334               let rargs = map fromJSString jargs
335               return $ QueryJobs rid rargs
336     ReqQueryInstances -> do
337               (names, fields, locking) <- fromJVal args
338               return $ QueryInstances names fields locking
339     ReqQueryNodes -> do
340               (names, fields, locking) <- fromJVal args
341               return $ QueryNodes names fields locking
342     ReqQueryGroups -> do
343               (names, fields, locking) <- fromJVal args
344               return $ QueryGroups names fields locking
345     ReqQueryClusterInfo -> do
346               return QueryClusterInfo
347     ReqQuery -> do
348               (what, fields, qfilter) <- fromJVal args
349               return $ Query what fields qfilter
350     ReqQueryFields -> do
351               (what, fields) <- fromJVal args
352               fields' <- case fields of
353                            JSNull -> return []
354                            _ -> fromJVal fields
355               return $ QueryFields what fields'
356     ReqSubmitJob -> do
357               [ops1] <- fromJVal args
358               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
359               return $ SubmitJob ops2
360     ReqSubmitManyJobs -> do
361               [ops1] <- fromJVal args
362               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
363               return $ SubmitManyJobs ops2
364     ReqWaitForJobChange -> do
365               (jid, fields, pinfo, pidx, wtmout) <-
366                 -- No instance for 5-tuple, code copied from the
367                 -- json sources and adapted
368                 fromJResult "Parsing WaitForJobChange message" $
369                 case args of
370                   JSArray [a, b, c, d, e] ->
371                     (,,,,) `fmap`
372                     J.readJSON a `ap`
373                     J.readJSON b `ap`
374                     J.readJSON c `ap`
375                     J.readJSON d `ap`
376                     J.readJSON e
377                   _ -> J.Error "Not enough values"
378               rid <- parseJobId jid
379               return $ WaitForJobChange rid fields pinfo pidx wtmout
380     ReqArchiveJob -> do
381               [jid] <- fromJVal args
382               rid <- parseJobId jid
383               return $ ArchiveJob rid
384     ReqAutoArchiveJobs -> do
385               (age, tmout) <- fromJVal args
386               return $ AutoArchiveJobs age tmout
387     ReqQueryExports -> do
388               (nodes, lock) <- fromJVal args
389               return $ QueryExports nodes lock
390     ReqQueryConfigValues -> do
391               [fields] <- fromJVal args
392               return $ QueryConfigValues fields
393     ReqQueryTags -> do
394               (kind, name) <- fromJVal args
395               return $ QueryTags kind name
396     ReqCancelJob -> do
397               [job] <- fromJVal args
398               rid <- parseJobId job
399               return $ CancelJob rid
400     ReqSetDrainFlag -> do
401               [flag] <- fromJVal args
402               return $ SetDrainFlag flag
403     ReqSetWatcherPause -> do
404               [duration] <- fromJVal args
405               return $ SetWatcherPause duration
406
407 -- | Check that luxi responses contain the required keys and that the
408 -- call was successful.
409 validateResult :: String -> Result JSValue
410 validateResult s = do
411   when (UTF8.replacement_char `elem` s) $
412        fail "Failed to decode UTF-8, detected replacement char after decoding"
413   oarr <- fromJResult "Parsing LUXI response"
414           (decodeStrict s)::Result (JSObject JSValue)
415   let arr = J.fromJSObject oarr
416   status <- fromObj arr (strOfKey Success)::Result Bool
417   let rkey = strOfKey Result
418   if status
419     then fromObj arr rkey
420     else fromObj arr rkey >>= fail
421
422 -- | Generic luxi method call.
423 callMethod :: LuxiOp -> Client -> IO (Result JSValue)
424 callMethod method s = do
425   sendMsg s $ buildCall method
426   result <- recvMsg s
427   let rval = validateResult result
428   return rval
429
430 -- | Parses a job ID.
431 parseJobId :: JSValue -> Result JobId
432 parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
433 parseJobId (JSRational _ x) =
434   if denominator x /= 1
435     then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
436     -- FIXME: potential integer overflow here on 32-bit platforms
437     else Ok . fromIntegral . numerator $ x
438 parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
439
440 -- | Parse job submission result.
441 parseSubmitJobResult :: JSValue -> Result JobId
442 parseSubmitJobResult (JSArray [JSBool True, v]) = parseJobId v
443 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
444   Bad (fromJSString x)
445 parseSubmitJobResult v = Bad $ "Unknown result from the master daemon" ++
446                          show v
447
448 -- | Specialized submitManyJobs call.
449 submitManyJobs :: Client -> [[OpCode]] -> IO (Result [JobId])
450 submitManyJobs s jobs = do
451   rval <- callMethod (SubmitManyJobs jobs) s
452   -- map each result (status, payload) pair into a nice Result ADT
453   return $ case rval of
454              Bad x -> Bad x
455              Ok (JSArray r) -> mapM parseSubmitJobResult r
456              x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
457
458 -- | Custom queryJobs call.
459 queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
460 queryJobsStatus s jids = do
461   rval <- callMethod (QueryJobs jids ["status"]) s
462   return $ case rval of
463              Bad x -> Bad x
464              Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
465                        J.Ok vals -> if any null vals
466                                     then Bad "Missing job status field"
467                                     else Ok (map head vals)
468                        J.Error x -> Bad x