Add Python opcode generation
[ganeti-local] / src / Ganeti / Luxi.hs
1 {-# LANGUAGE TemplateHaskell #-}
2
3 {-| Implementation of the Ganeti LUXI interface.
4
5 -}
6
7 {-
8
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
15
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 02110-1301, USA.
25
26 -}
27
28 module Ganeti.Luxi
29   ( LuxiOp(..)
30   , LuxiReq(..)
31   , Client
32   , JobId
33   , fromJobId
34   , makeJobId
35   , RecvResult(..)
36   , strOfOp
37   , getClient
38   , getServer
39   , acceptClient
40   , closeClient
41   , closeServer
42   , callMethod
43   , submitManyJobs
44   , queryJobsStatus
45   , buildCall
46   , buildResponse
47   , validateCall
48   , decodeCall
49   , recvMsg
50   , recvMsgExt
51   , sendMsg
52   , allLuxiCalls
53   ) where
54
55 import Control.Exception (catch)
56 import Data.IORef
57 import qualified Data.ByteString as B
58 import qualified Data.ByteString.Lazy as BL
59 import qualified Data.ByteString.UTF8 as UTF8
60 import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61 import Data.Word (Word8)
62 import Control.Monad
63 import Text.JSON (encodeStrict, decodeStrict)
64 import qualified Text.JSON as J
65 import Text.JSON.Pretty (pp_value)
66 import Text.JSON.Types
67 import System.Directory (removeFile)
68 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69 import System.IO.Error (isEOFError)
70 import System.Timeout
71 import qualified Network.Socket as S
72
73 import Ganeti.BasicTypes
74 import Ganeti.Constants
75 import Ganeti.Errors
76 import Ganeti.JSON
77 import Ganeti.OpParams (pTagsObject)
78 import Ganeti.OpCodes
79 import Ganeti.Runtime
80 import qualified Ganeti.Query.Language as Qlang
81 import Ganeti.THH
82 import Ganeti.Types
83 import Ganeti.Utils
84
85 -- * Utility functions
86
87 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
88 withTimeout :: Int -> String -> IO a -> IO a
89 withTimeout secs descr action = do
90   result <- timeout (secs * 1000000) action
91   case result of
92     Nothing -> fail $ "Timeout in " ++ descr
93     Just v -> return v
94
95 -- * Generic protocol functionality
96
97 -- | Result of receiving a message from the socket.
98 data RecvResult = RecvConnClosed    -- ^ Connection closed
99                 | RecvError String  -- ^ Any other error
100                 | RecvOk String     -- ^ Successfull receive
101                   deriving (Show, Eq)
102
103 -- | Currently supported Luxi operations and JSON serialization.
104 $(genLuxiOp "LuxiOp"
105   [ (luxiReqQuery,
106     [ simpleField "what"    [t| Qlang.ItemType |]
107     , simpleField "fields"  [t| [String]  |]
108     , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
109     ])
110   , (luxiReqQueryFields,
111     [ simpleField "what"    [t| Qlang.ItemType |]
112     , simpleField "fields"  [t| [String]  |]
113     ])
114   , (luxiReqQueryNodes,
115      [ simpleField "names"  [t| [String] |]
116      , simpleField "fields" [t| [String] |]
117      , simpleField "lock"   [t| Bool     |]
118      ])
119   , (luxiReqQueryGroups,
120      [ simpleField "names"  [t| [String] |]
121      , simpleField "fields" [t| [String] |]
122      , simpleField "lock"   [t| Bool     |]
123      ])
124   , (luxiReqQueryNetworks,
125      [ simpleField "names"  [t| [String] |]
126      , simpleField "fields" [t| [String] |]
127      , simpleField "lock"   [t| Bool     |]
128      ])
129   , (luxiReqQueryInstances,
130      [ simpleField "names"  [t| [String] |]
131      , simpleField "fields" [t| [String] |]
132      , simpleField "lock"   [t| Bool     |]
133      ])
134   , (luxiReqQueryJobs,
135      [ simpleField "ids"    [t| [JobId]  |]
136      , simpleField "fields" [t| [String] |]
137      ])
138   , (luxiReqQueryExports,
139      [ simpleField "nodes" [t| [String] |]
140      , simpleField "lock"  [t| Bool     |]
141      ])
142   , (luxiReqQueryConfigValues,
143      [ simpleField "fields" [t| [String] |] ]
144     )
145   , (luxiReqQueryClusterInfo, [])
146   , (luxiReqQueryTags,
147      [ pTagsObject 
148      , simpleField "name" [t| String |]
149      ])
150   , (luxiReqSubmitJob,
151      [ simpleField "job" [t| [MetaOpCode] |] ]
152     )
153   , (luxiReqSubmitManyJobs,
154      [ simpleField "ops" [t| [[MetaOpCode]] |] ]
155     )
156   , (luxiReqWaitForJobChange,
157      [ simpleField "job"      [t| JobId   |]
158      , simpleField "fields"   [t| [String]|]
159      , simpleField "prev_job" [t| JSValue |]
160      , simpleField "prev_log" [t| JSValue |]
161      , simpleField "tmout"    [t| Int     |]
162      ])
163   , (luxiReqArchiveJob,
164      [ simpleField "job" [t| JobId |] ]
165     )
166   , (luxiReqAutoArchiveJobs,
167      [ simpleField "age"   [t| Int |]
168      , simpleField "tmout" [t| Int |]
169      ])
170   , (luxiReqCancelJob,
171      [ simpleField "job" [t| JobId |] ]
172     )
173   , (luxiReqChangeJobPriority,
174      [ simpleField "job"      [t| JobId |]
175      , simpleField "priority" [t| Int |] ]
176     )
177   , (luxiReqSetDrainFlag,
178      [ simpleField "flag" [t| Bool |] ]
179     )
180   , (luxiReqSetWatcherPause,
181      [ simpleField "duration" [t| Double |] ]
182     )
183   ])
184
185 $(makeJSONInstance ''LuxiReq)
186
187 -- | List of all defined Luxi calls.
188 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
189
190 -- | The serialisation of LuxiOps into strings in messages.
191 $(genStrOfOp ''LuxiOp "strOfOp")
192
193 -- | Type holding the initial (unparsed) Luxi call.
194 data LuxiCall = LuxiCall LuxiReq JSValue
195
196 -- | The end-of-message separator.
197 eOM :: Word8
198 eOM = 3
199
200 -- | The end-of-message encoded as a ByteString.
201 bEOM :: B.ByteString
202 bEOM = B.singleton eOM
203
204 -- | Valid keys in the requests and responses.
205 data MsgKeys = Method
206              | Args
207              | Success
208              | Result
209
210 -- | The serialisation of MsgKeys into strings in messages.
211 $(genStrOfKey ''MsgKeys "strOfKey")
212
213 -- | Luxi client encapsulation.
214 data Client = Client { socket :: Handle           -- ^ The socket of the client
215                      , rbuf :: IORef B.ByteString -- ^ Already received buffer
216                      }
217
218 -- | Connects to the master daemon and returns a luxi Client.
219 getClient :: String -> IO Client
220 getClient path = do
221   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
222   withTimeout luxiDefCtmo "creating luxi connection" $
223               S.connect s (S.SockAddrUnix path)
224   rf <- newIORef B.empty
225   h <- S.socketToHandle s ReadWriteMode
226   return Client { socket=h, rbuf=rf }
227
228 -- | Creates and returns a server endpoint.
229 getServer :: Bool -> FilePath -> IO S.Socket
230 getServer setOwner path = do
231   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
232   S.bindSocket s (S.SockAddrUnix path)
233   when setOwner . setOwnerAndGroupFromNames path GanetiLuxid $
234     ExtraGroup DaemonsGroup
235   S.listen s 5 -- 5 is the max backlog
236   return s
237
238 -- | Closes a server endpoint.
239 -- FIXME: this should be encapsulated into a nicer type.
240 closeServer :: FilePath -> S.Socket -> IO ()
241 closeServer path sock = do
242   S.sClose sock
243   removeFile path
244
245 -- | Accepts a client
246 acceptClient :: S.Socket -> IO Client
247 acceptClient s = do
248   -- second return is the address of the client, which we ignore here
249   (client_socket, _) <- S.accept s
250   new_buffer <- newIORef B.empty
251   handle <- S.socketToHandle client_socket ReadWriteMode
252   return Client { socket=handle, rbuf=new_buffer }
253
254 -- | Closes the client socket.
255 closeClient :: Client -> IO ()
256 closeClient = hClose . socket
257
258 -- | Sends a message over a luxi transport.
259 sendMsg :: Client -> String -> IO ()
260 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
261   let encoded = UTF8L.fromString buf
262       handle = socket s
263   BL.hPut handle encoded
264   B.hPut handle bEOM
265   hFlush handle
266
267 -- | Given a current buffer and the handle, it will read from the
268 -- network until we get a full message, and it will return that
269 -- message and the leftover buffer contents.
270 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
271 recvUpdate handle obuf = do
272   nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
273             _ <- hWaitForInput handle (-1)
274             B.hGetNonBlocking handle 4096
275   let (msg, remaining) = B.break (eOM ==) nbuf
276       newbuf = B.append obuf msg
277   if B.null remaining
278     then recvUpdate handle newbuf
279     else return (newbuf, B.tail remaining)
280
281 -- | Waits for a message over a luxi transport.
282 recvMsg :: Client -> IO String
283 recvMsg s = do
284   cbuf <- readIORef $ rbuf s
285   let (imsg, ibuf) = B.break (eOM ==) cbuf
286   (msg, nbuf) <-
287     if B.null ibuf      -- if old buffer didn't contain a full message
288       then recvUpdate (socket s) cbuf   -- then we read from network
289       else return (imsg, B.tail ibuf)   -- else we return data from our buffer
290   writeIORef (rbuf s) nbuf
291   return $ UTF8.toString msg
292
293 -- | Extended wrapper over recvMsg.
294 recvMsgExt :: Client -> IO RecvResult
295 recvMsgExt s =
296   Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
297     return $ if isEOFError e
298                then RecvConnClosed
299                else RecvError (show e)
300
301 -- | Serialize a request to String.
302 buildCall :: LuxiOp  -- ^ The method
303           -> String  -- ^ The serialized form
304 buildCall lo =
305   let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
306            , (strOfKey Args, opToArgs lo)
307            ]
308       jo = toJSObject ja
309   in encodeStrict jo
310
311 -- | Serialize the response to String.
312 buildResponse :: Bool    -- ^ Success
313               -> JSValue -- ^ The arguments
314               -> String  -- ^ The serialized form
315 buildResponse success args =
316   let ja = [ (strOfKey Success, JSBool success)
317            , (strOfKey Result, args)]
318       jo = toJSObject ja
319   in encodeStrict jo
320
321 -- | Check that luxi request contains the required keys and parse it.
322 validateCall :: String -> Result LuxiCall
323 validateCall s = do
324   arr <- fromJResult "parsing top-level luxi message" $
325          decodeStrict s::Result (JSObject JSValue)
326   let aobj = fromJSObject arr
327   call <- fromObj aobj (strOfKey Method)::Result LuxiReq
328   args <- fromObj aobj (strOfKey Args)
329   return (LuxiCall call args)
330
331 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
332 --
333 -- This is currently hand-coded until we make it more uniform so that
334 -- it can be generated using TH.
335 decodeCall :: LuxiCall -> Result LuxiOp
336 decodeCall (LuxiCall call args) =
337   case call of
338     ReqQueryJobs -> do
339               (jids, jargs) <- fromJVal args
340               jids' <- case jids of
341                          JSNull -> return []
342                          _ -> fromJVal jids
343               return $ QueryJobs jids' jargs
344     ReqQueryInstances -> do
345               (names, fields, locking) <- fromJVal args
346               return $ QueryInstances names fields locking
347     ReqQueryNodes -> do
348               (names, fields, locking) <- fromJVal args
349               return $ QueryNodes names fields locking
350     ReqQueryGroups -> do
351               (names, fields, locking) <- fromJVal args
352               return $ QueryGroups names fields locking
353     ReqQueryClusterInfo ->
354               return QueryClusterInfo
355     ReqQueryNetworks -> do
356               (names, fields, locking) <- fromJVal args
357               return $ QueryNetworks names fields locking
358     ReqQuery -> do
359               (what, fields, qfilter) <- fromJVal args
360               return $ Query what fields qfilter
361     ReqQueryFields -> do
362               (what, fields) <- fromJVal args
363               fields' <- case fields of
364                            JSNull -> return []
365                            _ -> fromJVal fields
366               return $ QueryFields what fields'
367     ReqSubmitJob -> do
368               [ops1] <- fromJVal args
369               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
370               return $ SubmitJob ops2
371     ReqSubmitManyJobs -> do
372               [ops1] <- fromJVal args
373               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
374               return $ SubmitManyJobs ops2
375     ReqWaitForJobChange -> do
376               (jid, fields, pinfo, pidx, wtmout) <-
377                 -- No instance for 5-tuple, code copied from the
378                 -- json sources and adapted
379                 fromJResult "Parsing WaitForJobChange message" $
380                 case args of
381                   JSArray [a, b, c, d, e] ->
382                     (,,,,) `fmap`
383                     J.readJSON a `ap`
384                     J.readJSON b `ap`
385                     J.readJSON c `ap`
386                     J.readJSON d `ap`
387                     J.readJSON e
388                   _ -> J.Error "Not enough values"
389               return $ WaitForJobChange jid fields pinfo pidx wtmout
390     ReqArchiveJob -> do
391               [jid] <- fromJVal args
392               return $ ArchiveJob jid
393     ReqAutoArchiveJobs -> do
394               (age, tmout) <- fromJVal args
395               return $ AutoArchiveJobs age tmout
396     ReqQueryExports -> do
397               (nodes, lock) <- fromJVal args
398               return $ QueryExports nodes lock
399     ReqQueryConfigValues -> do
400               [fields] <- fromJVal args
401               return $ QueryConfigValues fields
402     ReqQueryTags -> do
403               (kind, name) <- fromJVal args
404               return $ QueryTags kind name
405     ReqCancelJob -> do
406               [jid] <- fromJVal args
407               return $ CancelJob jid
408     ReqChangeJobPriority -> do
409               (jid, priority) <- fromJVal args
410               return $ ChangeJobPriority jid priority
411     ReqSetDrainFlag -> do
412               [flag] <- fromJVal args
413               return $ SetDrainFlag flag
414     ReqSetWatcherPause -> do
415               [duration] <- fromJVal args
416               return $ SetWatcherPause duration
417
418 -- | Check that luxi responses contain the required keys and that the
419 -- call was successful.
420 validateResult :: String -> ErrorResult JSValue
421 validateResult s = do
422   when (UTF8.replacement_char `elem` s) $
423        fail "Failed to decode UTF-8, detected replacement char after decoding"
424   oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
425   let arr = J.fromJSObject oarr
426   status <- fromObj arr (strOfKey Success)
427   result <- fromObj arr (strOfKey Result)
428   if status
429     then return result
430     else decodeError result
431
432 -- | Try to decode an error from the server response. This function
433 -- will always fail, since it's called only on the error path (when
434 -- status is False).
435 decodeError :: JSValue -> ErrorResult JSValue
436 decodeError val =
437   case fromJVal val of
438     Ok e -> Bad e
439     Bad msg -> Bad $ GenericError msg
440
441 -- | Generic luxi method call.
442 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
443 callMethod method s = do
444   sendMsg s $ buildCall method
445   result <- recvMsg s
446   let rval = validateResult result
447   return rval
448
449 -- | Parse job submission result.
450 parseSubmitJobResult :: JSValue -> ErrorResult JobId
451 parseSubmitJobResult (JSArray [JSBool True, v]) =
452   case J.readJSON v of
453     J.Error msg -> Bad $ LuxiError msg
454     J.Ok v' -> Ok v'
455 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
456   Bad . LuxiError $ fromJSString x
457 parseSubmitJobResult v =
458   Bad . LuxiError $ "Unknown result from the master daemon: " ++
459       show (pp_value v)
460
461 -- | Specialized submitManyJobs call.
462 submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
463 submitManyJobs s jobs = do
464   rval <- callMethod (SubmitManyJobs jobs) s
465   -- map each result (status, payload) pair into a nice Result ADT
466   return $ case rval of
467              Bad x -> Bad x
468              Ok (JSArray r) -> mapM parseSubmitJobResult r
469              x -> Bad . LuxiError $
470                   "Cannot parse response from Ganeti: " ++ show x
471
472 -- | Custom queryJobs call.
473 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
474 queryJobsStatus s jids = do
475   rval <- callMethod (QueryJobs jids ["status"]) s
476   return $ case rval of
477              Bad x -> Bad x
478              Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
479                        J.Ok vals -> if any null vals
480                                     then Bad $
481                                          LuxiError "Missing job status field"
482                                     else Ok (map head vals)
483                        J.Error x -> Bad $ LuxiError x