Revision bfbbc223

b/lib/confd/client.py
91 91
  @ivar request: the request data
92 92
  @ivar args: any extra arguments for the callback
93 93
  @ivar expiry: the expiry timestamp of the request
94
  @ivar sent: the set of contacted peers
95
  @ivar rcvd: the set of peers who replied
94 96

  
95 97
  """
96
  def __init__(self, request, args, expiry):
98
  def __init__(self, request, args, expiry, sent):
97 99
    self.request = request
98 100
    self.args = args
99 101
    self.expiry = expiry
102
    self.sent = frozenset(sent)
103
    self.rcvd = set()
100 104

  
101 105

  
102 106
class ConfdClient:
......
233 237
        raise errors.ConfdClientError("Request too big")
234 238

  
235 239
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
236
    self._requests[request.rsalt] = _Request(request, args, expire_time)
240
    self._requests[request.rsalt] = _Request(request, args, expire_time,
241
                                             targets)
237 242

  
238 243
    if not async:
239 244
      self.FlushSendQueue()
......
259 264
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
260 265
        return
261 266

  
267
      rq.rcvd.add(ip)
268

  
262 269
      client_reply = ConfdUpcallPayload(salt=salt,
263 270
                                        type=UPCALL_REPLY,
264 271
                                        server_reply=answer,
......
293 300
    """
294 301
    return self._socket.process_next_packet(timeout=timeout)
295 302

  
303
  @staticmethod
304
  def _NeededReplies(peer_cnt):
305
    """Compute the minimum safe number of replies for a query.
306

  
307
    The algorithm is designed to work well for both small and big
308
    number of peers:
309
        - for less than three, we require all responses
310
        - for less than five, we allow one miss
311
        - otherwise, half the number plus one
312

  
313
    This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
314
    4->2, 5->3, 6->3, 7->4, etc.
315

  
316
    @type peer_cnt: int
317
    @param peer_cnt: the number of peers contacted
318
    @rtype: int
319
    @return: the number of replies which should give a safe coverage
320

  
321
    """
322
    if peer_cnt < 3:
323
      return peer_cnt
324
    elif peer_cnt < 5:
325
      return peer_cnt - 1
326
    else:
327
      return int(peer_cnt/2) + 1
328

  
329
  def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
330
    """Wait for replies to a given request.
331

  
332
    This method will wait until either the timeout expires or a
333
    minimum number (computed using L{_NeededReplies}) of replies are
334
    received for the given salt. It is useful when doing synchronous
335
    calls to this library.
336

  
337
    @param salt: the salt of the request we want responses for
338
    @param timeout: the maximum timeout (should be less or equal to
339
        L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
340
    @rtype: tuple
341
    @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
342
        request is unknown, timed_out will be true and the counters
343
        will be zero
344

  
345
    """
346
    def _CheckResponse():
347
      if salt not in self._requests:
348
        # expired?
349
        if self._logger:
350
          self._logger.debug("Discarding unknown/expired request: %s" % salt)
351
        return MISSING
352
      rq = self._requests[salt]
353
      if len(rq.rcvd) >= expected:
354
        # already got all replies
355
        return (False, len(rq.sent), len(rq.rcvd))
356
      # else wait, using default timeout
357
      self.ReceiveReply()
358
      raise utils.RetryAgain()
359

  
360
    MISSING = (True, 0, 0)
361

  
362
    if salt not in self._requests:
363
      return MISSING
364
    # extend the expire time with the current timeout, so that we
365
    # don't get the request expired from under us
366
    rq = self._requests[salt]
367
    rq.expiry += timeout
368
    sent = len(rq.sent)
369
    expected = self._NeededReplies(sent)
370

  
371
    try:
372
      return utils.Retry(_CheckResponse, 0, timeout)
373
    except utils.RetryTimeout:
374
      if salt in self._requests:
375
        rq = self._requests[salt]
376
        return (True, len(rq.sent), len(rq.rcvd))
377
      else:
378
        return MISSING
379

  
296 380

  
297 381
# UPCALL_REPLY: server reply upcall
298 382
# has all ConfdUpcallPayload fields populated

Also available in: Unified diff