Support 'null' in Luxi QueryJobs names field
[ganeti-local] / htools / Ganeti / Luxi.hs
1 {-# LANGUAGE TemplateHaskell #-}
2
3 {-| Implementation of the Ganeti LUXI interface.
4
5 -}
6
7 {-
8
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
15
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 02110-1301, USA.
25
26 -}
27
28 module Ganeti.Luxi
29   ( LuxiOp(..)
30   , LuxiReq(..)
31   , Client
32   , JobId
33   , fromJobId
34   , makeJobId
35   , RecvResult(..)
36   , strOfOp
37   , getClient
38   , getServer
39   , acceptClient
40   , closeClient
41   , closeServer
42   , callMethod
43   , submitManyJobs
44   , queryJobsStatus
45   , buildCall
46   , buildResponse
47   , validateCall
48   , decodeCall
49   , recvMsg
50   , recvMsgExt
51   , sendMsg
52   , allLuxiCalls
53   ) where
54
55 import Control.Exception (catch)
56 import Data.IORef
57 import qualified Data.ByteString as B
58 import qualified Data.ByteString.UTF8 as UTF8
59 import Data.Word (Word8)
60 import Control.Monad
61 import Text.JSON (encodeStrict, decodeStrict)
62 import qualified Text.JSON as J
63 import Text.JSON.Pretty (pp_value)
64 import Text.JSON.Types
65 import System.Directory (removeFile)
66 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
67 import System.IO.Error (isEOFError)
68 import System.Timeout
69 import qualified Network.Socket as S
70
71 import Ganeti.BasicTypes
72 import Ganeti.Constants
73 import Ganeti.Errors
74 import Ganeti.JSON
75 import Ganeti.OpParams (pTagsObject)
76 import Ganeti.OpCodes
77 import qualified Ganeti.Query.Language as Qlang
78 import Ganeti.THH
79 import Ganeti.Types
80
81 -- * Utility functions
82
83 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
84 withTimeout :: Int -> String -> IO a -> IO a
85 withTimeout secs descr action = do
86   result <- timeout (secs * 1000000) action
87   case result of
88     Nothing -> fail $ "Timeout in " ++ descr
89     Just v -> return v
90
91 -- * Generic protocol functionality
92
93 -- | Result of receiving a message from the socket.
94 data RecvResult = RecvConnClosed    -- ^ Connection closed
95                 | RecvError String  -- ^ Any other error
96                 | RecvOk String     -- ^ Successfull receive
97                   deriving (Show, Eq)
98
99 -- | Currently supported Luxi operations and JSON serialization.
100 $(genLuxiOp "LuxiOp"
101   [ (luxiReqQuery,
102     [ simpleField "what"    [t| Qlang.ItemType |]
103     , simpleField "fields"  [t| [String]  |]
104     , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
105     ])
106   , (luxiReqQueryFields,
107     [ simpleField "what"    [t| Qlang.ItemType |]
108     , simpleField "fields"  [t| [String]  |]
109     ])
110   , (luxiReqQueryNodes,
111      [ simpleField "names"  [t| [String] |]
112      , simpleField "fields" [t| [String] |]
113      , simpleField "lock"   [t| Bool     |]
114      ])
115   , (luxiReqQueryGroups,
116      [ simpleField "names"  [t| [String] |]
117      , simpleField "fields" [t| [String] |]
118      , simpleField "lock"   [t| Bool     |]
119      ])
120   , (luxiReqQueryInstances,
121      [ simpleField "names"  [t| [String] |]
122      , simpleField "fields" [t| [String] |]
123      , simpleField "lock"   [t| Bool     |]
124      ])
125   , (luxiReqQueryJobs,
126      [ simpleField "ids"    [t| [JobId]  |]
127      , simpleField "fields" [t| [String] |]
128      ])
129   , (luxiReqQueryExports,
130      [ simpleField "nodes" [t| [String] |]
131      , simpleField "lock"  [t| Bool     |]
132      ])
133   , (luxiReqQueryConfigValues,
134      [ simpleField "fields" [t| [String] |] ]
135     )
136   , (luxiReqQueryClusterInfo, [])
137   , (luxiReqQueryTags,
138      [ pTagsObject ])
139   , (luxiReqSubmitJob,
140      [ simpleField "job" [t| [MetaOpCode] |] ]
141     )
142   , (luxiReqSubmitManyJobs,
143      [ simpleField "ops" [t| [[MetaOpCode]] |] ]
144     )
145   , (luxiReqWaitForJobChange,
146      [ simpleField "job"      [t| JobId   |]
147      , simpleField "fields"   [t| [String]|]
148      , simpleField "prev_job" [t| JSValue |]
149      , simpleField "prev_log" [t| JSValue |]
150      , simpleField "tmout"    [t| Int     |]
151      ])
152   , (luxiReqArchiveJob,
153      [ simpleField "job" [t| JobId |] ]
154     )
155   , (luxiReqAutoArchiveJobs,
156      [ simpleField "age"   [t| Int |]
157      , simpleField "tmout" [t| Int |]
158      ])
159   , (luxiReqCancelJob,
160      [ simpleField "job" [t| JobId |] ]
161     )
162   , (luxiReqChangeJobPriority,
163      [ simpleField "job"      [t| JobId |]
164      , simpleField "priority" [t| Int |] ]
165     )
166   , (luxiReqSetDrainFlag,
167      [ simpleField "flag" [t| Bool |] ]
168     )
169   , (luxiReqSetWatcherPause,
170      [ simpleField "duration" [t| Double |] ]
171     )
172   ])
173
174 $(makeJSONInstance ''LuxiReq)
175
176 -- | List of all defined Luxi calls.
177 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
178
179 -- | The serialisation of LuxiOps into strings in messages.
180 $(genStrOfOp ''LuxiOp "strOfOp")
181
182 -- | Type holding the initial (unparsed) Luxi call.
183 data LuxiCall = LuxiCall LuxiReq JSValue
184
185 -- | The end-of-message separator.
186 eOM :: Word8
187 eOM = 3
188
189 -- | The end-of-message encoded as a ByteString.
190 bEOM :: B.ByteString
191 bEOM = B.singleton eOM
192
193 -- | Valid keys in the requests and responses.
194 data MsgKeys = Method
195              | Args
196              | Success
197              | Result
198
199 -- | The serialisation of MsgKeys into strings in messages.
200 $(genStrOfKey ''MsgKeys "strOfKey")
201
202 -- | Luxi client encapsulation.
203 data Client = Client { socket :: Handle           -- ^ The socket of the client
204                      , rbuf :: IORef B.ByteString -- ^ Already received buffer
205                      }
206
207 -- | Connects to the master daemon and returns a luxi Client.
208 getClient :: String -> IO Client
209 getClient path = do
210   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
211   withTimeout luxiDefCtmo "creating luxi connection" $
212               S.connect s (S.SockAddrUnix path)
213   rf <- newIORef B.empty
214   h <- S.socketToHandle s ReadWriteMode
215   return Client { socket=h, rbuf=rf }
216
217 -- | Creates and returns a server endpoint.
218 getServer :: FilePath -> IO S.Socket
219 getServer path = do
220   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
221   S.bindSocket s (S.SockAddrUnix path)
222   S.listen s 5 -- 5 is the max backlog
223   return s
224
225 -- | Closes a server endpoint.
226 -- FIXME: this should be encapsulated into a nicer type.
227 closeServer :: FilePath -> S.Socket -> IO ()
228 closeServer path sock = do
229   S.sClose sock
230   removeFile path
231
232 -- | Accepts a client
233 acceptClient :: S.Socket -> IO Client
234 acceptClient s = do
235   -- second return is the address of the client, which we ignore here
236   (client_socket, _) <- S.accept s
237   new_buffer <- newIORef B.empty
238   handle <- S.socketToHandle client_socket ReadWriteMode
239   return Client { socket=handle, rbuf=new_buffer }
240
241 -- | Closes the client socket.
242 closeClient :: Client -> IO ()
243 closeClient = hClose . socket
244
245 -- | Sends a message over a luxi transport.
246 sendMsg :: Client -> String -> IO ()
247 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
248   let encoded = UTF8.fromString buf
249       handle = socket s
250   B.hPut handle encoded
251   B.hPut handle bEOM
252   hFlush handle
253
254 -- | Given a current buffer and the handle, it will read from the
255 -- network until we get a full message, and it will return that
256 -- message and the leftover buffer contents.
257 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
258 recvUpdate handle obuf = do
259   nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
260             _ <- hWaitForInput handle (-1)
261             B.hGetNonBlocking handle 4096
262   let (msg, remaining) = B.break (eOM ==) nbuf
263       newbuf = B.append obuf msg
264   if B.null remaining
265     then recvUpdate handle newbuf
266     else return (newbuf, B.tail remaining)
267
268 -- | Waits for a message over a luxi transport.
269 recvMsg :: Client -> IO String
270 recvMsg s = do
271   cbuf <- readIORef $ rbuf s
272   let (imsg, ibuf) = B.break (eOM ==) cbuf
273   (msg, nbuf) <-
274     if B.null ibuf      -- if old buffer didn't contain a full message
275       then recvUpdate (socket s) cbuf   -- then we read from network
276       else return (imsg, B.tail ibuf)   -- else we return data from our buffer
277   writeIORef (rbuf s) nbuf
278   return $ UTF8.toString msg
279
280 -- | Extended wrapper over recvMsg.
281 recvMsgExt :: Client -> IO RecvResult
282 recvMsgExt s =
283   Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
284     return $ if isEOFError e
285                then RecvConnClosed
286                else RecvError (show e)
287
288 -- | Serialize a request to String.
289 buildCall :: LuxiOp  -- ^ The method
290           -> String  -- ^ The serialized form
291 buildCall lo =
292   let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
293            , (strOfKey Args, opToArgs lo)
294            ]
295       jo = toJSObject ja
296   in encodeStrict jo
297
298 -- | Serialize the response to String.
299 buildResponse :: Bool    -- ^ Success
300               -> JSValue -- ^ The arguments
301               -> String  -- ^ The serialized form
302 buildResponse success args =
303   let ja = [ (strOfKey Success, JSBool success)
304            , (strOfKey Result, args)]
305       jo = toJSObject ja
306   in encodeStrict jo
307
308 -- | Check that luxi request contains the required keys and parse it.
309 validateCall :: String -> Result LuxiCall
310 validateCall s = do
311   arr <- fromJResult "parsing top-level luxi message" $
312          decodeStrict s::Result (JSObject JSValue)
313   let aobj = fromJSObject arr
314   call <- fromObj aobj (strOfKey Method)::Result LuxiReq
315   args <- fromObj aobj (strOfKey Args)
316   return (LuxiCall call args)
317
318 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
319 --
320 -- This is currently hand-coded until we make it more uniform so that
321 -- it can be generated using TH.
322 decodeCall :: LuxiCall -> Result LuxiOp
323 decodeCall (LuxiCall call args) =
324   case call of
325     ReqQueryJobs -> do
326               (jids, jargs) <- fromJVal args
327               jids' <- case jids of
328                          JSNull -> return []
329                          _ -> fromJVal jids
330               return $ QueryJobs jids' jargs
331     ReqQueryInstances -> do
332               (names, fields, locking) <- fromJVal args
333               return $ QueryInstances names fields locking
334     ReqQueryNodes -> do
335               (names, fields, locking) <- fromJVal args
336               return $ QueryNodes names fields locking
337     ReqQueryGroups -> do
338               (names, fields, locking) <- fromJVal args
339               return $ QueryGroups names fields locking
340     ReqQueryClusterInfo ->
341               return QueryClusterInfo
342     ReqQuery -> do
343               (what, fields, qfilter) <- fromJVal args
344               return $ Query what fields qfilter
345     ReqQueryFields -> do
346               (what, fields) <- fromJVal args
347               fields' <- case fields of
348                            JSNull -> return []
349                            _ -> fromJVal fields
350               return $ QueryFields what fields'
351     ReqSubmitJob -> do
352               [ops1] <- fromJVal args
353               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
354               return $ SubmitJob ops2
355     ReqSubmitManyJobs -> do
356               [ops1] <- fromJVal args
357               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
358               return $ SubmitManyJobs ops2
359     ReqWaitForJobChange -> do
360               (jid, fields, pinfo, pidx, wtmout) <-
361                 -- No instance for 5-tuple, code copied from the
362                 -- json sources and adapted
363                 fromJResult "Parsing WaitForJobChange message" $
364                 case args of
365                   JSArray [a, b, c, d, e] ->
366                     (,,,,) `fmap`
367                     J.readJSON a `ap`
368                     J.readJSON b `ap`
369                     J.readJSON c `ap`
370                     J.readJSON d `ap`
371                     J.readJSON e
372                   _ -> J.Error "Not enough values"
373               return $ WaitForJobChange jid fields pinfo pidx wtmout
374     ReqArchiveJob -> do
375               [jid] <- fromJVal args
376               return $ ArchiveJob jid
377     ReqAutoArchiveJobs -> do
378               (age, tmout) <- fromJVal args
379               return $ AutoArchiveJobs age tmout
380     ReqQueryExports -> do
381               (nodes, lock) <- fromJVal args
382               return $ QueryExports nodes lock
383     ReqQueryConfigValues -> do
384               [fields] <- fromJVal args
385               return $ QueryConfigValues fields
386     ReqQueryTags -> do
387               (kind, name) <- fromJVal args
388               item <- tagObjectFrom kind name
389               return $ QueryTags item
390     ReqCancelJob -> do
391               [jid] <- fromJVal args
392               return $ CancelJob jid
393     ReqChangeJobPriority -> do
394               (jid, priority) <- fromJVal args
395               return $ ChangeJobPriority jid priority
396     ReqSetDrainFlag -> do
397               [flag] <- fromJVal args
398               return $ SetDrainFlag flag
399     ReqSetWatcherPause -> do
400               [duration] <- fromJVal args
401               return $ SetWatcherPause duration
402
403 -- | Check that luxi responses contain the required keys and that the
404 -- call was successful.
405 validateResult :: String -> ErrorResult JSValue
406 validateResult s = do
407   when (UTF8.replacement_char `elem` s) $
408        fail "Failed to decode UTF-8, detected replacement char after decoding"
409   oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
410   let arr = J.fromJSObject oarr
411   status <- fromObj arr (strOfKey Success)
412   result <- fromObj arr (strOfKey Result)
413   if status
414     then return result
415     else decodeError result
416
417 -- | Try to decode an error from the server response. This function
418 -- will always fail, since it's called only on the error path (when
419 -- status is False).
420 decodeError :: JSValue -> ErrorResult JSValue
421 decodeError val =
422   case fromJVal val of
423     Ok e -> Bad e
424     Bad msg -> Bad $ GenericError msg
425
426 -- | Generic luxi method call.
427 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
428 callMethod method s = do
429   sendMsg s $ buildCall method
430   result <- recvMsg s
431   let rval = validateResult result
432   return rval
433
434 -- | Parse job submission result.
435 parseSubmitJobResult :: JSValue -> ErrorResult JobId
436 parseSubmitJobResult (JSArray [JSBool True, v]) =
437   case J.readJSON v of
438     J.Error msg -> Bad $ LuxiError msg
439     J.Ok v' -> Ok v'
440 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
441   Bad . LuxiError $ fromJSString x
442 parseSubmitJobResult v =
443   Bad . LuxiError $ "Unknown result from the master daemon: " ++
444       show (pp_value v)
445
446 -- | Specialized submitManyJobs call.
447 submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
448 submitManyJobs s jobs = do
449   rval <- callMethod (SubmitManyJobs jobs) s
450   -- map each result (status, payload) pair into a nice Result ADT
451   return $ case rval of
452              Bad x -> Bad x
453              Ok (JSArray r) -> mapM parseSubmitJobResult r
454              x -> Bad . LuxiError $
455                   "Cannot parse response from Ganeti: " ++ show x
456
457 -- | Custom queryJobs call.
458 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
459 queryJobsStatus s jids = do
460   rval <- callMethod (QueryJobs jids ["status"]) s
461   return $ case rval of
462              Bad x -> Bad x
463              Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
464                        J.Ok vals -> if any null vals
465                                     then Bad $
466                                          LuxiError "Missing job status field"
467                                     else Ok (map head vals)
468                        J.Error x -> Bad $ LuxiError x