Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Confd / Client.hs @ 3add7574

History | View | Annotate | Download (4.5 kB)

1
{-| Implementation of the Ganeti Confd client functionality.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2012 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.Confd.Client
27
  ( getConfdClient
28
  , query
29
  ) where
30

    
31
import Control.Concurrent
32
import Control.Monad
33
import Data.List
34
import qualified Network.Socket as S
35
import System.Posix.Time
36
import qualified Text.JSON as J
37

    
38
import Ganeti.BasicTypes
39
import Ganeti.Confd.Types
40
import Ganeti.Confd.Utils
41
import qualified Ganeti.Constants as C
42
import Ganeti.Hash
43
import Ganeti.Ssconf
44

    
45
-- | Builds a properly initialized ConfdClient
46
getConfdClient :: IO ConfdClient
47
getConfdClient = S.withSocketsDo $ do
48
  hmac <- getClusterHmac
49
  candList <- getMasterCandidatesIps Nothing
50
  peerList <-
51
    case candList of
52
      (Ok p) -> return p
53
      (Bad msg) -> fail msg
54
  return . ConfdClient hmac peerList $ fromIntegral C.defaultConfdPort
55

    
56
-- | Sends a query to all the Confd servers the client is connected to.
57
-- Returns the most up-to-date result according to the serial number,
58
-- chosen between those received before the timeout.
59
query :: ConfdClient -> ConfdRequestType -> ConfdQuery -> IO (Maybe ConfdReply)
60
query client crType cQuery = do
61
  semaphore <- newMVar ()
62
  answer <- newMVar Nothing
63
  let dest = [(host, serverPort client) | host <- peers client]
64
      hmac = hmacKey client
65
      jobs = map (queryOneServer semaphore answer crType cQuery hmac) dest
66
      watchdog reqAnswers = do
67
        threadDelay $ 1000000 * C.confdClientExpireTimeout
68
        _ <- swapMVar reqAnswers 0
69
        putMVar semaphore ()
70
      waitForResult reqAnswers = do
71
        _ <- takeMVar semaphore
72
        l <- takeMVar reqAnswers
73
        unless (l == 0) $ do
74
          putMVar reqAnswers $ l - 1
75
          waitForResult reqAnswers
76
  reqAnswers <- newMVar . min C.confdDefaultReqCoverage $ length dest
77
  workers <- mapM forkIO jobs
78
  watcher <- forkIO $ watchdog reqAnswers
79
  waitForResult reqAnswers
80
  mapM_ killThread $ watcher:workers
81
  takeMVar answer
82

    
83
-- | Updates the reply to the query. As per the Confd design document,
84
-- only the reply with the highest serial number is kept.
85
updateConfdReply :: ConfdReply -> Maybe ConfdReply -> Maybe ConfdReply
86
updateConfdReply newValue Nothing = Just newValue
87
updateConfdReply newValue (Just currentValue) = Just $
88
  if confdReplyStatus newValue == ReplyStatusOk
89
      && confdReplySerial newValue > confdReplySerial currentValue
90
    then newValue
91
    else currentValue
92

    
93
-- | Send a query to a single server, waits for the result and stores it
94
-- in a shared variable. Then, sends a signal on another shared variable
95
-- acting as a semaphore.
96
-- This function is meant to be used as one of multiple threads querying
97
-- multiple servers in parallel.
98
queryOneServer
99
  :: MVar ()                 -- ^ The semaphore that will be signalled
100
  -> MVar (Maybe ConfdReply) -- ^ The shared variable for the result
101
  -> ConfdRequestType        -- ^ The type of the query to be sent
102
  -> ConfdQuery              -- ^ The content of the query
103
  -> HashKey                 -- ^ The hmac key to sign the message
104
  -> (String, S.PortNumber)  -- ^ The address and port of the server
105
  -> IO ()
106
queryOneServer semaphore answer crType cQuery hmac (host, port) = do
107
  request <- newConfdRequest crType cQuery
108
  timestamp <- fmap show epochTime
109
  let signedMsg =
110
        signMessage hmac timestamp (J.encodeStrict request)
111
      completeMsg = C.confdMagicFourcc ++ J.encodeStrict signedMsg
112
  s <- S.socket S.AF_INET S.Datagram S.defaultProtocol
113
  hostAddr <- S.inet_addr host
114
  _ <- S.sendTo s completeMsg $ S.SockAddrInet port hostAddr
115
  replyMsg <- S.recv s C.maxUdpDataSize
116
  parsedReply <-
117
    if C.confdMagicFourcc `isPrefixOf` replyMsg
118
      then return . parseReply hmac (drop 4 replyMsg) $ confdRqRsalt request
119
      else fail "Invalid magic code!"
120
  reply <-
121
    case parsedReply of
122
      Ok (_, r) -> return r
123
      Bad msg -> fail msg
124
  modifyMVar_ answer $! return . updateConfdReply reply
125
  putMVar semaphore ()