Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 04cdf663

History | View | Annotate | Download (15.3 kB)

1 e4ccf6cd Guido Trotter
#
2 e4ccf6cd Guido Trotter
#
3 e4ccf6cd Guido Trotter
4 e4ccf6cd Guido Trotter
# Copyright (C) 2009 Google Inc.
5 e4ccf6cd Guido Trotter
#
6 e4ccf6cd Guido Trotter
# This program is free software; you can redistribute it and/or modify
7 e4ccf6cd Guido Trotter
# it under the terms of the GNU General Public License as published by
8 e4ccf6cd Guido Trotter
# the Free Software Foundation; either version 2 of the License, or
9 e4ccf6cd Guido Trotter
# (at your option) any later version.
10 e4ccf6cd Guido Trotter
#
11 e4ccf6cd Guido Trotter
# This program is distributed in the hope that it will be useful, but
12 e4ccf6cd Guido Trotter
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 e4ccf6cd Guido Trotter
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 e4ccf6cd Guido Trotter
# General Public License for more details.
15 e4ccf6cd Guido Trotter
#
16 e4ccf6cd Guido Trotter
# You should have received a copy of the GNU General Public License
17 e4ccf6cd Guido Trotter
# along with this program; if not, write to the Free Software
18 e4ccf6cd Guido Trotter
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 e4ccf6cd Guido Trotter
# 02110-1301, USA.
20 e4ccf6cd Guido Trotter
21 e4ccf6cd Guido Trotter
22 e4ccf6cd Guido Trotter
"""Ganeti confd client
23 e4ccf6cd Guido Trotter

24 cf7b0cc4 Guido Trotter
Clients can use the confd client library to send requests to a group of master
25 cf7b0cc4 Guido Trotter
candidates running confd. The expected usage is through the asyncore framework,
26 cf7b0cc4 Guido Trotter
by sending queries, and asynchronously receiving replies through a callback.
27 cf7b0cc4 Guido Trotter

28 cf7b0cc4 Guido Trotter
This way the client library doesn't ever need to "wait" on a particular answer,
29 cf7b0cc4 Guido Trotter
and can proceed even if some udp packets are lost. It's up to the user to
30 cf7b0cc4 Guido Trotter
reschedule queries if they haven't received responses and they need them.
31 cf7b0cc4 Guido Trotter

32 69b99987 Michael Hanselmann
Example usage::
33 69b99987 Michael Hanselmann

34 cf7b0cc4 Guido Trotter
  client = ConfdClient(...) # includes callback specification
35 cf7b0cc4 Guido Trotter
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 cf7b0cc4 Guido Trotter
  client.SendRequest(req)
37 cf7b0cc4 Guido Trotter
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38 cf7b0cc4 Guido Trotter
  # ... wait ...
39 cf7b0cc4 Guido Trotter
  # And your callback will be called by asyncore, when your query gets a
40 cf7b0cc4 Guido Trotter
  # response, or when it expires.
41 cf7b0cc4 Guido Trotter

42 392ca296 Guido Trotter
You can use the provided ConfdFilterCallback to act as a filter, only passing
43 392ca296 Guido Trotter
"newer" answer to your callback, and filtering out outdated ones, or ones
44 392ca296 Guido Trotter
confirming what you already got.
45 392ca296 Guido Trotter

46 e4ccf6cd Guido Trotter
"""
47 69b99987 Michael Hanselmann
48 6c881c52 Iustin Pop
# pylint: disable-msg=E0203
49 6c881c52 Iustin Pop
50 6c881c52 Iustin Pop
# E0203: Access to member %r before its definition, since we use
51 6c881c52 Iustin Pop
# objects.py which doesn't explicitely initialise its members
52 6c881c52 Iustin Pop
53 e4ccf6cd Guido Trotter
import time
54 e4ccf6cd Guido Trotter
import random
55 e4ccf6cd Guido Trotter
56 e4ccf6cd Guido Trotter
from ganeti import utils
57 e4ccf6cd Guido Trotter
from ganeti import constants
58 e4ccf6cd Guido Trotter
from ganeti import objects
59 e4ccf6cd Guido Trotter
from ganeti import serializer
60 e4ccf6cd Guido Trotter
from ganeti import daemon # contains AsyncUDPSocket
61 e4ccf6cd Guido Trotter
from ganeti import errors
62 e4ccf6cd Guido Trotter
from ganeti import confd
63 e4ccf6cd Guido Trotter
64 e4ccf6cd Guido Trotter
65 e4ccf6cd Guido Trotter
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
66 e4ccf6cd Guido Trotter
  """Confd udp asyncore client
67 e4ccf6cd Guido Trotter

68 e4ccf6cd Guido Trotter
  This is kept separate from the main ConfdClient to make sure it's easy to
69 e4ccf6cd Guido Trotter
  implement a non-asyncore based client library.
70 e4ccf6cd Guido Trotter

71 e4ccf6cd Guido Trotter
  """
