+
+class ConfdCountingCallback:
+ """Callback that calls another callback, and counts the answers
+
+ """
+ def __init__(self, callback, logger=None):
+ """Constructor for ConfdCountingCallback
+
+ @type callback: f(L{ConfdUpcallPayload})
+ @param callback: function to call when getting answers
+ @type logger: logging.Logger
+ @param logger: optional logger for internal conditions
+
+ """
+ if not callable(callback):
+ raise errors.ProgrammerError("callback must be callable")
+
+ self._callback = callback
+ self._logger = logger
+ # answers contains a dict of salt -> count
+ self._answers = {}
+
+ def RegisterQuery(self, salt):
+ if salt in self._answers:
+ raise errors.ProgrammerError("query already registered")
+ self._answers[salt] = 0
+
+ def AllAnswered(self):
+ """Have all the registered queries received at least an answer?
+
+ """
+ return compat.all(self._answers.values())
+
+ def _HandleExpire(self, up):
+ # if we have no answer we have received none, before the expiration.
+ if up.salt in self._answers:
+ del self._answers[up.salt]
+
+ def _HandleReply(self, up):
+ """Handle a single confd reply, and decide whether to filter it.
+
+ @rtype: boolean
+ @return: True if the reply should be filtered, False if it should be passed
+ on to the up-callback
+
+ """
+ if up.salt in self._answers:
+ self._answers[up.salt] += 1
+
+ def __call__(self, up):
+ """Filtering callback
+
+ @type up: L{ConfdUpcallPayload}
+ @param up: upper callback
+
+ """
+ if up.type == UPCALL_REPLY:
+ self._HandleReply(up)
+ elif up.type == UPCALL_EXPIRE:
+ self._HandleExpire(up)
+ self._callback(up)
+
+
+class StoreResultCallback:
+ """Callback that simply stores the most recent answer.
+
+ @ivar _answers: dict of salt to (have_answer, reply)
+
+ """
+ _NO_KEY = (False, None)
+
+ def __init__(self):
+ """Constructor for StoreResultCallback
+
+ """
+ # answers contains a dict of salt -> best result
+ self._answers = {}
+
+ def GetResponse(self, salt):
+ """Return the best match for a salt
+
+ """
+ return self._answers.get(salt, self._NO_KEY)
+
+ def _HandleExpire(self, up):
+ """Expiration handler.
+
+ """
+ if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
+ del self._answers[up.salt]
+
+ def _HandleReply(self, up):
+ """Handle a single confd reply, and decide whether to filter it.
+
+ """
+ self._answers[up.salt] = (True, up)
+
+ def __call__(self, up):
+ """Filtering callback
+
+ @type up: L{ConfdUpcallPayload}
+ @param up: upper callback
+
+ """
+ if up.type == UPCALL_REPLY:
+ self._HandleReply(up)
+ elif up.type == UPCALL_EXPIRE:
+ self._HandleExpire(up)
+
+
+def GetConfdClient(callback):
+ """Return a client configured using the given callback.
+
+ This is handy to abstract the MC list and HMAC key reading.
+
+ @attention: This should only be called on nodes which are part of a
+ cluster, since it depends on a valid (ganeti) data directory;
+ for code running outside of a cluster, you need to create the
+ client manually
+
+ """
+ ss = ssconf.SimpleStore()
+ mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
+ mc_list = utils.ReadFile(mc_file).splitlines()
+ hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
+ return ConfdClient(hmac_key, mc_list, callback)