+ @staticmethod
+ def _NeededReplies(peer_cnt):
+ """Compute the minimum safe number of replies for a query.
+
+ The algorithm is designed to work well for both small and big
+ number of peers:
+ - for less than three, we require all responses
+ - for less than five, we allow one miss
+ - otherwise, half the number plus one
+
+ This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
+ 4->2, 5->3, 6->3, 7->4, etc.
+
+ @type peer_cnt: int
+ @param peer_cnt: the number of peers contacted
+ @rtype: int
+ @return: the number of replies which should give a safe coverage
+
+ """
+ if peer_cnt < 3:
+ return peer_cnt
+ elif peer_cnt < 5:
+ return peer_cnt - 1
+ else:
+ return int(peer_cnt/2) + 1
+
+ def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
+ """Wait for replies to a given request.
+
+ This method will wait until either the timeout expires or a
+ minimum number (computed using L{_NeededReplies}) of replies are
+ received for the given salt. It is useful when doing synchronous
+ calls to this library.
+
+ @param salt: the salt of the request we want responses for
+ @param timeout: the maximum timeout (should be less or equal to
+ L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
+ @rtype: tuple
+ @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
+ request is unknown, timed_out will be true and the counters
+ will be zero
+
+ """
+ def _CheckResponse():
+ if salt not in self._requests:
+ # expired?
+ if self._logger:
+ self._logger.debug("Discarding unknown/expired request: %s" % salt)
+ return MISSING
+ rq = self._requests[salt]
+ if len(rq.rcvd) >= expected:
+ # already got all replies
+ return (False, len(rq.sent), len(rq.rcvd))
+ # else wait, using default timeout
+ self.ReceiveReply()
+ raise utils.RetryAgain()
+
+ MISSING = (True, 0, 0)
+
+ if salt not in self._requests:
+ return MISSING
+ # extend the expire time with the current timeout, so that we
+ # don't get the request expired from under us
+ rq = self._requests[salt]
+ rq.expiry += timeout
+ sent = len(rq.sent)
+ expected = self._NeededReplies(sent)
+
+ try:
+ return utils.Retry(_CheckResponse, 0, timeout)
+ except utils.RetryTimeout:
+ if salt in self._requests:
+ rq = self._requests[salt]
+ return (True, len(rq.sent), len(rq.rcvd))
+ else:
+ return MISSING
+
+ def _SetPeersAddressFamily(self):
+ if not self._peers:
+ raise errors.ConfdClientError("Peer list empty")
+ try:
+ peer = self._peers[0]
+ self._family = netutils.IPAddress.GetAddressFamily(peer)
+ for peer in self._peers[1:]:
+ if netutils.IPAddress.GetAddressFamily(peer) != self._family:
+ raise errors.ConfdClientError("Peers must be of same address family")
+ except errors.IPAddressError:
+ raise errors.ConfdClientError("Peer address %s invalid" % peer)
+