Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 39292d3a

History | View | Annotate | Download (16.5 kB)

1 e4ccf6cd Guido Trotter
#
2 e4ccf6cd Guido Trotter
#
3 e4ccf6cd Guido Trotter
4 e4ccf6cd Guido Trotter
# Copyright (C) 2009 Google Inc.
5 e4ccf6cd Guido Trotter
#
6 e4ccf6cd Guido Trotter
# This program is free software; you can redistribute it and/or modify
7 e4ccf6cd Guido Trotter
# it under the terms of the GNU General Public License as published by
8 e4ccf6cd Guido Trotter
# the Free Software Foundation; either version 2 of the License, or
9 e4ccf6cd Guido Trotter
# (at your option) any later version.
10 e4ccf6cd Guido Trotter
#
11 e4ccf6cd Guido Trotter
# This program is distributed in the hope that it will be useful, but
12 e4ccf6cd Guido Trotter
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 e4ccf6cd Guido Trotter
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 e4ccf6cd Guido Trotter
# General Public License for more details.
15 e4ccf6cd Guido Trotter
#
16 e4ccf6cd Guido Trotter
# You should have received a copy of the GNU General Public License
17 e4ccf6cd Guido Trotter
# along with this program; if not, write to the Free Software
18 e4ccf6cd Guido Trotter
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 e4ccf6cd Guido Trotter
# 02110-1301, USA.
20 e4ccf6cd Guido Trotter
21 e4ccf6cd Guido Trotter
22 e4ccf6cd Guido Trotter
"""Ganeti confd client
23 e4ccf6cd Guido Trotter

24 cf7b0cc4 Guido Trotter
Clients can use the confd client library to send requests to a group of master
25 cf7b0cc4 Guido Trotter
candidates running confd. The expected usage is through the asyncore framework,
26 cf7b0cc4 Guido Trotter
by sending queries, and asynchronously receiving replies through a callback.
27 cf7b0cc4 Guido Trotter

28 cf7b0cc4 Guido Trotter
This way the client library doesn't ever need to "wait" on a particular answer,
29 cf7b0cc4 Guido Trotter
and can proceed even if some udp packets are lost. It's up to the user to
30 cf7b0cc4 Guido Trotter
reschedule queries if they haven't received responses and they need them.
31 cf7b0cc4 Guido Trotter

32 69b99987 Michael Hanselmann
Example usage::
33 69b99987 Michael Hanselmann

34 cf7b0cc4 Guido Trotter
  client = ConfdClient(...) # includes callback specification
35 cf7b0cc4 Guido Trotter
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 cf7b0cc4 Guido Trotter
  client.SendRequest(req)
37 cf7b0cc4 Guido Trotter
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38 cf7b0cc4 Guido Trotter
  # ... wait ...
39 cf7b0cc4 Guido Trotter
  # And your callback will be called by asyncore, when your query gets a
40 cf7b0cc4 Guido Trotter
  # response, or when it expires.
41 cf7b0cc4 Guido Trotter

42 392ca296 Guido Trotter
You can use the provided ConfdFilterCallback to act as a filter, only passing
43 392ca296 Guido Trotter
"newer" answer to your callback, and filtering out outdated ones, or ones
44 392ca296 Guido Trotter
confirming what you already got.
45 392ca296 Guido Trotter

46 e4ccf6cd Guido Trotter
"""
47 69b99987 Michael Hanselmann
48 6c881c52 Iustin Pop
# pylint: disable-msg=E0203
49 6c881c52 Iustin Pop
50 6c881c52 Iustin Pop
# E0203: Access to member %r before its definition, since we use
51 6c881c52 Iustin Pop
# objects.py which doesn't explicitely initialise its members
52 6c881c52 Iustin Pop
53 e4ccf6cd Guido Trotter
import time
54 e4ccf6cd Guido Trotter
import random
55 e4ccf6cd Guido Trotter
56 e4ccf6cd Guido Trotter
from ganeti import utils
57 e4ccf6cd Guido Trotter
from ganeti import constants
58 e4ccf6cd Guido Trotter
from ganeti import objects
59 e4ccf6cd Guido Trotter
from ganeti import serializer
60 e4ccf6cd Guido Trotter
from ganeti import daemon # contains AsyncUDPSocket
61 e4ccf6cd Guido Trotter
from ganeti import errors
62 e4ccf6cd Guido Trotter
from ganeti import confd
63 5b349fd1 Iustin Pop
from ganeti import ssconf
64 e4ccf6cd Guido Trotter
65 e4ccf6cd Guido Trotter
66 e4ccf6cd Guido Trotter
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
67 e4ccf6cd Guido Trotter
  """Confd udp asyncore client
68 e4ccf6cd Guido Trotter

69 e4ccf6cd Guido Trotter
  This is kept separate from the main ConfdClient to make sure it's easy to
70 e4ccf6cd Guido Trotter
  implement a non-asyncore based client library.
71 e4ccf6cd Guido Trotter

72 e4ccf6cd Guido Trotter
  """
