Revision 392ca296 lib/confd/client.py
b/lib/confd/client.py | ||
---|---|---|
38 | 38 |
# And your callback will be called by asyncore, when your query gets a |
39 | 39 |
# response, or when it expires. |
40 | 40 |
|
41 |
You can use the provided ConfdFilterCallback to act as a filter, only passing |
|
42 |
"newer" answer to your callback, and filtering out outdated ones, or ones |
|
43 |
confirming what you already got. |
|
44 |
|
|
41 | 45 |
""" |
42 | 46 |
import socket |
43 | 47 |
import time |
... | ... | |
288 | 292 |
if self.type not in constants.CONFD_REQS: |
289 | 293 |
raise errors.ConfdClientError("Invalid request type") |
290 | 294 |
|
295 |
|
|
296 |
class ConfdFilterCallback: |
|
297 |
"""Callback that calls another callback, but filters duplicate results. |
|
298 |
|
|
299 |
""" |
|
300 |
def __init__(self, callback, logger=None): |
|
301 |
"""Constructor for ConfdFilterCallback |
|
302 |
|
|
303 |
@type callback: f(L{ConfdUpcallPayload}) |
|
304 |
@param callback: function to call when getting answers |
|
305 |
@type logger: L{logging.Logger} |
|
306 |
@keyword logger: optional logger for internal conditions |
|
307 |
|
|
308 |
""" |
|
309 |
if not callable(callback): |
|
310 |
raise errors.ProgrammerError("callback must be callable") |
|
311 |
|
|
312 |
self._callback = callback |
|
313 |
self._logger = logger |
|
314 |
# answers contains a dict of salt -> answer |
|
315 |
self._answers = {} |
|
316 |
|
|
317 |
def _LogFilter(self, salt, new_reply, old_reply): |
|
318 |
if not self._logger: |
|
319 |
return |
|
320 |
|
|
321 |
if new_reply.serial > old_reply.serial: |
|
322 |
self._logger.debug("Filtering confirming answer, with newer" |
|
323 |
" serial for query %s" % salt) |
|
324 |
elif new_reply.serial == old_reply.serial: |
|
325 |
if new_reply.answer != old_reply.answer: |
|
326 |
self._logger.warning("Got incoherent answers for query %s" |
|
327 |
" (serial: %s)" % (salt, new_reply.serial)) |
|
328 |
else: |
|
329 |
self._logger.debug("Filtering confirming answer, with same" |
|
330 |
" serial for query %s" % salt) |
|
331 |
else: |
|
332 |
self._logger.debug("Filtering outdated answer for query %s" |
|
333 |
" serial: (%d < %d)" % (salt, old_reply.serial, |
|
334 |
new_reply.serial)) |
|
335 |
|
|
336 |
def _HandleExpire(self, up): |
|
337 |
# if we have no answer we have received none, before the expiration. |
|
338 |
if salt in self._answers: |
|
339 |
del self._answers[salt] |
|
340 |
|
|
341 |
def _HandleReply(self, up): |
|
342 |
"""Handle a single confd reply, and decide whether to filter it. |
|
343 |
|
|
344 |
@rtype: boolean |
|
345 |
@return: True if the reply should be filtered, False if it should be passed |
|
346 |
on to the up-callback |
|
347 |
|
|
348 |
""" |
|
349 |
filter_upcall = False |
|
350 |
salt = up.salt |
|
351 |
if salt not in self._answers: |
|
352 |
# first answer for a query (don't filter, and record) |
|
353 |
self._answers[salt] = up.server_reply |
|
354 |
elif up.server_reply.serial > self._answers[salt].serial: |
|
355 |
# newer answer (record, and compare contents) |
|
356 |
old_answer = self._answers[salt] |
|
357 |
self._answers[salt] = up.server_reply |
|
358 |
if up.server_reply.answer == old_answer.answer: |
|
359 |
# same content (filter) (version upgrade was unrelated) |
|
360 |
filter_upcall = True |
|
361 |
self._LogFilter(salt, up.server_reply, old_answer) |
|
362 |
# else: different content, pass up a second answer |
|
363 |
else: |
|
364 |
# older or same-version answer (duplicate or outdated, filter) |
|
365 |
filter_upcall = True |
|
366 |
self._LogFilter(salt, up.server_reply, self._answers[salt]) |
|
367 |
|
|
368 |
return filter_upcall |
|
369 |
|
|
370 |
def __call__(self, up): |
|
371 |
"""Filtering callback |
|
372 |
|
|
373 |
@type up: L{ConfdUpcallPayload} |
|
374 |
@param up: upper callback |
|
375 |
|
|
376 |
""" |
|
377 |
filter_upcall = False |
|
378 |
if up.type == UPCALL_REPLY: |
|
379 |
filter_upcall = self._HandleReply(up) |
|
380 |
elif up.type == UPCALL_EXPIRE: |
|
381 |
self._HandleExpire(up) |
|
382 |
|
|
383 |
if not filter_upcall: |
|
384 |
self._callback(up) |
|
385 |
|
Also available in: Unified diff