Confd IPv6 support
[ganeti-local] / lib / confd / client.py
1 #
2 #
3
4 # Copyright (C) 2009 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti confd client
23
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
27
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
31
32 Example usage::
33
34   client = ConfdClient(...) # includes callback specification
35   req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36   client.SendRequest(req)
37   # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38   # ... wait ...
39   # And your callback will be called by asyncore, when your query gets a
40   # response, or when it expires.
41
42 You can use the provided ConfdFilterCallback to act as a filter, only passing
43 "newer" answer to your callback, and filtering out outdated ones, or ones
44 confirming what you already got.
45
46 """
47
48 # pylint: disable-msg=E0203
49
50 # E0203: Access to member %r before its definition, since we use
51 # objects.py which doesn't explicitely initialise its members
52
53 import time
54 import random
55
56 from ganeti import utils
57 from ganeti import constants
58 from ganeti import objects
59 from ganeti import serializer
60 from ganeti import daemon # contains AsyncUDPSocket
61 from ganeti import errors
62 from ganeti import confd
63 from ganeti import ssconf
64 from ganeti import compat
65 from ganeti import netutils
66
67
68 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
69   """Confd udp asyncore client
70
71   This is kept separate from the main ConfdClient to make sure it's easy to
72   implement a non-asyncore based client library.
73
74   """
75   def __init__(self, client, family):
76     """Constructor for ConfdAsyncUDPClient
77
78     @type client: L{ConfdClient}
79     @param client: client library, to pass the datagrams to
80
81     """
82     daemon.AsyncUDPSocket.__init__(self, family)
83     self.client = client
84
85   # this method is overriding a daemon.AsyncUDPSocket method
86   def handle_datagram(self, payload, ip, port):
87     self.client.HandleResponse(payload, ip, port)
88
89
90 class _Request(object):
91   """Request status structure.
92
93   @ivar request: the request data
94   @ivar args: any extra arguments for the callback
95   @ivar expiry: the expiry timestamp of the request
96   @ivar sent: the set of contacted peers
97   @ivar rcvd: the set of peers who replied
98
99   """
100   def __init__(self, request, args, expiry, sent):
101     self.request = request
102     self.args = args
103     self.expiry = expiry
104     self.sent = frozenset(sent)
105     self.rcvd = set()
106
107
108 class ConfdClient:
109   """Send queries to confd, and get back answers.
110
111   Since the confd model works by querying multiple master candidates, and
112   getting back answers, this is an asynchronous library. It can either work
113   through asyncore or with your own handling.
114
115   @type _requests: dict
116   @ivar _requests: dictionary indexes by salt, which contains data
117       about the outstanding requests; the values are objects of type
118       L{_Request}
119
120   """
121   def __init__(self, hmac_key, peers, callback, port=None, logger=None):
122     """Constructor for ConfdClient
123
124     @type hmac_key: string
125     @param hmac_key: hmac key to talk to confd
126     @type peers: list
127     @param peers: list of peer nodes
128     @type callback: f(L{ConfdUpcallPayload})
129     @param callback: function to call when getting answers
130     @type port: integer
131     @param port: confd port (default: use GetDaemonPort)
132     @type logger: logging.Logger
133     @param logger: optional logger for internal conditions
134
135     """
136     if not callable(callback):
137       raise errors.ProgrammerError("callback must be callable")
138
139     self.UpdatePeerList(peers)
140     self._SetPeersAddressFamily()
141     self._hmac_key = hmac_key
142     self._socket = ConfdAsyncUDPClient(self, self._family)
143     self._callback = callback
144     self._confd_port = port
145     self._logger = logger
146     self._requests = {}
147
148     if self._confd_port is None:
149       self._confd_port = netutils.GetDaemonPort(constants.CONFD)
150
151   def UpdatePeerList(self, peers):
152     """Update the list of peers
153
154     @type peers: list
155     @param peers: list of peer nodes
156
157     """
158     # we are actually called from init, so:
159     # pylint: disable-msg=W0201
160     if not isinstance(peers, list):
161       raise errors.ProgrammerError("peers must be a list")
162     # make a copy of peers, since we're going to shuffle the list, later
163     self._peers = list(peers)
164
165   def _PackRequest(self, request, now=None):
166     """Prepare a request to be sent on the wire.
167
168     This function puts a proper salt in a confd request, puts the proper salt,
169     and adds the correct magic number.
170
171     """
172     if now is None:
173       now = time.time()
174     tstamp = '%d' % now
175     req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
176     return confd.PackMagic(req)
177
178   def _UnpackReply(self, payload):
179     in_payload = confd.UnpackMagic(payload)
180     (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
181     answer = objects.ConfdReply.FromDict(dict_answer)
182     return answer, salt
183
184   def ExpireRequests(self):
185     """Delete all the expired requests.
186
187     """
188     now = time.time()
189     for rsalt, rq in self._requests.items():
190       if now >= rq.expiry:
191         del self._requests[rsalt]
192         client_reply = ConfdUpcallPayload(salt=rsalt,
193                                           type=UPCALL_EXPIRE,
194                                           orig_request=rq.request,
195                                           extra_args=rq.args,
196                                           client=self,
197                                           )
198         self._callback(client_reply)
199
200   def SendRequest(self, request, args=None, coverage=0, async=True):
201     """Send a confd request to some MCs
202
203     @type request: L{objects.ConfdRequest}
204     @param request: the request to send
205     @type args: tuple
206     @param args: additional callback arguments
207     @type coverage: integer
208     @param coverage: number of remote nodes to contact; if default
209         (0), it will use a reasonable default
210         (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
211         passed, it will use the maximum number of peers, otherwise the
212         number passed in will be used
213     @type async: boolean
214     @param async: handle the write asynchronously
215
216     """
217     if coverage == 0:
218       coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
219     elif coverage == -1:
220       coverage = len(self._peers)
221
222     if coverage > len(self._peers):
223       raise errors.ConfdClientError("Not enough MCs known to provide the"
224                                     " desired coverage")
225
226     if not request.rsalt:
227       raise errors.ConfdClientError("Missing request rsalt")
228
229     self.ExpireRequests()
230     if request.rsalt in self._requests:
231       raise errors.ConfdClientError("Duplicate request rsalt")
232
233     if request.type not in constants.CONFD_REQS:
234       raise errors.ConfdClientError("Invalid request type")
235
236     random.shuffle(self._peers)
237     targets = self._peers[:coverage]
238
239     now = time.time()
240     payload = self._PackRequest(request, now=now)
241
242     for target in targets:
243       try:
244         self._socket.enqueue_send(target, self._confd_port, payload)
245       except errors.UdpDataSizeError:
246         raise errors.ConfdClientError("Request too big")
247
248     expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
249     self._requests[request.rsalt] = _Request(request, args, expire_time,
250                                              targets)
251
252     if not async:
253       self.FlushSendQueue()
254
255   def HandleResponse(self, payload, ip, port):
256     """Asynchronous handler for a confd reply
257
258     Call the relevant callback associated to the current request.
259
260     """
261     try:
262       try:
263         answer, salt = self._UnpackReply(payload)
264       except (errors.SignatureError, errors.ConfdMagicError), err:
265         if self._logger:
266           self._logger.debug("Discarding broken package: %s" % err)
267         return
268
269       try:
270         rq = self._requests[salt]
271       except KeyError:
272         if self._logger:
273           self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
274         return
275
276       rq.rcvd.add(ip)
277
278       client_reply = ConfdUpcallPayload(salt=salt,
279                                         type=UPCALL_REPLY,
280                                         server_reply=answer,
281                                         orig_request=rq.request,
282                                         server_ip=ip,
283                                         server_port=port,
284                                         extra_args=rq.args,
285                                         client=self,
286                                        )
287       self._callback(client_reply)
288
289     finally:
290       self.ExpireRequests()
291
292   def FlushSendQueue(self):
293     """Send out all pending requests.
294
295     Can be used for synchronous client use.
296
297     """
298     while self._socket.writable():
299       self._socket.handle_write()
300
301   def ReceiveReply(self, timeout=1):
302     """Receive one reply.
303
304     @type timeout: float
305     @param timeout: how long to wait for the reply
306     @rtype: boolean
307     @return: True if some data has been handled, False otherwise
308
309     """
310     return self._socket.process_next_packet(timeout=timeout)
311
312   @staticmethod
313   def _NeededReplies(peer_cnt):
314     """Compute the minimum safe number of replies for a query.
315
316     The algorithm is designed to work well for both small and big
317     number of peers:
318         - for less than three, we require all responses
319         - for less than five, we allow one miss
320         - otherwise, half the number plus one
321
322     This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
323     4->2, 5->3, 6->3, 7->4, etc.
324
325     @type peer_cnt: int
326     @param peer_cnt: the number of peers contacted
327     @rtype: int
328     @return: the number of replies which should give a safe coverage
329
330     """
331     if peer_cnt < 3:
332       return peer_cnt
333     elif peer_cnt < 5:
334       return peer_cnt - 1
335     else:
336       return int(peer_cnt/2) + 1
337
338   def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
339     """Wait for replies to a given request.
340
341     This method will wait until either the timeout expires or a
342     minimum number (computed using L{_NeededReplies}) of replies are
343     received for the given salt. It is useful when doing synchronous
344     calls to this library.
345
346     @param salt: the salt of the request we want responses for
347     @param timeout: the maximum timeout (should be less or equal to
348         L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
349     @rtype: tuple
350     @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
351         request is unknown, timed_out will be true and the counters
352         will be zero
353
354     """
355     def _CheckResponse():
356       if salt not in self._requests:
357         # expired?
358         if self._logger:
359           self._logger.debug("Discarding unknown/expired request: %s" % salt)
360         return MISSING
361       rq = self._requests[salt]
362       if len(rq.rcvd) >= expected:
363         # already got all replies
364         return (False, len(rq.sent), len(rq.rcvd))
365       # else wait, using default timeout
366       self.ReceiveReply()
367       raise utils.RetryAgain()
368
369     MISSING = (True, 0, 0)
370
371     if salt not in self._requests:
372       return MISSING
373     # extend the expire time with the current timeout, so that we
374     # don't get the request expired from under us
375     rq = self._requests[salt]
376     rq.expiry += timeout
377     sent = len(rq.sent)
378     expected = self._NeededReplies(sent)
379
380     try:
381       return utils.Retry(_CheckResponse, 0, timeout)
382     except utils.RetryTimeout:
383       if salt in self._requests:
384         rq = self._requests[salt]
385         return (True, len(rq.sent), len(rq.rcvd))
386       else:
387         return MISSING
388
389   def _SetPeersAddressFamily(self):
390     if not self._peers:
391       raise errors.ConfdClientError("Peer list empty")
392     try:
393       peer = self._peers[0]
394       self._family = netutils.GetAddressFamily(peer)
395       for peer in self._peers[1:]:
396         if netutils.GetAddressFamily(peer) != self._family:
397           raise errors.ConfdClientError("Peers must be of same address family")
398     except errors.GenericError:
399       raise errors.ConfdClientError("Peer address %s invalid" % peer)
400
401
402 # UPCALL_REPLY: server reply upcall
403 # has all ConfdUpcallPayload fields populated
404 UPCALL_REPLY = 1
405 # UPCALL_EXPIRE: internal library request expire
406 # has only salt, type, orig_request and extra_args
407 UPCALL_EXPIRE = 2
408 CONFD_UPCALL_TYPES = frozenset([
409   UPCALL_REPLY,
410   UPCALL_EXPIRE,
411   ])
412
413
414 class ConfdUpcallPayload(objects.ConfigObject):
415   """Callback argument for confd replies
416
417   @type salt: string
418   @ivar salt: salt associated with the query
419   @type type: one of confd.client.CONFD_UPCALL_TYPES
420   @ivar type: upcall type (server reply, expired request, ...)
421   @type orig_request: L{objects.ConfdRequest}
422   @ivar orig_request: original request
423   @type server_reply: L{objects.ConfdReply}
424   @ivar server_reply: server reply
425   @type server_ip: string
426   @ivar server_ip: answering server ip address
427   @type server_port: int
428   @ivar server_port: answering server port
429   @type extra_args: any
430   @ivar extra_args: 'args' argument of the SendRequest function
431   @type client: L{ConfdClient}
432   @ivar client: current confd client instance
433
434   """
435   __slots__ = [
436     "salt",
437     "type",
438     "orig_request",
439     "server_reply",
440     "server_ip",
441     "server_port",
442     "extra_args",
443     "client",
444     ]
445
446
447 class ConfdClientRequest(objects.ConfdRequest):
448   """This is the client-side version of ConfdRequest.
449
450   This version of the class helps creating requests, on the client side, by
451   filling in some default values.
452
453   """
454   def __init__(self, **kwargs):
455     objects.ConfdRequest.__init__(self, **kwargs)
456     if not self.rsalt:
457       self.rsalt = utils.NewUUID()
458     if not self.protocol:
459       self.protocol = constants.CONFD_PROTOCOL_VERSION
460     if self.type not in constants.CONFD_REQS:
461       raise errors.ConfdClientError("Invalid request type")
462
463
464 class ConfdFilterCallback:
465   """Callback that calls another callback, but filters duplicate results.
466
467   @ivar consistent: a dictionary indexed by salt; for each salt, if
468       all responses ware identical, this will be True; this is the
469       expected state on a healthy cluster; on inconsistent or
470       partitioned clusters, this might be False, if we see answers
471       with the same serial but different contents
472
473   """
474   def __init__(self, callback, logger=None):
475     """Constructor for ConfdFilterCallback
476
477     @type callback: f(L{ConfdUpcallPayload})
478     @param callback: function to call when getting answers
479     @type logger: logging.Logger
480     @param logger: optional logger for internal conditions
481
482     """
483     if not callable(callback):
484       raise errors.ProgrammerError("callback must be callable")
485
486     self._callback = callback
487     self._logger = logger
488     # answers contains a dict of salt -> answer
489     self._answers = {}
490     self.consistent = {}
491
492   def _LogFilter(self, salt, new_reply, old_reply):
493     if not self._logger:
494       return
495
496     if new_reply.serial > old_reply.serial:
497       self._logger.debug("Filtering confirming answer, with newer"
498                          " serial for query %s" % salt)
499     elif new_reply.serial == old_reply.serial:
500       if new_reply.answer != old_reply.answer:
501         self._logger.warning("Got incoherent answers for query %s"
502                              " (serial: %s)" % (salt, new_reply.serial))
503       else:
504         self._logger.debug("Filtering confirming answer, with same"
505                            " serial for query %s" % salt)
506     else:
507       self._logger.debug("Filtering outdated answer for query %s"
508                          " serial: (%d < %d)" % (salt, old_reply.serial,
509                                                  new_reply.serial))
510
511   def _HandleExpire(self, up):
512     # if we have no answer we have received none, before the expiration.
513     if up.salt in self._answers:
514       del self._answers[up.salt]
515     if up.salt in self.consistent:
516       del self.consistent[up.salt]
517
518   def _HandleReply(self, up):
519     """Handle a single confd reply, and decide whether to filter it.
520
521     @rtype: boolean
522     @return: True if the reply should be filtered, False if it should be passed
523              on to the up-callback
524
525     """
526     filter_upcall = False
527     salt = up.salt
528     if salt not in self.consistent:
529       self.consistent[salt] = True
530     if salt not in self._answers:
531       # first answer for a query (don't filter, and record)
532       self._answers[salt] = up.server_reply
533     elif up.server_reply.serial > self._answers[salt].serial:
534       # newer answer (record, and compare contents)
535       old_answer = self._answers[salt]
536       self._answers[salt] = up.server_reply
537       if up.server_reply.answer == old_answer.answer:
538         # same content (filter) (version upgrade was unrelated)
539         filter_upcall = True
540         self._LogFilter(salt, up.server_reply, old_answer)
541       # else: different content, pass up a second answer
542     else:
543       # older or same-version answer (duplicate or outdated, filter)
544       if (up.server_reply.serial == self._answers[salt].serial and
545           up.server_reply.answer != self._answers[salt].answer):
546         self.consistent[salt] = False
547       filter_upcall = True
548       self._LogFilter(salt, up.server_reply, self._answers[salt])
549
550     return filter_upcall
551
552   def __call__(self, up):
553     """Filtering callback
554
555     @type up: L{ConfdUpcallPayload}
556     @param up: upper callback
557
558     """
559     filter_upcall = False
560     if up.type == UPCALL_REPLY:
561       filter_upcall = self._HandleReply(up)
562     elif up.type == UPCALL_EXPIRE:
563       self._HandleExpire(up)
564
565     if not filter_upcall:
566       self._callback(up)
567
568
569 class ConfdCountingCallback:
570   """Callback that calls another callback, and counts the answers
571
572   """
573   def __init__(self, callback, logger=None):
574     """Constructor for ConfdCountingCallback
575
576     @type callback: f(L{ConfdUpcallPayload})
577     @param callback: function to call when getting answers
578     @type logger: logging.Logger
579     @param logger: optional logger for internal conditions
580
581     """
582     if not callable(callback):
583       raise errors.ProgrammerError("callback must be callable")
584
585     self._callback = callback
586     self._logger = logger
587     # answers contains a dict of salt -> count
588     self._answers = {}
589
590   def RegisterQuery(self, salt):
591     if salt in self._answers:
592       raise errors.ProgrammerError("query already registered")
593     self._answers[salt] = 0
594
595   def AllAnswered(self):
596     """Have all the registered queries received at least an answer?
597
598     """
599     return compat.all(self._answers.values())
600
601   def _HandleExpire(self, up):
602     # if we have no answer we have received none, before the expiration.
603     if up.salt in self._answers:
604       del self._answers[up.salt]
605
606   def _HandleReply(self, up):
607     """Handle a single confd reply, and decide whether to filter it.
608
609     @rtype: boolean
610     @return: True if the reply should be filtered, False if it should be passed
611              on to the up-callback
612
613     """
614     if up.salt in self._answers:
615       self._answers[up.salt] += 1
616
617   def __call__(self, up):
618     """Filtering callback
619
620     @type up: L{ConfdUpcallPayload}
621     @param up: upper callback
622
623     """
624     if up.type == UPCALL_REPLY:
625       self._HandleReply(up)
626     elif up.type == UPCALL_EXPIRE:
627       self._HandleExpire(up)
628     self._callback(up)
629
630
631 class StoreResultCallback:
632   """Callback that simply stores the most recent answer.
633
634   @ivar _answers: dict of salt to (have_answer, reply)
635
636   """
637   _NO_KEY = (False, None)
638
639   def __init__(self):
640     """Constructor for StoreResultCallback
641
642     """
643     # answers contains a dict of salt -> best result
644     self._answers = {}
645
646   def GetResponse(self, salt):
647     """Return the best match for a salt
648
649     """
650     return self._answers.get(salt, self._NO_KEY)
651
652   def _HandleExpire(self, up):
653     """Expiration handler.
654
655     """
656     if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
657       del self._answers[up.salt]
658
659   def _HandleReply(self, up):
660     """Handle a single confd reply, and decide whether to filter it.
661
662     """
663     self._answers[up.salt] = (True, up)
664
665   def __call__(self, up):
666     """Filtering callback
667
668     @type up: L{ConfdUpcallPayload}
669     @param up: upper callback
670
671     """
672     if up.type == UPCALL_REPLY:
673       self._HandleReply(up)
674     elif up.type == UPCALL_EXPIRE:
675       self._HandleExpire(up)
676
677
678 def GetConfdClient(callback):
679   """Return a client configured using the given callback.
680
681   This is handy to abstract the MC list and HMAC key reading.
682
683   @attention: This should only be called on nodes which are part of a
684       cluster, since it depends on a valid (ganeti) data directory;
685       for code running outside of a cluster, you need to create the
686       client manually
687
688   """
689   ss = ssconf.SimpleStore()
690   mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
691   mc_list = utils.ReadFile(mc_file).splitlines()
692   hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
693   return ConfdClient(hmac_key, mc_list, callback)