73 e4ccf6cd Guido Trotter
  def __init__(self, client):
74 e4ccf6cd Guido Trotter
    """Constructor for ConfdAsyncUDPClient
75 e4ccf6cd Guido Trotter

76 e4ccf6cd Guido Trotter
    @type client: L{ConfdClient}
77 e4ccf6cd Guido Trotter
    @param client: client library, to pass the datagrams to
78 e4ccf6cd Guido Trotter

79 e4ccf6cd Guido Trotter
    """
80 e4ccf6cd Guido Trotter
    daemon.AsyncUDPSocket.__init__(self)
81 e4ccf6cd Guido Trotter
    self.client = client
82 e4ccf6cd Guido Trotter
83 e4ccf6cd Guido Trotter
  # this method is overriding a daemon.AsyncUDPSocket method
84 e4ccf6cd Guido Trotter
  def handle_datagram(self, payload, ip, port):
85 e4ccf6cd Guido Trotter
    self.client.HandleResponse(payload, ip, port)
86 e4ccf6cd Guido Trotter
87 e4ccf6cd Guido Trotter
88 e4ccf6cd Guido Trotter
class ConfdClient:
89 e4ccf6cd Guido Trotter
  """Send queries to confd, and get back answers.
90 e4ccf6cd Guido Trotter

91 e4ccf6cd Guido Trotter
  Since the confd model works by querying multiple master candidates, and
92 e4ccf6cd Guido Trotter
  getting back answers, this is an asynchronous library. It can either work
93 e4ccf6cd Guido Trotter
  through asyncore or with your own handling.
94 e4ccf6cd Guido Trotter

95 e4ccf6cd Guido Trotter
  """
96 a3db74e4 Guido Trotter
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
97 e4ccf6cd Guido Trotter
    """Constructor for ConfdClient
98 e4ccf6cd Guido Trotter

99 e4ccf6cd Guido Trotter
    @type hmac_key: string
100 e4ccf6cd Guido Trotter
    @param hmac_key: hmac key to talk to confd
101 e4ccf6cd Guido Trotter
    @type peers: list
102 e4ccf6cd Guido Trotter
    @param peers: list of peer nodes
103 96e03b0b Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
104 96e03b0b Guido Trotter
    @param callback: function to call when getting answers
105 7d20c647 Guido Trotter
    @type port: integer
106 d63997b3 Guido Trotter
    @param port: confd port (default: use GetDaemonPort)
107 69b99987 Michael Hanselmann
    @type logger: logging.Logger
108 d63997b3 Guido Trotter
    @param logger: optional logger for internal conditions
109 e4ccf6cd Guido Trotter

110 e4ccf6cd Guido Trotter
    """
111 96e03b0b Guido Trotter
    if not callable(callback):
112 96e03b0b Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
113 e4ccf6cd Guido Trotter
114 a5229439 Guido Trotter
    self.UpdatePeerList(peers)
115 e4ccf6cd Guido Trotter
    self._hmac_key = hmac_key
116 e4ccf6cd Guido Trotter
    self._socket = ConfdAsyncUDPClient(self)
117 96e03b0b Guido Trotter
    self._callback = callback
118 7d20c647 Guido Trotter
    self._confd_port = port
119 a3db74e4 Guido Trotter
    self._logger = logger
120 96e03b0b Guido Trotter
    self._requests = {}
121 96e03b0b Guido Trotter
    self._expire_requests = []
122 7d20c647 Guido Trotter
123 7d20c647 Guido Trotter
    if self._confd_port is None:
124 7d20c647 Guido Trotter
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
125 e4ccf6cd Guido Trotter
126 a5229439 Guido Trotter
  def UpdatePeerList(self, peers):
127 a5229439 Guido Trotter
    """Update the list of peers
128 a5229439 Guido Trotter

129 a5229439 Guido Trotter
    @type peers: list
130 a5229439 Guido Trotter
    @param peers: list of peer nodes
131 a5229439 Guido Trotter

132 a5229439 Guido Trotter
    """