72 e4ccf6cd Guido Trotter
  def __init__(self, client):
73 e4ccf6cd Guido Trotter
    """Constructor for ConfdAsyncUDPClient
74 e4ccf6cd Guido Trotter

75 e4ccf6cd Guido Trotter
    @type client: L{ConfdClient}
76 e4ccf6cd Guido Trotter
    @param client: client library, to pass the datagrams to
77 e4ccf6cd Guido Trotter

78 e4ccf6cd Guido Trotter
    """
79 e4ccf6cd Guido Trotter
    daemon.AsyncUDPSocket.__init__(self)
80 e4ccf6cd Guido Trotter
    self.client = client
81 e4ccf6cd Guido Trotter
82 e4ccf6cd Guido Trotter
  # this method is overriding a daemon.AsyncUDPSocket method
83 e4ccf6cd Guido Trotter
  def handle_datagram(self, payload, ip, port):
84 e4ccf6cd Guido Trotter
    self.client.HandleResponse(payload, ip, port)
85 e4ccf6cd Guido Trotter
86 e4ccf6cd Guido Trotter
87 e4ccf6cd Guido Trotter
class ConfdClient:
88 e4ccf6cd Guido Trotter
  """Send queries to confd, and get back answers.
89 e4ccf6cd Guido Trotter

90 e4ccf6cd Guido Trotter
  Since the confd model works by querying multiple master candidates, and
91 e4ccf6cd Guido Trotter
  getting back answers, this is an asynchronous library. It can either work
92 e4ccf6cd Guido Trotter
  through asyncore or with your own handling.
93 e4ccf6cd Guido Trotter

94 e4ccf6cd Guido Trotter
  """
95 a3db74e4 Guido Trotter
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
96 e4ccf6cd Guido Trotter
    """Constructor for ConfdClient
97 e4ccf6cd Guido Trotter

98 e4ccf6cd Guido Trotter
    @type hmac_key: string
99 e4ccf6cd Guido Trotter
    @param hmac_key: hmac key to talk to confd
100 e4ccf6cd Guido Trotter
    @type peers: list
101 e4ccf6cd Guido Trotter
    @param peers: list of peer nodes
102 96e03b0b Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
103 96e03b0b Guido Trotter
    @param callback: function to call when getting answers
104 7d20c647 Guido Trotter
    @type port: integer
105 d63997b3 Guido Trotter
    @param port: confd port (default: use GetDaemonPort)
106 69b99987 Michael Hanselmann
    @type logger: logging.Logger
107 d63997b3 Guido Trotter
    @param logger: optional logger for internal conditions
108 e4ccf6cd Guido Trotter

109 e4ccf6cd Guido Trotter
    """
110 96e03b0b Guido Trotter
    if not callable(callback):
111 96e03b0b Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
112 e4ccf6cd Guido Trotter
113 a5229439 Guido Trotter
    self.UpdatePeerList(peers)
114 e4ccf6cd Guido Trotter
    self._hmac_key = hmac_key
115 e4ccf6cd Guido Trotter
    self._socket = ConfdAsyncUDPClient(self)
116 96e03b0b Guido Trotter
    self._callback = callback
117 7d20c647 Guido Trotter
    self._confd_port = port
118 a3db74e4 Guido Trotter
    self._logger = logger
119 96e03b0b Guido Trotter
    self._requests = {}
120 96e03b0b Guido Trotter
    self._expire_requests = []
