84030daf5ffa4220a8bb28818476c3db530de14e
[ganeti-local] / src / Ganeti / Luxi.hs
1 {-# LANGUAGE TemplateHaskell #-}
2
3 {-| Implementation of the Ganeti LUXI interface.
4
5 -}
6
7 {-
8
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
15
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 02110-1301, USA.
25
26 -}
27
28 module Ganeti.Luxi
29   ( LuxiOp(..)
30   , LuxiReq(..)
31   , Client
32   , JobId
33   , fromJobId
34   , makeJobId
35   , RecvResult(..)
36   , strOfOp
37   , getClient
38   , getServer
39   , acceptClient
40   , closeClient
41   , closeServer
42   , callMethod
43   , submitManyJobs
44   , queryJobsStatus
45   , buildCall
46   , buildResponse
47   , validateCall
48   , decodeCall
49   , recvMsg
50   , recvMsgExt
51   , sendMsg
52   , allLuxiCalls
53   ) where
54
55 import Control.Exception (catch)
56 import Data.IORef
57 import qualified Data.ByteString as B
58 import qualified Data.ByteString.Lazy as BL
59 import qualified Data.ByteString.UTF8 as UTF8
60 import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61 import Data.Word (Word8)
62 import Control.Monad
63 import Text.JSON (encodeStrict, decodeStrict)
64 import qualified Text.JSON as J
65 import Text.JSON.Pretty (pp_value)
66 import Text.JSON.Types
67 import System.Directory (removeFile)
68 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69 import System.IO.Error (isEOFError)
70 import System.Timeout
71 import qualified Network.Socket as S
72
73 import Ganeti.BasicTypes
74 import Ganeti.Constants
75 import Ganeti.Errors
76 import Ganeti.JSON
77 import Ganeti.OpParams (pTagsObject)
78 import Ganeti.OpCodes
79 import qualified Ganeti.Query.Language as Qlang
80 import Ganeti.THH
81 import Ganeti.Types
82
83 -- * Utility functions
84
85 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
86 withTimeout :: Int -> String -> IO a -> IO a
87 withTimeout secs descr action = do
88   result <- timeout (secs * 1000000) action
89   case result of
90     Nothing -> fail $ "Timeout in " ++ descr
91     Just v -> return v
92
93 -- * Generic protocol functionality
94
95 -- | Result of receiving a message from the socket.
96 data RecvResult = RecvConnClosed    -- ^ Connection closed
97                 | RecvError String  -- ^ Any other error
98                 | RecvOk String     -- ^ Successfull receive
99                   deriving (Show, Eq)
100
101 -- | Currently supported Luxi operations and JSON serialization.
102 $(genLuxiOp "LuxiOp"
103   [ (luxiReqQuery,
104     [ simpleField "what"    [t| Qlang.ItemType |]
105     , simpleField "fields"  [t| [String]  |]
106     , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
107     ])
108   , (luxiReqQueryFields,
109     [ simpleField "what"    [t| Qlang.ItemType |]
110     , simpleField "fields"  [t| [String]  |]
111     ])
112   , (luxiReqQueryNodes,
113      [ simpleField "names"  [t| [String] |]
114      , simpleField "fields" [t| [String] |]
115      , simpleField "lock"   [t| Bool     |]
116      ])
117   , (luxiReqQueryGroups,
118      [ simpleField "names"  [t| [String] |]
119      , simpleField "fields" [t| [String] |]
120      , simpleField "lock"   [t| Bool     |]
121      ])
122   , (luxiReqQueryNetworks,
123      [ simpleField "names"  [t| [String] |]
124      , simpleField "fields" [t| [String] |]
125      , simpleField "lock"   [t| Bool     |]
126      ])
127   , (luxiReqQueryInstances,
128      [ simpleField "names"  [t| [String] |]
129      , simpleField "fields" [t| [String] |]
130      , simpleField "lock"   [t| Bool     |]
131      ])
132   , (luxiReqQueryJobs,
133      [ simpleField "ids"    [t| [JobId]  |]
134      , simpleField "fields" [t| [String] |]
135      ])
136   , (luxiReqQueryExports,
137      [ simpleField "nodes" [t| [String] |]
138      , simpleField "lock"  [t| Bool     |]
139      ])
140   , (luxiReqQueryConfigValues,
141      [ simpleField "fields" [t| [String] |] ]
142     )
143   , (luxiReqQueryClusterInfo, [])
144   , (luxiReqQueryTags,
145      [ pTagsObject ])
146   , (luxiReqSubmitJob,
147      [ simpleField "job" [t| [MetaOpCode] |] ]
148     )
149   , (luxiReqSubmitManyJobs,
150      [ simpleField "ops" [t| [[MetaOpCode]] |] ]
151     )
152   , (luxiReqWaitForJobChange,
153      [ simpleField "job"      [t| JobId   |]
154      , simpleField "fields"   [t| [String]|]
155      , simpleField "prev_job" [t| JSValue |]
156      , simpleField "prev_log" [t| JSValue |]
157      , simpleField "tmout"    [t| Int     |]
158      ])
159   , (luxiReqArchiveJob,
160      [ simpleField "job" [t| JobId |] ]
161     )
162   , (luxiReqAutoArchiveJobs,
163      [ simpleField "age"   [t| Int |]
164      , simpleField "tmout" [t| Int |]
165      ])
166   , (luxiReqCancelJob,
167      [ simpleField "job" [t| JobId |] ]
168     )
169   , (luxiReqChangeJobPriority,
170      [ simpleField "job"      [t| JobId |]
171      , simpleField "priority" [t| Int |] ]
172     )
173   , (luxiReqSetDrainFlag,
174      [ simpleField "flag" [t| Bool |] ]
175     )
176   , (luxiReqSetWatcherPause,
177      [ simpleField "duration" [t| Double |] ]
178     )
179   ])
180
181 $(makeJSONInstance ''LuxiReq)
182
183 -- | List of all defined Luxi calls.
184 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
185
186 -- | The serialisation of LuxiOps into strings in messages.
187 $(genStrOfOp ''LuxiOp "strOfOp")
188
189 -- | Type holding the initial (unparsed) Luxi call.
190 data LuxiCall = LuxiCall LuxiReq JSValue
191
192 -- | The end-of-message separator.
193 eOM :: Word8
194 eOM = 3
195
196 -- | The end-of-message encoded as a ByteString.
197 bEOM :: B.ByteString
198 bEOM = B.singleton eOM
199
200 -- | Valid keys in the requests and responses.
201 data MsgKeys = Method
202              | Args
203              | Success
204              | Result
205
206 -- | The serialisation of MsgKeys into strings in messages.
207 $(genStrOfKey ''MsgKeys "strOfKey")
208
209 -- | Luxi client encapsulation.
210 data Client = Client { socket :: Handle           -- ^ The socket of the client
211                      , rbuf :: IORef B.ByteString -- ^ Already received buffer
212                      }
213
214 -- | Connects to the master daemon and returns a luxi Client.
215 getClient :: String -> IO Client
216 getClient path = do
217   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
218   withTimeout luxiDefCtmo "creating luxi connection" $
219               S.connect s (S.SockAddrUnix path)
220   rf <- newIORef B.empty
221   h <- S.socketToHandle s ReadWriteMode
222   return Client { socket=h, rbuf=rf }
223
224 -- | Creates and returns a server endpoint.
225 getServer :: FilePath -> IO S.Socket
226 getServer path = do
227   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
228   S.bindSocket s (S.SockAddrUnix path)
229   S.listen s 5 -- 5 is the max backlog
230   return s
231
232 -- | Closes a server endpoint.
233 -- FIXME: this should be encapsulated into a nicer type.
234 closeServer :: FilePath -> S.Socket -> IO ()
235 closeServer path sock = do
236   S.sClose sock
237   removeFile path
238
239 -- | Accepts a client
240 acceptClient :: S.Socket -> IO Client
241 acceptClient s = do
242   -- second return is the address of the client, which we ignore here
243   (client_socket, _) <- S.accept s
244   new_buffer <- newIORef B.empty
245   handle <- S.socketToHandle client_socket ReadWriteMode
246   return Client { socket=handle, rbuf=new_buffer }
247
248 -- | Closes the client socket.
249 closeClient :: Client -> IO ()
250 closeClient = hClose . socket
251
252 -- | Sends a message over a luxi transport.
253 sendMsg :: Client -> String -> IO ()
254 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
255   let encoded = UTF8L.fromString buf
256       handle = socket s
257   BL.hPut handle encoded
258   B.hPut handle bEOM
259   hFlush handle
260
261 -- | Given a current buffer and the handle, it will read from the
262 -- network until we get a full message, and it will return that
263 -- message and the leftover buffer contents.
264 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
265 recvUpdate handle obuf = do
266   nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
267             _ <- hWaitForInput handle (-1)
268             B.hGetNonBlocking handle 4096
269   let (msg, remaining) = B.break (eOM ==) nbuf
270       newbuf = B.append obuf msg
271   if B.null remaining
272     then recvUpdate handle newbuf
273     else return (newbuf, B.tail remaining)
274
275 -- | Waits for a message over a luxi transport.
276 recvMsg :: Client -> IO String
277 recvMsg s = do
278   cbuf <- readIORef $ rbuf s
279   let (imsg, ibuf) = B.break (eOM ==) cbuf
280   (msg, nbuf) <-
281     if B.null ibuf      -- if old buffer didn't contain a full message
282       then recvUpdate (socket s) cbuf   -- then we read from network
283       else return (imsg, B.tail ibuf)   -- else we return data from our buffer
284   writeIORef (rbuf s) nbuf
285   return $ UTF8.toString msg
286
287 -- | Extended wrapper over recvMsg.
288 recvMsgExt :: Client -> IO RecvResult
289 recvMsgExt s =
290   Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
291     return $ if isEOFError e
292                then RecvConnClosed
293                else RecvError (show e)
294
295 -- | Serialize a request to String.
296 buildCall :: LuxiOp  -- ^ The method
297           -> String  -- ^ The serialized form
298 buildCall lo =
299   let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
300            , (strOfKey Args, opToArgs lo)
301            ]
302       jo = toJSObject ja
303   in encodeStrict jo
304
305 -- | Serialize the response to String.
306 buildResponse :: Bool    -- ^ Success
307               -> JSValue -- ^ The arguments
308               -> String  -- ^ The serialized form
309 buildResponse success args =
310   let ja = [ (strOfKey Success, JSBool success)
311            , (strOfKey Result, args)]
312       jo = toJSObject ja
313   in encodeStrict jo
314
315 -- | Check that luxi request contains the required keys and parse it.
316 validateCall :: String -> Result LuxiCall
317 validateCall s = do
318   arr <- fromJResult "parsing top-level luxi message" $
319          decodeStrict s::Result (JSObject JSValue)
320   let aobj = fromJSObject arr
321   call <- fromObj aobj (strOfKey Method)::Result LuxiReq
322   args <- fromObj aobj (strOfKey Args)
323   return (LuxiCall call args)
324
325 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
326 --
327 -- This is currently hand-coded until we make it more uniform so that
328 -- it can be generated using TH.
329 decodeCall :: LuxiCall -> Result LuxiOp
330 decodeCall (LuxiCall call args) =
331   case call of
332     ReqQueryJobs -> do
333               (jids, jargs) <- fromJVal args
334               jids' <- case jids of
335                          JSNull -> return []
336                          _ -> fromJVal jids
337               return $ QueryJobs jids' jargs
338     ReqQueryInstances -> do
339               (names, fields, locking) <- fromJVal args
340               return $ QueryInstances names fields locking
341     ReqQueryNodes -> do
342               (names, fields, locking) <- fromJVal args
343               return $ QueryNodes names fields locking
344     ReqQueryGroups -> do
345               (names, fields, locking) <- fromJVal args
346               return $ QueryGroups names fields locking
347     ReqQueryClusterInfo ->
348               return QueryClusterInfo
349     ReqQueryNetworks -> do
350               (names, fields, locking) <- fromJVal args
351               return $ QueryNetworks names fields locking
352     ReqQuery -> do
353               (what, fields, qfilter) <- fromJVal args
354               return $ Query what fields qfilter
355     ReqQueryFields -> do
356               (what, fields) <- fromJVal args
357               fields' <- case fields of
358                            JSNull -> return []
359                            _ -> fromJVal fields
360               return $ QueryFields what fields'
361     ReqSubmitJob -> do
362               [ops1] <- fromJVal args
363               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
364               return $ SubmitJob ops2
365     ReqSubmitManyJobs -> do
366               [ops1] <- fromJVal args
367               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
368               return $ SubmitManyJobs ops2
369     ReqWaitForJobChange -> do
370               (jid, fields, pinfo, pidx, wtmout) <-
371                 -- No instance for 5-tuple, code copied from the
372                 -- json sources and adapted
373                 fromJResult "Parsing WaitForJobChange message" $
374                 case args of
375                   JSArray [a, b, c, d, e] ->
376                     (,,,,) `fmap`
377                     J.readJSON a `ap`
378                     J.readJSON b `ap`
379                     J.readJSON c `ap`
380                     J.readJSON d `ap`
381                     J.readJSON e
382                   _ -> J.Error "Not enough values"
383               return $ WaitForJobChange jid fields pinfo pidx wtmout
384     ReqArchiveJob -> do
385               [jid] <- fromJVal args
386               return $ ArchiveJob jid
387     ReqAutoArchiveJobs -> do
388               (age, tmout) <- fromJVal args
389               return $ AutoArchiveJobs age tmout
390     ReqQueryExports -> do
391               (nodes, lock) <- fromJVal args
392               return $ QueryExports nodes lock
393     ReqQueryConfigValues -> do
394               [fields] <- fromJVal args
395               return $ QueryConfigValues fields
396     ReqQueryTags -> do
397               (kind, name) <- fromJVal args
398               item <- tagObjectFrom kind name
399               return $ QueryTags item
400     ReqCancelJob -> do
401               [jid] <- fromJVal args
402               return $ CancelJob jid
403     ReqChangeJobPriority -> do
404               (jid, priority) <- fromJVal args
405               return $ ChangeJobPriority jid priority
406     ReqSetDrainFlag -> do
407               [flag] <- fromJVal args
408               return $ SetDrainFlag flag
409     ReqSetWatcherPause -> do
410               [duration] <- fromJVal args
411               return $ SetWatcherPause duration
412
413 -- | Check that luxi responses contain the required keys and that the
414 -- call was successful.
415 validateResult :: String -> ErrorResult JSValue
416 validateResult s = do
417   when (UTF8.replacement_char `elem` s) $
418        fail "Failed to decode UTF-8, detected replacement char after decoding"
419   oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
420   let arr = J.fromJSObject oarr
421   status <- fromObj arr (strOfKey Success)
422   result <- fromObj arr (strOfKey Result)
423   if status
424     then return result
425     else decodeError result
426
427 -- | Try to decode an error from the server response. This function
428 -- will always fail, since it's called only on the error path (when
429 -- status is False).
430 decodeError :: JSValue -> ErrorResult JSValue
431 decodeError val =
432   case fromJVal val of
433     Ok e -> Bad e
434     Bad msg -> Bad $ GenericError msg
435
436 -- | Generic luxi method call.
437 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
438 callMethod method s = do
439   sendMsg s $ buildCall method
440   result <- recvMsg s
441   let rval = validateResult result
442   return rval
443
444 -- | Parse job submission result.
445 parseSubmitJobResult :: JSValue -> ErrorResult JobId
446 parseSubmitJobResult (JSArray [JSBool True, v]) =
447   case J.readJSON v of
448     J.Error msg -> Bad $ LuxiError msg
449     J.Ok v' -> Ok v'
450 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
451   Bad . LuxiError $ fromJSString x
452 parseSubmitJobResult v =
453   Bad . LuxiError $ "Unknown result from the master daemon: " ++
454       show (pp_value v)
455
456 -- | Specialized submitManyJobs call.
457 submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
458 submitManyJobs s jobs = do
459   rval <- callMethod (SubmitManyJobs jobs) s
460   -- map each result (status, payload) pair into a nice Result ADT
461   return $ case rval of
462              Bad x -> Bad x
463              Ok (JSArray r) -> mapM parseSubmitJobResult r
464              x -> Bad . LuxiError $
465                   "Cannot parse response from Ganeti: " ++ show x
466
467 -- | Custom queryJobs call.
468 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
469 queryJobsStatus s jids = do
470   rval <- callMethod (QueryJobs jids ["status"]) s
471   return $ case rval of
472              Bad x -> Bad x
473              Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
474                        J.Ok vals -> if any null vals
475                                     then Bad $
476                                          LuxiError "Missing job status field"
477                                     else Ok (map head vals)
478                        J.Error x -> Bad $ LuxiError x