Loader.hs: set instance auto-repair policy in mergeData
[ganeti-local] / src / Ganeti / Luxi.hs
1 {-# LANGUAGE TemplateHaskell #-}
2
3 {-| Implementation of the Ganeti LUXI interface.
4
5 -}
6
7 {-
8
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
15
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 02110-1301, USA.
25
26 -}
27
28 module Ganeti.Luxi
29   ( LuxiOp(..)
30   , LuxiReq(..)
31   , Client
32   , JobId
33   , fromJobId
34   , makeJobId
35   , RecvResult(..)
36   , strOfOp
37   , getClient
38   , getServer
39   , acceptClient
40   , closeClient
41   , closeServer
42   , callMethod
43   , submitManyJobs
44   , queryJobsStatus
45   , buildCall
46   , buildResponse
47   , validateCall
48   , decodeCall
49   , recvMsg
50   , recvMsgExt
51   , sendMsg
52   , allLuxiCalls
53   ) where
54
55 import Control.Exception (catch)
56 import Data.IORef
57 import qualified Data.ByteString as B
58 import qualified Data.ByteString.Lazy as BL
59 import qualified Data.ByteString.UTF8 as UTF8
60 import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61 import Data.Word (Word8)
62 import Control.Monad
63 import Text.JSON (encodeStrict, decodeStrict)
64 import qualified Text.JSON as J
65 import Text.JSON.Pretty (pp_value)
66 import Text.JSON.Types
67 import System.Directory (removeFile)
68 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69 import System.IO.Error (isEOFError)
70 import System.Timeout
71 import qualified Network.Socket as S
72
73 import Ganeti.BasicTypes
74 import Ganeti.Constants
75 import Ganeti.Errors
76 import Ganeti.JSON
77 import Ganeti.OpParams (pTagsObject)
78 import Ganeti.OpCodes
79 import qualified Ganeti.Query.Language as Qlang
80 import Ganeti.THH
81 import Ganeti.Types
82
83 -- * Utility functions
84
85 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
86 withTimeout :: Int -> String -> IO a -> IO a
87 withTimeout secs descr action = do
88   result <- timeout (secs * 1000000) action
89   case result of
90     Nothing -> fail $ "Timeout in " ++ descr
91     Just v -> return v
92
93 -- * Generic protocol functionality
94
95 -- | Result of receiving a message from the socket.
96 data RecvResult = RecvConnClosed    -- ^ Connection closed
97                 | RecvError String  -- ^ Any other error
98                 | RecvOk String     -- ^ Successfull receive
99                   deriving (Show, Eq)
100
101 -- | Currently supported Luxi operations and JSON serialization.
102 $(genLuxiOp "LuxiOp"
103   [ (luxiReqQuery,
104     [ simpleField "what"    [t| Qlang.ItemType |]
105     , simpleField "fields"  [t| [String]  |]
106     , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
107     ])
108   , (luxiReqQueryFields,
109     [ simpleField "what"    [t| Qlang.ItemType |]
110     , simpleField "fields"  [t| [String]  |]
111     ])
112   , (luxiReqQueryNodes,
113      [ simpleField "names"  [t| [String] |]
114      , simpleField "fields" [t| [String] |]
115      , simpleField "lock"   [t| Bool     |]
116      ])
117   , (luxiReqQueryGroups,
118      [ simpleField "names"  [t| [String] |]
119      , simpleField "fields" [t| [String] |]
120      , simpleField "lock"   [t| Bool     |]
121      ])
122   , (luxiReqQueryInstances,
123      [ simpleField "names"  [t| [String] |]
124      , simpleField "fields" [t| [String] |]
125      , simpleField "lock"   [t| Bool     |]
126      ])
127   , (luxiReqQueryJobs,
128      [ simpleField "ids"    [t| [JobId]  |]
129      , simpleField "fields" [t| [String] |]
130      ])
131   , (luxiReqQueryExports,
132      [ simpleField "nodes" [t| [String] |]
133      , simpleField "lock"  [t| Bool     |]
134      ])
135   , (luxiReqQueryConfigValues,
136      [ simpleField "fields" [t| [String] |] ]
137     )
138   , (luxiReqQueryClusterInfo, [])
139   , (luxiReqQueryTags,
140      [ pTagsObject ])
141   , (luxiReqSubmitJob,
142      [ simpleField "job" [t| [MetaOpCode] |] ]
143     )
144   , (luxiReqSubmitManyJobs,
145      [ simpleField "ops" [t| [[MetaOpCode]] |] ]
146     )
147   , (luxiReqWaitForJobChange,
148      [ simpleField "job"      [t| JobId   |]
149      , simpleField "fields"   [t| [String]|]
150      , simpleField "prev_job" [t| JSValue |]
151      , simpleField "prev_log" [t| JSValue |]
152      , simpleField "tmout"    [t| Int     |]
153      ])
154   , (luxiReqArchiveJob,
155      [ simpleField "job" [t| JobId |] ]
156     )
157   , (luxiReqAutoArchiveJobs,
158      [ simpleField "age"   [t| Int |]
159      , simpleField "tmout" [t| Int |]
160      ])
161   , (luxiReqCancelJob,
162      [ simpleField "job" [t| JobId |] ]
163     )
164   , (luxiReqChangeJobPriority,
165      [ simpleField "job"      [t| JobId |]
166      , simpleField "priority" [t| Int |] ]
167     )
168   , (luxiReqSetDrainFlag,
169      [ simpleField "flag" [t| Bool |] ]
170     )
171   , (luxiReqSetWatcherPause,
172      [ simpleField "duration" [t| Double |] ]
173     )
174   ])
175
176 $(makeJSONInstance ''LuxiReq)
177
178 -- | List of all defined Luxi calls.
179 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
180
181 -- | The serialisation of LuxiOps into strings in messages.
182 $(genStrOfOp ''LuxiOp "strOfOp")
183
184 -- | Type holding the initial (unparsed) Luxi call.
185 data LuxiCall = LuxiCall LuxiReq JSValue
186
187 -- | The end-of-message separator.
188 eOM :: Word8
189 eOM = 3
190
191 -- | The end-of-message encoded as a ByteString.
192 bEOM :: B.ByteString
193 bEOM = B.singleton eOM
194
195 -- | Valid keys in the requests and responses.
196 data MsgKeys = Method
197              | Args
198              | Success
199              | Result
200
201 -- | The serialisation of MsgKeys into strings in messages.
202 $(genStrOfKey ''MsgKeys "strOfKey")
203
204 -- | Luxi client encapsulation.
205 data Client = Client { socket :: Handle           -- ^ The socket of the client
206                      , rbuf :: IORef B.ByteString -- ^ Already received buffer
207                      }
208
209 -- | Connects to the master daemon and returns a luxi Client.
210 getClient :: String -> IO Client
211 getClient path = do
212   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
213   withTimeout luxiDefCtmo "creating luxi connection" $
214               S.connect s (S.SockAddrUnix path)
215   rf <- newIORef B.empty
216   h <- S.socketToHandle s ReadWriteMode
217   return Client { socket=h, rbuf=rf }
218
219 -- | Creates and returns a server endpoint.
220 getServer :: FilePath -> IO S.Socket
221 getServer path = do
222   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
223   S.bindSocket s (S.SockAddrUnix path)
224   S.listen s 5 -- 5 is the max backlog
225   return s
226
227 -- | Closes a server endpoint.
228 -- FIXME: this should be encapsulated into a nicer type.
229 closeServer :: FilePath -> S.Socket -> IO ()
230 closeServer path sock = do
231   S.sClose sock
232   removeFile path
233
234 -- | Accepts a client
235 acceptClient :: S.Socket -> IO Client
236 acceptClient s = do
237   -- second return is the address of the client, which we ignore here
238   (client_socket, _) <- S.accept s
239   new_buffer <- newIORef B.empty
240   handle <- S.socketToHandle client_socket ReadWriteMode
241   return Client { socket=handle, rbuf=new_buffer }
242
243 -- | Closes the client socket.
244 closeClient :: Client -> IO ()
245 closeClient = hClose . socket
246
247 -- | Sends a message over a luxi transport.
248 sendMsg :: Client -> String -> IO ()
249 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
250   let encoded = UTF8L.fromString buf
251       handle = socket s
252   BL.hPut handle encoded
253   B.hPut handle bEOM
254   hFlush handle
255
256 -- | Given a current buffer and the handle, it will read from the
257 -- network until we get a full message, and it will return that
258 -- message and the leftover buffer contents.
259 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
260 recvUpdate handle obuf = do
261   nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
262             _ <- hWaitForInput handle (-1)
263             B.hGetNonBlocking handle 4096
264   let (msg, remaining) = B.break (eOM ==) nbuf
265       newbuf = B.append obuf msg
266   if B.null remaining
267     then recvUpdate handle newbuf
268     else return (newbuf, B.tail remaining)
269
270 -- | Waits for a message over a luxi transport.
271 recvMsg :: Client -> IO String
272 recvMsg s = do
273   cbuf <- readIORef $ rbuf s
274   let (imsg, ibuf) = B.break (eOM ==) cbuf
275   (msg, nbuf) <-
276     if B.null ibuf      -- if old buffer didn't contain a full message
277       then recvUpdate (socket s) cbuf   -- then we read from network
278       else return (imsg, B.tail ibuf)   -- else we return data from our buffer
279   writeIORef (rbuf s) nbuf
280   return $ UTF8.toString msg
281
282 -- | Extended wrapper over recvMsg.
283 recvMsgExt :: Client -> IO RecvResult
284 recvMsgExt s =
285   Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
286     return $ if isEOFError e
287                then RecvConnClosed
288                else RecvError (show e)
289
290 -- | Serialize a request to String.
291 buildCall :: LuxiOp  -- ^ The method
292           -> String  -- ^ The serialized form
293 buildCall lo =
294   let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
295            , (strOfKey Args, opToArgs lo)
296            ]
297       jo = toJSObject ja
298   in encodeStrict jo
299
300 -- | Serialize the response to String.
301 buildResponse :: Bool    -- ^ Success
302               -> JSValue -- ^ The arguments
303               -> String  -- ^ The serialized form
304 buildResponse success args =
305   let ja = [ (strOfKey Success, JSBool success)
306            , (strOfKey Result, args)]
307       jo = toJSObject ja
308   in encodeStrict jo
309
310 -- | Check that luxi request contains the required keys and parse it.
311 validateCall :: String -> Result LuxiCall
312 validateCall s = do
313   arr <- fromJResult "parsing top-level luxi message" $
314          decodeStrict s::Result (JSObject JSValue)
315   let aobj = fromJSObject arr
316   call <- fromObj aobj (strOfKey Method)::Result LuxiReq
317   args <- fromObj aobj (strOfKey Args)
318   return (LuxiCall call args)
319
320 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
321 --
322 -- This is currently hand-coded until we make it more uniform so that
323 -- it can be generated using TH.
324 decodeCall :: LuxiCall -> Result LuxiOp
325 decodeCall (LuxiCall call args) =
326   case call of
327     ReqQueryJobs -> do
328               (jids, jargs) <- fromJVal args
329               jids' <- case jids of
330                          JSNull -> return []
331                          _ -> fromJVal jids
332               return $ QueryJobs jids' jargs
333     ReqQueryInstances -> do
334               (names, fields, locking) <- fromJVal args
335               return $ QueryInstances names fields locking
336     ReqQueryNodes -> do
337               (names, fields, locking) <- fromJVal args
338               return $ QueryNodes names fields locking
339     ReqQueryGroups -> do
340               (names, fields, locking) <- fromJVal args
341               return $ QueryGroups names fields locking
342     ReqQueryClusterInfo ->
343               return QueryClusterInfo
344     ReqQuery -> do
345               (what, fields, qfilter) <- fromJVal args
346               return $ Query what fields qfilter
347     ReqQueryFields -> do
348               (what, fields) <- fromJVal args
349               fields' <- case fields of
350                            JSNull -> return []
351                            _ -> fromJVal fields
352               return $ QueryFields what fields'
353     ReqSubmitJob -> do
354               [ops1] <- fromJVal args
355               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
356               return $ SubmitJob ops2
357     ReqSubmitManyJobs -> do
358               [ops1] <- fromJVal args
359               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
360               return $ SubmitManyJobs ops2
361     ReqWaitForJobChange -> do
362               (jid, fields, pinfo, pidx, wtmout) <-
363                 -- No instance for 5-tuple, code copied from the
364                 -- json sources and adapted
365                 fromJResult "Parsing WaitForJobChange message" $
366                 case args of
367                   JSArray [a, b, c, d, e] ->
368                     (,,,,) `fmap`
369                     J.readJSON a `ap`
370                     J.readJSON b `ap`
371                     J.readJSON c `ap`
372                     J.readJSON d `ap`
373                     J.readJSON e
374                   _ -> J.Error "Not enough values"
375               return $ WaitForJobChange jid fields pinfo pidx wtmout
376     ReqArchiveJob -> do
377               [jid] <- fromJVal args
378               return $ ArchiveJob jid
379     ReqAutoArchiveJobs -> do
380               (age, tmout) <- fromJVal args
381               return $ AutoArchiveJobs age tmout
382     ReqQueryExports -> do
383               (nodes, lock) <- fromJVal args
384               return $ QueryExports nodes lock
385     ReqQueryConfigValues -> do
386               [fields] <- fromJVal args
387               return $ QueryConfigValues fields
388     ReqQueryTags -> do
389               (kind, name) <- fromJVal args
390               item <- tagObjectFrom kind name
391               return $ QueryTags item
392     ReqCancelJob -> do
393               [jid] <- fromJVal args
394               return $ CancelJob jid
395     ReqChangeJobPriority -> do
396               (jid, priority) <- fromJVal args
397               return $ ChangeJobPriority jid priority
398     ReqSetDrainFlag -> do
399               [flag] <- fromJVal args
400               return $ SetDrainFlag flag
401     ReqSetWatcherPause -> do
402               [duration] <- fromJVal args
403               return $ SetWatcherPause duration
404
405 -- | Check that luxi responses contain the required keys and that the
406 -- call was successful.
407 validateResult :: String -> ErrorResult JSValue
408 validateResult s = do
409   when (UTF8.replacement_char `elem` s) $
410        fail "Failed to decode UTF-8, detected replacement char after decoding"
411   oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
412   let arr = J.fromJSObject oarr
413   status <- fromObj arr (strOfKey Success)
414   result <- fromObj arr (strOfKey Result)
415   if status
416     then return result
417     else decodeError result
418
419 -- | Try to decode an error from the server response. This function
420 -- will always fail, since it's called only on the error path (when
421 -- status is False).
422 decodeError :: JSValue -> ErrorResult JSValue
423 decodeError val =
424   case fromJVal val of
425     Ok e -> Bad e
426     Bad msg -> Bad $ GenericError msg
427
428 -- | Generic luxi method call.
429 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
430 callMethod method s = do
431   sendMsg s $ buildCall method
432   result <- recvMsg s
433   let rval = validateResult result
434   return rval
435
436 -- | Parse job submission result.
437 parseSubmitJobResult :: JSValue -> ErrorResult JobId
438 parseSubmitJobResult (JSArray [JSBool True, v]) =
439   case J.readJSON v of
440     J.Error msg -> Bad $ LuxiError msg
441     J.Ok v' -> Ok v'
442 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
443   Bad . LuxiError $ fromJSString x
444 parseSubmitJobResult v =
445   Bad . LuxiError $ "Unknown result from the master daemon: " ++
446       show (pp_value v)
447
448 -- | Specialized submitManyJobs call.
449 submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
450 submitManyJobs s jobs = do
451   rval <- callMethod (SubmitManyJobs jobs) s
452   -- map each result (status, payload) pair into a nice Result ADT
453   return $ case rval of
454              Bad x -> Bad x
455              Ok (JSArray r) -> mapM parseSubmitJobResult r
456              x -> Bad . LuxiError $
457                   "Cannot parse response from Ganeti: " ++ show x
458
459 -- | Custom queryJobs call.
460 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
461 queryJobsStatus s jids = do
462   rval <- callMethod (QueryJobs jids ["status"]) s
463   return $ case rval of
464              Bad x -> Bad x
465              Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
466                        J.Ok vals -> if any null vals
467                                     then Bad $
468                                          LuxiError "Missing job status field"
469                                     else Ok (map head vals)
470                        J.Error x -> Bad $ LuxiError x