133 e11ddf13 Iustin Pop
    # we are actually called from init, so:
134 e11ddf13 Iustin Pop
    # pylint: disable-msg=W0201
135 a5229439 Guido Trotter
    if not isinstance(peers, list):
136 a5229439 Guido Trotter
      raise errors.ProgrammerError("peers must be a list")
137 db169865 Guido Trotter
    # make a copy of peers, since we're going to shuffle the list, later
138 db169865 Guido Trotter
    self._peers = list(peers)
139 a5229439 Guido Trotter
140 e4ccf6cd Guido Trotter
  def _PackRequest(self, request, now=None):
141 e4ccf6cd Guido Trotter
    """Prepare a request to be sent on the wire.
142 e4ccf6cd Guido Trotter

143 e4ccf6cd Guido Trotter
    This function puts a proper salt in a confd request, puts the proper salt,
144 e4ccf6cd Guido Trotter
    and adds the correct magic number.
145 e4ccf6cd Guido Trotter

146 e4ccf6cd Guido Trotter
    """
147 e4ccf6cd Guido Trotter
    if now is None:
148 e4ccf6cd Guido Trotter
      now = time.time()
149 e4ccf6cd Guido Trotter
    tstamp = '%d' % now
150 e4ccf6cd Guido Trotter
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
151 e4ccf6cd Guido Trotter
    return confd.PackMagic(req)
152 e4ccf6cd Guido Trotter
153 e4ccf6cd Guido Trotter
  def _UnpackReply(self, payload):
154 e4ccf6cd Guido Trotter
    in_payload = confd.UnpackMagic(payload)
155 c103d7ae Guido Trotter
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
156 c103d7ae Guido Trotter
    answer = objects.ConfdReply.FromDict(dict_answer)
157 e4ccf6cd Guido Trotter
    return answer, salt
158 e4ccf6cd Guido Trotter
159 96e03b0b Guido Trotter
  def ExpireRequests(self):
160 96e03b0b Guido Trotter
    """Delete all the expired requests.
161 e4ccf6cd Guido Trotter

162 e4ccf6cd Guido Trotter
    """
163 e4ccf6cd Guido Trotter
    now = time.time()
164 96e03b0b Guido Trotter
    while self._expire_requests:
165 96e03b0b Guido Trotter
      expire_time, rsalt = self._expire_requests[0]
166 e4ccf6cd Guido Trotter
      if now >= expire_time:
167 96e03b0b Guido Trotter
        self._expire_requests.pop(0)
168 96e03b0b Guido Trotter
        (request, args) = self._requests[rsalt]
169 96e03b0b Guido Trotter
        del self._requests[rsalt]
170 96e03b0b Guido Trotter
        client_reply = ConfdUpcallPayload(salt=rsalt,
171 96e03b0b Guido Trotter
                                          type=UPCALL_EXPIRE,
172 96e03b0b Guido Trotter
                                          orig_request=request,
173 5f6f260a Guido Trotter
                                          extra_args=args,
174 5f6f260a Guido Trotter
                                          client=self,
175 5f6f260a Guido Trotter
                                          )
176 96e03b0b Guido Trotter
        self._callback(client_reply)
177 e4ccf6cd Guido Trotter
      else:
178 e4ccf6cd Guido Trotter
        break
179 e4ccf6cd Guido Trotter
180 d63997b3 Guido Trotter
  def SendRequest(self, request, args=None, coverage=None, async=True):
181 e4ccf6cd Guido Trotter
    """Send a confd request to some MCs
182 e4ccf6cd Guido Trotter

183 e4ccf6cd Guido Trotter
    @type request: L{objects.ConfdRequest}
184 e4ccf6cd Guido Trotter
    @param request: the request to send
185 e4ccf6cd Guido Trotter
    @type args: tuple
186 d63997b3 Guido Trotter
    @param args: additional callback arguments
187 e4ccf6cd Guido Trotter
    @type coverage: integer
188 d63997b3 Guido Trotter
    @param coverage: number of remote nodes to contact
189 8496d93c Guido Trotter
    @type async: boolean
190 8496d93c Guido Trotter
    @param async: handle the write asynchronously
191 e4ccf6cd Guido Trotter

192 e4ccf6cd Guido Trotter
    """
193 e4ccf6cd Guido Trotter
    if coverage is None:
194 e4ccf6cd Guido Trotter
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
195 e4ccf6cd Guido Trotter
196 e4ccf6cd Guido Trotter
    if coverage > len(self._peers):