121 7d20c647 Guido Trotter
122 7d20c647 Guido Trotter
    if self._confd_port is None:
123 7d20c647 Guido Trotter
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
124 e4ccf6cd Guido Trotter
125 a5229439 Guido Trotter
  def UpdatePeerList(self, peers):
126 a5229439 Guido Trotter
    """Update the list of peers
127 a5229439 Guido Trotter

128 a5229439 Guido Trotter
    @type peers: list
129 a5229439 Guido Trotter
    @param peers: list of peer nodes
130 a5229439 Guido Trotter

131 a5229439 Guido Trotter
    """
132 e11ddf13 Iustin Pop
    # we are actually called from init, so:
133 e11ddf13 Iustin Pop
    # pylint: disable-msg=W0201
134 a5229439 Guido Trotter
    if not isinstance(peers, list):
135 a5229439 Guido Trotter
      raise errors.ProgrammerError("peers must be a list")
136 db169865 Guido Trotter
    # make a copy of peers, since we're going to shuffle the list, later
137 db169865 Guido Trotter
    self._peers = list(peers)
138 a5229439 Guido Trotter
139 e4ccf6cd Guido Trotter
  def _PackRequest(self, request, now=None):
140 e4ccf6cd Guido Trotter
    """Prepare a request to be sent on the wire.
141 e4ccf6cd Guido Trotter

142 e4ccf6cd Guido Trotter
    This function puts a proper salt in a confd request, puts the proper salt,
143 e4ccf6cd Guido Trotter
    and adds the correct magic number.
144 e4ccf6cd Guido Trotter

145 e4ccf6cd Guido Trotter
    """
146 e4ccf6cd Guido Trotter
    if now is None:
147 e4ccf6cd Guido Trotter
      now = time.time()
148 e4ccf6cd Guido Trotter
    tstamp = '%d' % now
149 e4ccf6cd Guido Trotter
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
150 e4ccf6cd Guido Trotter
    return confd.PackMagic(req)
151 e4ccf6cd Guido Trotter
152 e4ccf6cd Guido Trotter
  def _UnpackReply(self, payload):
153 e4ccf6cd Guido Trotter
    in_payload = confd.UnpackMagic(payload)
154 c103d7ae Guido Trotter
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
155 c103d7ae Guido Trotter
    answer = objects.ConfdReply.FromDict(dict_answer)
156 e4ccf6cd Guido Trotter
    return answer, salt
157 e4ccf6cd Guido Trotter
158 96e03b0b Guido Trotter
  def ExpireRequests(self):
159 96e03b0b Guido Trotter
    """Delete all the expired requests.
160 e4ccf6cd Guido Trotter

161 e4ccf6cd Guido Trotter
    """
162 e4ccf6cd Guido Trotter
    now = time.time()
163 96e03b0b Guido Trotter
    while self._expire_requests:
164 96e03b0b Guido Trotter
      expire_time, rsalt = self._expire_requests[0]
165 e4ccf6cd Guido Trotter
      if now >= expire_time:
166 96e03b0b Guido Trotter
        self._expire_requests.pop(0)
167 96e03b0b Guido Trotter
        (request, args) = self._requests[rsalt]
168 96e03b0b Guido Trotter
        del self._requests[rsalt]
169 96e03b0b Guido Trotter
        client_reply = ConfdUpcallPayload(salt=rsalt,
170 96e03b0b Guido Trotter
                                          type=UPCALL_EXPIRE,
171 96e03b0b Guido Trotter
                                          orig_request=request,
172 5f6f260a Guido Trotter
                                          extra_args=args,
173 5f6f260a Guido Trotter
                                          client=self,
174 5f6f260a Guido Trotter
                                          )
175 96e03b0b Guido Trotter
        self._callback(client_reply)
176 e4ccf6cd Guido Trotter
      else:
177 e4ccf6cd Guido Trotter
        break
178 e4ccf6cd Guido Trotter
179 d63997b3 Guido Trotter
  def SendRequest(self, request, args=None, coverage=None, async=True):
