Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ d8bcfe21

History | View | Annotate | Download (21.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
Clients can use the confd client library to send requests to a group of master
25
candidates running confd. The expected usage is through the asyncore framework,
26
by sending queries, and asynchronously receiving replies through a callback.
27

28
This way the client library doesn't ever need to "wait" on a particular answer,
29
and can proceed even if some udp packets are lost. It's up to the user to
30
reschedule queries if they haven't received responses and they need them.
31

32
Example usage::
33

34
  client = ConfdClient(...) # includes callback specification
35
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36
  client.SendRequest(req)
37
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38
  # ... wait ...
39
  # And your callback will be called by asyncore, when your query gets a
40
  # response, or when it expires.
41

42
You can use the provided ConfdFilterCallback to act as a filter, only passing
43
"newer" answer to your callback, and filtering out outdated ones, or ones
44
confirming what you already got.
45

46
"""
47

    
48
# pylint: disable-msg=E0203
49

    
50
# E0203: Access to member %r before its definition, since we use
51
# objects.py which doesn't explicitely initialise its members
52

    
53
import time
54
import random
55

    
56
from ganeti import utils
57
from ganeti import constants
58
from ganeti import objects
59
from ganeti import serializer
60
from ganeti import daemon # contains AsyncUDPSocket
61
from ganeti import errors
62
from ganeti import confd
63
from ganeti import ssconf
64
from ganeti import compat
65
from ganeti import netutils
66

    
67

    
68
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
69
  """Confd udp asyncore client
70

71
  This is kept separate from the main ConfdClient to make sure it's easy to
72
  implement a non-asyncore based client library.
73

74
  """
75
  def __init__(self, client, family):
76
    """Constructor for ConfdAsyncUDPClient
77

78
    @type client: L{ConfdClient}
79
    @param client: client library, to pass the datagrams to
80

81
    """
82
    daemon.AsyncUDPSocket.__init__(self, family)
83
    self.client = client
84

    
85
  # this method is overriding a daemon.AsyncUDPSocket method
86
  def handle_datagram(self, payload, ip, port):
87
    self.client.HandleResponse(payload, ip, port)
88

    
89

    
90
class _Request(object):
91
  """Request status structure.
92

93
  @ivar request: the request data
94
  @ivar args: any extra arguments for the callback
95
  @ivar expiry: the expiry timestamp of the request
96
  @ivar sent: the set of contacted peers
97
  @ivar rcvd: the set of peers who replied
98

99
  """
100
  def __init__(self, request, args, expiry, sent):
101
    self.request = request
102
    self.args = args
103
    self.expiry = expiry
104
    self.sent = frozenset(sent)
105
    self.rcvd = set()
106

    
107

    
108
class ConfdClient:
109
  """Send queries to confd, and get back answers.
110

111
  Since the confd model works by querying multiple master candidates, and
112
  getting back answers, this is an asynchronous library. It can either work
113
  through asyncore or with your own handling.
114

115
  @type _requests: dict
116
  @ivar _requests: dictionary indexes by salt, which contains data
117
      about the outstanding requests; the values are objects of type
118
      L{_Request}
119

120
  """
121
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
122
    """Constructor for ConfdClient
123

124
    @type hmac_key: string
125
    @param hmac_key: hmac key to talk to confd
126
    @type peers: list
127
    @param peers: list of peer nodes
128
    @type callback: f(L{ConfdUpcallPayload})
129
    @param callback: function to call when getting answers
130
    @type port: integer
131
    @param port: confd port (default: use GetDaemonPort)
132
    @type logger: logging.Logger
133
    @param logger: optional logger for internal conditions
134

135
    """
136
    if not callable(callback):
137
      raise errors.ProgrammerError("callback must be callable")
138

    
139
    self.UpdatePeerList(peers)
140
    self._SetPeersAddressFamily()
141
    self._hmac_key = hmac_key
142
    self._socket = ConfdAsyncUDPClient(self, self._family)
143
    self._callback = callback
144
    self._confd_port = port
145
    self._logger = logger
146
    self._requests = {}
147

    
148
    if self._confd_port is None:
149
      self._confd_port = netutils.GetDaemonPort(constants.CONFD)
150

    
151
  def UpdatePeerList(self, peers):
152
    """Update the list of peers
153

154
    @type peers: list
155
    @param peers: list of peer nodes
156

157
    """
158
    # we are actually called from init, so:
159
    # pylint: disable-msg=W0201
160
    if not isinstance(peers, list):
161
      raise errors.ProgrammerError("peers must be a list")
162
    # make a copy of peers, since we're going to shuffle the list, later
163
    self._peers = list(peers)
164

    
165
  def _PackRequest(self, request, now=None):
166
    """Prepare a request to be sent on the wire.
167

168
    This function puts a proper salt in a confd request, puts the proper salt,
169
    and adds the correct magic number.
170

171
    """
172
    if now is None:
173
      now = time.time()
174
    tstamp = '%d' % now
175
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
176
    return confd.PackMagic(req)
177

    
178
  def _UnpackReply(self, payload):
179
    in_payload = confd.UnpackMagic(payload)
180
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
181
    answer = objects.ConfdReply.FromDict(dict_answer)
182
    return answer, salt
183

    
184
  def ExpireRequests(self):
185
    """Delete all the expired requests.
186

187
    """
188
    now = time.time()
189
    for rsalt, rq in self._requests.items():
190
      if now >= rq.expiry:
191
        del self._requests[rsalt]
192
        client_reply = ConfdUpcallPayload(salt=rsalt,
193
                                          type=UPCALL_EXPIRE,
194
                                          orig_request=rq.request,
195
                                          extra_args=rq.args,
196
                                          client=self,
197
                                          )
198
        self._callback(client_reply)
199

    
200
  def SendRequest(self, request, args=None, coverage=0, async=True):
201
    """Send a confd request to some MCs
202

203
    @type request: L{objects.ConfdRequest}
204
    @param request: the request to send
205
    @type args: tuple
206
    @param args: additional callback arguments
207
    @type coverage: integer
208
    @param coverage: number of remote nodes to contact; if default
209
        (0), it will use a reasonable default
210
        (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
211
        passed, it will use the maximum number of peers, otherwise the
212
        number passed in will be used
213
    @type async: boolean
214
    @param async: handle the write asynchronously
215

216
    """
217
    if coverage == 0:
218
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
219
    elif coverage == -1:
220
      coverage = len(self._peers)
221

    
222
    if coverage > len(self._peers):
223
      raise errors.ConfdClientError("Not enough MCs known to provide the"
224
                                    " desired coverage")
225

    
226
    if not request.rsalt:
227
      raise errors.ConfdClientError("Missing request rsalt")
228

    
229
    self.ExpireRequests()
230
    if request.rsalt in self._requests:
231
      raise errors.ConfdClientError("Duplicate request rsalt")
232

    
233
    if request.type not in constants.CONFD_REQS:
234
      raise errors.ConfdClientError("Invalid request type")
235

    
236
    random.shuffle(self._peers)
237
    targets = self._peers[:coverage]
238

    
239
    now = time.time()
240
    payload = self._PackRequest(request, now=now)
241

    
242
    for target in targets:
243
      try:
244
        self._socket.enqueue_send(target, self._confd_port, payload)
245
      except errors.UdpDataSizeError:
246
        raise errors.ConfdClientError("Request too big")
247

    
248
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
249
    self._requests[request.rsalt] = _Request(request, args, expire_time,
250
                                             targets)
251

    
252
    if not async:
253
      self.FlushSendQueue()
254

    
255
  def HandleResponse(self, payload, ip, port):
256
    """Asynchronous handler for a confd reply
257

258
    Call the relevant callback associated to the current request.
259

260
    """
261
    try:
262
      try:
263
        answer, salt = self._UnpackReply(payload)
264
      except (errors.SignatureError, errors.ConfdMagicError), err:
265
        if self._logger:
266
          self._logger.debug("Discarding broken package: %s" % err)
267
        return
268

    
269
      try:
270
        rq = self._requests[salt]
271
      except KeyError:
272
        if self._logger:
273
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
274
        return
275

    
276
      rq.rcvd.add(ip)
277

    
278
      client_reply = ConfdUpcallPayload(salt=salt,
279
                                        type=UPCALL_REPLY,
280
                                        server_reply=answer,
281
                                        orig_request=rq.request,
282
                                        server_ip=ip,
283
                                        server_port=port,
284
                                        extra_args=rq.args,
285
                                        client=self,
286
                                       )
287
      self._callback(client_reply)
288

    
289
    finally:
290
      self.ExpireRequests()
291

    
292
  def FlushSendQueue(self):
293
    """Send out all pending requests.
294

295
    Can be used for synchronous client use.
296

297
    """
298
    while self._socket.writable():
299
      self._socket.handle_write()
300

    
301
  def ReceiveReply(self, timeout=1):
302
    """Receive one reply.
303

304
    @type timeout: float
305
    @param timeout: how long to wait for the reply
306
    @rtype: boolean
307
    @return: True if some data has been handled, False otherwise
308

309
    """
310
    return self._socket.process_next_packet(timeout=timeout)
311

    
312
  @staticmethod
313
  def _NeededReplies(peer_cnt):
314
    """Compute the minimum safe number of replies for a query.
315

316
    The algorithm is designed to work well for both small and big
317
    number of peers:
318
        - for less than three, we require all responses
319
        - for less than five, we allow one miss
320
        - otherwise, half the number plus one
321

322
    This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
323
    4->2, 5->3, 6->3, 7->4, etc.
324

325
    @type peer_cnt: int
326
    @param peer_cnt: the number of peers contacted
327
    @rtype: int
328
    @return: the number of replies which should give a safe coverage
329

330
    """
331
    if peer_cnt < 3:
332
      return peer_cnt
333
    elif peer_cnt < 5:
334
      return peer_cnt - 1
335
    else:
336
      return int(peer_cnt/2) + 1
337

    
338
  def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
339
    """Wait for replies to a given request.
340

341
    This method will wait until either the timeout expires or a
342
    minimum number (computed using L{_NeededReplies}) of replies are
343
    received for the given salt. It is useful when doing synchronous
344
    calls to this library.
345

346
    @param salt: the salt of the request we want responses for
347
    @param timeout: the maximum timeout (should be less or equal to
348
        L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
349
    @rtype: tuple
350
    @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
351
        request is unknown, timed_out will be true and the counters
352
        will be zero
353

354
    """
355
    def _CheckResponse():
356
      if salt not in self._requests:
357
        # expired?
358
        if self._logger:
359
          self._logger.debug("Discarding unknown/expired request: %s" % salt)
360
        return MISSING
361
      rq = self._requests[salt]
362
      if len(rq.rcvd) >= expected:
363
        # already got all replies
364
        return (False, len(rq.sent), len(rq.rcvd))
365
      # else wait, using default timeout
366
      self.ReceiveReply()
367
      raise utils.RetryAgain()
368

    
369
    MISSING = (True, 0, 0)
370

    
371
    if salt not in self._requests:
372
      return MISSING
373
    # extend the expire time with the current timeout, so that we
374
    # don't get the request expired from under us
375
    rq = self._requests[salt]
376
    rq.expiry += timeout
377
    sent = len(rq.sent)
378
    expected = self._NeededReplies(sent)
379

    
380
    try:
381
      return utils.Retry(_CheckResponse, 0, timeout)
382
    except utils.RetryTimeout:
383
      if salt in self._requests:
384
        rq = self._requests[salt]
385
        return (True, len(rq.sent), len(rq.rcvd))
386
      else:
387
        return MISSING
388

    
389
  def _SetPeersAddressFamily(self):
390
    if not self._peers:
391
      raise errors.ConfdClientError("Peer list empty")
392
    try:
393
      peer = self._peers[0]
394
      self._family = netutils.GetAddressFamily(peer)
395
      for peer in self._peers[1:]:
396
        if netutils.GetAddressFamily(peer) != self._family:
397
          raise errors.ConfdClientError("Peers must be of same address family")
398
    except errors.GenericError:
399
      raise errors.ConfdClientError("Peer address %s invalid" % peer)
400

    
401

    
402
# UPCALL_REPLY: server reply upcall
403
# has all ConfdUpcallPayload fields populated
404
UPCALL_REPLY = 1
405
# UPCALL_EXPIRE: internal library request expire
406
# has only salt, type, orig_request and extra_args
407
UPCALL_EXPIRE = 2
408
CONFD_UPCALL_TYPES = frozenset([
409
  UPCALL_REPLY,
410
  UPCALL_EXPIRE,
411
  ])
412

    
413

    
414
class ConfdUpcallPayload(objects.ConfigObject):
415
  """Callback argument for confd replies
416

417
  @type salt: string
418
  @ivar salt: salt associated with the query
419
  @type type: one of confd.client.CONFD_UPCALL_TYPES
420
  @ivar type: upcall type (server reply, expired request, ...)
421
  @type orig_request: L{objects.ConfdRequest}
422
  @ivar orig_request: original request
423
  @type server_reply: L{objects.ConfdReply}
424
  @ivar server_reply: server reply
425
  @type server_ip: string
426
  @ivar server_ip: answering server ip address
427
  @type server_port: int
428
  @ivar server_port: answering server port
429
  @type extra_args: any
430
  @ivar extra_args: 'args' argument of the SendRequest function
431
  @type client: L{ConfdClient}
432
  @ivar client: current confd client instance
433

434
  """
435
  __slots__ = [
436
    "salt",
437
    "type",
438
    "orig_request",
439
    "server_reply",
440
    "server_ip",
441
    "server_port",
442
    "extra_args",
443
    "client",
444
    ]
445

    
446

    
447
class ConfdClientRequest(objects.ConfdRequest):
448
  """This is the client-side version of ConfdRequest.
449

450
  This version of the class helps creating requests, on the client side, by
451
  filling in some default values.
452

453
  """
454
  def __init__(self, **kwargs):
455
    objects.ConfdRequest.__init__(self, **kwargs)
456
    if not self.rsalt:
457
      self.rsalt = utils.NewUUID()
458
    if not self.protocol:
459
      self.protocol = constants.CONFD_PROTOCOL_VERSION
460
    if self.type not in constants.CONFD_REQS:
461
      raise errors.ConfdClientError("Invalid request type")
462

    
463

    
464
class ConfdFilterCallback:
465
  """Callback that calls another callback, but filters duplicate results.
466

467
  @ivar consistent: a dictionary indexed by salt; for each salt, if
468
      all responses ware identical, this will be True; this is the
469
      expected state on a healthy cluster; on inconsistent or
470
      partitioned clusters, this might be False, if we see answers
471
      with the same serial but different contents
472

473
  """
474
  def __init__(self, callback, logger=None):
475
    """Constructor for ConfdFilterCallback
476

477
    @type callback: f(L{ConfdUpcallPayload})
478
    @param callback: function to call when getting answers
479
    @type logger: logging.Logger
480
    @param logger: optional logger for internal conditions
481

482
    """
483
    if not callable(callback):
484
      raise errors.ProgrammerError("callback must be callable")
485

    
486
    self._callback = callback
487
    self._logger = logger
488
    # answers contains a dict of salt -> answer
489
    self._answers = {}
490
    self.consistent = {}
491

    
492
  def _LogFilter(self, salt, new_reply, old_reply):
493
    if not self._logger:
494
      return
495

    
496
    if new_reply.serial > old_reply.serial:
497
      self._logger.debug("Filtering confirming answer, with newer"
498
                         " serial for query %s" % salt)
499
    elif new_reply.serial == old_reply.serial:
500
      if new_reply.answer != old_reply.answer:
501
        self._logger.warning("Got incoherent answers for query %s"
502
                             " (serial: %s)" % (salt, new_reply.serial))
503
      else:
504
        self._logger.debug("Filtering confirming answer, with same"
505
                           " serial for query %s" % salt)
506
    else:
507
      self._logger.debug("Filtering outdated answer for query %s"
508
                         " serial: (%d < %d)" % (salt, old_reply.serial,
509
                                                 new_reply.serial))
510

    
511
  def _HandleExpire(self, up):
512
    # if we have no answer we have received none, before the expiration.
513
    if up.salt in self._answers:
514
      del self._answers[up.salt]
515
    if up.salt in self.consistent:
516
      del self.consistent[up.salt]
517

    
518
  def _HandleReply(self, up):
519
    """Handle a single confd reply, and decide whether to filter it.
520

521
    @rtype: boolean
522
    @return: True if the reply should be filtered, False if it should be passed
523
             on to the up-callback
524

525
    """
526
    filter_upcall = False
527
    salt = up.salt
528
    if salt not in self.consistent:
529
      self.consistent[salt] = True
530
    if salt not in self._answers:
531
      # first answer for a query (don't filter, and record)
532
      self._answers[salt] = up.server_reply
533
    elif up.server_reply.serial > self._answers[salt].serial:
534
      # newer answer (record, and compare contents)
535
      old_answer = self._answers[salt]
536
      self._answers[salt] = up.server_reply
537
      if up.server_reply.answer == old_answer.answer:
538
        # same content (filter) (version upgrade was unrelated)
539
        filter_upcall = True
540
        self._LogFilter(salt, up.server_reply, old_answer)
541
      # else: different content, pass up a second answer
542
    else:
543
      # older or same-version answer (duplicate or outdated, filter)
544
      if (up.server_reply.serial == self._answers[salt].serial and
545
          up.server_reply.answer != self._answers[salt].answer):
546
        self.consistent[salt] = False
547
      filter_upcall = True
548
      self._LogFilter(salt, up.server_reply, self._answers[salt])
549

    
550
    return filter_upcall
551

    
552
  def __call__(self, up):
553
    """Filtering callback
554

555
    @type up: L{ConfdUpcallPayload}
556
    @param up: upper callback
557

558
    """
559
    filter_upcall = False
560
    if up.type == UPCALL_REPLY:
561
      filter_upcall = self._HandleReply(up)
562
    elif up.type == UPCALL_EXPIRE:
563
      self._HandleExpire(up)
564

    
565
    if not filter_upcall:
566
      self._callback(up)
567

    
568

    
569
class ConfdCountingCallback:
570
  """Callback that calls another callback, and counts the answers
571

572
  """
573
  def __init__(self, callback, logger=None):
574
    """Constructor for ConfdCountingCallback
575

576
    @type callback: f(L{ConfdUpcallPayload})
577
    @param callback: function to call when getting answers
578
    @type logger: logging.Logger
579
    @param logger: optional logger for internal conditions
580

581
    """
582
    if not callable(callback):
583
      raise errors.ProgrammerError("callback must be callable")
584

    
585
    self._callback = callback
586
    self._logger = logger
587
    # answers contains a dict of salt -> count
588
    self._answers = {}
589

    
590
  def RegisterQuery(self, salt):
591
    if salt in self._answers:
592
      raise errors.ProgrammerError("query already registered")
593
    self._answers[salt] = 0
594

    
595
  def AllAnswered(self):
596
    """Have all the registered queries received at least an answer?
597

598
    """
599
    return compat.all(self._answers.values())
600

    
601
  def _HandleExpire(self, up):
602
    # if we have no answer we have received none, before the expiration.
603
    if up.salt in self._answers:
604
      del self._answers[up.salt]
605

    
606
  def _HandleReply(self, up):
607
    """Handle a single confd reply, and decide whether to filter it.
608

609
    @rtype: boolean
610
    @return: True if the reply should be filtered, False if it should be passed
611
             on to the up-callback
612

613
    """
614
    if up.salt in self._answers:
615
      self._answers[up.salt] += 1
616

    
617
  def __call__(self, up):
618
    """Filtering callback
619

620
    @type up: L{ConfdUpcallPayload}
621
    @param up: upper callback
622

623
    """
624
    if up.type == UPCALL_REPLY:
625
      self._HandleReply(up)
626
    elif up.type == UPCALL_EXPIRE:
627
      self._HandleExpire(up)
628
    self._callback(up)
629

    
630

    
631
class StoreResultCallback:
632
  """Callback that simply stores the most recent answer.
633

634
  @ivar _answers: dict of salt to (have_answer, reply)
635

636
  """
637
  _NO_KEY = (False, None)
638

    
639
  def __init__(self):
640
    """Constructor for StoreResultCallback
641

642
    """
643
    # answers contains a dict of salt -> best result
644
    self._answers = {}
645

    
646
  def GetResponse(self, salt):
647
    """Return the best match for a salt
648

649
    """
650
    return self._answers.get(salt, self._NO_KEY)
651

    
652
  def _HandleExpire(self, up):
653
    """Expiration handler.
654

655
    """
656
    if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
657
      del self._answers[up.salt]
658

    
659
  def _HandleReply(self, up):
660
    """Handle a single confd reply, and decide whether to filter it.
661

662
    """
663
    self._answers[up.salt] = (True, up)
664

    
665
  def __call__(self, up):
666
    """Filtering callback
667

668
    @type up: L{ConfdUpcallPayload}
669
    @param up: upper callback
670

671
    """
672
    if up.type == UPCALL_REPLY:
673
      self._HandleReply(up)
674
    elif up.type == UPCALL_EXPIRE:
675
      self._HandleExpire(up)
676

    
677

    
678
def GetConfdClient(callback):
679
  """Return a client configured using the given callback.
680

681
  This is handy to abstract the MC list and HMAC key reading.
682

683
  @attention: This should only be called on nodes which are part of a
684
      cluster, since it depends on a valid (ganeti) data directory;
685
      for code running outside of a cluster, you need to create the
686
      client manually
687

688
  """
689
  ss = ssconf.SimpleStore()
690
  mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
691
  mc_list = utils.ReadFile(mc_file).splitlines()
692
  hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
693
  return ConfdClient(hmac_key, mc_list, callback)