197 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Not enough MCs known to provide the"
198 e4ccf6cd Guido Trotter
                                    " desired coverage")
199 e4ccf6cd Guido Trotter
200 e4ccf6cd Guido Trotter
    if not request.rsalt:
201 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Missing request rsalt")
202 e4ccf6cd Guido Trotter
203 96e03b0b Guido Trotter
    self.ExpireRequests()
204 96e03b0b Guido Trotter
    if request.rsalt in self._requests:
205 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Duplicate request rsalt")
206 e4ccf6cd Guido Trotter
207 e4ccf6cd Guido Trotter
    if request.type not in constants.CONFD_REQS:
208 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Invalid request type")
209 e4ccf6cd Guido Trotter
210 e4ccf6cd Guido Trotter
    random.shuffle(self._peers)
211 e4ccf6cd Guido Trotter
    targets = self._peers[:coverage]
212 e4ccf6cd Guido Trotter
213 e4ccf6cd Guido Trotter
    now = time.time()
214 e4ccf6cd Guido Trotter
    payload = self._PackRequest(request, now=now)
215 e4ccf6cd Guido Trotter
216 e4ccf6cd Guido Trotter
    for target in targets:
217 e4ccf6cd Guido Trotter
      try:
218 e4ccf6cd Guido Trotter
        self._socket.enqueue_send(target, self._confd_port, payload)
219 e4ccf6cd Guido Trotter
      except errors.UdpDataSizeError:
220 e4ccf6cd Guido Trotter
        raise errors.ConfdClientError("Request too big")
221 e4ccf6cd Guido Trotter
222 96e03b0b Guido Trotter
    self._requests[request.rsalt] = (request, args)
223 e4ccf6cd Guido Trotter
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
224 96e03b0b Guido Trotter
    self._expire_requests.append((expire_time, request.rsalt))
225 e4ccf6cd Guido Trotter
226 8496d93c Guido Trotter
    if not async:
227 8496d93c Guido Trotter
      self.FlushSendQueue()
228 8496d93c Guido Trotter
229 e4ccf6cd Guido Trotter
  def HandleResponse(self, payload, ip, port):
230 e4ccf6cd Guido Trotter
    """Asynchronous handler for a confd reply
231 e4ccf6cd Guido Trotter

232 e4ccf6cd Guido Trotter
    Call the relevant callback associated to the current request.
233 e4ccf6cd Guido Trotter

234 e4ccf6cd Guido Trotter
    """
235 e4ccf6cd Guido Trotter
    try:
236 e4ccf6cd Guido Trotter
      try:
237 e4ccf6cd Guido Trotter
        answer, salt = self._UnpackReply(payload)
238 a3db74e4 Guido Trotter
      except (errors.SignatureError, errors.ConfdMagicError), err:
239 a3db74e4 Guido Trotter
        if self._logger:
240 a3db74e4 Guido Trotter
          self._logger.debug("Discarding broken package: %s" % err)
241 e4ccf6cd Guido Trotter
        return
242 e4ccf6cd Guido Trotter
243 e4ccf6cd Guido Trotter
      try:
244 96e03b0b Guido Trotter
        (request, args) = self._requests[salt]
245 e4ccf6cd Guido Trotter
      except KeyError:
246 a3db74e4 Guido Trotter
        if self._logger:
247 a3db74e4 Guido Trotter
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
248 96e03b0b Guido Trotter
        return
249 96e03b0b Guido Trotter
250 96e03b0b Guido Trotter
      client_reply = ConfdUpcallPayload(salt=salt,
251 96e03b0b Guido Trotter
                                        type=UPCALL_REPLY,
252 96e03b0b Guido Trotter
                                        server_reply=answer,
253 96e03b0b Guido Trotter
                                        orig_request=request,
254 96e03b0b Guido Trotter
                                        server_ip=ip,
255 96e03b0b Guido Trotter
                                        server_port=port,
256 5f6f260a Guido Trotter
                                        extra_args=args,
257 5f6f260a Guido Trotter
                                        client=self,
258 5f6f260a Guido Trotter
                                       )
259 96e03b0b Guido Trotter
      self._callback(client_reply)
260 e4ccf6cd Guido Trotter
261 e4ccf6cd Guido Trotter
    finally:
262 96e03b0b Guido Trotter
      self.ExpireRequests()
263 96e03b0b Guido Trotter
264 8496d93c Guido Trotter
  def FlushSendQueue(self):
