Add separate module for backported language functionality
[ganeti-local] / lib / confd / client.py
1 #
2 #
3
4 # Copyright (C) 2009 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti confd client
23
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
27
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
31
32 Example usage::
33
34   client = ConfdClient(...) # includes callback specification
35   req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36   client.SendRequest(req)
37   # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38   # ... wait ...
39   # And your callback will be called by asyncore, when your query gets a
40   # response, or when it expires.
41
42 You can use the provided ConfdFilterCallback to act as a filter, only passing
43 "newer" answer to your callback, and filtering out outdated ones, or ones
44 confirming what you already got.
45
46 """
47
48 # pylint: disable-msg=E0203
49
50 # E0203: Access to member %r before its definition, since we use
51 # objects.py which doesn't explicitely initialise its members
52
53 import time
54 import random
55
56 from ganeti import utils
57 from ganeti import constants
58 from ganeti import objects
59 from ganeti import serializer
60 from ganeti import daemon # contains AsyncUDPSocket
61 from ganeti import errors
62 from ganeti import confd
63 from ganeti import ssconf
64 from ganeti import compat
65
66
67 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
68   """Confd udp asyncore client
69
70   This is kept separate from the main ConfdClient to make sure it's easy to
71   implement a non-asyncore based client library.
72
73   """
74   def __init__(self, client):
75     """Constructor for ConfdAsyncUDPClient
76
77     @type client: L{ConfdClient}
78     @param client: client library, to pass the datagrams to
79
80     """
81     daemon.AsyncUDPSocket.__init__(self)
82     self.client = client
83
84   # this method is overriding a daemon.AsyncUDPSocket method
85   def handle_datagram(self, payload, ip, port):
86     self.client.HandleResponse(payload, ip, port)
87
88
89 class _Request(object):
90   """Request status structure.
91
92   @ivar request: the request data
93   @ivar args: any extra arguments for the callback
94   @ivar expiry: the expiry timestamp of the request
95   @ivar sent: the set of contacted peers
96   @ivar rcvd: the set of peers who replied
97
98   """
99   def __init__(self, request, args, expiry, sent):
100     self.request = request
101     self.args = args
102     self.expiry = expiry
103     self.sent = frozenset(sent)
104     self.rcvd = set()
105
106
107 class ConfdClient:
108   """Send queries to confd, and get back answers.
109
110   Since the confd model works by querying multiple master candidates, and
111   getting back answers, this is an asynchronous library. It can either work
112   through asyncore or with your own handling.
113
114   @type _requests: dict
115   @ivar _requests: dictionary indexes by salt, which contains data
116       about the outstanding requests; the values are objects of type
117       L{_Request}
118
119   """
120   def __init__(self, hmac_key, peers, callback, port=None, logger=None):
121     """Constructor for ConfdClient
122
123     @type hmac_key: string
124     @param hmac_key: hmac key to talk to confd
125     @type peers: list
126     @param peers: list of peer nodes
127     @type callback: f(L{ConfdUpcallPayload})
128     @param callback: function to call when getting answers
129     @type port: integer
130     @param port: confd port (default: use GetDaemonPort)
131     @type logger: logging.Logger
132     @param logger: optional logger for internal conditions
133
134     """
135     if not callable(callback):
136       raise errors.ProgrammerError("callback must be callable")
137
138     self.UpdatePeerList(peers)
139     self._hmac_key = hmac_key
140     self._socket = ConfdAsyncUDPClient(self)
141     self._callback = callback
142     self._confd_port = port
143     self._logger = logger
144     self._requests = {}
145
146     if self._confd_port is None:
147       self._confd_port = utils.GetDaemonPort(constants.CONFD)
148
149   def UpdatePeerList(self, peers):
150     """Update the list of peers
151
152     @type peers: list
153     @param peers: list of peer nodes
154
155     """
156     # we are actually called from init, so:
157     # pylint: disable-msg=W0201
158     if not isinstance(peers, list):
159       raise errors.ProgrammerError("peers must be a list")
160     # make a copy of peers, since we're going to shuffle the list, later
161     self._peers = list(peers)
162
163   def _PackRequest(self, request, now=None):
164     """Prepare a request to be sent on the wire.
165
166     This function puts a proper salt in a confd request, puts the proper salt,
167     and adds the correct magic number.
168
169     """
170     if now is None:
171       now = time.time()
172     tstamp = '%d' % now
173     req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
174     return confd.PackMagic(req)
175
176   def _UnpackReply(self, payload):
177     in_payload = confd.UnpackMagic(payload)
178     (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
179     answer = objects.ConfdReply.FromDict(dict_answer)
180     return answer, salt
181
182   def ExpireRequests(self):
183     """Delete all the expired requests.
184
185     """
186     now = time.time()
187     for rsalt, rq in self._requests.items():
188       if now >= rq.expiry:
189         del self._requests[rsalt]
190         client_reply = ConfdUpcallPayload(salt=rsalt,
191                                           type=UPCALL_EXPIRE,
192                                           orig_request=rq.request,
193                                           extra_args=rq.args,
194                                           client=self,
195                                           )
196         self._callback(client_reply)
197
198   def SendRequest(self, request, args=None, coverage=0, async=True):
199     """Send a confd request to some MCs
200
201     @type request: L{objects.ConfdRequest}
202     @param request: the request to send
203     @type args: tuple
204     @param args: additional callback arguments
205     @type coverage: integer
206     @param coverage: number of remote nodes to contact; if default
207         (0), it will use a reasonable default
208         (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
209         passed, it will use the maximum number of peers, otherwise the
210         number passed in will be used
211     @type async: boolean
212     @param async: handle the write asynchronously
213
214     """
215     if coverage == 0:
216       coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
217     elif coverage == -1:
218       coverage = len(self._peers)
219
220     if coverage > len(self._peers):
221       raise errors.ConfdClientError("Not enough MCs known to provide the"
222                                     " desired coverage")
223
224     if not request.rsalt:
225       raise errors.ConfdClientError("Missing request rsalt")
226
227     self.ExpireRequests()
228     if request.rsalt in self._requests:
229       raise errors.ConfdClientError("Duplicate request rsalt")
230
231     if request.type not in constants.CONFD_REQS:
232       raise errors.ConfdClientError("Invalid request type")
233
234     random.shuffle(self._peers)
235     targets = self._peers[:coverage]
236
237     now = time.time()
238     payload = self._PackRequest(request, now=now)
239
240     for target in targets:
241       try:
242         self._socket.enqueue_send(target, self._confd_port, payload)
243       except errors.UdpDataSizeError:
244         raise errors.ConfdClientError("Request too big")
245
246     expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
247     self._requests[request.rsalt] = _Request(request, args, expire_time,
248                                              targets)
249
250     if not async:
251       self.FlushSendQueue()
252
253   def HandleResponse(self, payload, ip, port):
254     """Asynchronous handler for a confd reply
255
256     Call the relevant callback associated to the current request.
257
258     """
259     try:
260       try:
261         answer, salt = self._UnpackReply(payload)
262       except (errors.SignatureError, errors.ConfdMagicError), err:
263         if self._logger:
264           self._logger.debug("Discarding broken package: %s" % err)
265         return
266
267       try:
268         rq = self._requests[salt]
269       except KeyError:
270         if self._logger:
271           self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
272         return
273
274       rq.rcvd.add(ip)
275
276       client_reply = ConfdUpcallPayload(salt=salt,
277                                         type=UPCALL_REPLY,
278                                         server_reply=answer,
279                                         orig_request=rq.request,
280                                         server_ip=ip,
281                                         server_port=port,
282                                         extra_args=rq.args,
283                                         client=self,
284                                        )
285       self._callback(client_reply)
286
287     finally:
288       self.ExpireRequests()
289
290   def FlushSendQueue(self):
291     """Send out all pending requests.
292
293     Can be used for synchronous client use.
294
295     """
296     while self._socket.writable():
297       self._socket.handle_write()
298
299   def ReceiveReply(self, timeout=1):
300     """Receive one reply.
301
302     @type timeout: float
303     @param timeout: how long to wait for the reply
304     @rtype: boolean
305     @return: True if some data has been handled, False otherwise
306
307     """
308     return self._socket.process_next_packet(timeout=timeout)
309
310   @staticmethod
311   def _NeededReplies(peer_cnt):
312     """Compute the minimum safe number of replies for a query.
313
314     The algorithm is designed to work well for both small and big
315     number of peers:
316         - for less than three, we require all responses
317         - for less than five, we allow one miss
318         - otherwise, half the number plus one
319
320     This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
321     4->2, 5->3, 6->3, 7->4, etc.
322
323     @type peer_cnt: int
324     @param peer_cnt: the number of peers contacted
325     @rtype: int
326     @return: the number of replies which should give a safe coverage
327
328     """
329     if peer_cnt < 3:
330       return peer_cnt
331     elif peer_cnt < 5:
332       return peer_cnt - 1
333     else:
334       return int(peer_cnt/2) + 1
335
336   def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
337     """Wait for replies to a given request.
338
339     This method will wait until either the timeout expires or a
340     minimum number (computed using L{_NeededReplies}) of replies are
341     received for the given salt. It is useful when doing synchronous
342     calls to this library.
343
344     @param salt: the salt of the request we want responses for
345     @param timeout: the maximum timeout (should be less or equal to
346         L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
347     @rtype: tuple
348     @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
349         request is unknown, timed_out will be true and the counters
350         will be zero
351
352     """
353     def _CheckResponse():
354       if salt not in self._requests:
355         # expired?
356         if self._logger:
357           self._logger.debug("Discarding unknown/expired request: %s" % salt)
358         return MISSING
359       rq = self._requests[salt]
360       if len(rq.rcvd) >= expected:
361         # already got all replies
362         return (False, len(rq.sent), len(rq.rcvd))
363       # else wait, using default timeout
364       self.ReceiveReply()
365       raise utils.RetryAgain()
366
367     MISSING = (True, 0, 0)
368
369     if salt not in self._requests:
370       return MISSING
371     # extend the expire time with the current timeout, so that we
372     # don't get the request expired from under us
373     rq = self._requests[salt]
374     rq.expiry += timeout
375     sent = len(rq.sent)
376     expected = self._NeededReplies(sent)
377
378     try:
379       return utils.Retry(_CheckResponse, 0, timeout)
380     except utils.RetryTimeout:
381       if salt in self._requests:
382         rq = self._requests[salt]
383         return (True, len(rq.sent), len(rq.rcvd))
384       else:
385         return MISSING
386
387
388 # UPCALL_REPLY: server reply upcall
389 # has all ConfdUpcallPayload fields populated
390 UPCALL_REPLY = 1
391 # UPCALL_EXPIRE: internal library request expire
392 # has only salt, type, orig_request and extra_args
393 UPCALL_EXPIRE = 2
394 CONFD_UPCALL_TYPES = frozenset([
395   UPCALL_REPLY,
396   UPCALL_EXPIRE,
397   ])
398
399
400 class ConfdUpcallPayload(objects.ConfigObject):
401   """Callback argument for confd replies
402
403   @type salt: string
404   @ivar salt: salt associated with the query
405   @type type: one of confd.client.CONFD_UPCALL_TYPES
406   @ivar type: upcall type (server reply, expired request, ...)
407   @type orig_request: L{objects.ConfdRequest}
408   @ivar orig_request: original request
409   @type server_reply: L{objects.ConfdReply}
410   @ivar server_reply: server reply
411   @type server_ip: string
412   @ivar server_ip: answering server ip address
413   @type server_port: int
414   @ivar server_port: answering server port
415   @type extra_args: any
416   @ivar extra_args: 'args' argument of the SendRequest function
417   @type client: L{ConfdClient}
418   @ivar client: current confd client instance
419
420   """
421   __slots__ = [
422     "salt",
423     "type",
424     "orig_request",
425     "server_reply",
426     "server_ip",
427     "server_port",
428     "extra_args",
429     "client",
430     ]
431
432
433 class ConfdClientRequest(objects.ConfdRequest):
434   """This is the client-side version of ConfdRequest.
435
436   This version of the class helps creating requests, on the client side, by
437   filling in some default values.
438
439   """
440   def __init__(self, **kwargs):
441     objects.ConfdRequest.__init__(self, **kwargs)
442     if not self.rsalt:
443       self.rsalt = utils.NewUUID()
444     if not self.protocol:
445       self.protocol = constants.CONFD_PROTOCOL_VERSION
446     if self.type not in constants.CONFD_REQS:
447       raise errors.ConfdClientError("Invalid request type")
448
449
450 class ConfdFilterCallback:
451   """Callback that calls another callback, but filters duplicate results.
452
453   @ivar consistent: a dictionary indexed by salt; for each salt, if
454       all responses ware identical, this will be True; this is the
455       expected state on a healthy cluster; on inconsistent or
456       partitioned clusters, this might be False, if we see answers
457       with the same serial but different contents
458
459   """
460   def __init__(self, callback, logger=None):
461     """Constructor for ConfdFilterCallback
462
463     @type callback: f(L{ConfdUpcallPayload})
464     @param callback: function to call when getting answers
465     @type logger: logging.Logger
466     @param logger: optional logger for internal conditions
467
468     """
469     if not callable(callback):
470       raise errors.ProgrammerError("callback must be callable")
471
472     self._callback = callback
473     self._logger = logger
474     # answers contains a dict of salt -> answer
475     self._answers = {}
476     self.consistent = {}
477
478   def _LogFilter(self, salt, new_reply, old_reply):
479     if not self._logger:
480       return
481
482     if new_reply.serial > old_reply.serial:
483       self._logger.debug("Filtering confirming answer, with newer"
484                          " serial for query %s" % salt)
485     elif new_reply.serial == old_reply.serial:
486       if new_reply.answer != old_reply.answer:
487         self._logger.warning("Got incoherent answers for query %s"
488                              " (serial: %s)" % (salt, new_reply.serial))
489       else:
490         self._logger.debug("Filtering confirming answer, with same"
491                            " serial for query %s" % salt)
492     else:
493       self._logger.debug("Filtering outdated answer for query %s"
494                          " serial: (%d < %d)" % (salt, old_reply.serial,
495                                                  new_reply.serial))
496
497   def _HandleExpire(self, up):
498     # if we have no answer we have received none, before the expiration.
499     if up.salt in self._answers:
500       del self._answers[up.salt]
501     if up.salt in self.consistent:
502       del self.consistent[up.salt]
503
504   def _HandleReply(self, up):
505     """Handle a single confd reply, and decide whether to filter it.
506
507     @rtype: boolean
508     @return: True if the reply should be filtered, False if it should be passed
509              on to the up-callback
510
511     """
512     filter_upcall = False
513     salt = up.salt
514     if salt not in self.consistent:
515       self.consistent[salt] = True
516     if salt not in self._answers:
517       # first answer for a query (don't filter, and record)
518       self._answers[salt] = up.server_reply
519     elif up.server_reply.serial > self._answers[salt].serial:
520       # newer answer (record, and compare contents)
521       old_answer = self._answers[salt]
522       self._answers[salt] = up.server_reply
523       if up.server_reply.answer == old_answer.answer:
524         # same content (filter) (version upgrade was unrelated)
525         filter_upcall = True
526         self._LogFilter(salt, up.server_reply, old_answer)
527       # else: different content, pass up a second answer
528     else:
529       # older or same-version answer (duplicate or outdated, filter)
530       if (up.server_reply.serial == self._answers[salt].serial and
531           up.server_reply.answer != self._answers[salt].answer):
532         self.consistent[salt] = False
533       filter_upcall = True
534       self._LogFilter(salt, up.server_reply, self._answers[salt])
535
536     return filter_upcall
537
538   def __call__(self, up):
539     """Filtering callback
540
541     @type up: L{ConfdUpcallPayload}
542     @param up: upper callback
543
544     """
545     filter_upcall = False
546     if up.type == UPCALL_REPLY:
547       filter_upcall = self._HandleReply(up)
548     elif up.type == UPCALL_EXPIRE:
549       self._HandleExpire(up)
550
551     if not filter_upcall:
552       self._callback(up)
553
554
555 class ConfdCountingCallback:
556   """Callback that calls another callback, and counts the answers
557
558   """
559   def __init__(self, callback, logger=None):
560     """Constructor for ConfdCountingCallback
561
562     @type callback: f(L{ConfdUpcallPayload})
563     @param callback: function to call when getting answers
564     @type logger: logging.Logger
565     @param logger: optional logger for internal conditions
566
567     """
568     if not callable(callback):
569       raise errors.ProgrammerError("callback must be callable")
570
571     self._callback = callback
572     self._logger = logger
573     # answers contains a dict of salt -> count
574     self._answers = {}
575
576   def RegisterQuery(self, salt):
577     if salt in self._answers:
578       raise errors.ProgrammerError("query already registered")
579     self._answers[salt] = 0
580
581   def AllAnswered(self):
582     """Have all the registered queries received at least an answer?
583
584     """
585     return compat.all(self._answers.values())
586
587   def _HandleExpire(self, up):
588     # if we have no answer we have received none, before the expiration.
589     if up.salt in self._answers:
590       del self._answers[up.salt]
591
592   def _HandleReply(self, up):
593     """Handle a single confd reply, and decide whether to filter it.
594
595     @rtype: boolean
596     @return: True if the reply should be filtered, False if it should be passed
597              on to the up-callback
598
599     """
600     if up.salt in self._answers:
601       self._answers[up.salt] += 1
602
603   def __call__(self, up):
604     """Filtering callback
605
606     @type up: L{ConfdUpcallPayload}
607     @param up: upper callback
608
609     """
610     if up.type == UPCALL_REPLY:
611       self._HandleReply(up)
612     elif up.type == UPCALL_EXPIRE:
613       self._HandleExpire(up)
614     self._callback(up)
615
616
617 class StoreResultCallback:
618   """Callback that simply stores the most recent answer.
619
620   @ivar _answers: dict of salt to (have_answer, reply)
621
622   """
623   _NO_KEY = (False, None)
624
625   def __init__(self):
626     """Constructor for StoreResultCallback
627
628     """
629     # answers contains a dict of salt -> best result
630     self._answers = {}
631
632   def GetResponse(self, salt):
633     """Return the best match for a salt
634
635     """
636     return self._answers.get(salt, self._NO_KEY)
637
638   def _HandleExpire(self, up):
639     """Expiration handler.
640
641     """
642     if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
643       del self._answers[up.salt]
644
645   def _HandleReply(self, up):
646     """Handle a single confd reply, and decide whether to filter it.
647
648     """
649     self._answers[up.salt] = (True, up)
650
651   def __call__(self, up):
652     """Filtering callback
653
654     @type up: L{ConfdUpcallPayload}
655     @param up: upper callback
656
657     """
658     if up.type == UPCALL_REPLY:
659       self._HandleReply(up)
660     elif up.type == UPCALL_EXPIRE:
661       self._HandleExpire(up)
662
663
664 def GetConfdClient(callback):
665   """Return a client configured using the given callback.
666
667   This is handy to abstract the MC list and HMAC key reading.
668
669   @attention: This should only be called on nodes which are part of a
670       cluster, since it depends on a valid (ganeti) data directory;
671       for code running outside of a cluster, you need to create the
672       client manually
673
674   """
675   ss = ssconf.SimpleStore()
676   mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
677   mc_list = utils.ReadFile(mc_file).splitlines()
678   hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
679   return ConfdClient(hmac_key, mc_list, callback)