Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ cc6484c4

History | View | Annotate | Download (20.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
Clients can use the confd client library to send requests to a group of master
25
candidates running confd. The expected usage is through the asyncore framework,
26
by sending queries, and asynchronously receiving replies through a callback.
27

28
This way the client library doesn't ever need to "wait" on a particular answer,
29
and can proceed even if some udp packets are lost. It's up to the user to
30
reschedule queries if they haven't received responses and they need them.
31

32
Example usage::
33

34
  client = ConfdClient(...) # includes callback specification
35
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36
  client.SendRequest(req)
37
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38
  # ... wait ...
39
  # And your callback will be called by asyncore, when your query gets a
40
  # response, or when it expires.
41

42
You can use the provided ConfdFilterCallback to act as a filter, only passing
43
"newer" answer to your callback, and filtering out outdated ones, or ones
44
confirming what you already got.
45

46
"""
47

    
48
# pylint: disable-msg=E0203
49

    
50
# E0203: Access to member %r before its definition, since we use
51
# objects.py which doesn't explicitely initialise its members
52

    
53
import time
54
import random
55

    
56
from ganeti import utils
57
from ganeti import constants
58
from ganeti import objects
59
from ganeti import serializer
60
from ganeti import daemon # contains AsyncUDPSocket
61
from ganeti import errors
62
from ganeti import confd
63
from ganeti import ssconf
64

    
65

    
66
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
67
  """Confd udp asyncore client
68

69
  This is kept separate from the main ConfdClient to make sure it's easy to
70
  implement a non-asyncore based client library.
71

72
  """
73
  def __init__(self, client):
74
    """Constructor for ConfdAsyncUDPClient
75

76
    @type client: L{ConfdClient}
77
    @param client: client library, to pass the datagrams to
78

79
    """
80
    daemon.AsyncUDPSocket.__init__(self)
81
    self.client = client
82

    
83
  # this method is overriding a daemon.AsyncUDPSocket method
84
  def handle_datagram(self, payload, ip, port):
85
    self.client.HandleResponse(payload, ip, port)
86

    
87

    
88
class _Request(object):
89
  """Request status structure.
90

91
  @ivar request: the request data
92
  @ivar args: any extra arguments for the callback
93
  @ivar expiry: the expiry timestamp of the request
94
  @ivar sent: the set of contacted peers
95
  @ivar rcvd: the set of peers who replied
96

97
  """
98
  def __init__(self, request, args, expiry, sent):
99
    self.request = request
100
    self.args = args
101
    self.expiry = expiry
102
    self.sent = frozenset(sent)
103
    self.rcvd = set()
104

    
105

    
106
class ConfdClient:
107
  """Send queries to confd, and get back answers.
108

109
  Since the confd model works by querying multiple master candidates, and
110
  getting back answers, this is an asynchronous library. It can either work
111
  through asyncore or with your own handling.
112

113
  @type _requests: dict
114
  @ivar _requests: dictionary indexes by salt, which contains data
115
      about the outstanding requests; the values are objects of type
116
      L{_Request}
117

118
  """
119
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
120
    """Constructor for ConfdClient
121

122
    @type hmac_key: string
123
    @param hmac_key: hmac key to talk to confd
124
    @type peers: list
125
    @param peers: list of peer nodes
126
    @type callback: f(L{ConfdUpcallPayload})
127
    @param callback: function to call when getting answers
128
    @type port: integer
129
    @param port: confd port (default: use GetDaemonPort)
130
    @type logger: logging.Logger
131
    @param logger: optional logger for internal conditions
132

133
    """
134
    if not callable(callback):
135
      raise errors.ProgrammerError("callback must be callable")
136

    
137
    self.UpdatePeerList(peers)
138
    self._hmac_key = hmac_key
139
    self._socket = ConfdAsyncUDPClient(self)
140
    self._callback = callback
141
    self._confd_port = port
142
    self._logger = logger
143
    self._requests = {}
144

    
145
    if self._confd_port is None:
146
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
147

    
148
  def UpdatePeerList(self, peers):
149
    """Update the list of peers
150

151
    @type peers: list
152
    @param peers: list of peer nodes
153

154
    """
155
    # we are actually called from init, so:
156
    # pylint: disable-msg=W0201
157
    if not isinstance(peers, list):
158
      raise errors.ProgrammerError("peers must be a list")
159
    # make a copy of peers, since we're going to shuffle the list, later
160
    self._peers = list(peers)
161

    
162
  def _PackRequest(self, request, now=None):
163
    """Prepare a request to be sent on the wire.
164

165
    This function puts a proper salt in a confd request, puts the proper salt,
166
    and adds the correct magic number.
167

168
    """
169
    if now is None:
170
      now = time.time()
171
    tstamp = '%d' % now
172
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
173
    return confd.PackMagic(req)
174

    
175
  def _UnpackReply(self, payload):
176
    in_payload = confd.UnpackMagic(payload)
177
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
178
    answer = objects.ConfdReply.FromDict(dict_answer)
179
    return answer, salt
180

    
181
  def ExpireRequests(self):
182
    """Delete all the expired requests.
183

184
    """
185
    now = time.time()
186
    for rsalt, rq in self._requests.items():
187
      if now >= rq.expiry:
188
        del self._requests[rsalt]
189
        client_reply = ConfdUpcallPayload(salt=rsalt,
190
                                          type=UPCALL_EXPIRE,
191
                                          orig_request=rq.request,
192
                                          extra_args=rq.args,
193
                                          client=self,
194
                                          )
195
        self._callback(client_reply)
196

    
197
  def SendRequest(self, request, args=None, coverage=0, async=True):
198
    """Send a confd request to some MCs
199

200
    @type request: L{objects.ConfdRequest}
201
    @param request: the request to send
202
    @type args: tuple
203
    @param args: additional callback arguments
204
    @type coverage: integer
205
    @param coverage: number of remote nodes to contact; if default
206
        (0), it will use a reasonable default
207
        (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
208
        passed, it will use the maximum number of peers, otherwise the
209
        number passed in will be used
210
    @type async: boolean
211
    @param async: handle the write asynchronously
212

213
    """
214
    if coverage == 0:
215
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
216
    elif coverage == -1:
217
      coverage = len(self._peers)
218

    
219
    if coverage > len(self._peers):
220
      raise errors.ConfdClientError("Not enough MCs known to provide the"
221
                                    " desired coverage")
222

    
223
    if not request.rsalt:
224
      raise errors.ConfdClientError("Missing request rsalt")
225

    
226
    self.ExpireRequests()
227
    if request.rsalt in self._requests:
228
      raise errors.ConfdClientError("Duplicate request rsalt")
229

    
230
    if request.type not in constants.CONFD_REQS:
231
      raise errors.ConfdClientError("Invalid request type")
232

    
233
    random.shuffle(self._peers)
234
    targets = self._peers[:coverage]
235

    
236
    now = time.time()
237
    payload = self._PackRequest(request, now=now)
238

    
239
    for target in targets:
240
      try:
241
        self._socket.enqueue_send(target, self._confd_port, payload)
242
      except errors.UdpDataSizeError:
243
        raise errors.ConfdClientError("Request too big")
244

    
245
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
246
    self._requests[request.rsalt] = _Request(request, args, expire_time,
247
                                             targets)
248

    
249
    if not async:
250
      self.FlushSendQueue()
251

    
252
  def HandleResponse(self, payload, ip, port):
253
    """Asynchronous handler for a confd reply
254

255
    Call the relevant callback associated to the current request.
256

257
    """
258
    try:
259
      try:
260
        answer, salt = self._UnpackReply(payload)
261
      except (errors.SignatureError, errors.ConfdMagicError), err:
262
        if self._logger:
263
          self._logger.debug("Discarding broken package: %s" % err)
264
        return
265

    
266
      try:
267
        rq = self._requests[salt]
268
      except KeyError:
269
        if self._logger:
270
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
271
        return
272

    
273
      rq.rcvd.add(ip)
274

    
275
      client_reply = ConfdUpcallPayload(salt=salt,
276
                                        type=UPCALL_REPLY,
277
                                        server_reply=answer,
278
                                        orig_request=rq.request,
279
                                        server_ip=ip,
280
                                        server_port=port,
281
                                        extra_args=rq.args,
282
                                        client=self,
283
                                       )
284
      self._callback(client_reply)
285

    
286
    finally:
287
      self.ExpireRequests()
288

    
289
  def FlushSendQueue(self):
290
    """Send out all pending requests.
291

292
    Can be used for synchronous client use.
293

294
    """
295
    while self._socket.writable():
296
      self._socket.handle_write()
297

    
298
  def ReceiveReply(self, timeout=1):
299
    """Receive one reply.
300

301
    @type timeout: float
302
    @param timeout: how long to wait for the reply
303
    @rtype: boolean
304
    @return: True if some data has been handled, False otherwise
305

306
    """
307
    return self._socket.process_next_packet(timeout=timeout)
308

    
309
  @staticmethod
310
  def _NeededReplies(peer_cnt):
311
    """Compute the minimum safe number of replies for a query.
312

313
    The algorithm is designed to work well for both small and big
314
    number of peers:
315
        - for less than three, we require all responses
316
        - for less than five, we allow one miss
317
        - otherwise, half the number plus one
318

319
    This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
320
    4->2, 5->3, 6->3, 7->4, etc.
321

322
    @type peer_cnt: int
323
    @param peer_cnt: the number of peers contacted
324
    @rtype: int
325
    @return: the number of replies which should give a safe coverage
326

327
    """
328
    if peer_cnt < 3:
329
      return peer_cnt
330
    elif peer_cnt < 5:
331
      return peer_cnt - 1
332
    else:
333
      return int(peer_cnt/2) + 1
334

    
335
  def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
336
    """Wait for replies to a given request.
337

338
    This method will wait until either the timeout expires or a
339
    minimum number (computed using L{_NeededReplies}) of replies are
340
    received for the given salt. It is useful when doing synchronous
341
    calls to this library.
342

343
    @param salt: the salt of the request we want responses for
344
    @param timeout: the maximum timeout (should be less or equal to
345
        L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
346
    @rtype: tuple
347
    @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
348
        request is unknown, timed_out will be true and the counters
349
        will be zero
350

351
    """
352
    def _CheckResponse():
353
      if salt not in self._requests:
354
        # expired?
355
        if self._logger:
356
          self._logger.debug("Discarding unknown/expired request: %s" % salt)
357
        return MISSING
358
      rq = self._requests[salt]
359
      if len(rq.rcvd) >= expected:
360
        # already got all replies
361
        return (False, len(rq.sent), len(rq.rcvd))
362
      # else wait, using default timeout
363
      self.ReceiveReply()
364
      raise utils.RetryAgain()
365

    
366
    MISSING = (True, 0, 0)
367

    
368
    if salt not in self._requests:
369
      return MISSING
370
    # extend the expire time with the current timeout, so that we
371
    # don't get the request expired from under us
372
    rq = self._requests[salt]
373
    rq.expiry += timeout
374
    sent = len(rq.sent)
375
    expected = self._NeededReplies(sent)
376

    
377
    try:
378
      return utils.Retry(_CheckResponse, 0, timeout)
379
    except utils.RetryTimeout:
380
      if salt in self._requests:
381
        rq = self._requests[salt]
382
        return (True, len(rq.sent), len(rq.rcvd))
383
      else:
384
        return MISSING
385

    
386

    
387
# UPCALL_REPLY: server reply upcall
388
# has all ConfdUpcallPayload fields populated
389
UPCALL_REPLY = 1
390
# UPCALL_EXPIRE: internal library request expire
391
# has only salt, type, orig_request and extra_args
392
UPCALL_EXPIRE = 2
393
CONFD_UPCALL_TYPES = frozenset([
394
  UPCALL_REPLY,
395
  UPCALL_EXPIRE,
396
  ])
397

    
398

    
399
class ConfdUpcallPayload(objects.ConfigObject):
400
  """Callback argument for confd replies
401

402
  @type salt: string
403
  @ivar salt: salt associated with the query
404
  @type type: one of confd.client.CONFD_UPCALL_TYPES
405
  @ivar type: upcall type (server reply, expired request, ...)
406
  @type orig_request: L{objects.ConfdRequest}
407
  @ivar orig_request: original request
408
  @type server_reply: L{objects.ConfdReply}
409
  @ivar server_reply: server reply
410
  @type server_ip: string
411
  @ivar server_ip: answering server ip address
412
  @type server_port: int
413
  @ivar server_port: answering server port
414
  @type extra_args: any
415
  @ivar extra_args: 'args' argument of the SendRequest function
416
  @type client: L{ConfdClient}
417
  @ivar client: current confd client instance
418

419
  """
420
  __slots__ = [
421
    "salt",
422
    "type",
423
    "orig_request",
424
    "server_reply",
425
    "server_ip",
426
    "server_port",
427
    "extra_args",
428
    "client",
429
    ]
430

    
431

    
432
class ConfdClientRequest(objects.ConfdRequest):
433
  """This is the client-side version of ConfdRequest.
434

435
  This version of the class helps creating requests, on the client side, by
436
  filling in some default values.
437

438
  """
439
  def __init__(self, **kwargs):
440
    objects.ConfdRequest.__init__(self, **kwargs)
441
    if not self.rsalt:
442
      self.rsalt = utils.NewUUID()
443
    if not self.protocol:
444
      self.protocol = constants.CONFD_PROTOCOL_VERSION
445
    if self.type not in constants.CONFD_REQS:
446
      raise errors.ConfdClientError("Invalid request type")
447

    
448

    
449
class ConfdFilterCallback:
450
  """Callback that calls another callback, but filters duplicate results.
451

452
  @ivar consistent: a dictionary indexed by salt; for each salt, if
453
      all responses ware identical, this will be True; this is the
454
      expected state on a healthy cluster; on inconsistent or
455
      partitioned clusters, this might be False, if we see answers
456
      with the same serial but different contents
457

458
  """
459
  def __init__(self, callback, logger=None):
460
    """Constructor for ConfdFilterCallback
461

462
    @type callback: f(L{ConfdUpcallPayload})
463
    @param callback: function to call when getting answers
464
    @type logger: logging.Logger
465
    @param logger: optional logger for internal conditions
466

467
    """
468
    if not callable(callback):
469
      raise errors.ProgrammerError("callback must be callable")
470

    
471
    self._callback = callback
472
    self._logger = logger
473
    # answers contains a dict of salt -> answer
474
    self._answers = {}
475
    self.consistent = {}
476

    
477
  def _LogFilter(self, salt, new_reply, old_reply):
478
    if not self._logger:
479
      return
480

    
481
    if new_reply.serial > old_reply.serial:
482
      self._logger.debug("Filtering confirming answer, with newer"
483
                         " serial for query %s" % salt)
484
    elif new_reply.serial == old_reply.serial:
485
      if new_reply.answer != old_reply.answer:
486
        self._logger.warning("Got incoherent answers for query %s"
487
                             " (serial: %s)" % (salt, new_reply.serial))
488
      else:
489
        self._logger.debug("Filtering confirming answer, with same"
490
                           " serial for query %s" % salt)
491
    else:
492
      self._logger.debug("Filtering outdated answer for query %s"
493
                         " serial: (%d < %d)" % (salt, old_reply.serial,
494
                                                 new_reply.serial))
495

    
496
  def _HandleExpire(self, up):
497
    # if we have no answer we have received none, before the expiration.
498
    if up.salt in self._answers:
499
      del self._answers[up.salt]
500
    if up.salt in self.consistent:
501
      del self.consistent[up.salt]
502

    
503
  def _HandleReply(self, up):
504
    """Handle a single confd reply, and decide whether to filter it.
505

506
    @rtype: boolean
507
    @return: True if the reply should be filtered, False if it should be passed
508
             on to the up-callback
509

510
    """
511
    filter_upcall = False
512
    salt = up.salt
513
    if salt not in self.consistent:
514
      self.consistent[salt] = True
515
    if salt not in self._answers:
516
      # first answer for a query (don't filter, and record)
517
      self._answers[salt] = up.server_reply
518
    elif up.server_reply.serial > self._answers[salt].serial:
519
      # newer answer (record, and compare contents)
520
      old_answer = self._answers[salt]
521
      self._answers[salt] = up.server_reply
522
      if up.server_reply.answer == old_answer.answer:
523
        # same content (filter) (version upgrade was unrelated)
524
        filter_upcall = True
525
        self._LogFilter(salt, up.server_reply, old_answer)
526
      # else: different content, pass up a second answer
527
    else:
528
      # older or same-version answer (duplicate or outdated, filter)
529
      if (up.server_reply.serial == self._answers[salt].serial and
530
          up.server_reply.answer != self._answers[salt].answer):
531
        self.consistent[salt] = False
532
      filter_upcall = True
533
      self._LogFilter(salt, up.server_reply, self._answers[salt])
534

    
535
    return filter_upcall
536

    
537
  def __call__(self, up):
538
    """Filtering callback
539

540
    @type up: L{ConfdUpcallPayload}
541
    @param up: upper callback
542

543
    """
544
    filter_upcall = False
545
    if up.type == UPCALL_REPLY:
546
      filter_upcall = self._HandleReply(up)
547
    elif up.type == UPCALL_EXPIRE:
548
      self._HandleExpire(up)
549

    
550
    if not filter_upcall:
551
      self._callback(up)
552

    
553

    
554
class ConfdCountingCallback:
555
  """Callback that calls another callback, and counts the answers
556

557
  """
558
  def __init__(self, callback, logger=None):
559
    """Constructor for ConfdCountingCallback
560

561
    @type callback: f(L{ConfdUpcallPayload})
562
    @param callback: function to call when getting answers
563
    @type logger: logging.Logger
564
    @param logger: optional logger for internal conditions
565

566
    """
567
    if not callable(callback):
568
      raise errors.ProgrammerError("callback must be callable")
569

    
570
    self._callback = callback
571
    self._logger = logger
572
    # answers contains a dict of salt -> count
573
    self._answers = {}
574

    
575
  def RegisterQuery(self, salt):
576
    if salt in self._answers:
577
      raise errors.ProgrammerError("query already registered")
578
    self._answers[salt] = 0
579

    
580
  def AllAnswered(self):
581
    """Have all the registered queries received at least an answer?
582

583
    """
584
    return utils.all(self._answers.values())
585

    
586
  def _HandleExpire(self, up):
587
    # if we have no answer we have received none, before the expiration.
588
    if up.salt in self._answers:
589
      del self._answers[up.salt]
590

    
591
  def _HandleReply(self, up):
592
    """Handle a single confd reply, and decide whether to filter it.
593

594
    @rtype: boolean
595
    @return: True if the reply should be filtered, False if it should be passed
596
             on to the up-callback
597

598
    """
599
    if up.salt in self._answers:
600
      self._answers[up.salt] += 1
601

    
602
  def __call__(self, up):
603
    """Filtering callback
604

605
    @type up: L{ConfdUpcallPayload}
606
    @param up: upper callback
607

608
    """
609
    if up.type == UPCALL_REPLY:
610
      self._HandleReply(up)
611
    elif up.type == UPCALL_EXPIRE:
612
      self._HandleExpire(up)
613
    self._callback(up)
614

    
615

    
616
class StoreResultCallback:
617
  """Callback that simply stores the most recent answer.
618

619
  @ivar _answers: dict of salt to (have_answer, reply)
620

621
  """
622
  _NO_KEY = (False, None)
623

    
624
  def __init__(self):
625
    """Constructor for StoreResultCallback
626

627
    """
628
    # answers contains a dict of salt -> best result
629
    self._answers = {}
630

    
631
  def GetResponse(self, salt):
632
    """Return the best match for a salt
633

634
    """
635
    return self._answers.get(salt, self._NO_KEY)
636

    
637
  def _HandleExpire(self, up):
638
    """Expiration handler.
639

640
    """
641
    if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
642
      del self._answers[up.salt]
643

    
644
  def _HandleReply(self, up):
645
    """Handle a single confd reply, and decide whether to filter it.
646

647
    """
648
    self._answers[up.salt] = (True, up)
649

    
650
  def __call__(self, up):
651
    """Filtering callback
652

653
    @type up: L{ConfdUpcallPayload}
654
    @param up: upper callback
655

656
    """
657
    if up.type == UPCALL_REPLY:
658
      self._HandleReply(up)
659
    elif up.type == UPCALL_EXPIRE:
660
      self._HandleExpire(up)
661

    
662

    
663
def GetConfdClient(callback):
664
  """Return a client configured using the given callback.
665

666
  This is handy to abstract the MC list and HMAC key reading.
667

668
  @attention: This should only be called on nodes which are part of a
669
      cluster, since it depends on a valid (ganeti) data directory;
670
      for code running outside of a cluster, you need to create the
671
      client manually
672

673
  """
674
  ss = ssconf.SimpleStore()
675
  mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
676
  mc_list = utils.ReadFile(mc_file).splitlines()
677
  hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
678
  return ConfdClient(hmac_key, mc_list, callback)