180 e4ccf6cd Guido Trotter
    """Send a confd request to some MCs
181 e4ccf6cd Guido Trotter

182 e4ccf6cd Guido Trotter
    @type request: L{objects.ConfdRequest}
183 e4ccf6cd Guido Trotter
    @param request: the request to send
184 e4ccf6cd Guido Trotter
    @type args: tuple
185 d63997b3 Guido Trotter
    @param args: additional callback arguments
186 e4ccf6cd Guido Trotter
    @type coverage: integer
187 d63997b3 Guido Trotter
    @param coverage: number of remote nodes to contact
188 8496d93c Guido Trotter
    @type async: boolean
189 8496d93c Guido Trotter
    @param async: handle the write asynchronously
190 e4ccf6cd Guido Trotter

191 e4ccf6cd Guido Trotter
    """
192 e4ccf6cd Guido Trotter
    if coverage is None:
193 e4ccf6cd Guido Trotter
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
194 e4ccf6cd Guido Trotter
195 e4ccf6cd Guido Trotter
    if coverage > len(self._peers):
196 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Not enough MCs known to provide the"
197 e4ccf6cd Guido Trotter
                                    " desired coverage")
198 e4ccf6cd Guido Trotter
199 e4ccf6cd Guido Trotter
    if not request.rsalt:
200 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Missing request rsalt")
201 e4ccf6cd Guido Trotter
202 96e03b0b Guido Trotter
    self.ExpireRequests()
203 96e03b0b Guido Trotter
    if request.rsalt in self._requests:
204 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Duplicate request rsalt")
205 e4ccf6cd Guido Trotter
206 e4ccf6cd Guido Trotter
    if request.type not in constants.CONFD_REQS:
207 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Invalid request type")
208 e4ccf6cd Guido Trotter
209 e4ccf6cd Guido Trotter
    random.shuffle(self._peers)
210 e4ccf6cd Guido Trotter
    targets = self._peers[:coverage]
211 e4ccf6cd Guido Trotter
212 e4ccf6cd Guido Trotter
    now = time.time()
213 e4ccf6cd Guido Trotter
    payload = self._PackRequest(request, now=now)
214 e4ccf6cd Guido Trotter
215 e4ccf6cd Guido Trotter
    for target in targets:
216 e4ccf6cd Guido Trotter
      try:
217 e4ccf6cd Guido Trotter
        self._socket.enqueue_send(target, self._confd_port, payload)
218 e4ccf6cd Guido Trotter
      except errors.UdpDataSizeError:
219 e4ccf6cd Guido Trotter
        raise errors.ConfdClientError("Request too big")
220 e4ccf6cd Guido Trotter
221 96e03b0b Guido Trotter
    self._requests[request.rsalt] = (request, args)
222 e4ccf6cd Guido Trotter
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
223 96e03b0b Guido Trotter
    self._expire_requests.append((expire_time, request.rsalt))
224 e4ccf6cd Guido Trotter
225 8496d93c Guido Trotter
    if not async:
226 8496d93c Guido Trotter
      self.FlushSendQueue()
227 8496d93c Guido Trotter
228 e4ccf6cd Guido Trotter
  def HandleResponse(self, payload, ip, port):
229 e4ccf6cd Guido Trotter
    """Asynchronous handler for a confd reply
230 e4ccf6cd Guido Trotter

231 e4ccf6cd Guido Trotter
    Call the relevant callback associated to the current request.
232 e4ccf6cd Guido Trotter

233 e4ccf6cd Guido Trotter
    """
234 e4ccf6cd Guido Trotter
    try:
235 e4ccf6cd Guido Trotter
      try:
236 e4ccf6cd Guido Trotter
        answer, salt = self._UnpackReply(payload)
237 a3db74e4 Guido Trotter
      except (errors.SignatureError, errors.ConfdMagicError), err:
238 a3db74e4 Guido Trotter
        if self._logger:
239 a3db74e4 Guido Trotter
          self._logger.debug("Discarding broken package: %s" % err)
240 e4ccf6cd Guido Trotter
        return
241 e4ccf6cd Guido Trotter
242 e4ccf6cd Guido Trotter
      try:
243 96e03b0b Guido Trotter
        (request, args) = self._requests[salt]
244 e4ccf6cd Guido Trotter
      except KeyError:
245 a3db74e4 Guido Trotter
        if self._logger:
246 a3db74e4 Guido Trotter
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
247 96e03b0b Guido Trotter
        return
248 96e03b0b Guido Trotter
249 96e03b0b Guido Trotter
      client_reply = ConfdUpcallPayload(salt=salt,
250 96e03b0b Guido Trotter
                                        type=UPCALL_REPLY,
251 96e03b0b Guido Trotter
                                        server_reply=answer,
252 96e03b0b Guido Trotter
                                        orig_request=request,
253 96e03b0b Guido Trotter
                                        server_ip=ip,
254 96e03b0b Guido Trotter
                                        server_port=port,
255 5f6f260a Guido Trotter
                                        extra_args=args,
256 5f6f260a Guido Trotter
                                        client=self,
257 5f6f260a Guido Trotter
                                       )
258 96e03b0b Guido Trotter
      self._callback(client_reply)
259 e4ccf6cd Guido Trotter
260 e4ccf6cd Guido Trotter
    finally:
261 96e03b0b Guido Trotter
      self.ExpireRequests()
262 96e03b0b Guido Trotter
263 8496d93c Guido Trotter
  def FlushSendQueue(self):
264 8496d93c Guido Trotter
    """Send out all pending requests.
265 8496d93c Guido Trotter

266 8496d93c Guido Trotter
    Can be used for synchronous client use.
267 8496d93c Guido Trotter

268 8496d93c Guido Trotter
    """
269 8496d93c Guido Trotter
    while self._socket.writable():
270 8496d93c Guido Trotter
      self._socket.handle_write()
271 8496d93c Guido Trotter
272 8496d93c Guido Trotter
  def ReceiveReply(self, timeout=1):
273 8496d93c Guido Trotter
    """Receive one reply.
274 8496d93c Guido Trotter

275 8496d93c Guido Trotter
    @type timeout: float
276 8496d93c Guido Trotter
    @param timeout: how long to wait for the reply
277 8496d93c Guido Trotter
    @rtype: boolean
278 8496d93c Guido Trotter
    @return: True if some data has been handled, False otherwise
279 8496d93c Guido Trotter

280 8496d93c Guido Trotter
    """
281 8496d93c Guido Trotter
    return self._socket.process_next_packet(timeout=timeout)
282 8496d93c Guido Trotter
283 96e03b0b Guido Trotter
284 96e03b0b Guido Trotter
# UPCALL_REPLY: server reply upcall
285 96e03b0b Guido Trotter
# has all ConfdUpcallPayload fields populated
286 96e03b0b Guido Trotter
UPCALL_REPLY = 1
287 96e03b0b Guido Trotter
# UPCALL_EXPIRE: internal library request expire
288 96e03b0b Guido Trotter
# has only salt, type, orig_request and extra_args
289 96e03b0b Guido Trotter
UPCALL_EXPIRE = 2
290 96e03b0b Guido Trotter
CONFD_UPCALL_TYPES = frozenset([
291 96e03b0b Guido Trotter
  UPCALL_REPLY,
292 96e03b0b Guido Trotter
  UPCALL_EXPIRE,
293 96e03b0b Guido Trotter
  ])
294 96e03b0b Guido Trotter
295 96e03b0b Guido Trotter
296 96e03b0b Guido Trotter
class ConfdUpcallPayload(objects.ConfigObject):
297 96e03b0b Guido Trotter
  """Callback argument for confd replies
298 96e03b0b Guido Trotter

299 96e03b0b Guido Trotter
  @type salt: string
300 96e03b0b Guido Trotter
  @ivar salt: salt associated with the query
301 96e03b0b Guido Trotter
  @type type: one of confd.client.CONFD_UPCALL_TYPES
302 96e03b0b Guido Trotter
  @ivar type: upcall type (server reply, expired request, ...)
303 96e03b0b Guido Trotter
  @type orig_request: L{objects.ConfdRequest}
304 96e03b0b Guido Trotter
  @ivar orig_request: original request
305 96e03b0b Guido Trotter
  @type server_reply: L{objects.ConfdReply}
306 96e03b0b Guido Trotter
  @ivar server_reply: server reply
307 96e03b0b Guido Trotter
  @type server_ip: string
308 96e03b0b Guido Trotter
  @ivar server_ip: answering server ip address
309 96e03b0b Guido Trotter
  @type server_port: int
310 96e03b0b Guido Trotter
  @ivar server_port: answering server port
311 96e03b0b Guido Trotter
  @type extra_args: any
312 96e03b0b Guido Trotter
  @ivar extra_args: 'args' argument of the SendRequest function
313 5f6f260a Guido Trotter
  @type client: L{ConfdClient}
314 5f6f260a Guido Trotter
  @ivar client: current confd client instance
315 96e03b0b Guido Trotter

316 96e03b0b Guido Trotter
  """
