Revision bfbbc223 lib/confd/client.py
b/lib/confd/client.py | ||
---|---|---|
91 | 91 |
@ivar request: the request data |
92 | 92 |
@ivar args: any extra arguments for the callback |
93 | 93 |
@ivar expiry: the expiry timestamp of the request |
94 |
@ivar sent: the set of contacted peers |
|
95 |
@ivar rcvd: the set of peers who replied |
|
94 | 96 |
|
95 | 97 |
""" |
96 |
def __init__(self, request, args, expiry): |
|
98 |
def __init__(self, request, args, expiry, sent):
|
|
97 | 99 |
self.request = request |
98 | 100 |
self.args = args |
99 | 101 |
self.expiry = expiry |
102 |
self.sent = frozenset(sent) |
|
103 |
self.rcvd = set() |
|
100 | 104 |
|
101 | 105 |
|
102 | 106 |
class ConfdClient: |
... | ... | |
233 | 237 |
raise errors.ConfdClientError("Request too big") |
234 | 238 |
|
235 | 239 |
expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT |
236 |
self._requests[request.rsalt] = _Request(request, args, expire_time) |
|
240 |
self._requests[request.rsalt] = _Request(request, args, expire_time, |
|
241 |
targets) |
|
237 | 242 |
|
238 | 243 |
if not async: |
239 | 244 |
self.FlushSendQueue() |
... | ... | |
259 | 264 |
self._logger.debug("Discarding unknown (expired?) reply: %s" % err) |
260 | 265 |
return |
261 | 266 |
|
267 |
rq.rcvd.add(ip) |
|
268 |
|
|
262 | 269 |
client_reply = ConfdUpcallPayload(salt=salt, |
263 | 270 |
type=UPCALL_REPLY, |
264 | 271 |
server_reply=answer, |
... | ... | |
293 | 300 |
""" |
294 | 301 |
return self._socket.process_next_packet(timeout=timeout) |
295 | 302 |
|
303 |
@staticmethod |
|
304 |
def _NeededReplies(peer_cnt): |
|
305 |
"""Compute the minimum safe number of replies for a query. |
|
306 |
|
|
307 |
The algorithm is designed to work well for both small and big |
|
308 |
number of peers: |
|
309 |
- for less than three, we require all responses |
|
310 |
- for less than five, we allow one miss |
|
311 |
- otherwise, half the number plus one |
|
312 |
|
|
313 |
This guarantees that we progress monotonically: 1->1, 2->2, 3->2, |
|
314 |
4->2, 5->3, 6->3, 7->4, etc. |
|
315 |
|
|
316 |
@type peer_cnt: int |
|
317 |
@param peer_cnt: the number of peers contacted |
|
318 |
@rtype: int |
|
319 |
@return: the number of replies which should give a safe coverage |
|
320 |
|
|
321 |
""" |
|
322 |
if peer_cnt < 3: |
|
323 |
return peer_cnt |
|
324 |
elif peer_cnt < 5: |
|
325 |
return peer_cnt - 1 |
|
326 |
else: |
|
327 |
return int(peer_cnt/2) + 1 |
|
328 |
|
|
329 |
def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT): |
|
330 |
"""Wait for replies to a given request. |
|
331 |
|
|
332 |
This method will wait until either the timeout expires or a |
|
333 |
minimum number (computed using L{_NeededReplies}) of replies are |
|
334 |
received for the given salt. It is useful when doing synchronous |
|
335 |
calls to this library. |
|
336 |
|
|
337 |
@param salt: the salt of the request we want responses for |
|
338 |
@param timeout: the maximum timeout (should be less or equal to |
|
339 |
L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT} |
|
340 |
@rtype: tuple |
|
341 |
@return: a tuple of (timed_out, sent_cnt, recv_cnt); if the |
|
342 |
request is unknown, timed_out will be true and the counters |
|
343 |
will be zero |
|
344 |
|
|
345 |
""" |
|
346 |
def _CheckResponse(): |
|
347 |
if salt not in self._requests: |
|
348 |
# expired? |
|
349 |
if self._logger: |
|
350 |
self._logger.debug("Discarding unknown/expired request: %s" % salt) |
|
351 |
return MISSING |
|
352 |
rq = self._requests[salt] |
|
353 |
if len(rq.rcvd) >= expected: |
|
354 |
# already got all replies |
|
355 |
return (False, len(rq.sent), len(rq.rcvd)) |
|
356 |
# else wait, using default timeout |
|
357 |
self.ReceiveReply() |
|
358 |
raise utils.RetryAgain() |
|
359 |
|
|
360 |
MISSING = (True, 0, 0) |
|
361 |
|
|
362 |
if salt not in self._requests: |
|
363 |
return MISSING |
|
364 |
# extend the expire time with the current timeout, so that we |
|
365 |
# don't get the request expired from under us |
|
366 |
rq = self._requests[salt] |
|
367 |
rq.expiry += timeout |
|
368 |
sent = len(rq.sent) |
|
369 |
expected = self._NeededReplies(sent) |
|
370 |
|
|
371 |
try: |
|
372 |
return utils.Retry(_CheckResponse, 0, timeout) |
|
373 |
except utils.RetryTimeout: |
|
374 |
if salt in self._requests: |
|
375 |
rq = self._requests[salt] |
|
376 |
return (True, len(rq.sent), len(rq.rcvd)) |
|
377 |
else: |
|
378 |
return MISSING |
|
379 |
|
|
296 | 380 |
|
297 | 381 |
# UPCALL_REPLY: server reply upcall |
298 | 382 |
# has all ConfdUpcallPayload fields populated |
Also available in: Unified diff