Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ aa2efc52

History | View | Annotate | Download (20.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
Clients can use the confd client library to send requests to a group of master
25
candidates running confd. The expected usage is through the asyncore framework,
26
by sending queries, and asynchronously receiving replies through a callback.
27

28
This way the client library doesn't ever need to "wait" on a particular answer,
29
and can proceed even if some udp packets are lost. It's up to the user to
30
reschedule queries if they haven't received responses and they need them.
31

32
Example usage::
33

34
  client = ConfdClient(...) # includes callback specification
35
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36
  client.SendRequest(req)
37
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38
  # ... wait ...
39
  # And your callback will be called by asyncore, when your query gets a
40
  # response, or when it expires.
41

42
You can use the provided ConfdFilterCallback to act as a filter, only passing
43
"newer" answer to your callback, and filtering out outdated ones, or ones
44
confirming what you already got.
45

46
"""
47

    
48
# pylint: disable-msg=E0203
49

    
50
# E0203: Access to member %r before its definition, since we use
51
# objects.py which doesn't explicitely initialise its members
52

    
53
import time
54
import random
55

    
56
from ganeti import utils
57
from ganeti import constants
58
from ganeti import objects
59
from ganeti import serializer
60
from ganeti import daemon # contains AsyncUDPSocket
61
from ganeti import errors
62
from ganeti import confd
63
from ganeti import ssconf
64

    
65

    
66
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
67
  """Confd udp asyncore client
68

69
  This is kept separate from the main ConfdClient to make sure it's easy to
70
  implement a non-asyncore based client library.
71

72
  """
73
  def __init__(self, client):
74
    """Constructor for ConfdAsyncUDPClient
75

76
    @type client: L{ConfdClient}
77
    @param client: client library, to pass the datagrams to
78

79
    """
80
    daemon.AsyncUDPSocket.__init__(self)
81
    self.client = client
82

    
83
  # this method is overriding a daemon.AsyncUDPSocket method
84
  def handle_datagram(self, payload, ip, port):
85
    self.client.HandleResponse(payload, ip, port)
86

    
87

    
88
class _Request(object):
89
  """Request status structure.
90

91
  @ivar request: the request data
92
  @ivar args: any extra arguments for the callback
93
  @ivar expiry: the expiry timestamp of the request
94
  @ivar sent: the set of contacted peers
95
  @ivar rcvd: the set of peers who replied
96

97
  """
98
  def __init__(self, request, args, expiry, sent):
99
    self.request = request
100
    self.args = args
101
    self.expiry = expiry
102
    self.sent = frozenset(sent)
103
    self.rcvd = set()
104

    
105

    
106
class ConfdClient:
107
  """Send queries to confd, and get back answers.
108

109
  Since the confd model works by querying multiple master candidates, and
110
  getting back answers, this is an asynchronous library. It can either work
111
  through asyncore or with your own handling.
112

113
  @type _requests: dict
114
  @ivar _requests: dictionary indexes by salt, which contains data
115
      about the outstanding requests; the values are objects of type
116
      L{_Request}
117

118
  """
119
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
120
    """Constructor for ConfdClient
121

122
    @type hmac_key: string
123
    @param hmac_key: hmac key to talk to confd
124
    @type peers: list
125
    @param peers: list of peer nodes
126
    @type callback: f(L{ConfdUpcallPayload})
127
    @param callback: function to call when getting answers
128
    @type port: integer
129
    @param port: confd port (default: use GetDaemonPort)
130
    @type logger: logging.Logger
131
    @param logger: optional logger for internal conditions
132

133
    """
134
    if not callable(callback):
135
      raise errors.ProgrammerError("callback must be callable")
136

    
137
    self.UpdatePeerList(peers)
138
    self._hmac_key = hmac_key
139
    self._socket = ConfdAsyncUDPClient(self)
140
    self._callback = callback
141
    self._confd_port = port
142
    self._logger = logger
143
    self._requests = {}
144

    
145
    if self._confd_port is None:
146
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
147

    
148
  def UpdatePeerList(self, peers):
149
    """Update the list of peers
150

151
    @type peers: list
152
    @param peers: list of peer nodes
153

154
    """
155
    # we are actually called from init, so:
156
    # pylint: disable-msg=W0201
157
    if not isinstance(peers, list):
158
      raise errors.ProgrammerError("peers must be a list")
159
    # make a copy of peers, since we're going to shuffle the list, later
160
    self._peers = list(peers)
161

    
162
  def _PackRequest(self, request, now=None):
163
    """Prepare a request to be sent on the wire.
164

165
    This function puts a proper salt in a confd request, puts the proper salt,
166
    and adds the correct magic number.
167

168
    """
169
    if now is None:
170
      now = time.time()
171
    tstamp = '%d' % now
172
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
173
    return confd.PackMagic(req)
174

    
175
  def _UnpackReply(self, payload):
176
    in_payload = confd.UnpackMagic(payload)
177
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
178
    answer = objects.ConfdReply.FromDict(dict_answer)
179
    return answer, salt
180

    
181
  def ExpireRequests(self):
182
    """Delete all the expired requests.
183

184
    """
185
    now = time.time()
186
    for rsalt, rq in self._requests.items():
187
      if now >= rq.expiry:
188
        del self._requests[rsalt]
189
        client_reply = ConfdUpcallPayload(salt=rsalt,
190
                                          type=UPCALL_EXPIRE,
191
                                          orig_request=rq.request,
192
                                          extra_args=rq.args,
193
                                          client=self,
194
                                          )
195
        self._callback(client_reply)
196

    
197
  def SendRequest(self, request, args=None, coverage=None, async=True):
198
    """Send a confd request to some MCs
199

200
    @type request: L{objects.ConfdRequest}
201
    @param request: the request to send
202
    @type args: tuple
203
    @param args: additional callback arguments
204
    @type coverage: integer
205
    @param coverage: number of remote nodes to contact
206
    @type async: boolean
207
    @param async: handle the write asynchronously
208

209
    """
210
    if coverage is None:
211
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
212

    
213
    if coverage > len(self._peers):
214
      raise errors.ConfdClientError("Not enough MCs known to provide the"
215
                                    " desired coverage")
216

    
217
    if not request.rsalt:
218
      raise errors.ConfdClientError("Missing request rsalt")
219

    
220
    self.ExpireRequests()
221
    if request.rsalt in self._requests:
222
      raise errors.ConfdClientError("Duplicate request rsalt")
223

    
224
    if request.type not in constants.CONFD_REQS:
225
      raise errors.ConfdClientError("Invalid request type")
226

    
227
    random.shuffle(self._peers)
228
    targets = self._peers[:coverage]
229

    
230
    now = time.time()
231
    payload = self._PackRequest(request, now=now)
232

    
233
    for target in targets:
234
      try:
235
        self._socket.enqueue_send(target, self._confd_port, payload)
236
      except errors.UdpDataSizeError:
237
        raise errors.ConfdClientError("Request too big")
238

    
239
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
240
    self._requests[request.rsalt] = _Request(request, args, expire_time,
241
                                             targets)
242

    
243
    if not async:
244
      self.FlushSendQueue()
245

    
246
  def HandleResponse(self, payload, ip, port):
247
    """Asynchronous handler for a confd reply
248

249
    Call the relevant callback associated to the current request.
250

251
    """
252
    try:
253
      try:
254
        answer, salt = self._UnpackReply(payload)
255
      except (errors.SignatureError, errors.ConfdMagicError), err:
256
        if self._logger:
257
          self._logger.debug("Discarding broken package: %s" % err)
258
        return
259

    
260
      try:
261
        rq = self._requests[salt]
262
      except KeyError:
263
        if self._logger:
264
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
265
        return
266

    
267
      rq.rcvd.add(ip)
268

    
269
      client_reply = ConfdUpcallPayload(salt=salt,
270
                                        type=UPCALL_REPLY,
271
                                        server_reply=answer,
272
                                        orig_request=rq.request,
273
                                        server_ip=ip,
274
                                        server_port=port,
275
                                        extra_args=rq.args,
276
                                        client=self,
277
                                       )
278
      self._callback(client_reply)
279

    
280
    finally:
281
      self.ExpireRequests()
282

    
283
  def FlushSendQueue(self):
284
    """Send out all pending requests.
285

286
    Can be used for synchronous client use.
287

288
    """
289
    while self._socket.writable():
290
      self._socket.handle_write()
291

    
292
  def ReceiveReply(self, timeout=1):
293
    """Receive one reply.
294

295
    @type timeout: float
296
    @param timeout: how long to wait for the reply
297
    @rtype: boolean
298
    @return: True if some data has been handled, False otherwise
299

300
    """
301
    return self._socket.process_next_packet(timeout=timeout)
302

    
303
  @staticmethod
304
  def _NeededReplies(peer_cnt):
305
    """Compute the minimum safe number of replies for a query.
306

307
    The algorithm is designed to work well for both small and big
308
    number of peers:
309
        - for less than three, we require all responses
310
        - for less than five, we allow one miss
311
        - otherwise, half the number plus one
312

313
    This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
314
    4->2, 5->3, 6->3, 7->4, etc.
315

316
    @type peer_cnt: int
317
    @param peer_cnt: the number of peers contacted
318
    @rtype: int
319
    @return: the number of replies which should give a safe coverage
320

321
    """
322
    if peer_cnt < 3:
323
      return peer_cnt
324
    elif peer_cnt < 5:
325
      return peer_cnt - 1
326
    else:
327
      return int(peer_cnt/2) + 1
328

    
329
  def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
330
    """Wait for replies to a given request.
331

332
    This method will wait until either the timeout expires or a
333
    minimum number (computed using L{_NeededReplies}) of replies are
334
    received for the given salt. It is useful when doing synchronous
335
    calls to this library.
336

337
    @param salt: the salt of the request we want responses for
338
    @param timeout: the maximum timeout (should be less or equal to
339
        L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
340
    @rtype: tuple
341
    @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
342
        request is unknown, timed_out will be true and the counters
343
        will be zero
344

345
    """
346
    def _CheckResponse():
347
      if salt not in self._requests:
348
        # expired?
349
        if self._logger:
350
          self._logger.debug("Discarding unknown/expired request: %s" % salt)
351
        return MISSING
352
      rq = self._requests[salt]
353
      if len(rq.rcvd) >= expected:
354
        # already got all replies
355
        return (False, len(rq.sent), len(rq.rcvd))
356
      # else wait, using default timeout
357
      self.ReceiveReply()
358
      raise utils.RetryAgain()
359

    
360
    MISSING = (True, 0, 0)
361

    
362
    if salt not in self._requests:
363
      return MISSING
364
    # extend the expire time with the current timeout, so that we
365
    # don't get the request expired from under us
366
    rq = self._requests[salt]
367
    rq.expiry += timeout
368
    sent = len(rq.sent)
369
    expected = self._NeededReplies(sent)
370

    
371
    try:
372
      return utils.Retry(_CheckResponse, 0, timeout)
373
    except utils.RetryTimeout:
374
      if salt in self._requests:
375
        rq = self._requests[salt]
376
        return (True, len(rq.sent), len(rq.rcvd))
377
      else:
378
        return MISSING
379

    
380

    
381
# UPCALL_REPLY: server reply upcall
382
# has all ConfdUpcallPayload fields populated
383
UPCALL_REPLY = 1
384
# UPCALL_EXPIRE: internal library request expire
385
# has only salt, type, orig_request and extra_args
386
UPCALL_EXPIRE = 2
387
CONFD_UPCALL_TYPES = frozenset([
388
  UPCALL_REPLY,
389
  UPCALL_EXPIRE,
390
  ])
391

    
392

    
393
class ConfdUpcallPayload(objects.ConfigObject):
394
  """Callback argument for confd replies
395

396
  @type salt: string
397
  @ivar salt: salt associated with the query
398
  @type type: one of confd.client.CONFD_UPCALL_TYPES
399
  @ivar type: upcall type (server reply, expired request, ...)
400
  @type orig_request: L{objects.ConfdRequest}
401
  @ivar orig_request: original request
402
  @type server_reply: L{objects.ConfdReply}
403
  @ivar server_reply: server reply
404
  @type server_ip: string
405
  @ivar server_ip: answering server ip address
406
  @type server_port: int
407
  @ivar server_port: answering server port
408
  @type extra_args: any
409
  @ivar extra_args: 'args' argument of the SendRequest function
410
  @type client: L{ConfdClient}
411
  @ivar client: current confd client instance
412

413
  """
414
  __slots__ = [
415
    "salt",
416
    "type",
417
    "orig_request",
418
    "server_reply",
419
    "server_ip",
420
    "server_port",
421
    "extra_args",
422
    "client",
423
    ]
424

    
425

    
426
class ConfdClientRequest(objects.ConfdRequest):
427
  """This is the client-side version of ConfdRequest.
428

429
  This version of the class helps creating requests, on the client side, by
430
  filling in some default values.
431

432
  """
433
  def __init__(self, **kwargs):
434
    objects.ConfdRequest.__init__(self, **kwargs)
435
    if not self.rsalt:
436
      self.rsalt = utils.NewUUID()
437
    if not self.protocol:
438
      self.protocol = constants.CONFD_PROTOCOL_VERSION
439
    if self.type not in constants.CONFD_REQS:
440
      raise errors.ConfdClientError("Invalid request type")
441

    
442

    
443
class ConfdFilterCallback:
444
  """Callback that calls another callback, but filters duplicate results.
445

446
  @ivar consistent: a dictionary indexed by salt; for each salt, if
447
      all responses ware identical, this will be True; this is the
448
      expected state on a healthy cluster; on inconsistent or
449
      partitioned clusters, this might be False, if we see answers
450
      with the same serial but different contents
451

452
  """
453
  def __init__(self, callback, logger=None):
454
    """Constructor for ConfdFilterCallback
455

456
    @type callback: f(L{ConfdUpcallPayload})
457
    @param callback: function to call when getting answers
458
    @type logger: logging.Logger
459
    @param logger: optional logger for internal conditions
460

461
    """
462
    if not callable(callback):
463
      raise errors.ProgrammerError("callback must be callable")
464

    
465
    self._callback = callback
466
    self._logger = logger
467
    # answers contains a dict of salt -> answer
468
    self._answers = {}
469
    self.consistent = {}
470

    
471
  def _LogFilter(self, salt, new_reply, old_reply):
472
    if not self._logger:
473
      return
474

    
475
    if new_reply.serial > old_reply.serial:
476
      self._logger.debug("Filtering confirming answer, with newer"
477
                         " serial for query %s" % salt)
478
    elif new_reply.serial == old_reply.serial:
479
      if new_reply.answer != old_reply.answer:
480
        self._logger.warning("Got incoherent answers for query %s"
481
                             " (serial: %s)" % (salt, new_reply.serial))
482
      else:
483
        self._logger.debug("Filtering confirming answer, with same"
484
                           " serial for query %s" % salt)
485
    else:
486
      self._logger.debug("Filtering outdated answer for query %s"
487
                         " serial: (%d < %d)" % (salt, old_reply.serial,
488
                                                 new_reply.serial))
489

    
490
  def _HandleExpire(self, up):
491
    # if we have no answer we have received none, before the expiration.
492
    if up.salt in self._answers:
493
      del self._answers[up.salt]
494
    if up.salt in self.consistent:
495
      del self.consistent[up.salt]
496

    
497
  def _HandleReply(self, up):
498
    """Handle a single confd reply, and decide whether to filter it.
499

500
    @rtype: boolean
501
    @return: True if the reply should be filtered, False if it should be passed
502
             on to the up-callback
503

504
    """
505
    filter_upcall = False
506
    salt = up.salt
507
    if salt not in self.consistent:
508
      self.consistent[salt] = True
509
    if salt not in self._answers:
510
      # first answer for a query (don't filter, and record)
511
      self._answers[salt] = up.server_reply
512
    elif up.server_reply.serial > self._answers[salt].serial:
513
      # newer answer (record, and compare contents)
514
      old_answer = self._answers[salt]
515
      self._answers[salt] = up.server_reply
516
      if up.server_reply.answer == old_answer.answer:
517
        # same content (filter) (version upgrade was unrelated)
518
        filter_upcall = True
519
        self._LogFilter(salt, up.server_reply, old_answer)
520
      # else: different content, pass up a second answer
521
    else:
522
      # older or same-version answer (duplicate or outdated, filter)
523
      if (up.server_reply.serial == self._answers[salt].serial and
524
          up.server_reply.answer != self._answers[salt].answer):
525
        self.consistent[salt] = False
526
      filter_upcall = True
527
      self._LogFilter(salt, up.server_reply, self._answers[salt])
528

    
529
    return filter_upcall
530

    
531
  def __call__(self, up):
532
    """Filtering callback
533

534
    @type up: L{ConfdUpcallPayload}
535
    @param up: upper callback
536

537
    """
538
    filter_upcall = False
539
    if up.type == UPCALL_REPLY:
540
      filter_upcall = self._HandleReply(up)
541
    elif up.type == UPCALL_EXPIRE:
542
      self._HandleExpire(up)
543

    
544
    if not filter_upcall:
545
      self._callback(up)
546

    
547

    
548
class ConfdCountingCallback:
549
  """Callback that calls another callback, and counts the answers
550

551
  """
552
  def __init__(self, callback, logger=None):
553
    """Constructor for ConfdCountingCallback
554

555
    @type callback: f(L{ConfdUpcallPayload})
556
    @param callback: function to call when getting answers
557
    @type logger: logging.Logger
558
    @param logger: optional logger for internal conditions
559

560
    """
561
    if not callable(callback):
562
      raise errors.ProgrammerError("callback must be callable")
563

    
564
    self._callback = callback
565
    self._logger = logger
566
    # answers contains a dict of salt -> count
567
    self._answers = {}
568

    
569
  def RegisterQuery(self, salt):
570
    if salt in self._answers:
571
      raise errors.ProgrammerError("query already registered")
572
    self._answers[salt] = 0
573

    
574
  def AllAnswered(self):
575
    """Have all the registered queries received at least an answer?
576

577
    """
578
    return utils.all(self._answers.values())
579

    
580
  def _HandleExpire(self, up):
581
    # if we have no answer we have received none, before the expiration.
582
    if up.salt in self._answers:
583
      del self._answers[up.salt]
584

    
585
  def _HandleReply(self, up):
586
    """Handle a single confd reply, and decide whether to filter it.
587

588
    @rtype: boolean
589
    @return: True if the reply should be filtered, False if it should be passed
590
             on to the up-callback
591

592
    """
593
    if up.salt in self._answers:
594
      self._answers[up.salt] += 1
595

    
596
  def __call__(self, up):
597
    """Filtering callback
598

599
    @type up: L{ConfdUpcallPayload}
600
    @param up: upper callback
601

602
    """
603
    if up.type == UPCALL_REPLY:
604
      self._HandleReply(up)
605
    elif up.type == UPCALL_EXPIRE:
606
      self._HandleExpire(up)
607
    self._callback(up)
608

    
609

    
610
class StoreResultCallback:
611
  """Callback that simply stores the most recent answer.
612

613
  @ivar _answers: dict of salt to (have_answer, reply)
614

615
  """
616
  _NO_KEY = (False, None)
617

    
618
  def __init__(self):
619
    """Constructor for StoreResultCallback
620

621
    """
622
    # answers contains a dict of salt -> best result
623
    self._answers = {}
624

    
625
  def GetResponse(self, salt):
626
    """Return the best match for a salt
627

628
    """
629
    return self._answers.get(salt, self._NO_KEY)
630

    
631
  def _HandleExpire(self, up):
632
    """Expiration handler.
633

634
    """
635
    if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
636
      del self._answers[up.salt]
637

    
638
  def _HandleReply(self, up):
639
    """Handle a single confd reply, and decide whether to filter it.
640

641
    """
642
    self._answers[up.salt] = (True, up)
643

    
644
  def __call__(self, up):
645
    """Filtering callback
646

647
    @type up: L{ConfdUpcallPayload}
648
    @param up: upper callback
649

650
    """
651
    if up.type == UPCALL_REPLY:
652
      self._HandleReply(up)
653
    elif up.type == UPCALL_EXPIRE:
654
      self._HandleExpire(up)
655

    
656

    
657
def GetConfdClient(callback):
658
  """Return a client configured using the given callback.
659

660
  This is handy to abstract the MC list and HMAC key reading.
661

662
  @attention: This should only be called on nodes which are part of a
663
      cluster, since it depends on a valid (ganeti) data directory;
664
      for code running outside of a cluster, you need to create the
665
      client manually
666

667
  """
668
  ss = ssconf.SimpleStore()
669
  mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
670
  mc_list = utils.ReadFile(mc_file).splitlines()
671
  hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
672
  return ConfdClient(hmac_key, mc_list, callback)