265 8496d93c Guido Trotter
    """Send out all pending requests.
266 8496d93c Guido Trotter

267 8496d93c Guido Trotter
    Can be used for synchronous client use.
268 8496d93c Guido Trotter

269 8496d93c Guido Trotter
    """
270 8496d93c Guido Trotter
    while self._socket.writable():
271 8496d93c Guido Trotter
      self._socket.handle_write()
272 8496d93c Guido Trotter
273 8496d93c Guido Trotter
  def ReceiveReply(self, timeout=1):
274 8496d93c Guido Trotter
    """Receive one reply.
275 8496d93c Guido Trotter

276 8496d93c Guido Trotter
    @type timeout: float
277 8496d93c Guido Trotter
    @param timeout: how long to wait for the reply
278 8496d93c Guido Trotter
    @rtype: boolean
279 8496d93c Guido Trotter
    @return: True if some data has been handled, False otherwise
280 8496d93c Guido Trotter

281 8496d93c Guido Trotter
    """
282 8496d93c Guido Trotter
    return self._socket.process_next_packet(timeout=timeout)
283 8496d93c Guido Trotter
284 96e03b0b Guido Trotter
285 96e03b0b Guido Trotter
# UPCALL_REPLY: server reply upcall
286 96e03b0b Guido Trotter
# has all ConfdUpcallPayload fields populated
287 96e03b0b Guido Trotter
UPCALL_REPLY = 1
288 96e03b0b Guido Trotter
# UPCALL_EXPIRE: internal library request expire
289 96e03b0b Guido Trotter
# has only salt, type, orig_request and extra_args
290 96e03b0b Guido Trotter
UPCALL_EXPIRE = 2
291 96e03b0b Guido Trotter
CONFD_UPCALL_TYPES = frozenset([
292 96e03b0b Guido Trotter
  UPCALL_REPLY,
293 96e03b0b Guido Trotter
  UPCALL_EXPIRE,
294 96e03b0b Guido Trotter
  ])
295 96e03b0b Guido Trotter
296 96e03b0b Guido Trotter
297 96e03b0b Guido Trotter
class ConfdUpcallPayload(objects.ConfigObject):
298 96e03b0b Guido Trotter
  """Callback argument for confd replies
299 96e03b0b Guido Trotter

300 96e03b0b Guido Trotter
  @type salt: string
301 96e03b0b Guido Trotter
  @ivar salt: salt associated with the query
302 96e03b0b Guido Trotter
  @type type: one of confd.client.CONFD_UPCALL_TYPES
303 96e03b0b Guido Trotter
  @ivar type: upcall type (server reply, expired request, ...)
304 96e03b0b Guido Trotter
  @type orig_request: L{objects.ConfdRequest}
305 96e03b0b Guido Trotter
  @ivar orig_request: original request
306 96e03b0b Guido Trotter
  @type server_reply: L{objects.ConfdReply}
307 96e03b0b Guido Trotter
  @ivar server_reply: server reply
308 96e03b0b Guido Trotter
  @type server_ip: string
309 96e03b0b Guido Trotter
  @ivar server_ip: answering server ip address
310 96e03b0b Guido Trotter
  @type server_port: int
311 96e03b0b Guido Trotter
  @ivar server_port: answering server port
312 96e03b0b Guido Trotter
  @type extra_args: any
313 96e03b0b Guido Trotter
  @ivar extra_args: 'args' argument of the SendRequest function
314 5f6f260a Guido Trotter
  @type client: L{ConfdClient}
315 5f6f260a Guido Trotter
  @ivar client: current confd client instance
316 96e03b0b Guido Trotter

317 96e03b0b Guido Trotter
  """
318 96e03b0b Guido Trotter
  __slots__ = [
319 96e03b0b Guido Trotter
    "salt",
320 96e03b0b Guido Trotter
    "type",
321 96e03b0b Guido Trotter
    "orig_request",
322 96e03b0b Guido Trotter
    "server_reply",
323 96e03b0b Guido Trotter
    "server_ip",
324 96e03b0b Guido Trotter
    "server_port",
325 96e03b0b Guido Trotter
    "extra_args",
326 5f6f260a Guido Trotter
    "client",
327 96e03b0b Guido Trotter
    ]
