Optimise autotools/run-in-tempdir
[ganeti-local] / htools / Ganeti / Confd / Client.hs
1 {-| Implementation of the Ganeti Confd client functionality.
2
3 -}
4
5 {-
6
7 Copyright (C) 2012 Google Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
13
14 This program is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 02110-1301, USA.
23
24 -}
25
26 module Ganeti.Confd.Client
27   ( getConfdClient
28   , query
29   ) where
30
31 import Control.Concurrent
32 import Control.Monad
33 import Data.List
34 import qualified Network.Socket as S
35 import System.Posix.Time
36 import qualified Text.JSON as J
37
38 import Ganeti.BasicTypes
39 import Ganeti.Confd.Types
40 import Ganeti.Confd.Utils
41 import qualified Ganeti.Constants as C
42 import Ganeti.Hash
43 import Ganeti.Ssconf
44
45 -- | Builds a properly initialized ConfdClient
46 getConfdClient :: IO ConfdClient
47 getConfdClient = S.withSocketsDo $ do
48   hmac <- getClusterHmac
49   candList <- getMasterCandidatesIps Nothing
50   peerList <-
51     case candList of
52       (Ok p) -> return p
53       (Bad msg) -> fail msg
54   return . ConfdClient hmac peerList $ fromIntegral C.defaultConfdPort
55
56 -- | Sends a query to all the Confd servers the client is connected to.
57 -- Returns the most up-to-date result according to the serial number,
58 -- chosen between those received before the timeout.
59 query :: ConfdClient -> ConfdRequestType -> ConfdQuery -> IO (Maybe ConfdReply)
60 query client crType cQuery = do
61   semaphore <- newMVar ()
62   answer <- newMVar Nothing
63   let dest = [(host, serverPort client) | host <- peers client]
64       hmac = hmacKey client
65       jobs = map (queryOneServer semaphore answer crType cQuery hmac) dest
66       watchdog reqAnswers = do
67         threadDelay $ 1000000 * C.confdClientExpireTimeout
68         _ <- swapMVar reqAnswers 0
69         putMVar semaphore ()
70       waitForResult reqAnswers = do
71         _ <- takeMVar semaphore
72         l <- takeMVar reqAnswers
73         unless (l == 0) $ do
74           putMVar reqAnswers $ l - 1
75           waitForResult reqAnswers
76   reqAnswers <- newMVar . min C.confdDefaultReqCoverage $ length dest
77   workers <- mapM forkIO jobs
78   watcher <- forkIO $ watchdog reqAnswers
79   waitForResult reqAnswers
80   mapM_ killThread $ watcher:workers
81   takeMVar answer
82
83 -- | Updates the reply to the query. As per the Confd design document,
84 -- only the reply with the highest serial number is kept.
85 updateConfdReply :: ConfdReply -> Maybe ConfdReply -> Maybe ConfdReply
86 updateConfdReply newValue Nothing = Just newValue
87 updateConfdReply newValue (Just currentValue) = Just $
88   if confdReplyStatus newValue == ReplyStatusOk
89       && confdReplySerial newValue > confdReplySerial currentValue
90     then newValue
91     else currentValue
92
93 -- | Send a query to a single server, waits for the result and stores it
94 -- in a shared variable. Then, sends a signal on another shared variable
95 -- acting as a semaphore.
96 -- This function is meant to be used as one of multiple threads querying
97 -- multiple servers in parallel.
98 queryOneServer
99   :: MVar ()                 -- ^ The semaphore that will be signalled
100   -> MVar (Maybe ConfdReply) -- ^ The shared variable for the result
101   -> ConfdRequestType        -- ^ The type of the query to be sent
102   -> ConfdQuery              -- ^ The content of the query
103   -> HashKey                 -- ^ The hmac key to sign the message
104   -> (String, S.PortNumber)  -- ^ The address and port of the server
105   -> IO ()
106 queryOneServer semaphore answer crType cQuery hmac (host, port) = do
107   request <- newConfdRequest crType cQuery
108   timestamp <- fmap show epochTime
109   let signedMsg =
110         signMessage hmac timestamp (J.encodeStrict request)
111       completeMsg = C.confdMagicFourcc ++ J.encodeStrict signedMsg
112   s <- S.socket S.AF_INET S.Datagram S.defaultProtocol
113   hostAddr <- S.inet_addr host
114   _ <- S.sendTo s completeMsg $ S.SockAddrInet port hostAddr
115   replyMsg <- S.recv s C.maxUdpDataSize
116   parsedReply <-
117     if C.confdMagicFourcc `isPrefixOf` replyMsg
118       then return . parseReply hmac (drop 4 replyMsg) $ confdRqRsalt request
119       else fail "Invalid magic code!"
120   reply <-
121     case parsedReply of
122       Ok (_, r) -> return r
123       Bad msg -> fail msg
124   modifyMVar_ answer $! return . updateConfdReply reply
125   putMVar semaphore ()