Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ cea881e5

History | View | Annotate | Download (20.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
Clients can use the confd client library to send requests to a group of master
25
candidates running confd. The expected usage is through the asyncore framework,
26
by sending queries, and asynchronously receiving replies through a callback.
27

28
This way the client library doesn't ever need to "wait" on a particular answer,
29
and can proceed even if some udp packets are lost. It's up to the user to
30
reschedule queries if they haven't received responses and they need them.
31

32
Example usage::
33

34
  client = ConfdClient(...) # includes callback specification
35
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36
  client.SendRequest(req)
37
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38
  # ... wait ...
39
  # And your callback will be called by asyncore, when your query gets a
40
  # response, or when it expires.
41

42
You can use the provided ConfdFilterCallback to act as a filter, only passing
43
"newer" answer to your callback, and filtering out outdated ones, or ones
44
confirming what you already got.
45

46
"""
47

    
48
# pylint: disable-msg=E0203
49

    
50
# E0203: Access to member %r before its definition, since we use
51
# objects.py which doesn't explicitely initialise its members
52

    
53
import time
54
import random
55

    
56
from ganeti import utils
57
from ganeti import constants
58
from ganeti import objects
59
from ganeti import serializer
60
from ganeti import daemon # contains AsyncUDPSocket
61
from ganeti import errors
62
from ganeti import confd
63
from ganeti import ssconf
64
from ganeti import compat
65

    
66

    
67
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
68
  """Confd udp asyncore client
69

70
  This is kept separate from the main ConfdClient to make sure it's easy to
71
  implement a non-asyncore based client library.
72

73
  """
74
  def __init__(self, client):
75
    """Constructor for ConfdAsyncUDPClient
76

77
    @type client: L{ConfdClient}
78
    @param client: client library, to pass the datagrams to
79

80
    """
81
    daemon.AsyncUDPSocket.__init__(self)
82
    self.client = client
83

    
84
  # this method is overriding a daemon.AsyncUDPSocket method
85
  def handle_datagram(self, payload, ip, port):
86
    self.client.HandleResponse(payload, ip, port)
87

    
88

    
89
class _Request(object):
90
  """Request status structure.
91

92
  @ivar request: the request data
93
  @ivar args: any extra arguments for the callback
94
  @ivar expiry: the expiry timestamp of the request
95
  @ivar sent: the set of contacted peers
96
  @ivar rcvd: the set of peers who replied
97

98
  """
99
  def __init__(self, request, args, expiry, sent):
100
    self.request = request
101
    self.args = args
102
    self.expiry = expiry
103
    self.sent = frozenset(sent)
104
    self.rcvd = set()
105

    
106

    
107
class ConfdClient:
108
  """Send queries to confd, and get back answers.
109

110
  Since the confd model works by querying multiple master candidates, and
111
  getting back answers, this is an asynchronous library. It can either work
112
  through asyncore or with your own handling.
113

114
  @type _requests: dict
115
  @ivar _requests: dictionary indexes by salt, which contains data
116
      about the outstanding requests; the values are objects of type
117
      L{_Request}
118

119
  """
120
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
121
    """Constructor for ConfdClient
122

123
    @type hmac_key: string
124
    @param hmac_key: hmac key to talk to confd
125
    @type peers: list
126
    @param peers: list of peer nodes
127
    @type callback: f(L{ConfdUpcallPayload})
128
    @param callback: function to call when getting answers
129
    @type port: integer
130
    @param port: confd port (default: use GetDaemonPort)
131
    @type logger: logging.Logger
132
    @param logger: optional logger for internal conditions
133

134
    """
135
    if not callable(callback):
136
      raise errors.ProgrammerError("callback must be callable")
137

    
138
    self.UpdatePeerList(peers)
139
    self._hmac_key = hmac_key
140
    self._socket = ConfdAsyncUDPClient(self)
141
    self._callback = callback
142
    self._confd_port = port
143
    self._logger = logger
144
    self._requests = {}
145

    
146
    if self._confd_port is None:
147
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
148

    
149
  def UpdatePeerList(self, peers):
150
    """Update the list of peers
151

152
    @type peers: list
153
    @param peers: list of peer nodes
154

155
    """
156
    # we are actually called from init, so:
157
    # pylint: disable-msg=W0201
158
    if not isinstance(peers, list):
159
      raise errors.ProgrammerError("peers must be a list")
160
    # make a copy of peers, since we're going to shuffle the list, later
161
    self._peers = list(peers)
162

    
163
  def _PackRequest(self, request, now=None):
164
    """Prepare a request to be sent on the wire.
165

166
    This function puts a proper salt in a confd request, puts the proper salt,
167
    and adds the correct magic number.
168

169
    """
170
    if now is None:
171
      now = time.time()
172
    tstamp = '%d' % now
173
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
174
    return confd.PackMagic(req)
175

    
176
  def _UnpackReply(self, payload):
177
    in_payload = confd.UnpackMagic(payload)
178
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
179
    answer = objects.ConfdReply.FromDict(dict_answer)
180
    return answer, salt
181

    
182
  def ExpireRequests(self):
183
    """Delete all the expired requests.
184

185
    """
186
    now = time.time()
187
    for rsalt, rq in self._requests.items():
188
      if now >= rq.expiry:
189
        del self._requests[rsalt]
190
        client_reply = ConfdUpcallPayload(salt=rsalt,
191
                                          type=UPCALL_EXPIRE,
192
                                          orig_request=rq.request,
193
                                          extra_args=rq.args,
194
                                          client=self,
195
                                          )
196
        self._callback(client_reply)
197

    
198
  def SendRequest(self, request, args=None, coverage=0, async=True):
199
    """Send a confd request to some MCs
200

201
    @type request: L{objects.ConfdRequest}
202
    @param request: the request to send
203
    @type args: tuple
204
    @param args: additional callback arguments
205
    @type coverage: integer
206
    @param coverage: number of remote nodes to contact; if default
207
        (0), it will use a reasonable default
208
        (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
209
        passed, it will use the maximum number of peers, otherwise the
210
        number passed in will be used
211
    @type async: boolean
212
    @param async: handle the write asynchronously
213

214
    """
215
    if coverage == 0:
216
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
217
    elif coverage == -1:
218
      coverage = len(self._peers)
219

    
220
    if coverage > len(self._peers):
221
      raise errors.ConfdClientError("Not enough MCs known to provide the"
222
                                    " desired coverage")
223

    
224
    if not request.rsalt:
225
      raise errors.ConfdClientError("Missing request rsalt")
226

    
227
    self.ExpireRequests()
228
    if request.rsalt in self._requests:
229
      raise errors.ConfdClientError("Duplicate request rsalt")
230

    
231
    if request.type not in constants.CONFD_REQS:
232
      raise errors.ConfdClientError("Invalid request type")
233

    
234
    random.shuffle(self._peers)
235
    targets = self._peers[:coverage]
236

    
237
    now = time.time()
238
    payload = self._PackRequest(request, now=now)
239

    
240
    for target in targets:
241
      try:
242
        self._socket.enqueue_send(target, self._confd_port, payload)
243
      except errors.UdpDataSizeError:
244
        raise errors.ConfdClientError("Request too big")
245

    
246
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
247
    self._requests[request.rsalt] = _Request(request, args, expire_time,
248
                                             targets)
249

    
250
    if not async:
251
      self.FlushSendQueue()
252

    
253
  def HandleResponse(self, payload, ip, port):
254
    """Asynchronous handler for a confd reply
255

256
    Call the relevant callback associated to the current request.
257

258
    """
259
    try:
260
      try:
261
        answer, salt = self._UnpackReply(payload)
262
      except (errors.SignatureError, errors.ConfdMagicError), err:
263
        if self._logger:
264
          self._logger.debug("Discarding broken package: %s" % err)
265
        return
266

    
267
      try:
268
        rq = self._requests[salt]
269
      except KeyError:
270
        if self._logger:
271
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
272
        return
273

    
274
      rq.rcvd.add(ip)
275

    
276
      client_reply = ConfdUpcallPayload(salt=salt,
277
                                        type=UPCALL_REPLY,
278
                                        server_reply=answer,
279
                                        orig_request=rq.request,
280
                                        server_ip=ip,
281
                                        server_port=port,
282
                                        extra_args=rq.args,
283
                                        client=self,
284
                                       )
285
      self._callback(client_reply)
286

    
287
    finally:
288
      self.ExpireRequests()
289

    
290
  def FlushSendQueue(self):
291
    """Send out all pending requests.
292

293
    Can be used for synchronous client use.
294

295
    """
296
    while self._socket.writable():
297
      self._socket.handle_write()
298

    
299
  def ReceiveReply(self, timeout=1):
300
    """Receive one reply.
301

302
    @type timeout: float
303
    @param timeout: how long to wait for the reply
304
    @rtype: boolean
305
    @return: True if some data has been handled, False otherwise
306

307
    """
308
    return self._socket.process_next_packet(timeout=timeout)
309

    
310
  @staticmethod
311
  def _NeededReplies(peer_cnt):
312
    """Compute the minimum safe number of replies for a query.
313

314
    The algorithm is designed to work well for both small and big
315
    number of peers:
316
        - for less than three, we require all responses
317
        - for less than five, we allow one miss
318
        - otherwise, half the number plus one
319

320
    This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
321
    4->2, 5->3, 6->3, 7->4, etc.
322

323
    @type peer_cnt: int
324
    @param peer_cnt: the number of peers contacted
325
    @rtype: int
326
    @return: the number of replies which should give a safe coverage
327

328
    """
329
    if peer_cnt < 3:
330
      return peer_cnt
331
    elif peer_cnt < 5:
332
      return peer_cnt - 1
333
    else:
334
      return int(peer_cnt/2) + 1
335

    
336
  def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
337
    """Wait for replies to a given request.
338

339
    This method will wait until either the timeout expires or a
340
    minimum number (computed using L{_NeededReplies}) of replies are
341
    received for the given salt. It is useful when doing synchronous
342
    calls to this library.
343

344
    @param salt: the salt of the request we want responses for
345
    @param timeout: the maximum timeout (should be less or equal to
346
        L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
347
    @rtype: tuple
348
    @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
349
        request is unknown, timed_out will be true and the counters
350
        will be zero
351

352
    """
353
    def _CheckResponse():
354
      if salt not in self._requests:
355
        # expired?
356
        if self._logger:
357
          self._logger.debug("Discarding unknown/expired request: %s" % salt)
358
        return MISSING
359
      rq = self._requests[salt]
360
      if len(rq.rcvd) >= expected:
361
        # already got all replies
362
        return (False, len(rq.sent), len(rq.rcvd))
363
      # else wait, using default timeout
364
      self.ReceiveReply()
365
      raise utils.RetryAgain()
366

    
367
    MISSING = (True, 0, 0)
368

    
369
    if salt not in self._requests:
370
      return MISSING
371
    # extend the expire time with the current timeout, so that we
372
    # don't get the request expired from under us
373
    rq = self._requests[salt]
374
    rq.expiry += timeout
375
    sent = len(rq.sent)
376
    expected = self._NeededReplies(sent)
377

    
378
    try:
379
      return utils.Retry(_CheckResponse, 0, timeout)
380
    except utils.RetryTimeout:
381
      if salt in self._requests:
382
        rq = self._requests[salt]
383
        return (True, len(rq.sent), len(rq.rcvd))
384
      else:
385
        return MISSING
386

    
387

    
388
# UPCALL_REPLY: server reply upcall
389
# has all ConfdUpcallPayload fields populated
390
UPCALL_REPLY = 1
391
# UPCALL_EXPIRE: internal library request expire
392
# has only salt, type, orig_request and extra_args
393
UPCALL_EXPIRE = 2
394
CONFD_UPCALL_TYPES = frozenset([
395
  UPCALL_REPLY,
396
  UPCALL_EXPIRE,
397
  ])
398

    
399

    
400
class ConfdUpcallPayload(objects.ConfigObject):
401
  """Callback argument for confd replies
402

403
  @type salt: string
404
  @ivar salt: salt associated with the query
405
  @type type: one of confd.client.CONFD_UPCALL_TYPES
406
  @ivar type: upcall type (server reply, expired request, ...)
407
  @type orig_request: L{objects.ConfdRequest}
408
  @ivar orig_request: original request
409
  @type server_reply: L{objects.ConfdReply}
410
  @ivar server_reply: server reply
411
  @type server_ip: string
412
  @ivar server_ip: answering server ip address
413
  @type server_port: int
414
  @ivar server_port: answering server port
415
  @type extra_args: any
416
  @ivar extra_args: 'args' argument of the SendRequest function
417
  @type client: L{ConfdClient}
418
  @ivar client: current confd client instance
419

420
  """
421
  __slots__ = [
422
    "salt",
423
    "type",
424
    "orig_request",
425
    "server_reply",
426
    "server_ip",
427
    "server_port",
428
    "extra_args",
429
    "client",
430
    ]
431

    
432

    
433
class ConfdClientRequest(objects.ConfdRequest):
434
  """This is the client-side version of ConfdRequest.
435

436
  This version of the class helps creating requests, on the client side, by
437
  filling in some default values.
438

439
  """
440
  def __init__(self, **kwargs):
441
    objects.ConfdRequest.__init__(self, **kwargs)
442
    if not self.rsalt:
443
      self.rsalt = utils.NewUUID()
444
    if not self.protocol:
445
      self.protocol = constants.CONFD_PROTOCOL_VERSION
446
    if self.type not in constants.CONFD_REQS:
447
      raise errors.ConfdClientError("Invalid request type")
448

    
449

    
450
class ConfdFilterCallback:
451
  """Callback that calls another callback, but filters duplicate results.
452

453
  @ivar consistent: a dictionary indexed by salt; for each salt, if
454
      all responses ware identical, this will be True; this is the
455
      expected state on a healthy cluster; on inconsistent or
456
      partitioned clusters, this might be False, if we see answers
457
      with the same serial but different contents
458

459
  """
460
  def __init__(self, callback, logger=None):
461
    """Constructor for ConfdFilterCallback
462

463
    @type callback: f(L{ConfdUpcallPayload})
464
    @param callback: function to call when getting answers
465
    @type logger: logging.Logger
466
    @param logger: optional logger for internal conditions
467

468
    """
469
    if not callable(callback):
470
      raise errors.ProgrammerError("callback must be callable")
471

    
472
    self._callback = callback
473
    self._logger = logger
474
    # answers contains a dict of salt -> answer
475
    self._answers = {}
476
    self.consistent = {}
477

    
478
  def _LogFilter(self, salt, new_reply, old_reply):
479
    if not self._logger:
480
      return
481

    
482
    if new_reply.serial > old_reply.serial:
483
      self._logger.debug("Filtering confirming answer, with newer"
484
                         " serial for query %s" % salt)
485
    elif new_reply.serial == old_reply.serial:
486
      if new_reply.answer != old_reply.answer:
487
        self._logger.warning("Got incoherent answers for query %s"
488
                             " (serial: %s)" % (salt, new_reply.serial))
489
      else:
490
        self._logger.debug("Filtering confirming answer, with same"
491
                           " serial for query %s" % salt)
492
    else:
493
      self._logger.debug("Filtering outdated answer for query %s"
494
                         " serial: (%d < %d)" % (salt, old_reply.serial,
495
                                                 new_reply.serial))
496

    
497
  def _HandleExpire(self, up):
498
    # if we have no answer we have received none, before the expiration.
499
    if up.salt in self._answers:
500
      del self._answers[up.salt]
501
    if up.salt in self.consistent:
502
      del self.consistent[up.salt]
503

    
504
  def _HandleReply(self, up):
505
    """Handle a single confd reply, and decide whether to filter it.
506

507
    @rtype: boolean
508
    @return: True if the reply should be filtered, False if it should be passed
509
             on to the up-callback
510

511
    """
512
    filter_upcall = False
513
    salt = up.salt
514
    if salt not in self.consistent:
515
      self.consistent[salt] = True
516
    if salt not in self._answers:
517
      # first answer for a query (don't filter, and record)
518
      self._answers[salt] = up.server_reply
519
    elif up.server_reply.serial > self._answers[salt].serial:
520
      # newer answer (record, and compare contents)
521
      old_answer = self._answers[salt]
522
      self._answers[salt] = up.server_reply
523
      if up.server_reply.answer == old_answer.answer:
524
        # same content (filter) (version upgrade was unrelated)
525
        filter_upcall = True
526
        self._LogFilter(salt, up.server_reply, old_answer)
527
      # else: different content, pass up a second answer
528
    else:
529
      # older or same-version answer (duplicate or outdated, filter)
530
      if (up.server_reply.serial == self._answers[salt].serial and
531
          up.server_reply.answer != self._answers[salt].answer):
532
        self.consistent[salt] = False
533
      filter_upcall = True
534
      self._LogFilter(salt, up.server_reply, self._answers[salt])
535

    
536
    return filter_upcall
537

    
538
  def __call__(self, up):
539
    """Filtering callback
540

541
    @type up: L{ConfdUpcallPayload}
542
    @param up: upper callback
543

544
    """
545
    filter_upcall = False
546
    if up.type == UPCALL_REPLY:
547
      filter_upcall = self._HandleReply(up)
548
    elif up.type == UPCALL_EXPIRE:
549
      self._HandleExpire(up)
550

    
551
    if not filter_upcall:
552
      self._callback(up)
553

    
554

    
555
class ConfdCountingCallback:
556
  """Callback that calls another callback, and counts the answers
557

558
  """
559
  def __init__(self, callback, logger=None):
560
    """Constructor for ConfdCountingCallback
561

562
    @type callback: f(L{ConfdUpcallPayload})
563
    @param callback: function to call when getting answers
564
    @type logger: logging.Logger
565
    @param logger: optional logger for internal conditions
566

567
    """
568
    if not callable(callback):
569
      raise errors.ProgrammerError("callback must be callable")
570

    
571
    self._callback = callback
572
    self._logger = logger
573
    # answers contains a dict of salt -> count
574
    self._answers = {}
575

    
576
  def RegisterQuery(self, salt):
577
    if salt in self._answers:
578
      raise errors.ProgrammerError("query already registered")
579
    self._answers[salt] = 0
580

    
581
  def AllAnswered(self):
582
    """Have all the registered queries received at least an answer?
583

584
    """
585
    return compat.all(self._answers.values())
586

    
587
  def _HandleExpire(self, up):
588
    # if we have no answer we have received none, before the expiration.
589
    if up.salt in self._answers:
590
      del self._answers[up.salt]
591

    
592
  def _HandleReply(self, up):
593
    """Handle a single confd reply, and decide whether to filter it.
594

595
    @rtype: boolean
596
    @return: True if the reply should be filtered, False if it should be passed
597
             on to the up-callback
598

599
    """
600
    if up.salt in self._answers:
601
      self._answers[up.salt] += 1
602

    
603
  def __call__(self, up):
604
    """Filtering callback
605

606
    @type up: L{ConfdUpcallPayload}
607
    @param up: upper callback
608

609
    """
610
    if up.type == UPCALL_REPLY:
611
      self._HandleReply(up)
612
    elif up.type == UPCALL_EXPIRE:
613
      self._HandleExpire(up)
614
    self._callback(up)
615

    
616

    
617
class StoreResultCallback:
618
  """Callback that simply stores the most recent answer.
619

620
  @ivar _answers: dict of salt to (have_answer, reply)
621

622
  """
623
  _NO_KEY = (False, None)
624

    
625
  def __init__(self):
626
    """Constructor for StoreResultCallback
627

628
    """
629
    # answers contains a dict of salt -> best result
630
    self._answers = {}
631

    
632
  def GetResponse(self, salt):
633
    """Return the best match for a salt
634

635
    """
636
    return self._answers.get(salt, self._NO_KEY)
637

    
638
  def _HandleExpire(self, up):
639
    """Expiration handler.
640

641
    """
642
    if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
643
      del self._answers[up.salt]
644

    
645
  def _HandleReply(self, up):
646
    """Handle a single confd reply, and decide whether to filter it.
647

648
    """
649
    self._answers[up.salt] = (True, up)
650

    
651
  def __call__(self, up):
652
    """Filtering callback
653

654
    @type up: L{ConfdUpcallPayload}
655
    @param up: upper callback
656

657
    """
658
    if up.type == UPCALL_REPLY:
659
      self._HandleReply(up)
660
    elif up.type == UPCALL_EXPIRE:
661
      self._HandleExpire(up)
662

    
663

    
664
def GetConfdClient(callback):
665
  """Return a client configured using the given callback.
666

667
  This is handy to abstract the MC list and HMAC key reading.
668

669
  @attention: This should only be called on nodes which are part of a
670
      cluster, since it depends on a valid (ganeti) data directory;
671
      for code running outside of a cluster, you need to create the
672
      client manually
673

674
  """
675
  ss = ssconf.SimpleStore()
676
  mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
677
  mc_list = utils.ReadFile(mc_file).splitlines()
678
  hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
679
  return ConfdClient(hmac_key, mc_list, callback)