328 e4ccf6cd Guido Trotter
329 e4ccf6cd Guido Trotter
330 e4ccf6cd Guido Trotter
class ConfdClientRequest(objects.ConfdRequest):
331 e4ccf6cd Guido Trotter
  """This is the client-side version of ConfdRequest.
332 e4ccf6cd Guido Trotter

333 e4ccf6cd Guido Trotter
  This version of the class helps creating requests, on the client side, by
334 e4ccf6cd Guido Trotter
  filling in some default values.
335 e4ccf6cd Guido Trotter

336 e4ccf6cd Guido Trotter
  """
337 e4ccf6cd Guido Trotter
  def __init__(self, **kwargs):
338 e4ccf6cd Guido Trotter
    objects.ConfdRequest.__init__(self, **kwargs)
339 e4ccf6cd Guido Trotter
    if not self.rsalt:
340 e4ccf6cd Guido Trotter
      self.rsalt = utils.NewUUID()
341 e4ccf6cd Guido Trotter
    if not self.protocol:
342 e4ccf6cd Guido Trotter
      self.protocol = constants.CONFD_PROTOCOL_VERSION
343 e4ccf6cd Guido Trotter
    if self.type not in constants.CONFD_REQS:
344 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Invalid request type")
345 e4ccf6cd Guido Trotter
346 392ca296 Guido Trotter
347 392ca296 Guido Trotter
class ConfdFilterCallback:
348 392ca296 Guido Trotter
  """Callback that calls another callback, but filters duplicate results.
349 392ca296 Guido Trotter

350 49b3fdac Iustin Pop
  @ivar consistent: a dictionary indexed by salt; for each salt, if
351 49b3fdac Iustin Pop
      all responses ware identical, this will be True; this is the
352 49b3fdac Iustin Pop
      expected state on a healthy cluster; on inconsistent or
353 49b3fdac Iustin Pop
      partitioned clusters, this might be False, if we see answers
354 49b3fdac Iustin Pop
      with the same serial but different contents
355 49b3fdac Iustin Pop

356 392ca296 Guido Trotter
  """
357 392ca296 Guido Trotter
  def __init__(self, callback, logger=None):
358 392ca296 Guido Trotter
    """Constructor for ConfdFilterCallback
359 392ca296 Guido Trotter

360 392ca296 Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
361 392ca296 Guido Trotter
    @param callback: function to call when getting answers
362 69b99987 Michael Hanselmann
    @type logger: logging.Logger
363 d63997b3 Guido Trotter
    @param logger: optional logger for internal conditions
364 392ca296 Guido Trotter

365 392ca296 Guido Trotter
    """
366 392ca296 Guido Trotter
    if not callable(callback):
367 392ca296 Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
368 392ca296 Guido Trotter
369 392ca296 Guido Trotter
    self._callback = callback
370 392ca296 Guido Trotter
    self._logger = logger
371 392ca296 Guido Trotter
    # answers contains a dict of salt -> answer
372 392ca296 Guido Trotter
    self._answers = {}
373 49b3fdac Iustin Pop
    self.consistent = {}
374 392ca296 Guido Trotter
375 392ca296 Guido Trotter
  def _LogFilter(self, salt, new_reply, old_reply):
376 392ca296 Guido Trotter
    if not self._logger:
377 392ca296 Guido Trotter
      return
378 392ca296 Guido Trotter
379 392ca296 Guido Trotter
    if new_reply.serial > old_reply.serial:
380 392ca296 Guido Trotter
      self._logger.debug("Filtering confirming answer, with newer"
381 392ca296 Guido Trotter
                         " serial for query %s" % salt)
382 392ca296 Guido Trotter
    elif new_reply.serial == old_reply.serial:
383 392ca296 Guido Trotter
      if new_reply.answer != old_reply.answer:
384 392ca296 Guido Trotter
        self._logger.warning("Got incoherent answers for query %s"
385 392ca296 Guido Trotter
                             " (serial: %s)" % (salt, new_reply.serial))
386 392ca296 Guido Trotter
      else:
387 392ca296 Guido Trotter
        self._logger.debug("Filtering confirming answer, with same"
388 392ca296 Guido Trotter
                           " serial for query %s" % salt)
389 392ca296 Guido Trotter
    else:
390 392ca296 Guido Trotter
      self._logger.debug("Filtering outdated answer for query %s"
391 392ca296 Guido Trotter
                         " serial: (%d < %d)" % (salt, old_reply.serial,
392 392ca296 Guido Trotter
                                                 new_reply.serial))
393 392ca296 Guido Trotter
394 392ca296 Guido Trotter
  def _HandleExpire(self, up):