317 96e03b0b Guido Trotter
  __slots__ = [
318 96e03b0b Guido Trotter
    "salt",
319 96e03b0b Guido Trotter
    "type",
320 96e03b0b Guido Trotter
    "orig_request",
321 96e03b0b Guido Trotter
    "server_reply",
322 96e03b0b Guido Trotter
    "server_ip",
323 96e03b0b Guido Trotter
    "server_port",
324 96e03b0b Guido Trotter
    "extra_args",
325 5f6f260a Guido Trotter
    "client",
326 96e03b0b Guido Trotter
    ]
327 e4ccf6cd Guido Trotter
328 e4ccf6cd Guido Trotter
329 e4ccf6cd Guido Trotter
class ConfdClientRequest(objects.ConfdRequest):
330 e4ccf6cd Guido Trotter
  """This is the client-side version of ConfdRequest.
331 e4ccf6cd Guido Trotter

332 e4ccf6cd Guido Trotter
  This version of the class helps creating requests, on the client side, by
333 e4ccf6cd Guido Trotter
  filling in some default values.
334 e4ccf6cd Guido Trotter

335 e4ccf6cd Guido Trotter
  """
336 e4ccf6cd Guido Trotter
  def __init__(self, **kwargs):
337 e4ccf6cd Guido Trotter
    objects.ConfdRequest.__init__(self, **kwargs)
338 e4ccf6cd Guido Trotter
    if not self.rsalt:
339 e4ccf6cd Guido Trotter
      self.rsalt = utils.NewUUID()
340 e4ccf6cd Guido Trotter
    if not self.protocol:
341 e4ccf6cd Guido Trotter
      self.protocol = constants.CONFD_PROTOCOL_VERSION
342 e4ccf6cd Guido Trotter
    if self.type not in constants.CONFD_REQS:
343 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Invalid request type")
344 e4ccf6cd Guido Trotter
345 392ca296 Guido Trotter
346 392ca296 Guido Trotter
class ConfdFilterCallback:
347 392ca296 Guido Trotter
  """Callback that calls another callback, but filters duplicate results.
348 392ca296 Guido Trotter

349 392ca296 Guido Trotter
  """
350 392ca296 Guido Trotter
  def __init__(self, callback, logger=None):
351 392ca296 Guido Trotter
    """Constructor for ConfdFilterCallback
352 392ca296 Guido Trotter

353 392ca296 Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
354 392ca296 Guido Trotter
    @param callback: function to call when getting answers
355 69b99987 Michael Hanselmann
    @type logger: logging.Logger
356 d63997b3 Guido Trotter
    @param logger: optional logger for internal conditions
357 392ca296 Guido Trotter

358 392ca296 Guido Trotter
    """
359 392ca296 Guido Trotter
    if not callable(callback):
360 392ca296 Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
361 392ca296 Guido Trotter
362 392ca296 Guido Trotter
    self._callback = callback
363 392ca296 Guido Trotter
    self._logger = logger
364 392ca296 Guido Trotter
    # answers contains a dict of salt -> answer
365 392ca296 Guido Trotter
    self._answers = {}
366 392ca296 Guido Trotter
367 392ca296 Guido Trotter
  def _LogFilter(self, salt, new_reply, old_reply):
368 392ca296 Guido Trotter
    if not self._logger:
369 392ca296 Guido Trotter
      return
370 392ca296 Guido Trotter
371 392ca296 Guido Trotter
    if new_reply.serial > old_reply.serial:
