Correct properties of the cluster's file storage dir
[ganeti-local] / src / Ganeti / Luxi.hs
1 {-# LANGUAGE TemplateHaskell #-}
2
3 {-| Implementation of the Ganeti LUXI interface.
4
5 -}
6
7 {-
8
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
15
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 02110-1301, USA.
25
26 -}
27
28 module Ganeti.Luxi
29   ( LuxiOp(..)
30   , LuxiReq(..)
31   , Client
32   , JobId
33   , fromJobId
34   , makeJobId
35   , RecvResult(..)
36   , strOfOp
37   , getClient
38   , getServer
39   , acceptClient
40   , closeClient
41   , closeServer
42   , callMethod
43   , submitManyJobs
44   , queryJobsStatus
45   , buildCall
46   , buildResponse
47   , validateCall
48   , decodeCall
49   , recvMsg
50   , recvMsgExt
51   , sendMsg
52   , allLuxiCalls
53   ) where
54
55 import Control.Exception (catch)
56 import Data.IORef
57 import qualified Data.ByteString as B
58 import qualified Data.ByteString.Lazy as BL
59 import qualified Data.ByteString.UTF8 as UTF8
60 import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61 import Data.Word (Word8)
62 import Control.Monad
63 import Text.JSON (encodeStrict, decodeStrict)
64 import qualified Text.JSON as J
65 import Text.JSON.Pretty (pp_value)
66 import Text.JSON.Types
67 import System.Directory (removeFile)
68 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69 import System.IO.Error (isEOFError)
70 import System.Timeout
71 import qualified Network.Socket as S
72
73 import Ganeti.BasicTypes
74 import Ganeti.Constants
75 import Ganeti.Errors
76 import Ganeti.JSON
77 import Ganeti.OpParams (pTagsObject)
78 import Ganeti.OpCodes
79 import qualified Ganeti.Query.Language as Qlang
80 import Ganeti.Runtime (GanetiDaemon(..), MiscGroup(..), GanetiGroup(..))
81 import Ganeti.THH
82 import Ganeti.Types
83 import Ganeti.Utils
84
85 -- * Utility functions
86
87 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
88 withTimeout :: Int -> String -> IO a -> IO a
89 withTimeout secs descr action = do
90   result <- timeout (secs * 1000000) action
91   case result of
92     Nothing -> fail $ "Timeout in " ++ descr
93     Just v -> return v
94
95 -- * Generic protocol functionality
96
97 -- | Result of receiving a message from the socket.
98 data RecvResult = RecvConnClosed    -- ^ Connection closed
99                 | RecvError String  -- ^ Any other error
100                 | RecvOk String     -- ^ Successfull receive
101                   deriving (Show, Eq)
102
103 -- | Currently supported Luxi operations and JSON serialization.
104 $(genLuxiOp "LuxiOp"
105   [ (luxiReqQuery,
106     [ simpleField "what"    [t| Qlang.ItemType |]
107     , simpleField "fields"  [t| [String]  |]
108     , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
109     ])
110   , (luxiReqQueryFields,
111     [ simpleField "what"    [t| Qlang.ItemType |]
112     , simpleField "fields"  [t| [String]  |]
113     ])
114   , (luxiReqQueryNodes,
115      [ simpleField "names"  [t| [String] |]
116      , simpleField "fields" [t| [String] |]
117      , simpleField "lock"   [t| Bool     |]
118      ])
119   , (luxiReqQueryGroups,
120      [ simpleField "names"  [t| [String] |]
121      , simpleField "fields" [t| [String] |]
122      , simpleField "lock"   [t| Bool     |]
123      ])
124   , (luxiReqQueryNetworks,
125      [ simpleField "names"  [t| [String] |]
126      , simpleField "fields" [t| [String] |]
127      , simpleField "lock"   [t| Bool     |]
128      ])
129   , (luxiReqQueryInstances,
130      [ simpleField "names"  [t| [String] |]
131      , simpleField "fields" [t| [String] |]
132      , simpleField "lock"   [t| Bool     |]
133      ])
134   , (luxiReqQueryJobs,
135      [ simpleField "ids"    [t| [JobId]  |]
136      , simpleField "fields" [t| [String] |]
137      ])
138   , (luxiReqQueryExports,
139      [ simpleField "nodes" [t| [String] |]
140      , simpleField "lock"  [t| Bool     |]
141      ])
142   , (luxiReqQueryConfigValues,
143      [ simpleField "fields" [t| [String] |] ]
144     )
145   , (luxiReqQueryClusterInfo, [])
146   , (luxiReqQueryTags,
147      [ pTagsObject 
148      , simpleField "name" [t| String |]
149      ])
150   , (luxiReqSubmitJob,
151      [ simpleField "job" [t| [MetaOpCode] |] ]
152     )
153   , (luxiReqSubmitJobToDrainedQueue,
154      [ simpleField "job" [t| [MetaOpCode] |] ]
155     )
156   , (luxiReqSubmitManyJobs,
157      [ simpleField "ops" [t| [[MetaOpCode]] |] ]
158     )
159   , (luxiReqWaitForJobChange,
160      [ simpleField "job"      [t| JobId   |]
161      , simpleField "fields"   [t| [String]|]
162      , simpleField "prev_job" [t| JSValue |]
163      , simpleField "prev_log" [t| JSValue |]
164      , simpleField "tmout"    [t| Int     |]
165      ])
166   , (luxiReqArchiveJob,
167      [ simpleField "job" [t| JobId |] ]
168     )
169   , (luxiReqAutoArchiveJobs,
170      [ simpleField "age"   [t| Int |]
171      , simpleField "tmout" [t| Int |]
172      ])
173   , (luxiReqCancelJob,
174      [ simpleField "job" [t| JobId |] ]
175     )
176   , (luxiReqChangeJobPriority,
177      [ simpleField "job"      [t| JobId |]
178      , simpleField "priority" [t| Int |] ]
179     )
180   , (luxiReqSetDrainFlag,
181      [ simpleField "flag" [t| Bool |] ]
182     )
183   , (luxiReqSetWatcherPause,
184      [ simpleField "duration" [t| Double |] ]
185     )
186   ])
187
188 $(makeJSONInstance ''LuxiReq)
189
190 -- | List of all defined Luxi calls.
191 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
192
193 -- | The serialisation of LuxiOps into strings in messages.
194 $(genStrOfOp ''LuxiOp "strOfOp")
195
196 -- | Type holding the initial (unparsed) Luxi call.
197 data LuxiCall = LuxiCall LuxiReq JSValue
198
199 -- | The end-of-message separator.
200 eOM :: Word8
201 eOM = 3
202
203 -- | The end-of-message encoded as a ByteString.
204 bEOM :: B.ByteString
205 bEOM = B.singleton eOM
206
207 -- | Valid keys in the requests and responses.
208 data MsgKeys = Method
209              | Args
210              | Success
211              | Result
212
213 -- | The serialisation of MsgKeys into strings in messages.
214 $(genStrOfKey ''MsgKeys "strOfKey")
215
216 -- | Luxi client encapsulation.
217 data Client = Client { socket :: Handle           -- ^ The socket of the client
218                      , rbuf :: IORef B.ByteString -- ^ Already received buffer
219                      }
220
221 -- | Connects to the master daemon and returns a luxi Client.
222 getClient :: String -> IO Client
223 getClient path = do
224   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
225   withTimeout luxiDefCtmo "creating luxi connection" $
226               S.connect s (S.SockAddrUnix path)
227   rf <- newIORef B.empty
228   h <- S.socketToHandle s ReadWriteMode
229   return Client { socket=h, rbuf=rf }
230
231 -- | Creates and returns a server endpoint.
232 getServer :: Bool -> FilePath -> IO S.Socket
233 getServer setOwner path = do
234   s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
235   S.bindSocket s (S.SockAddrUnix path)
236   when setOwner . setOwnerAndGroupFromNames path GanetiLuxid $
237     ExtraGroup DaemonsGroup
238   S.listen s 5 -- 5 is the max backlog
239   return s
240
241 -- | Closes a server endpoint.
242 -- FIXME: this should be encapsulated into a nicer type.
243 closeServer :: FilePath -> S.Socket -> IO ()
244 closeServer path sock = do
245   S.sClose sock
246   removeFile path
247
248 -- | Accepts a client
249 acceptClient :: S.Socket -> IO Client
250 acceptClient s = do
251   -- second return is the address of the client, which we ignore here
252   (client_socket, _) <- S.accept s
253   new_buffer <- newIORef B.empty
254   handle <- S.socketToHandle client_socket ReadWriteMode
255   return Client { socket=handle, rbuf=new_buffer }
256
257 -- | Closes the client socket.
258 closeClient :: Client -> IO ()
259 closeClient = hClose . socket
260
261 -- | Sends a message over a luxi transport.
262 sendMsg :: Client -> String -> IO ()
263 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
264   let encoded = UTF8L.fromString buf
265       handle = socket s
266   BL.hPut handle encoded
267   B.hPut handle bEOM
268   hFlush handle
269
270 -- | Given a current buffer and the handle, it will read from the
271 -- network until we get a full message, and it will return that
272 -- message and the leftover buffer contents.
273 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
274 recvUpdate handle obuf = do
275   nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
276             _ <- hWaitForInput handle (-1)
277             B.hGetNonBlocking handle 4096
278   let (msg, remaining) = B.break (eOM ==) nbuf
279       newbuf = B.append obuf msg
280   if B.null remaining
281     then recvUpdate handle newbuf
282     else return (newbuf, B.tail remaining)
283
284 -- | Waits for a message over a luxi transport.
285 recvMsg :: Client -> IO String
286 recvMsg s = do
287   cbuf <- readIORef $ rbuf s
288   let (imsg, ibuf) = B.break (eOM ==) cbuf
289   (msg, nbuf) <-
290     if B.null ibuf      -- if old buffer didn't contain a full message
291       then recvUpdate (socket s) cbuf   -- then we read from network
292       else return (imsg, B.tail ibuf)   -- else we return data from our buffer
293   writeIORef (rbuf s) nbuf
294   return $ UTF8.toString msg
295
296 -- | Extended wrapper over recvMsg.
297 recvMsgExt :: Client -> IO RecvResult
298 recvMsgExt s =
299   Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
300     return $ if isEOFError e
301                then RecvConnClosed
302                else RecvError (show e)
303
304 -- | Serialize a request to String.
305 buildCall :: LuxiOp  -- ^ The method
306           -> String  -- ^ The serialized form
307 buildCall lo =
308   let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
309            , (strOfKey Args, opToArgs lo)
310            ]
311       jo = toJSObject ja
312   in encodeStrict jo
313
314 -- | Serialize the response to String.
315 buildResponse :: Bool    -- ^ Success
316               -> JSValue -- ^ The arguments
317               -> String  -- ^ The serialized form
318 buildResponse success args =
319   let ja = [ (strOfKey Success, JSBool success)
320            , (strOfKey Result, args)]
321       jo = toJSObject ja
322   in encodeStrict jo
323
324 -- | Check that luxi request contains the required keys and parse it.
325 validateCall :: String -> Result LuxiCall
326 validateCall s = do
327   arr <- fromJResult "parsing top-level luxi message" $
328          decodeStrict s::Result (JSObject JSValue)
329   let aobj = fromJSObject arr
330   call <- fromObj aobj (strOfKey Method)::Result LuxiReq
331   args <- fromObj aobj (strOfKey Args)
332   return (LuxiCall call args)
333
334 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
335 --
336 -- This is currently hand-coded until we make it more uniform so that
337 -- it can be generated using TH.
338 decodeCall :: LuxiCall -> Result LuxiOp
339 decodeCall (LuxiCall call args) =
340   case call of
341     ReqQueryJobs -> do
342               (jids, jargs) <- fromJVal args
343               jids' <- case jids of
344                          JSNull -> return []
345                          _ -> fromJVal jids
346               return $ QueryJobs jids' jargs
347     ReqQueryInstances -> do
348               (names, fields, locking) <- fromJVal args
349               return $ QueryInstances names fields locking
350     ReqQueryNodes -> do
351               (names, fields, locking) <- fromJVal args
352               return $ QueryNodes names fields locking
353     ReqQueryGroups -> do
354               (names, fields, locking) <- fromJVal args
355               return $ QueryGroups names fields locking
356     ReqQueryClusterInfo ->
357               return QueryClusterInfo
358     ReqQueryNetworks -> do
359               (names, fields, locking) <- fromJVal args
360               return $ QueryNetworks names fields locking
361     ReqQuery -> do
362               (what, fields, qfilter) <- fromJVal args
363               return $ Query what fields qfilter
364     ReqQueryFields -> do
365               (what, fields) <- fromJVal args
366               fields' <- case fields of
367                            JSNull -> return []
368                            _ -> fromJVal fields
369               return $ QueryFields what fields'
370     ReqSubmitJob -> do
371               [ops1] <- fromJVal args
372               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
373               return $ SubmitJob ops2
374     ReqSubmitJobToDrainedQueue -> do
375               [ops1] <- fromJVal args
376               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
377               return $ SubmitJobToDrainedQueue ops2
378     ReqSubmitManyJobs -> do
379               [ops1] <- fromJVal args
380               ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
381               return $ SubmitManyJobs ops2
382     ReqWaitForJobChange -> do
383               (jid, fields, pinfo, pidx, wtmout) <-
384                 -- No instance for 5-tuple, code copied from the
385                 -- json sources and adapted
386                 fromJResult "Parsing WaitForJobChange message" $
387                 case args of
388                   JSArray [a, b, c, d, e] ->
389                     (,,,,) `fmap`
390                     J.readJSON a `ap`
391                     J.readJSON b `ap`
392                     J.readJSON c `ap`
393                     J.readJSON d `ap`
394                     J.readJSON e
395                   _ -> J.Error "Not enough values"
396               return $ WaitForJobChange jid fields pinfo pidx wtmout
397     ReqArchiveJob -> do
398               [jid] <- fromJVal args
399               return $ ArchiveJob jid
400     ReqAutoArchiveJobs -> do
401               (age, tmout) <- fromJVal args
402               return $ AutoArchiveJobs age tmout
403     ReqQueryExports -> do
404               (nodes, lock) <- fromJVal args
405               return $ QueryExports nodes lock
406     ReqQueryConfigValues -> do
407               [fields] <- fromJVal args
408               return $ QueryConfigValues fields
409     ReqQueryTags -> do
410               (kind, name) <- fromJVal args
411               return $ QueryTags kind name
412     ReqCancelJob -> do
413               [jid] <- fromJVal args
414               return $ CancelJob jid
415     ReqChangeJobPriority -> do
416               (jid, priority) <- fromJVal args
417               return $ ChangeJobPriority jid priority
418     ReqSetDrainFlag -> do
419               [flag] <- fromJVal args
420               return $ SetDrainFlag flag
421     ReqSetWatcherPause -> do
422               [duration] <- fromJVal args
423               return $ SetWatcherPause duration
424
425 -- | Check that luxi responses contain the required keys and that the
426 -- call was successful.
427 validateResult :: String -> ErrorResult JSValue
428 validateResult s = do
429   when (UTF8.replacement_char `elem` s) $
430        fail "Failed to decode UTF-8, detected replacement char after decoding"
431   oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
432   let arr = J.fromJSObject oarr
433   status <- fromObj arr (strOfKey Success)
434   result <- fromObj arr (strOfKey Result)
435   if status
436     then return result
437     else decodeError result
438
439 -- | Try to decode an error from the server response. This function
440 -- will always fail, since it's called only on the error path (when
441 -- status is False).
442 decodeError :: JSValue -> ErrorResult JSValue
443 decodeError val =
444   case fromJVal val of
445     Ok e -> Bad e
446     Bad msg -> Bad $ GenericError msg
447
448 -- | Generic luxi method call.
449 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
450 callMethod method s = do
451   sendMsg s $ buildCall method
452   result <- recvMsg s
453   let rval = validateResult result
454   return rval
455
456 -- | Parse job submission result.
457 parseSubmitJobResult :: JSValue -> ErrorResult JobId
458 parseSubmitJobResult (JSArray [JSBool True, v]) =
459   case J.readJSON v of
460     J.Error msg -> Bad $ LuxiError msg
461     J.Ok v' -> Ok v'
462 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
463   Bad . LuxiError $ fromJSString x
464 parseSubmitJobResult v =
465   Bad . LuxiError $ "Unknown result from the master daemon: " ++
466       show (pp_value v)
467
468 -- | Specialized submitManyJobs call.
469 submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
470 submitManyJobs s jobs = do
471   rval <- callMethod (SubmitManyJobs jobs) s
472   -- map each result (status, payload) pair into a nice Result ADT
473   return $ case rval of
474              Bad x -> Bad x
475              Ok (JSArray r) -> mapM parseSubmitJobResult r
476              x -> Bad . LuxiError $
477                   "Cannot parse response from Ganeti: " ++ show x
478
479 -- | Custom queryJobs call.
480 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
481 queryJobsStatus s jids = do
482   rval <- callMethod (QueryJobs jids ["status"]) s
483   return $ case rval of
484              Bad x -> Bad x
485              Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
486                        J.Ok vals -> if any null vals
487                                     then Bad $
488                                          LuxiError "Missing job status field"
489                                     else Ok (map head vals)
490                        J.Error x -> Bad $ LuxiError x