395 392ca296 Guido Trotter
    # if we have no answer we have received none, before the expiration.
396 a9613def Guido Trotter
    if up.salt in self._answers:
397 a9613def Guido Trotter
      del self._answers[up.salt]
398 49b3fdac Iustin Pop
    if up.salt in self.consistent:
399 49b3fdac Iustin Pop
      del self.consistent[up.salt]
400 392ca296 Guido Trotter
401 392ca296 Guido Trotter
  def _HandleReply(self, up):
402 392ca296 Guido Trotter
    """Handle a single confd reply, and decide whether to filter it.
403 392ca296 Guido Trotter

404 392ca296 Guido Trotter
    @rtype: boolean
405 392ca296 Guido Trotter
    @return: True if the reply should be filtered, False if it should be passed
406 392ca296 Guido Trotter
             on to the up-callback
407 392ca296 Guido Trotter

408 392ca296 Guido Trotter
    """
409 392ca296 Guido Trotter
    filter_upcall = False
410 392ca296 Guido Trotter
    salt = up.salt
411 49b3fdac Iustin Pop
    if salt not in self.consistent:
412 49b3fdac Iustin Pop
      self.consistent[salt] = True
413 392ca296 Guido Trotter
    if salt not in self._answers:
414 392ca296 Guido Trotter
      # first answer for a query (don't filter, and record)
415 392ca296 Guido Trotter
      self._answers[salt] = up.server_reply
416 392ca296 Guido Trotter
    elif up.server_reply.serial > self._answers[salt].serial:
417 392ca296 Guido Trotter
      # newer answer (record, and compare contents)
418 392ca296 Guido Trotter
      old_answer = self._answers[salt]
419 392ca296 Guido Trotter
      self._answers[salt] = up.server_reply
420 392ca296 Guido Trotter
      if up.server_reply.answer == old_answer.answer:
421 392ca296 Guido Trotter
        # same content (filter) (version upgrade was unrelated)
422 392ca296 Guido Trotter
        filter_upcall = True
423 392ca296 Guido Trotter
        self._LogFilter(salt, up.server_reply, old_answer)
424 392ca296 Guido Trotter
      # else: different content, pass up a second answer
425 392ca296 Guido Trotter
    else:
426 392ca296 Guido Trotter
      # older or same-version answer (duplicate or outdated, filter)
427 39292d3a Iustin Pop
      if (up.server_reply.serial == self._answers[salt].serial and
428 39292d3a Iustin Pop
          up.server_reply.answer != self._answers[salt].answer):
429 49b3fdac Iustin Pop
        self.consistent[salt] = False
430 392ca296 Guido Trotter
      filter_upcall = True
431 392ca296 Guido Trotter
      self._LogFilter(salt, up.server_reply, self._answers[salt])
432 392ca296 Guido Trotter
433 392ca296 Guido Trotter
    return filter_upcall
434 392ca296 Guido Trotter
435 392ca296 Guido Trotter
  def __call__(self, up):
436 392ca296 Guido Trotter
    """Filtering callback
437 392ca296 Guido Trotter

438 392ca296 Guido Trotter
    @type up: L{ConfdUpcallPayload}
439 392ca296 Guido Trotter
    @param up: upper callback
440 392ca296 Guido Trotter

441 392ca296 Guido Trotter
    """
442 392ca296 Guido Trotter
    filter_upcall = False
443 392ca296 Guido Trotter
    if up.type == UPCALL_REPLY:
444 392ca296 Guido Trotter
      filter_upcall = self._HandleReply(up)
445 392ca296 Guido Trotter
    elif up.type == UPCALL_EXPIRE:
446 392ca296 Guido Trotter
      self._HandleExpire(up)
447 392ca296 Guido Trotter
448 392ca296 Guido Trotter
    if not filter_upcall:
449 392ca296 Guido Trotter
      self._callback(up)
450 04cdf663 Guido Trotter
451 04cdf663 Guido Trotter
452 04cdf663 Guido Trotter
class ConfdCountingCallback:
453 04cdf663 Guido Trotter
  """Callback that calls another callback, and counts the answers
454 04cdf663 Guido Trotter

455 04cdf663 Guido Trotter
  """
456 04cdf663 Guido Trotter
  def __init__(self, callback, logger=None):
457 04cdf663 Guido Trotter
    """Constructor for ConfdCountingCallback
458 04cdf663 Guido Trotter

459 04cdf663 Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
460 04cdf663 Guido Trotter
    @param callback: function to call when getting answers
461 04cdf663 Guido Trotter
    @type logger: logging.Logger
462 04cdf663 Guido Trotter
    @param logger: optional logger for internal conditions
463 04cdf663 Guido Trotter

464 04cdf663 Guido Trotter
    """