372 392ca296 Guido Trotter
      self._logger.debug("Filtering confirming answer, with newer"
373 392ca296 Guido Trotter
                         " serial for query %s" % salt)
374 392ca296 Guido Trotter
    elif new_reply.serial == old_reply.serial:
375 392ca296 Guido Trotter
      if new_reply.answer != old_reply.answer:
376 392ca296 Guido Trotter
        self._logger.warning("Got incoherent answers for query %s"
377 392ca296 Guido Trotter
                             " (serial: %s)" % (salt, new_reply.serial))
378 392ca296 Guido Trotter
      else:
379 392ca296 Guido Trotter
        self._logger.debug("Filtering confirming answer, with same"
380 392ca296 Guido Trotter
                           " serial for query %s" % salt)
381 392ca296 Guido Trotter
    else:
382 392ca296 Guido Trotter
      self._logger.debug("Filtering outdated answer for query %s"
383 392ca296 Guido Trotter
                         " serial: (%d < %d)" % (salt, old_reply.serial,
384 392ca296 Guido Trotter
                                                 new_reply.serial))
385 392ca296 Guido Trotter
386 392ca296 Guido Trotter
  def _HandleExpire(self, up):
387 392ca296 Guido Trotter
    # if we have no answer we have received none, before the expiration.
388 a9613def Guido Trotter
    if up.salt in self._answers:
389 a9613def Guido Trotter
      del self._answers[up.salt]
390 392ca296 Guido Trotter
391 392ca296 Guido Trotter
  def _HandleReply(self, up):
392 392ca296 Guido Trotter
    """Handle a single confd reply, and decide whether to filter it.
393 392ca296 Guido Trotter

394 392ca296 Guido Trotter
    @rtype: boolean
395 392ca296 Guido Trotter
    @return: True if the reply should be filtered, False if it should be passed
396 392ca296 Guido Trotter
             on to the up-callback
397 392ca296 Guido Trotter

398 392ca296 Guido Trotter
    """
399 392ca296 Guido Trotter
    filter_upcall = False
400 392ca296 Guido Trotter
    salt = up.salt
401 392ca296 Guido Trotter
    if salt not in self._answers:
402 392ca296 Guido Trotter
      # first answer for a query (don't filter, and record)
403 392ca296 Guido Trotter
      self._answers[salt] = up.server_reply
404 392ca296 Guido Trotter
    elif up.server_reply.serial > self._answers[salt].serial:
405 392ca296 Guido Trotter
      # newer answer (record, and compare contents)
406 392ca296 Guido Trotter
      old_answer = self._answers[salt]
407 392ca296 Guido Trotter
      self._answers[salt] = up.server_reply
408 392ca296 Guido Trotter
      if up.server_reply.answer == old_answer.answer:
409 392ca296 Guido Trotter
        # same content (filter) (version upgrade was unrelated)
410 392ca296 Guido Trotter
        filter_upcall = True
411 392ca296 Guido Trotter
        self._LogFilter(salt, up.server_reply, old_answer)
412 392ca296 Guido Trotter
      # else: different content, pass up a second answer
413 392ca296 Guido Trotter
    else:
414 392ca296 Guido Trotter
      # older or same-version answer (duplicate or outdated, filter)
415 392ca296 Guido Trotter
      filter_upcall = True
416 392ca296 Guido Trotter
      self._LogFilter(salt, up.server_reply, self._answers[salt])
417 392ca296 Guido Trotter
418 392ca296 Guido Trotter
    return filter_upcall
419 392ca296 Guido Trotter
420 392ca296 Guido Trotter
  def __call__(self, up):
421 392ca296 Guido Trotter
    """Filtering callback
422 392ca296 Guido Trotter

423 392ca296 Guido Trotter
    @type up: L{ConfdUpcallPayload}
424 392ca296 Guido Trotter
    @param up: upper callback
425 392ca296 Guido Trotter

426 392ca296 Guido Trotter
    """
427 392ca296 Guido Trotter
    filter_upcall = False
428 392ca296 Guido Trotter
    if up.type == UPCALL_REPLY:
429 392ca296 Guido Trotter
      filter_upcall = self._HandleReply(up)
430 392ca296 Guido Trotter
    elif up.type == UPCALL_EXPIRE:
