+
+class ConfdFilterCallback:
+ """Callback that calls another callback, but filters duplicate results.
+
+ @ivar consistent: a dictionary indexed by salt; for each salt, if
+ all responses ware identical, this will be True; this is the
+ expected state on a healthy cluster; on inconsistent or
+ partitioned clusters, this might be False, if we see answers
+ with the same serial but different contents
+
+ """
+ def __init__(self, callback, logger=None):
+ """Constructor for ConfdFilterCallback
+
+ @type callback: f(L{ConfdUpcallPayload})
+ @param callback: function to call when getting answers
+ @type logger: logging.Logger
+ @param logger: optional logger for internal conditions
+
+ """
+ if not callable(callback):
+ raise errors.ProgrammerError("callback must be callable")
+
+ self._callback = callback
+ self._logger = logger
+ # answers contains a dict of salt -> answer
+ self._answers = {}
+ self.consistent = {}
+
+ def _LogFilter(self, salt, new_reply, old_reply):
+ if not self._logger:
+ return
+
+ if new_reply.serial > old_reply.serial:
+ self._logger.debug("Filtering confirming answer, with newer"
+ " serial for query %s" % salt)
+ elif new_reply.serial == old_reply.serial:
+ if new_reply.answer != old_reply.answer:
+ self._logger.warning("Got incoherent answers for query %s"
+ " (serial: %s)" % (salt, new_reply.serial))
+ else:
+ self._logger.debug("Filtering confirming answer, with same"
+ " serial for query %s" % salt)
+ else:
+ self._logger.debug("Filtering outdated answer for query %s"
+ " serial: (%d < %d)" % (salt, old_reply.serial,
+ new_reply.serial))
+
+ def _HandleExpire(self, up):
+ # if we have no answer we have received none, before the expiration.
+ if up.salt in self._answers:
+ del self._answers[up.salt]
+ if up.salt in self.consistent:
+ del self.consistent[up.salt]
+
+ def _HandleReply(self, up):
+ """Handle a single confd reply, and decide whether to filter it.
+
+ @rtype: boolean
+ @return: True if the reply should be filtered, False if it should be passed
+ on to the up-callback
+
+ """
+ filter_upcall = False
+ salt = up.salt
+ if salt not in self.consistent:
+ self.consistent[salt] = True
+ if salt not in self._answers:
+ # first answer for a query (don't filter, and record)
+ self._answers[salt] = up.server_reply
+ elif up.server_reply.serial > self._answers[salt].serial:
+ # newer answer (record, and compare contents)
+ old_answer = self._answers[salt]
+ self._answers[salt] = up.server_reply
+ if up.server_reply.answer == old_answer.answer:
+ # same content (filter) (version upgrade was unrelated)
+ filter_upcall = True
+ self._LogFilter(salt, up.server_reply, old_answer)
+ # else: different content, pass up a second answer
+ else:
+ # older or same-version answer (duplicate or outdated, filter)
+ if (up.server_reply.serial == self._answers[salt].serial and
+ up.server_reply.answer != self._answers[salt].answer):
+ self.consistent[salt] = False
+ filter_upcall = True
+ self._LogFilter(salt, up.server_reply, self._answers[salt])
+
+ return filter_upcall
+
+ def __call__(self, up):
+ """Filtering callback
+
+ @type up: L{ConfdUpcallPayload}
+ @param up: upper callback
+
+ """
+ filter_upcall = False
+ if up.type == UPCALL_REPLY:
+ filter_upcall = self._HandleReply(up)
+ elif up.type == UPCALL_EXPIRE:
+ self._HandleExpire(up)
+
+ if not filter_upcall:
+ self._callback(up)
+
+
+class ConfdCountingCallback:
+ """Callback that calls another callback, and counts the answers
+
+ """
+ def __init__(self, callback, logger=None):
+ """Constructor for ConfdCountingCallback
+
+ @type callback: f(L{ConfdUpcallPayload})
+ @param callback: function to call when getting answers
+ @type logger: logging.Logger
+ @param logger: optional logger for internal conditions
+
+ """
+ if not callable(callback):
+ raise errors.ProgrammerError("callback must be callable")
+
+ self._callback = callback
+ self._logger = logger
+ # answers contains a dict of salt -> count
+ self._answers = {}
+
+ def RegisterQuery(self, salt):
+ if salt in self._answers:
+ raise errors.ProgrammerError("query already registered")
+ self._answers[salt] = 0
+
+ def AllAnswered(self):
+ """Have all the registered queries received at least an answer?
+
+ """
+ return compat.all(self._answers.values())
+
+ def _HandleExpire(self, up):
+ # if we have no answer we have received none, before the expiration.
+ if up.salt in self._answers:
+ del self._answers[up.salt]
+
+ def _HandleReply(self, up):
+ """Handle a single confd reply, and decide whether to filter it.
+
+ @rtype: boolean
+ @return: True if the reply should be filtered, False if it should be passed
+ on to the up-callback
+
+ """
+ if up.salt in self._answers:
+ self._answers[up.salt] += 1
+
+ def __call__(self, up):
+ """Filtering callback
+
+ @type up: L{ConfdUpcallPayload}
+ @param up: upper callback
+
+ """
+ if up.type == UPCALL_REPLY:
+ self._HandleReply(up)
+ elif up.type == UPCALL_EXPIRE:
+ self._HandleExpire(up)
+ self._callback(up)
+
+
+class StoreResultCallback:
+ """Callback that simply stores the most recent answer.
+
+ @ivar _answers: dict of salt to (have_answer, reply)
+
+ """
+ _NO_KEY = (False, None)
+
+ def __init__(self):
+ """Constructor for StoreResultCallback
+
+ """
+ # answers contains a dict of salt -> best result
+ self._answers = {}
+
+ def GetResponse(self, salt):
+ """Return the best match for a salt
+
+ """
+ return self._answers.get(salt, self._NO_KEY)
+
+ def _HandleExpire(self, up):
+ """Expiration handler.
+
+ """
+ if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
+ del self._answers[up.salt]
+
+ def _HandleReply(self, up):
+ """Handle a single confd reply, and decide whether to filter it.
+
+ """
+ self._answers[up.salt] = (True, up)
+
+ def __call__(self, up):
+ """Filtering callback
+
+ @type up: L{ConfdUpcallPayload}
+ @param up: upper callback
+
+ """
+ if up.type == UPCALL_REPLY:
+ self._HandleReply(up)
+ elif up.type == UPCALL_EXPIRE:
+ self._HandleExpire(up)
+
+
+def GetConfdClient(callback):
+ """Return a client configured using the given callback.
+
+ This is handy to abstract the MC list and HMAC key reading.
+
+ @attention: This should only be called on nodes which are part of a
+ cluster, since it depends on a valid (ganeti) data directory;
+ for code running outside of a cluster, you need to create the
+ client manually
+
+ """
+ ss = ssconf.SimpleStore()
+ mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
+ mc_list = utils.ReadFile(mc_file).splitlines()
+ hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
+ return ConfdClient(hmac_key, mc_list, callback)