465 04cdf663 Guido Trotter
    if not callable(callback):
466 04cdf663 Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
467 04cdf663 Guido Trotter
468 04cdf663 Guido Trotter
    self._callback = callback
469 04cdf663 Guido Trotter
    self._logger = logger
470 04cdf663 Guido Trotter
    # answers contains a dict of salt -> count
471 04cdf663 Guido Trotter
    self._answers = {}
472 04cdf663 Guido Trotter
473 04cdf663 Guido Trotter
  def RegisterQuery(self, salt):
474 04cdf663 Guido Trotter
    if salt in self._answers:
475 04cdf663 Guido Trotter
      raise errors.ProgrammerError("query already registered")
476 04cdf663 Guido Trotter
    self._answers[salt] = 0
477 04cdf663 Guido Trotter
478 04cdf663 Guido Trotter
  def AllAnswered(self):
479 04cdf663 Guido Trotter
    """Have all the registered queries received at least an answer?
480 04cdf663 Guido Trotter

481 04cdf663 Guido Trotter
    """
482 04cdf663 Guido Trotter
    return utils.all(self._answers.values())
483 04cdf663 Guido Trotter
484 04cdf663 Guido Trotter
  def _HandleExpire(self, up):
485 04cdf663 Guido Trotter
    # if we have no answer we have received none, before the expiration.
486 04cdf663 Guido Trotter
    if up.salt in self._answers:
487 04cdf663 Guido Trotter
      del self._answers[up.salt]
488 04cdf663 Guido Trotter
489 04cdf663 Guido Trotter
  def _HandleReply(self, up):
490 04cdf663 Guido Trotter
    """Handle a single confd reply, and decide whether to filter it.
491 04cdf663 Guido Trotter

492 04cdf663 Guido Trotter
    @rtype: boolean
493 04cdf663 Guido Trotter
    @return: True if the reply should be filtered, False if it should be passed
494 04cdf663 Guido Trotter
             on to the up-callback
495 04cdf663 Guido Trotter

496 04cdf663 Guido Trotter
    """
497 04cdf663 Guido Trotter
    if up.salt in self._answers:
498 04cdf663 Guido Trotter
      self._answers[up.salt] += 1
499 04cdf663 Guido Trotter
500 04cdf663 Guido Trotter
  def __call__(self, up):
501 04cdf663 Guido Trotter
    """Filtering callback
502 04cdf663 Guido Trotter

503 04cdf663 Guido Trotter
    @type up: L{ConfdUpcallPayload}
504 04cdf663 Guido Trotter
    @param up: upper callback
505 04cdf663 Guido Trotter

506 04cdf663 Guido Trotter
    """
507 04cdf663 Guido Trotter
    if up.type == UPCALL_REPLY:
508 04cdf663 Guido Trotter
      self._HandleReply(up)
509 04cdf663 Guido Trotter
    elif up.type == UPCALL_EXPIRE:
510 04cdf663 Guido Trotter
      self._HandleExpire(up)
511 04cdf663 Guido Trotter
    self._callback(up)
512 5b349fd1 Iustin Pop
513 5b349fd1 Iustin Pop
def GetConfdClient(callback):
514 5b349fd1 Iustin Pop
  """Return a client configured using the given callback.
515 5b349fd1 Iustin Pop

516 5b349fd1 Iustin Pop
  This is handy to abstract the MC list and HMAC key reading.
517 5b349fd1 Iustin Pop

518 5b349fd1 Iustin Pop
  @attention: This should only be called on nodes which are part of a
519 5b349fd1 Iustin Pop
      cluster, since it depends on a valid (ganeti) data directory;
520 5b349fd1 Iustin Pop
      for code running outside of a cluster, you need to create the
521 5b349fd1 Iustin Pop
      client manually
522 5b349fd1 Iustin Pop

523 5b349fd1 Iustin Pop
  """
524 5b349fd1 Iustin Pop
  ss = ssconf.SimpleStore()
525 5b349fd1 Iustin Pop
  mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
526 5b349fd1 Iustin Pop
  mc_list = utils.ReadFile(mc_file).splitlines()
527 5b349fd1 Iustin Pop
  hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
528 5b349fd1 Iustin Pop
  return ConfdClient(hmac_key, mc_list, callback)