Merge branch 'devel-2.6' into submit
[ganeti-local] / htools / Ganeti / Luxi.hs
1 {-# LANGUAGE TemplateHaskell #-}
2
3 {-| Implementation of the Ganeti LUXI interface.
4
5 -}
6
7 {-
8
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
15
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 02110-1301, USA.
25
26 -}
27
28 module Ganeti.Luxi
29   ( LuxiOp(..)
30   , LuxiReq(..)
31   , Client
32   , JobId
33   , RecvResult(..)
34   , TagObject(..)
35   , strOfOp
36   , getClient
37   , getServer
38   , acceptClient
39   , closeClient
40   , closeServer
41   , callMethod
42   , submitManyJobs
43   , queryJobsStatus
44   , buildCall
45   , buildResponse
46   , validateCall
47   , decodeCall
48   , recvMsg
49   , recvMsgExt
50   , sendMsg
51   , allLuxiCalls
52   ) where
53
54 import Control.Exception (catch)
55 import Data.IORef
56 import Data.Ratio (numerator, denominator)
57 import qualified Data.ByteString as B
58 import qualified Data.ByteString.UTF8 as UTF8
59 import Data.Word (Word8)
60 import Control.Monad
61 import Prelude hiding (catch)
62 import Text.JSON (encodeStrict, decodeStrict)
63 import qualified Text.JSON as J
64 import Text.JSON.Pretty (pp_value)
65 import Text.JSON.Types
66 import System.Directory (removeFile)
67 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
68 import System.IO.Error (isEOFError)
69 import System.Timeout
70 import qualified Network.Socket as S
71
72 import Ganeti.BasicTypes
73 import Ganeti.Constants
74 import Ganeti.Errors
75 import Ganeti.JSON
76 import Ganeti.Jobs (JobStatus)
77 import Ganeti.OpCodes (OpCode)
78 import Ganeti.Utils
79 import qualified Ganeti.Query.Language as Qlang
80 import Ganeti.THH
81
82 -- * Utility functions
83
84 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
85 withTimeout :: Int -> String -> IO a -> IO a
86 withTimeout secs descr action = do
87   result <- timeout (secs * 1000000) action
88   case result of
89     Nothing -> fail $ "Timeout in " ++ descr
90     Just v -> return v
91
92 -- * Generic protocol functionality
93
94 -- | Result of receiving a message from the socket.
95 data RecvResult = RecvConnClosed    -- ^ Connection closed
96                 | RecvError String  -- ^ Any other error
97                 | RecvOk String     -- ^ Successfull receive
98                   deriving (Show, Read, Eq)
99
100 -- | The Ganeti job type.
101 type JobId = Int
102
103 -- | Data type representing what items do the tag operations apply to.
104 $(declareSADT "TagObject"
105   [ ("TagInstance", 'tagInstance)
106   , ("TagNode",     'tagNode)
107   , ("TagGroup",    'tagNodegroup)
108   , ("TagCluster",  'tagCluster)
109   ])
110 $(makeJSONInstance ''TagObject)
111
112 -- | Currently supported Luxi operations and JSON serialization.
113 $(genLuxiOp "LuxiOp"
114   [ (luxiReqQuery,
115     [ ("what",    [t| Qlang.ItemType |])
116     , ("fields",  [t| [String]  |])
117     , ("qfilter", [t| Qlang.Filter Qlang.FilterField |])
118     ])
119   , (luxiReqQueryFields,
120     [ ("what",    [t| Qlang.ItemType |])
121     , ("fields",  [t| [String]  |])
122     ])
123   , (luxiReqQueryNodes,
124      [ ("names",  [t| [String] |])
125      , ("fields", [t| [String] |])
126      , ("lock",   [t| Bool     |])
127      ])
128   , (luxiReqQueryGroups,
129      [ ("names",  [t| [String] |])
130      , ("fields", [t| [String] |])
131      , ("lock",   [t| Bool     |])
132      ])
133   , (luxiReqQueryInstances,
134      [ ("names",  [t| [String] |])
135      , ("fields", [t| [String] |])
136      , ("lock",   [t| Bool     |])
137      ])
138   , (luxiReqQueryJobs,
139      [ ("ids",    [t| [Int]    |])
140      , ("fields", [t| [String] |])
141      ])
142   , (luxiReqQueryExports,
143      [ ("nodes", [t| [String] |])
144      , ("lock",  [t| Bool     |])
145      ])
146   , (luxiReqQueryConfigValues,
147      [ ("fields", [t| [String] |]) ]
148     )
149   , (luxiReqQueryClusterInfo, [])
150   , (luxiReqQueryTags,
151      [ ("kind", [t| TagObject |])
152      , ("name", [t| String |])
153      ])
154   , (luxiReqSubmitJob,
155      [ ("job", [t| [OpCode] |]) ]
156     )
157   , (luxiReqSubmitManyJobs,
158      [ ("ops", [t| [[OpCode]] |]) ]
159     )
160   , (luxiReqWaitForJobChange,
161      [ ("job",      [t| Int     |])
162      , ("fields",   [t| [String]|])
163      , ("prev_job", [t| JSValue |])
164      , ("prev_log", [t| JSValue |])
165      , ("tmout",    [t| Int     |])
166      ])
167   , (luxiReqArchiveJob,
168      [ ("job", [t| Int |]) ]
169     )
170   , (luxiReqAutoArchiveJobs,
171      [ ("age",   [t| Int |])
172      , ("tmout", [t| Int |])
173      ])
174   , (luxiReqCancelJob,
175      [ ("job", [t| Int |]) ]
176     )
177   , (luxiReqSetDrainFlag,
178      [ ("flag", [t| Bool |]) ]
179     )
180   , (luxiReqSetWatcherPause,
181      [ ("duration", [t| Double |]) ]
182     )
183   ])
184
185 $(makeJSONInstance ''LuxiReq)
186
187 -- | List of all defined Luxi calls.
188 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
189
190 -- | The serialisation of LuxiOps into strings in messages.
191 $(genStrOfOp ''LuxiOp "strOfOp")
192
193 -- | Type holding the initial (unparsed) Luxi call.
194 data LuxiCall = LuxiCall LuxiReq JSValue
195
196 -- | The end-of-message separator.
197 eOM :: Word8
198 eOM = 3
199
200 -- | The end-of-message encoded as a ByteString.
201 bEOM :: B.ByteString
202 bEOM = B.singleton eOM
203
204 -- | Valid keys in the requests and responses.
205 data MsgKeys = Method
206              | Args
207              | Success
208              | Result
209
210 -- | The serialisation of MsgKeys into strings in messages.
211 $(genStrOfKey ''MsgKeys "strOfKey")
212
213 -- | Luxi client encapsulation.
214 data Client = Client { socket :: Handle           -- ^ The socket of the client
215                      , rbuf :: IORef B.ByteString -- ^ Already received buffer
216                      }
217
218 -- | Connects to the master daemon and returns a luxi Client.
219 getClient :: String -> IO Client
220 getClient path = do
221   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
222   withTimeout luxiDefCtmo "creating luxi connection" $
223               S.connect s (S.SockAddrUnix path)
224   rf <- newIORef B.empty
225   h <- S.socketToHandle s ReadWriteMode
226   return Client { socket=h, rbuf=rf }
227
228 -- | Creates and returns a server endpoint.
229 getServer :: FilePath -> IO S.Socket
230 getServer path = do
231   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
232   S.bindSocket s (S.SockAddrUnix path)
233   S.listen s 5 -- 5 is the max backlog
234   return s
235
236 -- | Closes a server endpoint.
237 -- FIXME: this should be encapsulated into a nicer type.
238 closeServer :: FilePath -> S.Socket -> IO ()
239 closeServer path sock = do
240   S.sClose sock
241   removeFile path
242
243 -- | Accepts a client
244 acceptClient :: S.Socket -> IO Client
245 acceptClient s = do
246   -- second return is the address of the client, which we ignore here
247   (client_socket, _) <- S.accept s
248   new_buffer <- newIORef B.empty
249   handle <- S.socketToHandle client_socket ReadWriteMode
250   return Client { socket=handle, rbuf=new_buffer }
251
252 -- | Closes the client socket.
253 closeClient :: Client -> IO ()
254 closeClient = hClose . socket
255
256 -- | Sends a message over a luxi transport.
257 sendMsg :: Client -> String -> IO ()
258 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
259   let encoded = UTF8.fromString buf
260       handle = socket s
261   B.hPut handle encoded
262   B.hPut handle bEOM
263   hFlush handle
264
265 -- | Given a current buffer and the handle, it will read from the
266 -- network until we get a full message, and it will return that
267 -- message and the leftover buffer contents.
268 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
269 recvUpdate handle obuf = do
270   nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
271             _ <- hWaitForInput handle (-1)
272             B.hGetNonBlocking handle 4096
273   let (msg, remaining) = B.break (eOM ==) nbuf
274       newbuf = B.append obuf msg
275   if B.null remaining
276     then recvUpdate handle newbuf
277     else return (newbuf, B.tail remaining)
278
279 -- | Waits for a message over a luxi transport.
280 recvMsg :: Client -> IO String
281 recvMsg s = do
282   cbuf <- readIORef $ rbuf s
283   let (imsg, ibuf) = B.break (eOM ==) cbuf
284   (msg, nbuf) <-
285     if B.null ibuf      -- if old buffer didn't contain a full message
286       then recvUpdate (socket s) cbuf   -- then we read from network
287       else return (imsg, B.tail ibuf)   -- else we return data from our buffer
288   writeIORef (rbuf s) nbuf
289   return $ UTF8.toString msg
290
291 -- | Extended wrapper over recvMsg.
292 recvMsgExt :: Client -> IO RecvResult
293 recvMsgExt s =
294   catch (liftM RecvOk (recvMsg s)) $ \e ->
295     return $ if isEOFError e
296                then RecvConnClosed
297                else RecvError (show e)
298
299 -- | Serialize a request to String.
300 buildCall :: LuxiOp  -- ^ The method
301           -> String  -- ^ The serialized form
302 buildCall lo =
303   let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
304            , (strOfKey Args, opToArgs lo)
305            ]
306       jo = toJSObject ja
307   in encodeStrict jo
308
309 -- | Serialize the response to String.
310 buildResponse :: Bool    -- ^ Success
311               -> JSValue -- ^ The arguments
312               -> String  -- ^ The serialized form
313 buildResponse success args =
314   let ja = [ (strOfKey Success, JSBool success)
315            , (strOfKey Result, args)]
316       jo = toJSObject ja
317   in encodeStrict jo
318
319 -- | Check that luxi request contains the required keys and parse it.
320 validateCall :: String -> Result LuxiCall
321 validateCall s = do
322   arr <- fromJResult "parsing top-level luxi message" $
323          decodeStrict s::Result (JSObject JSValue)
324   let aobj = fromJSObject arr
325   call <- fromObj aobj (strOfKey Method)::Result LuxiReq
326   args <- fromObj aobj (strOfKey Args)
327   return (LuxiCall call args)
328
329 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
330 --
331 -- This is currently hand-coded until we make it more uniform so that
332 -- it can be generated using TH.
333 decodeCall :: LuxiCall -> Result LuxiOp
334 decodeCall (LuxiCall call args) =
335   case call of
336     ReqQueryJobs -> do
337               (jid, jargs) <- fromJVal args
338               rid <- mapM parseJobId jid
339               let rargs = map fromJSString jargs
340               return $ QueryJobs rid rargs
341     ReqQueryInstances -> do
342               (names, fields, locking) <- fromJVal args
343               return $ QueryInstances names fields locking
344     ReqQueryNodes -> do
345               (names, fields, locking) <- fromJVal args
346               return $ QueryNodes names fields locking
347     ReqQueryGroups -> do
348               (names, fields, locking) <- fromJVal args
349               return $ QueryGroups names fields locking
350     ReqQueryClusterInfo ->
351               return QueryClusterInfo
352     ReqQuery -> do
353               (what, fields, qfilter) <- fromJVal args
354               return $ Query what fields qfilter
355     ReqQueryFields -> do
356               (what, fields) <- fromJVal args
357               fields' <- case fields of
358                            JSNull -> return []
359                            _ -> fromJVal fields
360               return $ QueryFields what fields'
361     ReqSubmitJob -> do
362               [ops1] <- fromJVal args
363               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
364               return $ SubmitJob ops2
365     ReqSubmitManyJobs -> do
366               [ops1] <- fromJVal args
367               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
368               return $ SubmitManyJobs ops2
369     ReqWaitForJobChange -> do
370               (jid, fields, pinfo, pidx, wtmout) <-
371                 -- No instance for 5-tuple, code copied from the
372                 -- json sources and adapted
373                 fromJResult "Parsing WaitForJobChange message" $
374                 case args of
375                   JSArray [a, b, c, d, e] ->
376                     (,,,,) `fmap`
377                     J.readJSON a `ap`
378                     J.readJSON b `ap`
379                     J.readJSON c `ap`
380                     J.readJSON d `ap`
381                     J.readJSON e
382                   _ -> J.Error "Not enough values"
383               rid <- parseJobId jid
384               return $ WaitForJobChange rid fields pinfo pidx wtmout
385     ReqArchiveJob -> do
386               [jid] <- fromJVal args
387               rid <- parseJobId jid
388               return $ ArchiveJob rid
389     ReqAutoArchiveJobs -> do
390               (age, tmout) <- fromJVal args
391               return $ AutoArchiveJobs age tmout
392     ReqQueryExports -> do
393               (nodes, lock) <- fromJVal args
394               return $ QueryExports nodes lock
395     ReqQueryConfigValues -> do
396               [fields] <- fromJVal args
397               return $ QueryConfigValues fields
398     ReqQueryTags -> do
399               (kind, name) <- fromJVal args
400               return $ QueryTags kind name
401     ReqCancelJob -> do
402               [job] <- fromJVal args
403               rid <- parseJobId job
404               return $ CancelJob rid
405     ReqSetDrainFlag -> do
406               [flag] <- fromJVal args
407               return $ SetDrainFlag flag
408     ReqSetWatcherPause -> do
409               [duration] <- fromJVal args
410               return $ SetWatcherPause duration
411
412 -- | Check that luxi responses contain the required keys and that the
413 -- call was successful.
414 validateResult :: String -> ErrorResult JSValue
415 validateResult s = do
416   when (UTF8.replacement_char `elem` s) $
417        fail "Failed to decode UTF-8, detected replacement char after decoding"
418   oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
419   let arr = J.fromJSObject oarr
420   status <- fromObj arr (strOfKey Success)
421   result <- fromObj arr (strOfKey Result)
422   if status
423     then return result
424     else decodeError result
425
426 -- | Try to decode an error from the server response. This function
427 -- will always fail, since it's called only on the error path (when
428 -- status is False).
429 decodeError :: JSValue -> ErrorResult JSValue
430 decodeError val =
431   case fromJVal val of
432     Ok e -> Bad e
433     Bad msg -> Bad $ GenericError msg
434
435 -- | Generic luxi method call.
436 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
437 callMethod method s = do
438   sendMsg s $ buildCall method
439   result <- recvMsg s
440   let rval = validateResult result
441   return rval
442
443 -- | Parses a job ID.
444 parseJobId :: JSValue -> Result JobId
445 parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
446 parseJobId (JSRational _ x) =
447   if denominator x /= 1
448     then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
449     -- FIXME: potential integer overflow here on 32-bit platforms
450     else Ok . fromIntegral . numerator $ x
451 parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
452
453 -- | Parse job submission result.
454 parseSubmitJobResult :: JSValue -> ErrorResult JobId
455 parseSubmitJobResult (JSArray [JSBool True, v]) =
456   case parseJobId v of
457     Bad msg -> Bad $ LuxiError msg
458     Ok v' -> Ok v'
459 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
460   Bad . LuxiError $ fromJSString x
461 parseSubmitJobResult v =
462   Bad . LuxiError $ "Unknown result from the master daemon: " ++
463       show (pp_value v)
464
465 -- | Specialized submitManyJobs call.
466 submitManyJobs :: Client -> [[OpCode]] -> IO (ErrorResult [JobId])
467 submitManyJobs s jobs = do
468   rval <- callMethod (SubmitManyJobs jobs) s
469   -- map each result (status, payload) pair into a nice Result ADT
470   return $ case rval of
471              Bad x -> Bad x
472              Ok (JSArray r) -> mapM parseSubmitJobResult r
473              x -> Bad . LuxiError $
474                   "Cannot parse response from Ganeti: " ++ show x
475
476 -- | Custom queryJobs call.
477 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
478 queryJobsStatus s jids = do
479   rval <- callMethod (QueryJobs jids ["status"]) s
480   return $ case rval of
481              Bad x -> Bad x
482              Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
483                        J.Ok vals -> if any null vals
484                                     then Bad $
485                                          LuxiError "Missing job status field"
486                                     else Ok (map head vals)
487                        J.Error x -> Bad $ LuxiError x