431 392ca296 Guido Trotter
      self._HandleExpire(up)
432 392ca296 Guido Trotter
433 392ca296 Guido Trotter
    if not filter_upcall:
434 392ca296 Guido Trotter
      self._callback(up)
435 04cdf663 Guido Trotter
436 04cdf663 Guido Trotter
437 04cdf663 Guido Trotter
class ConfdCountingCallback:
438 04cdf663 Guido Trotter
  """Callback that calls another callback, and counts the answers
439 04cdf663 Guido Trotter

440 04cdf663 Guido Trotter
  """
441 04cdf663 Guido Trotter
  def __init__(self, callback, logger=None):
442 04cdf663 Guido Trotter
    """Constructor for ConfdCountingCallback
443 04cdf663 Guido Trotter

444 04cdf663 Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
445 04cdf663 Guido Trotter
    @param callback: function to call when getting answers
446 04cdf663 Guido Trotter
    @type logger: logging.Logger
447 04cdf663 Guido Trotter
    @param logger: optional logger for internal conditions
448 04cdf663 Guido Trotter

449 04cdf663 Guido Trotter
    """
450 04cdf663 Guido Trotter
    if not callable(callback):
451 04cdf663 Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
452 04cdf663 Guido Trotter
453 04cdf663 Guido Trotter
    self._callback = callback
454 04cdf663 Guido Trotter
    self._logger = logger
455 04cdf663 Guido Trotter
    # answers contains a dict of salt -> count
456 04cdf663 Guido Trotter
    self._answers = {}
457 04cdf663 Guido Trotter
458 04cdf663 Guido Trotter
  def RegisterQuery(self, salt):
459 04cdf663 Guido Trotter
    if salt in self._answers:
460 04cdf663 Guido Trotter
      raise errors.ProgrammerError("query already registered")
461 04cdf663 Guido Trotter
    self._answers[salt] = 0
462 04cdf663 Guido Trotter
463 04cdf663 Guido Trotter
  def AllAnswered(self):
464 04cdf663 Guido Trotter
    """Have all the registered queries received at least an answer?
465 04cdf663 Guido Trotter

466 04cdf663 Guido Trotter
    """
467 04cdf663 Guido Trotter
    return utils.all(self._answers.values())
468 04cdf663 Guido Trotter
469 04cdf663 Guido Trotter
  def _HandleExpire(self, up):
470 04cdf663 Guido Trotter
    # if we have no answer we have received none, before the expiration.
471 04cdf663 Guido Trotter
    if up.salt in self._answers:
472 04cdf663 Guido Trotter
      del self._answers[up.salt]
473 04cdf663 Guido Trotter
474 04cdf663 Guido Trotter
  def _HandleReply(self, up):
475 04cdf663 Guido Trotter
    """Handle a single confd reply, and decide whether to filter it.
476 04cdf663 Guido Trotter

477 04cdf663 Guido Trotter
    @rtype: boolean
478 04cdf663 Guido Trotter
    @return: True if the reply should be filtered, False if it should be passed
479 04cdf663 Guido Trotter
             on to the up-callback
480 04cdf663 Guido Trotter

481 04cdf663 Guido Trotter
    """
482 04cdf663 Guido Trotter
    if up.salt in self._answers:
483 04cdf663 Guido Trotter
      self._answers[up.salt] += 1
484 04cdf663 Guido Trotter
485 04cdf663 Guido Trotter
  def __call__(self, up):
486 04cdf663 Guido Trotter
    """Filtering callback
487 04cdf663 Guido Trotter

488 04cdf663 Guido Trotter
    @type up: L{ConfdUpcallPayload}
489 04cdf663 Guido Trotter
    @param up: upper callback
490 04cdf663 Guido Trotter

491 04cdf663 Guido Trotter
    """
492 04cdf663 Guido Trotter
    if up.type == UPCALL_REPLY:
493 04cdf663 Guido Trotter
      self._HandleReply(up)
494 04cdf663 Guido Trotter
    elif up.type == UPCALL_EXPIRE:
495 04cdf663 Guido Trotter
      self._HandleExpire(up)
496 04cdf663 Guido Trotter
    self._callback(up)