Make Query operators enforce strictness
[ganeti-local] / htools / Ganeti / Luxi.hs
1 {-# LANGUAGE TemplateHaskell #-}
2
3 {-| Implementation of the Ganeti LUXI interface.
4
5 -}
6
7 {-
8
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
15
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 02110-1301, USA.
25
26 -}
27
28 module Ganeti.Luxi
29   ( LuxiOp(..)
30   , LuxiReq(..)
31   , Client
32   , JobId
33   , RecvResult(..)
34   , strOfOp
35   , getClient
36   , getServer
37   , acceptClient
38   , closeClient
39   , closeServer
40   , callMethod
41   , submitManyJobs
42   , queryJobsStatus
43   , buildCall
44   , buildResponse
45   , validateCall
46   , decodeCall
47   , recvMsg
48   , recvMsgExt
49   , sendMsg
50   , allLuxiCalls
51   ) where
52
53 import Control.Exception (catch)
54 import Data.IORef
55 import Data.Ratio (numerator, denominator)
56 import qualified Data.ByteString as B
57 import qualified Data.ByteString.UTF8 as UTF8
58 import Data.Word (Word8)
59 import Control.Monad
60 import Text.JSON (encodeStrict, decodeStrict)
61 import qualified Text.JSON as J
62 import Text.JSON.Pretty (pp_value)
63 import Text.JSON.Types
64 import System.Directory (removeFile)
65 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
66 import System.IO.Error (isEOFError)
67 import System.Timeout
68 import qualified Network.Socket as S
69
70 import Ganeti.BasicTypes
71 import Ganeti.Constants
72 import Ganeti.Errors
73 import Ganeti.JSON
74 import Ganeti.Jobs (JobStatus)
75 import Ganeti.OpCodes
76 import Ganeti.Utils
77 import qualified Ganeti.Query.Language as Qlang
78 import Ganeti.THH
79
80 -- * Utility functions
81
82 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
83 withTimeout :: Int -> String -> IO a -> IO a
84 withTimeout secs descr action = do
85   result <- timeout (secs * 1000000) action
86   case result of
87     Nothing -> fail $ "Timeout in " ++ descr
88     Just v -> return v
89
90 -- * Generic protocol functionality
91
92 -- | Result of receiving a message from the socket.
93 data RecvResult = RecvConnClosed    -- ^ Connection closed
94                 | RecvError String  -- ^ Any other error
95                 | RecvOk String     -- ^ Successfull receive
96                   deriving (Show, Eq)
97
98 -- | The Ganeti job type.
99 type JobId = Int
100
101 -- | Currently supported Luxi operations and JSON serialization.
102 $(genLuxiOp "LuxiOp"
103   [ (luxiReqQuery,
104     [ simpleField "what"    [t| Qlang.ItemType |]
105     , simpleField "fields"  [t| [String]  |]
106     , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
107     ])
108   , (luxiReqQueryFields,
109     [ simpleField "what"    [t| Qlang.ItemType |]
110     , simpleField "fields"  [t| [String]  |]
111     ])
112   , (luxiReqQueryNodes,
113      [ simpleField "names"  [t| [String] |]
114      , simpleField "fields" [t| [String] |]
115      , simpleField "lock"   [t| Bool     |]
116      ])
117   , (luxiReqQueryGroups,
118      [ simpleField "names"  [t| [String] |]
119      , simpleField "fields" [t| [String] |]
120      , simpleField "lock"   [t| Bool     |]
121      ])
122   , (luxiReqQueryInstances,
123      [ simpleField "names"  [t| [String] |]
124      , simpleField "fields" [t| [String] |]
125      , simpleField "lock"   [t| Bool     |]
126      ])
127   , (luxiReqQueryJobs,
128      [ simpleField "ids"    [t| [Int]    |]
129      , simpleField "fields" [t| [String] |]
130      ])
131   , (luxiReqQueryExports,
132      [ simpleField "nodes" [t| [String] |]
133      , simpleField "lock"  [t| Bool     |]
134      ])
135   , (luxiReqQueryConfigValues,
136      [ simpleField "fields" [t| [String] |] ]
137     )
138   , (luxiReqQueryClusterInfo, [])
139   , (luxiReqQueryTags,
140      [ customField 'decodeTagObject 'encodeTagObject $
141        simpleField "kind" [t| TagObject |]
142      ])
143   , (luxiReqSubmitJob,
144      [ simpleField "job" [t| [OpCode] |] ]
145     )
146   , (luxiReqSubmitManyJobs,
147      [ simpleField "ops" [t| [[OpCode]] |] ]
148     )
149   , (luxiReqWaitForJobChange,
150      [ simpleField "job"      [t| Int     |]
151      , simpleField "fields"   [t| [String]|]
152      , simpleField "prev_job" [t| JSValue |]
153      , simpleField "prev_log" [t| JSValue |]
154      , simpleField "tmout"    [t| Int     |]
155      ])
156   , (luxiReqArchiveJob,
157      [ simpleField "job" [t| Int |] ]
158     )
159   , (luxiReqAutoArchiveJobs,
160      [ simpleField "age"   [t| Int |]
161      , simpleField "tmout" [t| Int |]
162      ])
163   , (luxiReqCancelJob,
164      [ simpleField "job" [t| Int |] ]
165     )
166   , (luxiReqChangeJobPriority,
167      [ simpleField "job" [t| Int |]
168      , simpleField "priority" [t| Int |] ]
169     )
170   , (luxiReqSetDrainFlag,
171      [ simpleField "flag" [t| Bool |] ]
172     )
173   , (luxiReqSetWatcherPause,
174      [ simpleField "duration" [t| Double |] ]
175     )
176   ])
177
178 $(makeJSONInstance ''LuxiReq)
179
180 -- | List of all defined Luxi calls.
181 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
182
183 -- | The serialisation of LuxiOps into strings in messages.
184 $(genStrOfOp ''LuxiOp "strOfOp")
185
186 -- | Type holding the initial (unparsed) Luxi call.
187 data LuxiCall = LuxiCall LuxiReq JSValue
188
189 -- | The end-of-message separator.
190 eOM :: Word8
191 eOM = 3
192
193 -- | The end-of-message encoded as a ByteString.
194 bEOM :: B.ByteString
195 bEOM = B.singleton eOM
196
197 -- | Valid keys in the requests and responses.
198 data MsgKeys = Method
199              | Args
200              | Success
201              | Result
202
203 -- | The serialisation of MsgKeys into strings in messages.
204 $(genStrOfKey ''MsgKeys "strOfKey")
205
206 -- | Luxi client encapsulation.
207 data Client = Client { socket :: Handle           -- ^ The socket of the client
208                      , rbuf :: IORef B.ByteString -- ^ Already received buffer
209                      }
210
211 -- | Connects to the master daemon and returns a luxi Client.
212 getClient :: String -> IO Client
213 getClient path = do
214   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
215   withTimeout luxiDefCtmo "creating luxi connection" $
216               S.connect s (S.SockAddrUnix path)
217   rf <- newIORef B.empty
218   h <- S.socketToHandle s ReadWriteMode
219   return Client { socket=h, rbuf=rf }
220
221 -- | Creates and returns a server endpoint.
222 getServer :: FilePath -> IO S.Socket
223 getServer path = do
224   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
225   S.bindSocket s (S.SockAddrUnix path)
226   S.listen s 5 -- 5 is the max backlog
227   return s
228
229 -- | Closes a server endpoint.
230 -- FIXME: this should be encapsulated into a nicer type.
231 closeServer :: FilePath -> S.Socket -> IO ()
232 closeServer path sock = do
233   S.sClose sock
234   removeFile path
235
236 -- | Accepts a client
237 acceptClient :: S.Socket -> IO Client
238 acceptClient s = do
239   -- second return is the address of the client, which we ignore here
240   (client_socket, _) <- S.accept s
241   new_buffer <- newIORef B.empty
242   handle <- S.socketToHandle client_socket ReadWriteMode
243   return Client { socket=handle, rbuf=new_buffer }
244
245 -- | Closes the client socket.
246 closeClient :: Client -> IO ()
247 closeClient = hClose . socket
248
249 -- | Sends a message over a luxi transport.
250 sendMsg :: Client -> String -> IO ()
251 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
252   let encoded = UTF8.fromString buf
253       handle = socket s
254   B.hPut handle encoded
255   B.hPut handle bEOM
256   hFlush handle
257
258 -- | Given a current buffer and the handle, it will read from the
259 -- network until we get a full message, and it will return that
260 -- message and the leftover buffer contents.
261 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
262 recvUpdate handle obuf = do
263   nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
264             _ <- hWaitForInput handle (-1)
265             B.hGetNonBlocking handle 4096
266   let (msg, remaining) = B.break (eOM ==) nbuf
267       newbuf = B.append obuf msg
268   if B.null remaining
269     then recvUpdate handle newbuf
270     else return (newbuf, B.tail remaining)
271
272 -- | Waits for a message over a luxi transport.
273 recvMsg :: Client -> IO String
274 recvMsg s = do
275   cbuf <- readIORef $ rbuf s
276   let (imsg, ibuf) = B.break (eOM ==) cbuf
277   (msg, nbuf) <-
278     if B.null ibuf      -- if old buffer didn't contain a full message
279       then recvUpdate (socket s) cbuf   -- then we read from network
280       else return (imsg, B.tail ibuf)   -- else we return data from our buffer
281   writeIORef (rbuf s) nbuf
282   return $ UTF8.toString msg
283
284 -- | Extended wrapper over recvMsg.
285 recvMsgExt :: Client -> IO RecvResult
286 recvMsgExt s =
287   Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
288     return $ if isEOFError e
289                then RecvConnClosed
290                else RecvError (show e)
291
292 -- | Serialize a request to String.
293 buildCall :: LuxiOp  -- ^ The method
294           -> String  -- ^ The serialized form
295 buildCall lo =
296   let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
297            , (strOfKey Args, opToArgs lo)
298            ]
299       jo = toJSObject ja
300   in encodeStrict jo
301
302 -- | Serialize the response to String.
303 buildResponse :: Bool    -- ^ Success
304               -> JSValue -- ^ The arguments
305               -> String  -- ^ The serialized form
306 buildResponse success args =
307   let ja = [ (strOfKey Success, JSBool success)
308            , (strOfKey Result, args)]
309       jo = toJSObject ja
310   in encodeStrict jo
311
312 -- | Check that luxi request contains the required keys and parse it.
313 validateCall :: String -> Result LuxiCall
314 validateCall s = do
315   arr <- fromJResult "parsing top-level luxi message" $
316          decodeStrict s::Result (JSObject JSValue)
317   let aobj = fromJSObject arr
318   call <- fromObj aobj (strOfKey Method)::Result LuxiReq
319   args <- fromObj aobj (strOfKey Args)
320   return (LuxiCall call args)
321
322 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
323 --
324 -- This is currently hand-coded until we make it more uniform so that
325 -- it can be generated using TH.
326 decodeCall :: LuxiCall -> Result LuxiOp
327 decodeCall (LuxiCall call args) =
328   case call of
329     ReqQueryJobs -> do
330               (jid, jargs) <- fromJVal args
331               rid <- mapM parseJobId jid
332               let rargs = map fromJSString jargs
333               return $ QueryJobs rid rargs
334     ReqQueryInstances -> do
335               (names, fields, locking) <- fromJVal args
336               return $ QueryInstances names fields locking
337     ReqQueryNodes -> do
338               (names, fields, locking) <- fromJVal args
339               return $ QueryNodes names fields locking
340     ReqQueryGroups -> do
341               (names, fields, locking) <- fromJVal args
342               return $ QueryGroups names fields locking
343     ReqQueryClusterInfo ->
344               return QueryClusterInfo
345     ReqQuery -> do
346               (what, fields, qfilter) <- fromJVal args
347               return $ Query what fields qfilter
348     ReqQueryFields -> do
349               (what, fields) <- fromJVal args
350               fields' <- case fields of
351                            JSNull -> return []
352                            _ -> fromJVal fields
353               return $ QueryFields what fields'
354     ReqSubmitJob -> do
355               [ops1] <- fromJVal args
356               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
357               return $ SubmitJob ops2
358     ReqSubmitManyJobs -> do
359               [ops1] <- fromJVal args
360               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
361               return $ SubmitManyJobs ops2
362     ReqWaitForJobChange -> do
363               (jid, fields, pinfo, pidx, wtmout) <-
364                 -- No instance for 5-tuple, code copied from the
365                 -- json sources and adapted
366                 fromJResult "Parsing WaitForJobChange message" $
367                 case args of
368                   JSArray [a, b, c, d, e] ->
369                     (,,,,) `fmap`
370                     J.readJSON a `ap`
371                     J.readJSON b `ap`
372                     J.readJSON c `ap`
373                     J.readJSON d `ap`
374                     J.readJSON e
375                   _ -> J.Error "Not enough values"
376               rid <- parseJobId jid
377               return $ WaitForJobChange rid fields pinfo pidx wtmout
378     ReqArchiveJob -> do
379               [jid] <- fromJVal args
380               rid <- parseJobId jid
381               return $ ArchiveJob rid
382     ReqAutoArchiveJobs -> do
383               (age, tmout) <- fromJVal args
384               return $ AutoArchiveJobs age tmout
385     ReqQueryExports -> do
386               (nodes, lock) <- fromJVal args
387               return $ QueryExports nodes lock
388     ReqQueryConfigValues -> do
389               [fields] <- fromJVal args
390               return $ QueryConfigValues fields
391     ReqQueryTags -> do
392               (kind, name) <- fromJVal args
393               item <- tagObjectFrom kind name
394               return $ QueryTags item
395     ReqCancelJob -> do
396               [job] <- fromJVal args
397               rid <- parseJobId job
398               return $ CancelJob rid
399     ReqChangeJobPriority -> do
400               (job, priority) <- fromJVal args
401               rid <- parseJobId job
402               return $ ChangeJobPriority rid priority
403     ReqSetDrainFlag -> do
404               [flag] <- fromJVal args
405               return $ SetDrainFlag flag
406     ReqSetWatcherPause -> do
407               [duration] <- fromJVal args
408               return $ SetWatcherPause duration
409
410 -- | Check that luxi responses contain the required keys and that the
411 -- call was successful.
412 validateResult :: String -> ErrorResult JSValue
413 validateResult s = do
414   when (UTF8.replacement_char `elem` s) $
415        fail "Failed to decode UTF-8, detected replacement char after decoding"
416   oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
417   let arr = J.fromJSObject oarr
418   status <- fromObj arr (strOfKey Success)
419   result <- fromObj arr (strOfKey Result)
420   if status
421     then return result
422     else decodeError result
423
424 -- | Try to decode an error from the server response. This function
425 -- will always fail, since it's called only on the error path (when
426 -- status is False).
427 decodeError :: JSValue -> ErrorResult JSValue
428 decodeError val =
429   case fromJVal val of
430     Ok e -> Bad e
431     Bad msg -> Bad $ GenericError msg
432
433 -- | Generic luxi method call.
434 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
435 callMethod method s = do
436   sendMsg s $ buildCall method
437   result <- recvMsg s
438   let rval = validateResult result
439   return rval
440
441 -- | Parses a job ID.
442 parseJobId :: JSValue -> Result JobId
443 parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
444 parseJobId (JSRational _ x) =
445   if denominator x /= 1
446     then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
447     -- FIXME: potential integer overflow here on 32-bit platforms
448     else Ok . fromIntegral . numerator $ x
449 parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
450
451 -- | Parse job submission result.
452 parseSubmitJobResult :: JSValue -> ErrorResult JobId
453 parseSubmitJobResult (JSArray [JSBool True, v]) =
454   case parseJobId v of
455     Bad msg -> Bad $ LuxiError msg
456     Ok v' -> Ok v'
457 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
458   Bad . LuxiError $ fromJSString x
459 parseSubmitJobResult v =
460   Bad . LuxiError $ "Unknown result from the master daemon: " ++
461       show (pp_value v)
462
463 -- | Specialized submitManyJobs call.
464 submitManyJobs :: Client -> [[OpCode]] -> IO (ErrorResult [JobId])
465 submitManyJobs s jobs = do
466   rval <- callMethod (SubmitManyJobs jobs) s
467   -- map each result (status, payload) pair into a nice Result ADT
468   return $ case rval of
469              Bad x -> Bad x
470              Ok (JSArray r) -> mapM parseSubmitJobResult r
471              x -> Bad . LuxiError $
472                   "Cannot parse response from Ganeti: " ++ show x
473
474 -- | Custom queryJobs call.
475 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
476 queryJobsStatus s jids = do
477   rval <- callMethod (QueryJobs jids ["status"]) s
478   return $ case rval of
479              Bad x -> Bad x
480              Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
481                        J.Ok vals -> if any null vals
482                                     then Bad $
483                                          LuxiError "Missing job status field"
484                                     else Ok (map head vals)
485                        J.Error x -> Bad $ LuxiError x