Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 31d3b918

History | View | Annotate | Download (21.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009, 2010, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
Clients can use the confd client library to send requests to a group of master
25
candidates running confd. The expected usage is through the asyncore framework,
26
by sending queries, and asynchronously receiving replies through a callback.
27

28
This way the client library doesn't ever need to "wait" on a particular answer,
29
and can proceed even if some udp packets are lost. It's up to the user to
30
reschedule queries if they haven't received responses and they need them.
31

32
Example usage::
33

34
  client = ConfdClient(...) # includes callback specification
35
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36
  client.SendRequest(req)
37
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38
  # ... wait ...
39
  # And your callback will be called by asyncore, when your query gets a
40
  # response, or when it expires.
41

42
You can use the provided ConfdFilterCallback to act as a filter, only passing
43
"newer" answer to your callback, and filtering out outdated ones, or ones
44
confirming what you already got.
45

46
"""
47

    
48
# pylint: disable=E0203
49

    
50
# E0203: Access to member %r before its definition, since we use
51
# objects.py which doesn't explicitly initialise its members
52

    
53
import time
54
import random
55

    
56
from ganeti import utils
57
from ganeti import constants
58
from ganeti import objects
59
from ganeti import serializer
60
from ganeti import daemon # contains AsyncUDPSocket
61
from ganeti import errors
62
from ganeti import confd
63
from ganeti import ssconf
64
from ganeti import compat
65
from ganeti import netutils
66
from ganeti import pathutils
67

    
68

    
69
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
70
  """Confd udp asyncore client
71

72
  This is kept separate from the main ConfdClient to make sure it's easy to
73
  implement a non-asyncore based client library.
74

75
  """
76
  def __init__(self, client, family):
77
    """Constructor for ConfdAsyncUDPClient
78

79
    @type client: L{ConfdClient}
80
    @param client: client library, to pass the datagrams to
81

82
    """
83
    daemon.AsyncUDPSocket.__init__(self, family)
84
    self.client = client
85

    
86
  # this method is overriding a daemon.AsyncUDPSocket method
87
  def handle_datagram(self, payload, ip, port):
88
    self.client.HandleResponse(payload, ip, port)
89

    
90

    
91
class _Request(object):
92
  """Request status structure.
93

94
  @ivar request: the request data
95
  @ivar args: any extra arguments for the callback
96
  @ivar expiry: the expiry timestamp of the request
97
  @ivar sent: the set of contacted peers
98
  @ivar rcvd: the set of peers who replied
99

100
  """
101
  def __init__(self, request, args, expiry, sent):
102
    self.request = request
103
    self.args = args
104
    self.expiry = expiry
105
    self.sent = frozenset(sent)
106
    self.rcvd = set()
107

    
108

    
109
class ConfdClient:
110
  """Send queries to confd, and get back answers.
111

112
  Since the confd model works by querying multiple master candidates, and
113
  getting back answers, this is an asynchronous library. It can either work
114
  through asyncore or with your own handling.
115

116
  @type _requests: dict
117
  @ivar _requests: dictionary indexes by salt, which contains data
118
      about the outstanding requests; the values are objects of type
119
      L{_Request}
120

121
  """
122
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
123
    """Constructor for ConfdClient
124

125
    @type hmac_key: string
126
    @param hmac_key: hmac key to talk to confd
127
    @type peers: list
128
    @param peers: list of peer nodes
129
    @type callback: f(L{ConfdUpcallPayload})
130
    @param callback: function to call when getting answers
131
    @type port: integer
132
    @param port: confd port (default: use GetDaemonPort)
133
    @type logger: logging.Logger
134
    @param logger: optional logger for internal conditions
135

136
    """
137
    if not callable(callback):
138
      raise errors.ProgrammerError("callback must be callable")
139

    
140
    self.UpdatePeerList(peers)
141
    self._SetPeersAddressFamily()
142
    self._hmac_key = hmac_key
143
    self._socket = ConfdAsyncUDPClient(self, self._family)
144
    self._callback = callback
145
    self._confd_port = port
146
    self._logger = logger
147
    self._requests = {}
148

    
149
    if self._confd_port is None:
150
      self._confd_port = netutils.GetDaemonPort(constants.CONFD)
151

    
152
  def UpdatePeerList(self, peers):
153
    """Update the list of peers
154

155
    @type peers: list
156
    @param peers: list of peer nodes
157

158
    """
159
    # we are actually called from init, so:
160
    # pylint: disable=W0201
161
    if not isinstance(peers, list):
162
      raise errors.ProgrammerError("peers must be a list")
163
    # make a copy of peers, since we're going to shuffle the list, later
164
    self._peers = list(peers)
165

    
166
  def _PackRequest(self, request, now=None):
167
    """Prepare a request to be sent on the wire.
168

169
    This function puts a proper salt in a confd request, puts the proper salt,
170
    and adds the correct magic number.
171

172
    """
173
    if now is None:
174
      now = time.time()
175
    tstamp = "%d" % now
176
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
177
    return confd.PackMagic(req)
178

    
179
  def _UnpackReply(self, payload):
180
    in_payload = confd.UnpackMagic(payload)
181
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
182
    answer = objects.ConfdReply.FromDict(dict_answer)
183
    return answer, salt
184

    
185
  def ExpireRequests(self):
186
    """Delete all the expired requests.
187

188
    """
189
    now = time.time()
190
    for rsalt, rq in self._requests.items():
191
      if now >= rq.expiry:
192
        del self._requests[rsalt]
193
        client_reply = ConfdUpcallPayload(salt=rsalt,
194
                                          type=UPCALL_EXPIRE,
195
                                          orig_request=rq.request,
196
                                          extra_args=rq.args,
197
                                          client=self,
198
                                          )
199
        self._callback(client_reply)
200

    
201
  def SendRequest(self, request, args=None, coverage=0, async=True):
202
    """Send a confd request to some MCs
203

204
    @type request: L{objects.ConfdRequest}
205
    @param request: the request to send
206
    @type args: tuple
207
    @param args: additional callback arguments
208
    @type coverage: integer
209
    @param coverage: number of remote nodes to contact; if default
210
        (0), it will use a reasonable default
211
        (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
212
        passed, it will use the maximum number of peers, otherwise the
213
        number passed in will be used
214
    @type async: boolean
215
    @param async: handle the write asynchronously
216

217
    """
218
    if coverage == 0:
219
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
220
    elif coverage == -1:
221
      coverage = len(self._peers)
222

    
223
    if coverage > len(self._peers):
224
      raise errors.ConfdClientError("Not enough MCs known to provide the"
225
                                    " desired coverage")
226

    
227
    if not request.rsalt:
228
      raise errors.ConfdClientError("Missing request rsalt")
229

    
230
    self.ExpireRequests()
231
    if request.rsalt in self._requests:
232
      raise errors.ConfdClientError("Duplicate request rsalt")
233

    
234
    if request.type not in constants.CONFD_REQS:
235
      raise errors.ConfdClientError("Invalid request type")
236

    
237
    random.shuffle(self._peers)
238
    targets = self._peers[:coverage]
239

    
240
    now = time.time()
241
    payload = self._PackRequest(request, now=now)
242

    
243
    for target in targets:
244
      try:
245
        self._socket.enqueue_send(target, self._confd_port, payload)
246
      except errors.UdpDataSizeError:
247
        raise errors.ConfdClientError("Request too big")
248

    
249
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
250
    self._requests[request.rsalt] = _Request(request, args, expire_time,
251
                                             targets)
252

    
253
    if not async:
254
      self.FlushSendQueue()
255

    
256
  def HandleResponse(self, payload, ip, port):
257
    """Asynchronous handler for a confd reply
258

259
    Call the relevant callback associated to the current request.
260

261
    """
262
    try:
263
      try:
264
        answer, salt = self._UnpackReply(payload)
265
      except (errors.SignatureError, errors.ConfdMagicError), err:
266
        if self._logger:
267
          self._logger.debug("Discarding broken package: %s" % err)
268
        return
269

    
270
      try:
271
        rq = self._requests[salt]
272
      except KeyError:
273
        if self._logger:
274
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
275
        return
276

    
277
      rq.rcvd.add(ip)
278

    
279
      client_reply = ConfdUpcallPayload(salt=salt,
280
                                        type=UPCALL_REPLY,
281
                                        server_reply=answer,
282
                                        orig_request=rq.request,
283
                                        server_ip=ip,
284
                                        server_port=port,
285
                                        extra_args=rq.args,
286
                                        client=self,
287
                                        )
288
      self._callback(client_reply)
289

    
290
    finally:
291
      self.ExpireRequests()
292

    
293
  def FlushSendQueue(self):
294
    """Send out all pending requests.
295

296
    Can be used for synchronous client use.
297

298
    """
299
    while self._socket.writable():
300
      self._socket.handle_write()
301

    
302
  def ReceiveReply(self, timeout=1):
303
    """Receive one reply.
304

305
    @type timeout: float
306
    @param timeout: how long to wait for the reply
307
    @rtype: boolean
308
    @return: True if some data has been handled, False otherwise
309

310
    """
311
    return self._socket.process_next_packet(timeout=timeout)
312

    
313
  @staticmethod
314
  def _NeededReplies(peer_cnt):
315
    """Compute the minimum safe number of replies for a query.
316

317
    The algorithm is designed to work well for both small and big
318
    number of peers:
319
        - for less than three, we require all responses
320
        - for less than five, we allow one miss
321
        - otherwise, half the number plus one
322

323
    This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
324
    4->2, 5->3, 6->3, 7->4, etc.
325

326
    @type peer_cnt: int
327
    @param peer_cnt: the number of peers contacted
328
    @rtype: int
329
    @return: the number of replies which should give a safe coverage
330

331
    """
332
    if peer_cnt < 3:
333
      return peer_cnt
334
    elif peer_cnt < 5:
335
      return peer_cnt - 1
336
    else:
337
      return int(peer_cnt / 2) + 1
338

    
339
  def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
340
    """Wait for replies to a given request.
341

342
    This method will wait until either the timeout expires or a
343
    minimum number (computed using L{_NeededReplies}) of replies are
344
    received for the given salt. It is useful when doing synchronous
345
    calls to this library.
346

347
    @param salt: the salt of the request we want responses for
348
    @param timeout: the maximum timeout (should be less or equal to
349
        L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
350
    @rtype: tuple
351
    @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
352
        request is unknown, timed_out will be true and the counters
353
        will be zero
354

355
    """
356
    def _CheckResponse():
357
      if salt not in self._requests:
358
        # expired?
359
        if self._logger:
360
          self._logger.debug("Discarding unknown/expired request: %s" % salt)
361
        return MISSING
362
      rq = self._requests[salt]
363
      if len(rq.rcvd) >= expected:
364
        # already got all replies
365
        return (False, len(rq.sent), len(rq.rcvd))
366
      # else wait, using default timeout
367
      self.ReceiveReply()
368
      raise utils.RetryAgain()
369

    
370
    MISSING = (True, 0, 0)
371

    
372
    if salt not in self._requests:
373
      return MISSING
374
    # extend the expire time with the current timeout, so that we
375
    # don't get the request expired from under us
376
    rq = self._requests[salt]
377
    rq.expiry += timeout
378
    sent = len(rq.sent)
379
    expected = self._NeededReplies(sent)
380

    
381
    try:
382
      return utils.Retry(_CheckResponse, 0, timeout)
383
    except utils.RetryTimeout:
384
      if salt in self._requests:
385
        rq = self._requests[salt]
386
        return (True, len(rq.sent), len(rq.rcvd))
387
      else:
388
        return MISSING
389

    
390
  def _SetPeersAddressFamily(self):
391
    if not self._peers:
392
      raise errors.ConfdClientError("Peer list empty")
393
    try:
394
      peer = self._peers[0]
395
      self._family = netutils.IPAddress.GetAddressFamily(peer)
396
      for peer in self._peers[1:]:
397
        if netutils.IPAddress.GetAddressFamily(peer) != self._family:
398
          raise errors.ConfdClientError("Peers must be of same address family")
399
    except errors.IPAddressError:
400
      raise errors.ConfdClientError("Peer address %s invalid" % peer)
401

    
402

    
403
# UPCALL_REPLY: server reply upcall
404
# has all ConfdUpcallPayload fields populated
405
UPCALL_REPLY = 1
406
# UPCALL_EXPIRE: internal library request expire
407
# has only salt, type, orig_request and extra_args
408
UPCALL_EXPIRE = 2
409
CONFD_UPCALL_TYPES = compat.UniqueFrozenset([
410
  UPCALL_REPLY,
411
  UPCALL_EXPIRE,
412
  ])
413

    
414

    
415
class ConfdUpcallPayload(objects.ConfigObject):
416
  """Callback argument for confd replies
417

418
  @type salt: string
419
  @ivar salt: salt associated with the query
420
  @type type: one of confd.client.CONFD_UPCALL_TYPES
421
  @ivar type: upcall type (server reply, expired request, ...)
422
  @type orig_request: L{objects.ConfdRequest}
423
  @ivar orig_request: original request
424
  @type server_reply: L{objects.ConfdReply}
425
  @ivar server_reply: server reply
426
  @type server_ip: string
427
  @ivar server_ip: answering server ip address
428
  @type server_port: int
429
  @ivar server_port: answering server port
430
  @type extra_args: any
431
  @ivar extra_args: 'args' argument of the SendRequest function
432
  @type client: L{ConfdClient}
433
  @ivar client: current confd client instance
434

435
  """
436
  __slots__ = [
437
    "salt",
438
    "type",
439
    "orig_request",
440
    "server_reply",
441
    "server_ip",
442
    "server_port",
443
    "extra_args",
444
    "client",
445
    ]
446

    
447

    
448
class ConfdClientRequest(objects.ConfdRequest):
449
  """This is the client-side version of ConfdRequest.
450

451
  This version of the class helps creating requests, on the client side, by
452
  filling in some default values.
453

454
  """
455
  def __init__(self, **kwargs):
456
    objects.ConfdRequest.__init__(self, **kwargs)
457
    if not self.rsalt:
458
      self.rsalt = utils.NewUUID()
459
    if not self.protocol:
460
      self.protocol = constants.CONFD_PROTOCOL_VERSION
461
    if self.type not in constants.CONFD_REQS:
462
      raise errors.ConfdClientError("Invalid request type")
463

    
464

    
465
class ConfdFilterCallback:
466
  """Callback that calls another callback, but filters duplicate results.
467

468
  @ivar consistent: a dictionary indexed by salt; for each salt, if
469
      all responses ware identical, this will be True; this is the
470
      expected state on a healthy cluster; on inconsistent or
471
      partitioned clusters, this might be False, if we see answers
472
      with the same serial but different contents
473

474
  """
475
  def __init__(self, callback, logger=None):
476
    """Constructor for ConfdFilterCallback
477

478
    @type callback: f(L{ConfdUpcallPayload})
479
    @param callback: function to call when getting answers
480
    @type logger: logging.Logger
481
    @param logger: optional logger for internal conditions
482

483
    """
484
    if not callable(callback):
485
      raise errors.ProgrammerError("callback must be callable")
486

    
487
    self._callback = callback
488
    self._logger = logger
489
    # answers contains a dict of salt -> answer
490
    self._answers = {}
491
    self.consistent = {}
492

    
493
  def _LogFilter(self, salt, new_reply, old_reply):
494
    if not self._logger:
495
      return
496

    
497
    if new_reply.serial > old_reply.serial:
498
      self._logger.debug("Filtering confirming answer, with newer"
499
                         " serial for query %s" % salt)
500
    elif new_reply.serial == old_reply.serial:
501
      if new_reply.answer != old_reply.answer:
502
        self._logger.warning("Got incoherent answers for query %s"
503
                             " (serial: %s)" % (salt, new_reply.serial))
504
      else:
505
        self._logger.debug("Filtering confirming answer, with same"
506
                           " serial for query %s" % salt)
507
    else:
508
      self._logger.debug("Filtering outdated answer for query %s"
509
                         " serial: (%d < %d)" % (salt, old_reply.serial,
510
                                                 new_reply.serial))
511

    
512
  def _HandleExpire(self, up):
513
    # if we have no answer we have received none, before the expiration.
514
    if up.salt in self._answers:
515
      del self._answers[up.salt]
516
    if up.salt in self.consistent:
517
      del self.consistent[up.salt]
518

    
519
  def _HandleReply(self, up):
520
    """Handle a single confd reply, and decide whether to filter it.
521

522
    @rtype: boolean
523
    @return: True if the reply should be filtered, False if it should be passed
524
             on to the up-callback
525

526
    """
527
    filter_upcall = False
528
    salt = up.salt
529
    if salt not in self.consistent:
530
      self.consistent[salt] = True
531
    if salt not in self._answers:
532
      # first answer for a query (don't filter, and record)
533
      self._answers[salt] = up.server_reply
534
    elif up.server_reply.serial > self._answers[salt].serial:
535
      # newer answer (record, and compare contents)
536
      old_answer = self._answers[salt]
537
      self._answers[salt] = up.server_reply
538
      if up.server_reply.answer == old_answer.answer:
539
        # same content (filter) (version upgrade was unrelated)
540
        filter_upcall = True
541
        self._LogFilter(salt, up.server_reply, old_answer)
542
      # else: different content, pass up a second answer
543
    else:
544
      # older or same-version answer (duplicate or outdated, filter)
545
      if (up.server_reply.serial == self._answers[salt].serial and
546
          up.server_reply.answer != self._answers[salt].answer):
547
        self.consistent[salt] = False
548
      filter_upcall = True
549
      self._LogFilter(salt, up.server_reply, self._answers[salt])
550

    
551
    return filter_upcall
552

    
553
  def __call__(self, up):
554
    """Filtering callback
555

556
    @type up: L{ConfdUpcallPayload}
557
    @param up: upper callback
558

559
    """
560
    filter_upcall = False
561
    if up.type == UPCALL_REPLY:
562
      filter_upcall = self._HandleReply(up)
563
    elif up.type == UPCALL_EXPIRE:
564
      self._HandleExpire(up)
565

    
566
    if not filter_upcall:
567
      self._callback(up)
568

    
569

    
570
class ConfdCountingCallback:
571
  """Callback that calls another callback, and counts the answers
572

573
  """
574
  def __init__(self, callback, logger=None):
575
    """Constructor for ConfdCountingCallback
576

577
    @type callback: f(L{ConfdUpcallPayload})
578
    @param callback: function to call when getting answers
579
    @type logger: logging.Logger
580
    @param logger: optional logger for internal conditions
581

582
    """
583
    if not callable(callback):
584
      raise errors.ProgrammerError("callback must be callable")
585

    
586
    self._callback = callback
587
    self._logger = logger
588
    # answers contains a dict of salt -> count
589
    self._answers = {}
590

    
591
  def RegisterQuery(self, salt):
592
    if salt in self._answers:
593
      raise errors.ProgrammerError("query already registered")
594
    self._answers[salt] = 0
595

    
596
  def AllAnswered(self):
597
    """Have all the registered queries received at least an answer?
598

599
    """
600
    return compat.all(self._answers.values())
601

    
602
  def _HandleExpire(self, up):
603
    # if we have no answer we have received none, before the expiration.
604
    if up.salt in self._answers:
605
      del self._answers[up.salt]
606

    
607
  def _HandleReply(self, up):
608
    """Handle a single confd reply, and decide whether to filter it.
609

610
    @rtype: boolean
611
    @return: True if the reply should be filtered, False if it should be passed
612
             on to the up-callback
613

614
    """
615
    if up.salt in self._answers:
616
      self._answers[up.salt] += 1
617

    
618
  def __call__(self, up):
619
    """Filtering callback
620

621
    @type up: L{ConfdUpcallPayload}
622
    @param up: upper callback
623

624
    """
625
    if up.type == UPCALL_REPLY:
626
      self._HandleReply(up)
627
    elif up.type == UPCALL_EXPIRE:
628
      self._HandleExpire(up)
629
    self._callback(up)
630

    
631

    
632
class StoreResultCallback:
633
  """Callback that simply stores the most recent answer.
634

635
  @ivar _answers: dict of salt to (have_answer, reply)
636

637
  """
638
  _NO_KEY = (False, None)
639

    
640
  def __init__(self):
641
    """Constructor for StoreResultCallback
642

643
    """
644
    # answers contains a dict of salt -> best result
645
    self._answers = {}
646

    
647
  def GetResponse(self, salt):
648
    """Return the best match for a salt
649

650
    """
651
    return self._answers.get(salt, self._NO_KEY)
652

    
653
  def _HandleExpire(self, up):
654
    """Expiration handler.
655

656
    """
657
    if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
658
      del self._answers[up.salt]
659

    
660
  def _HandleReply(self, up):
661
    """Handle a single confd reply, and decide whether to filter it.
662

663
    """
664
    self._answers[up.salt] = (True, up)
665

    
666
  def __call__(self, up):
667
    """Filtering callback
668

669
    @type up: L{ConfdUpcallPayload}
670
    @param up: upper callback
671

672
    """
673
    if up.type == UPCALL_REPLY:
674
      self._HandleReply(up)
675
    elif up.type == UPCALL_EXPIRE:
676
      self._HandleExpire(up)
677

    
678

    
679
def GetConfdClient(callback):
680
  """Return a client configured using the given callback.
681

682
  This is handy to abstract the MC list and HMAC key reading.
683

684
  @attention: This should only be called on nodes which are part of a
685
      cluster, since it depends on a valid (ganeti) data directory;
686
      for code running outside of a cluster, you need to create the
687
      client manually
688

689
  """
690
  ss = ssconf.SimpleStore()
691
  mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
692
  mc_list = utils.ReadFile(mc_file).splitlines()
693
  hmac_key = utils.ReadFile(pathutils.CONFD_HMAC_KEY)
694
  return ConfdClient(hmac_key, mc_list, callback)