Introduce lib/netutils.py
[ganeti-local] / lib / confd / client.py
1 #
2 #
3
4 # Copyright (C) 2009 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti confd client
23
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
27
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
31
32 Example usage::
33
34   client = ConfdClient(...) # includes callback specification
35   req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36   client.SendRequest(req)
37   # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38   # ... wait ...
39   # And your callback will be called by asyncore, when your query gets a
40   # response, or when it expires.
41
42 You can use the provided ConfdFilterCallback to act as a filter, only passing
43 "newer" answer to your callback, and filtering out outdated ones, or ones
44 confirming what you already got.
45
46 """
47
48 # pylint: disable-msg=E0203
49
50 # E0203: Access to member %r before its definition, since we use
51 # objects.py which doesn't explicitely initialise its members
52
53 import time
54 import random
55
56 from ganeti import utils
57 from ganeti import constants
58 from ganeti import objects
59 from ganeti import serializer
60 from ganeti import daemon # contains AsyncUDPSocket
61 from ganeti import errors
62 from ganeti import confd
63 from ganeti import ssconf
64 from ganeti import compat
65 from ganeti import netutils
66
67
68 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
69   """Confd udp asyncore client
70
71   This is kept separate from the main ConfdClient to make sure it's easy to
72   implement a non-asyncore based client library.
73
74   """
75   def __init__(self, client):
76     """Constructor for ConfdAsyncUDPClient
77
78     @type client: L{ConfdClient}
79     @param client: client library, to pass the datagrams to
80
81     """
82     daemon.AsyncUDPSocket.__init__(self)
83     self.client = client
84
85   # this method is overriding a daemon.AsyncUDPSocket method
86   def handle_datagram(self, payload, ip, port):
87     self.client.HandleResponse(payload, ip, port)
88
89
90 class _Request(object):
91   """Request status structure.
92
93   @ivar request: the request data
94   @ivar args: any extra arguments for the callback
95   @ivar expiry: the expiry timestamp of the request
96   @ivar sent: the set of contacted peers
97   @ivar rcvd: the set of peers who replied
98
99   """
100   def __init__(self, request, args, expiry, sent):
101     self.request = request
102     self.args = args
103     self.expiry = expiry
104     self.sent = frozenset(sent)
105     self.rcvd = set()
106
107
108 class ConfdClient:
109   """Send queries to confd, and get back answers.
110
111   Since the confd model works by querying multiple master candidates, and
112   getting back answers, this is an asynchronous library. It can either work
113   through asyncore or with your own handling.
114
115   @type _requests: dict
116   @ivar _requests: dictionary indexes by salt, which contains data
117       about the outstanding requests; the values are objects of type
118       L{_Request}
119
120   """
121   def __init__(self, hmac_key, peers, callback, port=None, logger=None):
122     """Constructor for ConfdClient
123
124     @type hmac_key: string
125     @param hmac_key: hmac key to talk to confd
126     @type peers: list
127     @param peers: list of peer nodes
128     @type callback: f(L{ConfdUpcallPayload})
129     @param callback: function to call when getting answers
130     @type port: integer
131     @param port: confd port (default: use GetDaemonPort)
132     @type logger: logging.Logger
133     @param logger: optional logger for internal conditions
134
135     """
136     if not callable(callback):
137       raise errors.ProgrammerError("callback must be callable")
138
139     self.UpdatePeerList(peers)
140     self._hmac_key = hmac_key
141     self._socket = ConfdAsyncUDPClient(self)
142     self._callback = callback
143     self._confd_port = port
144     self._logger = logger
145     self._requests = {}
146
147     if self._confd_port is None:
148       self._confd_port = netutils.GetDaemonPort(constants.CONFD)
149
150   def UpdatePeerList(self, peers):
151     """Update the list of peers
152
153     @type peers: list
154     @param peers: list of peer nodes
155
156     """
157     # we are actually called from init, so:
158     # pylint: disable-msg=W0201
159     if not isinstance(peers, list):
160       raise errors.ProgrammerError("peers must be a list")
161     # make a copy of peers, since we're going to shuffle the list, later
162     self._peers = list(peers)
163
164   def _PackRequest(self, request, now=None):
165     """Prepare a request to be sent on the wire.
166
167     This function puts a proper salt in a confd request, puts the proper salt,
168     and adds the correct magic number.
169
170     """
171     if now is None:
172       now = time.time()
173     tstamp = '%d' % now
174     req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
175     return confd.PackMagic(req)
176
177   def _UnpackReply(self, payload):
178     in_payload = confd.UnpackMagic(payload)
179     (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
180     answer = objects.ConfdReply.FromDict(dict_answer)
181     return answer, salt
182
183   def ExpireRequests(self):
184     """Delete all the expired requests.
185
186     """
187     now = time.time()
188     for rsalt, rq in self._requests.items():
189       if now >= rq.expiry:
190         del self._requests[rsalt]
191         client_reply = ConfdUpcallPayload(salt=rsalt,
192                                           type=UPCALL_EXPIRE,
193                                           orig_request=rq.request,
194                                           extra_args=rq.args,
195                                           client=self,
196                                           )
197         self._callback(client_reply)
198
199   def SendRequest(self, request, args=None, coverage=0, async=True):
200     """Send a confd request to some MCs
201
202     @type request: L{objects.ConfdRequest}
203     @param request: the request to send
204     @type args: tuple
205     @param args: additional callback arguments
206     @type coverage: integer
207     @param coverage: number of remote nodes to contact; if default
208         (0), it will use a reasonable default
209         (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
210         passed, it will use the maximum number of peers, otherwise the
211         number passed in will be used
212     @type async: boolean
213     @param async: handle the write asynchronously
214
215     """
216     if coverage == 0:
217       coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
218     elif coverage == -1:
219       coverage = len(self._peers)
220
221     if coverage > len(self._peers):
222       raise errors.ConfdClientError("Not enough MCs known to provide the"
223                                     " desired coverage")
224
225     if not request.rsalt:
226       raise errors.ConfdClientError("Missing request rsalt")
227
228     self.ExpireRequests()
229     if request.rsalt in self._requests:
230       raise errors.ConfdClientError("Duplicate request rsalt")
231
232     if request.type not in constants.CONFD_REQS:
233       raise errors.ConfdClientError("Invalid request type")
234
235     random.shuffle(self._peers)
236     targets = self._peers[:coverage]
237
238     now = time.time()
239     payload = self._PackRequest(request, now=now)
240
241     for target in targets:
242       try:
243         self._socket.enqueue_send(target, self._confd_port, payload)
244       except errors.UdpDataSizeError:
245         raise errors.ConfdClientError("Request too big")
246
247     expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
248     self._requests[request.rsalt] = _Request(request, args, expire_time,
249                                              targets)
250
251     if not async:
252       self.FlushSendQueue()
253
254   def HandleResponse(self, payload, ip, port):
255     """Asynchronous handler for a confd reply
256
257     Call the relevant callback associated to the current request.
258
259     """
260     try:
261       try:
262         answer, salt = self._UnpackReply(payload)
263       except (errors.SignatureError, errors.ConfdMagicError), err:
264         if self._logger:
265           self._logger.debug("Discarding broken package: %s" % err)
266         return
267
268       try:
269         rq = self._requests[salt]
270       except KeyError:
271         if self._logger:
272           self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
273         return
274
275       rq.rcvd.add(ip)
276
277       client_reply = ConfdUpcallPayload(salt=salt,
278                                         type=UPCALL_REPLY,
279                                         server_reply=answer,
280                                         orig_request=rq.request,
281                                         server_ip=ip,
282                                         server_port=port,
283                                         extra_args=rq.args,
284                                         client=self,
285                                        )
286       self._callback(client_reply)
287
288     finally:
289       self.ExpireRequests()
290
291   def FlushSendQueue(self):
292     """Send out all pending requests.
293
294     Can be used for synchronous client use.
295
296     """
297     while self._socket.writable():
298       self._socket.handle_write()
299
300   def ReceiveReply(self, timeout=1):
301     """Receive one reply.
302
303     @type timeout: float
304     @param timeout: how long to wait for the reply
305     @rtype: boolean
306     @return: True if some data has been handled, False otherwise
307
308     """
309     return self._socket.process_next_packet(timeout=timeout)
310
311   @staticmethod
312   def _NeededReplies(peer_cnt):
313     """Compute the minimum safe number of replies for a query.
314
315     The algorithm is designed to work well for both small and big
316     number of peers:
317         - for less than three, we require all responses
318         - for less than five, we allow one miss
319         - otherwise, half the number plus one
320
321     This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
322     4->2, 5->3, 6->3, 7->4, etc.
323
324     @type peer_cnt: int
325     @param peer_cnt: the number of peers contacted
326     @rtype: int
327     @return: the number of replies which should give a safe coverage
328
329     """
330     if peer_cnt < 3:
331       return peer_cnt
332     elif peer_cnt < 5:
333       return peer_cnt - 1
334     else:
335       return int(peer_cnt/2) + 1
336
337   def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
338     """Wait for replies to a given request.
339
340     This method will wait until either the timeout expires or a
341     minimum number (computed using L{_NeededReplies}) of replies are
342     received for the given salt. It is useful when doing synchronous
343     calls to this library.
344
345     @param salt: the salt of the request we want responses for
346     @param timeout: the maximum timeout (should be less or equal to
347         L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
348     @rtype: tuple
349     @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
350         request is unknown, timed_out will be true and the counters
351         will be zero
352
353     """
354     def _CheckResponse():
355       if salt not in self._requests:
356         # expired?
357         if self._logger:
358           self._logger.debug("Discarding unknown/expired request: %s" % salt)
359         return MISSING
360       rq = self._requests[salt]
361       if len(rq.rcvd) >= expected:
362         # already got all replies
363         return (False, len(rq.sent), len(rq.rcvd))
364       # else wait, using default timeout
365       self.ReceiveReply()
366       raise utils.RetryAgain()
367
368     MISSING = (True, 0, 0)
369
370     if salt not in self._requests:
371       return MISSING
372     # extend the expire time with the current timeout, so that we
373     # don't get the request expired from under us
374     rq = self._requests[salt]
375     rq.expiry += timeout
376     sent = len(rq.sent)
377     expected = self._NeededReplies(sent)
378
379     try:
380       return utils.Retry(_CheckResponse, 0, timeout)
381     except utils.RetryTimeout:
382       if salt in self._requests:
383         rq = self._requests[salt]
384         return (True, len(rq.sent), len(rq.rcvd))
385       else:
386         return MISSING
387
388
389 # UPCALL_REPLY: server reply upcall
390 # has all ConfdUpcallPayload fields populated
391 UPCALL_REPLY = 1
392 # UPCALL_EXPIRE: internal library request expire
393 # has only salt, type, orig_request and extra_args
394 UPCALL_EXPIRE = 2
395 CONFD_UPCALL_TYPES = frozenset([
396   UPCALL_REPLY,
397   UPCALL_EXPIRE,
398   ])
399
400
401 class ConfdUpcallPayload(objects.ConfigObject):
402   """Callback argument for confd replies
403
404   @type salt: string
405   @ivar salt: salt associated with the query
406   @type type: one of confd.client.CONFD_UPCALL_TYPES
407   @ivar type: upcall type (server reply, expired request, ...)
408   @type orig_request: L{objects.ConfdRequest}
409   @ivar orig_request: original request
410   @type server_reply: L{objects.ConfdReply}
411   @ivar server_reply: server reply
412   @type server_ip: string
413   @ivar server_ip: answering server ip address
414   @type server_port: int
415   @ivar server_port: answering server port
416   @type extra_args: any
417   @ivar extra_args: 'args' argument of the SendRequest function
418   @type client: L{ConfdClient}
419   @ivar client: current confd client instance
420
421   """
422   __slots__ = [
423     "salt",
424     "type",
425     "orig_request",
426     "server_reply",
427     "server_ip",
428     "server_port",
429     "extra_args",
430     "client",
431     ]
432
433
434 class ConfdClientRequest(objects.ConfdRequest):
435   """This is the client-side version of ConfdRequest.
436
437   This version of the class helps creating requests, on the client side, by
438   filling in some default values.
439
440   """
441   def __init__(self, **kwargs):
442     objects.ConfdRequest.__init__(self, **kwargs)
443     if not self.rsalt:
444       self.rsalt = utils.NewUUID()
445     if not self.protocol:
446       self.protocol = constants.CONFD_PROTOCOL_VERSION
447     if self.type not in constants.CONFD_REQS:
448       raise errors.ConfdClientError("Invalid request type")
449
450
451 class ConfdFilterCallback:
452   """Callback that calls another callback, but filters duplicate results.
453
454   @ivar consistent: a dictionary indexed by salt; for each salt, if
455       all responses ware identical, this will be True; this is the
456       expected state on a healthy cluster; on inconsistent or
457       partitioned clusters, this might be False, if we see answers
458       with the same serial but different contents
459
460   """
461   def __init__(self, callback, logger=None):
462     """Constructor for ConfdFilterCallback
463
464     @type callback: f(L{ConfdUpcallPayload})
465     @param callback: function to call when getting answers
466     @type logger: logging.Logger
467     @param logger: optional logger for internal conditions
468
469     """
470     if not callable(callback):
471       raise errors.ProgrammerError("callback must be callable")
472
473     self._callback = callback
474     self._logger = logger
475     # answers contains a dict of salt -> answer
476     self._answers = {}
477     self.consistent = {}
478
479   def _LogFilter(self, salt, new_reply, old_reply):
480     if not self._logger:
481       return
482
483     if new_reply.serial > old_reply.serial:
484       self._logger.debug("Filtering confirming answer, with newer"
485                          " serial for query %s" % salt)
486     elif new_reply.serial == old_reply.serial:
487       if new_reply.answer != old_reply.answer:
488         self._logger.warning("Got incoherent answers for query %s"
489                              " (serial: %s)" % (salt, new_reply.serial))
490       else:
491         self._logger.debug("Filtering confirming answer, with same"
492                            " serial for query %s" % salt)
493     else:
494       self._logger.debug("Filtering outdated answer for query %s"
495                          " serial: (%d < %d)" % (salt, old_reply.serial,
496                                                  new_reply.serial))
497
498   def _HandleExpire(self, up):
499     # if we have no answer we have received none, before the expiration.
500     if up.salt in self._answers:
501       del self._answers[up.salt]
502     if up.salt in self.consistent:
503       del self.consistent[up.salt]
504
505   def _HandleReply(self, up):
506     """Handle a single confd reply, and decide whether to filter it.
507
508     @rtype: boolean
509     @return: True if the reply should be filtered, False if it should be passed
510              on to the up-callback
511
512     """
513     filter_upcall = False
514     salt = up.salt
515     if salt not in self.consistent:
516       self.consistent[salt] = True
517     if salt not in self._answers:
518       # first answer for a query (don't filter, and record)
519       self._answers[salt] = up.server_reply
520     elif up.server_reply.serial > self._answers[salt].serial:
521       # newer answer (record, and compare contents)
522       old_answer = self._answers[salt]
523       self._answers[salt] = up.server_reply
524       if up.server_reply.answer == old_answer.answer:
525         # same content (filter) (version upgrade was unrelated)
526         filter_upcall = True
527         self._LogFilter(salt, up.server_reply, old_answer)
528       # else: different content, pass up a second answer
529     else:
530       # older or same-version answer (duplicate or outdated, filter)
531       if (up.server_reply.serial == self._answers[salt].serial and
532           up.server_reply.answer != self._answers[salt].answer):
533         self.consistent[salt] = False
534       filter_upcall = True
535       self._LogFilter(salt, up.server_reply, self._answers[salt])
536
537     return filter_upcall
538
539   def __call__(self, up):
540     """Filtering callback
541
542     @type up: L{ConfdUpcallPayload}
543     @param up: upper callback
544
545     """
546     filter_upcall = False
547     if up.type == UPCALL_REPLY:
548       filter_upcall = self._HandleReply(up)
549     elif up.type == UPCALL_EXPIRE:
550       self._HandleExpire(up)
551
552     if not filter_upcall:
553       self._callback(up)
554
555
556 class ConfdCountingCallback:
557   """Callback that calls another callback, and counts the answers
558
559   """
560   def __init__(self, callback, logger=None):
561     """Constructor for ConfdCountingCallback
562
563     @type callback: f(L{ConfdUpcallPayload})
564     @param callback: function to call when getting answers
565     @type logger: logging.Logger
566     @param logger: optional logger for internal conditions
567
568     """
569     if not callable(callback):
570       raise errors.ProgrammerError("callback must be callable")
571
572     self._callback = callback
573     self._logger = logger
574     # answers contains a dict of salt -> count
575     self._answers = {}
576
577   def RegisterQuery(self, salt):
578     if salt in self._answers:
579       raise errors.ProgrammerError("query already registered")
580     self._answers[salt] = 0
581
582   def AllAnswered(self):
583     """Have all the registered queries received at least an answer?
584
585     """
586     return compat.all(self._answers.values())
587
588   def _HandleExpire(self, up):
589     # if we have no answer we have received none, before the expiration.
590     if up.salt in self._answers:
591       del self._answers[up.salt]
592
593   def _HandleReply(self, up):
594     """Handle a single confd reply, and decide whether to filter it.
595
596     @rtype: boolean
597     @return: True if the reply should be filtered, False if it should be passed
598              on to the up-callback
599
600     """
601     if up.salt in self._answers:
602       self._answers[up.salt] += 1
603
604   def __call__(self, up):
605     """Filtering callback
606
607     @type up: L{ConfdUpcallPayload}
608     @param up: upper callback
609
610     """
611     if up.type == UPCALL_REPLY:
612       self._HandleReply(up)
613     elif up.type == UPCALL_EXPIRE:
614       self._HandleExpire(up)
615     self._callback(up)
616
617
618 class StoreResultCallback:
619   """Callback that simply stores the most recent answer.
620
621   @ivar _answers: dict of salt to (have_answer, reply)
622
623   """
624   _NO_KEY = (False, None)
625
626   def __init__(self):
627     """Constructor for StoreResultCallback
628
629     """
630     # answers contains a dict of salt -> best result
631     self._answers = {}
632
633   def GetResponse(self, salt):
634     """Return the best match for a salt
635
636     """
637     return self._answers.get(salt, self._NO_KEY)
638
639   def _HandleExpire(self, up):
640     """Expiration handler.
641
642     """
643     if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
644       del self._answers[up.salt]
645
646   def _HandleReply(self, up):
647     """Handle a single confd reply, and decide whether to filter it.
648
649     """
650     self._answers[up.salt] = (True, up)
651
652   def __call__(self, up):
653     """Filtering callback
654
655     @type up: L{ConfdUpcallPayload}
656     @param up: upper callback
657
658     """
659     if up.type == UPCALL_REPLY:
660       self._HandleReply(up)
661     elif up.type == UPCALL_EXPIRE:
662       self._HandleExpire(up)
663
664
665 def GetConfdClient(callback):
666   """Return a client configured using the given callback.
667
668   This is handy to abstract the MC list and HMAC key reading.
669
670   @attention: This should only be called on nodes which are part of a
671       cluster, since it depends on a valid (ganeti) data directory;
672       for code running outside of a cluster, you need to create the
673       client manually
674
675   """
676   ss = ssconf.SimpleStore()
677   mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
678   mc_list = utils.ReadFile(mc_file).splitlines()
679   hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
680   return ConfdClient(hmac_key, mc_list, callback)