Moved checks within LUClusterVerifyGroup
[ganeti-local] / lib / confd / client.py
1 #
2 #
3
4 # Copyright (C) 2009, 2010, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti confd client
23
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
27
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
31
32 Example usage::
33
34   client = ConfdClient(...) # includes callback specification
35   req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36   client.SendRequest(req)
37   # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38   # ... wait ...
39   # And your callback will be called by asyncore, when your query gets a
40   # response, or when it expires.
41
42 You can use the provided ConfdFilterCallback to act as a filter, only passing
43 "newer" answer to your callback, and filtering out outdated ones, or ones
44 confirming what you already got.
45
46 """
47
48 # pylint: disable=E0203
49
50 # E0203: Access to member %r before its definition, since we use
51 # objects.py which doesn't explicitly initialise its members
52
53 import time
54 import random
55
56 from ganeti import utils
57 from ganeti import constants
58 from ganeti import objects
59 from ganeti import serializer
60 from ganeti import daemon # contains AsyncUDPSocket
61 from ganeti import errors
62 from ganeti import confd
63 from ganeti import ssconf
64 from ganeti import compat
65 from ganeti import netutils
66 from ganeti import pathutils
67
68
69 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
70   """Confd udp asyncore client
71
72   This is kept separate from the main ConfdClient to make sure it's easy to
73   implement a non-asyncore based client library.
74
75   """
76   def __init__(self, client, family):
77     """Constructor for ConfdAsyncUDPClient
78
79     @type client: L{ConfdClient}
80     @param client: client library, to pass the datagrams to
81
82     """
83     daemon.AsyncUDPSocket.__init__(self, family)
84     self.client = client
85
86   # this method is overriding a daemon.AsyncUDPSocket method
87   def handle_datagram(self, payload, ip, port):
88     self.client.HandleResponse(payload, ip, port)
89
90
91 class _Request(object):
92   """Request status structure.
93
94   @ivar request: the request data
95   @ivar args: any extra arguments for the callback
96   @ivar expiry: the expiry timestamp of the request
97   @ivar sent: the set of contacted peers
98   @ivar rcvd: the set of peers who replied
99
100   """
101   def __init__(self, request, args, expiry, sent):
102     self.request = request
103     self.args = args
104     self.expiry = expiry
105     self.sent = frozenset(sent)
106     self.rcvd = set()
107
108
109 class ConfdClient:
110   """Send queries to confd, and get back answers.
111
112   Since the confd model works by querying multiple master candidates, and
113   getting back answers, this is an asynchronous library. It can either work
114   through asyncore or with your own handling.
115
116   @type _requests: dict
117   @ivar _requests: dictionary indexes by salt, which contains data
118       about the outstanding requests; the values are objects of type
119       L{_Request}
120
121   """
122   def __init__(self, hmac_key, peers, callback, port=None, logger=None):
123     """Constructor for ConfdClient
124
125     @type hmac_key: string
126     @param hmac_key: hmac key to talk to confd
127     @type peers: list
128     @param peers: list of peer nodes
129     @type callback: f(L{ConfdUpcallPayload})
130     @param callback: function to call when getting answers
131     @type port: integer
132     @param port: confd port (default: use GetDaemonPort)
133     @type logger: logging.Logger
134     @param logger: optional logger for internal conditions
135
136     """
137     if not callable(callback):
138       raise errors.ProgrammerError("callback must be callable")
139
140     self.UpdatePeerList(peers)
141     self._SetPeersAddressFamily()
142     self._hmac_key = hmac_key
143     self._socket = ConfdAsyncUDPClient(self, self._family)
144     self._callback = callback
145     self._confd_port = port
146     self._logger = logger
147     self._requests = {}
148
149     if self._confd_port is None:
150       self._confd_port = netutils.GetDaemonPort(constants.CONFD)
151
152   def UpdatePeerList(self, peers):
153     """Update the list of peers
154
155     @type peers: list
156     @param peers: list of peer nodes
157
158     """
159     # we are actually called from init, so:
160     # pylint: disable=W0201
161     if not isinstance(peers, list):
162       raise errors.ProgrammerError("peers must be a list")
163     # make a copy of peers, since we're going to shuffle the list, later
164     self._peers = list(peers)
165
166   def _PackRequest(self, request, now=None):
167     """Prepare a request to be sent on the wire.
168
169     This function puts a proper salt in a confd request, puts the proper salt,
170     and adds the correct magic number.
171
172     """
173     if now is None:
174       now = time.time()
175     tstamp = "%d" % now
176     req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
177     return confd.PackMagic(req)
178
179   def _UnpackReply(self, payload):
180     in_payload = confd.UnpackMagic(payload)
181     (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
182     answer = objects.ConfdReply.FromDict(dict_answer)
183     return answer, salt
184
185   def ExpireRequests(self):
186     """Delete all the expired requests.
187
188     """
189     now = time.time()
190     for rsalt, rq in self._requests.items():
191       if now >= rq.expiry:
192         del self._requests[rsalt]
193         client_reply = ConfdUpcallPayload(salt=rsalt,
194                                           type=UPCALL_EXPIRE,
195                                           orig_request=rq.request,
196                                           extra_args=rq.args,
197                                           client=self,
198                                           )
199         self._callback(client_reply)
200
201   def SendRequest(self, request, args=None, coverage=0, async=True):
202     """Send a confd request to some MCs
203
204     @type request: L{objects.ConfdRequest}
205     @param request: the request to send
206     @type args: tuple
207     @param args: additional callback arguments
208     @type coverage: integer
209     @param coverage: number of remote nodes to contact; if default
210         (0), it will use a reasonable default
211         (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
212         passed, it will use the maximum number of peers, otherwise the
213         number passed in will be used
214     @type async: boolean
215     @param async: handle the write asynchronously
216
217     """
218     if coverage == 0:
219       coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
220     elif coverage == -1:
221       coverage = len(self._peers)
222
223     if coverage > len(self._peers):
224       raise errors.ConfdClientError("Not enough MCs known to provide the"
225                                     " desired coverage")
226
227     if not request.rsalt:
228       raise errors.ConfdClientError("Missing request rsalt")
229
230     self.ExpireRequests()
231     if request.rsalt in self._requests:
232       raise errors.ConfdClientError("Duplicate request rsalt")
233
234     if request.type not in constants.CONFD_REQS:
235       raise errors.ConfdClientError("Invalid request type")
236
237     random.shuffle(self._peers)
238     targets = self._peers[:coverage]
239
240     now = time.time()
241     payload = self._PackRequest(request, now=now)
242
243     for target in targets:
244       try:
245         self._socket.enqueue_send(target, self._confd_port, payload)
246       except errors.UdpDataSizeError:
247         raise errors.ConfdClientError("Request too big")
248
249     expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
250     self._requests[request.rsalt] = _Request(request, args, expire_time,
251                                              targets)
252
253     if not async:
254       self.FlushSendQueue()
255
256   def HandleResponse(self, payload, ip, port):
257     """Asynchronous handler for a confd reply
258
259     Call the relevant callback associated to the current request.
260
261     """
262     try:
263       try:
264         answer, salt = self._UnpackReply(payload)
265       except (errors.SignatureError, errors.ConfdMagicError), err:
266         if self._logger:
267           self._logger.debug("Discarding broken package: %s" % err)
268         return
269
270       try:
271         rq = self._requests[salt]
272       except KeyError:
273         if self._logger:
274           self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
275         return
276
277       rq.rcvd.add(ip)
278
279       client_reply = ConfdUpcallPayload(salt=salt,
280                                         type=UPCALL_REPLY,
281                                         server_reply=answer,
282                                         orig_request=rq.request,
283                                         server_ip=ip,
284                                         server_port=port,
285                                         extra_args=rq.args,
286                                         client=self,
287                                         )
288       self._callback(client_reply)
289
290     finally:
291       self.ExpireRequests()
292
293   def FlushSendQueue(self):
294     """Send out all pending requests.
295
296     Can be used for synchronous client use.
297
298     """
299     while self._socket.writable():
300       self._socket.handle_write()
301
302   def ReceiveReply(self, timeout=1):
303     """Receive one reply.
304
305     @type timeout: float
306     @param timeout: how long to wait for the reply
307     @rtype: boolean
308     @return: True if some data has been handled, False otherwise
309
310     """
311     return self._socket.process_next_packet(timeout=timeout)
312
313   @staticmethod
314   def _NeededReplies(peer_cnt):
315     """Compute the minimum safe number of replies for a query.
316
317     The algorithm is designed to work well for both small and big
318     number of peers:
319         - for less than three, we require all responses
320         - for less than five, we allow one miss
321         - otherwise, half the number plus one
322
323     This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
324     4->2, 5->3, 6->3, 7->4, etc.
325
326     @type peer_cnt: int
327     @param peer_cnt: the number of peers contacted
328     @rtype: int
329     @return: the number of replies which should give a safe coverage
330
331     """
332     if peer_cnt < 3:
333       return peer_cnt
334     elif peer_cnt < 5:
335       return peer_cnt - 1
336     else:
337       return int(peer_cnt / 2) + 1
338
339   def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
340     """Wait for replies to a given request.
341
342     This method will wait until either the timeout expires or a
343     minimum number (computed using L{_NeededReplies}) of replies are
344     received for the given salt. It is useful when doing synchronous
345     calls to this library.
346
347     @param salt: the salt of the request we want responses for
348     @param timeout: the maximum timeout (should be less or equal to
349         L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
350     @rtype: tuple
351     @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
352         request is unknown, timed_out will be true and the counters
353         will be zero
354
355     """
356     def _CheckResponse():
357       if salt not in self._requests:
358         # expired?
359         if self._logger:
360           self._logger.debug("Discarding unknown/expired request: %s" % salt)
361         return MISSING
362       rq = self._requests[salt]
363       if len(rq.rcvd) >= expected:
364         # already got all replies
365         return (False, len(rq.sent), len(rq.rcvd))
366       # else wait, using default timeout
367       self.ReceiveReply()
368       raise utils.RetryAgain()
369
370     MISSING = (True, 0, 0)
371
372     if salt not in self._requests:
373       return MISSING
374     # extend the expire time with the current timeout, so that we
375     # don't get the request expired from under us
376     rq = self._requests[salt]
377     rq.expiry += timeout
378     sent = len(rq.sent)
379     expected = self._NeededReplies(sent)
380
381     try:
382       return utils.Retry(_CheckResponse, 0, timeout)
383     except utils.RetryTimeout:
384       if salt in self._requests:
385         rq = self._requests[salt]
386         return (True, len(rq.sent), len(rq.rcvd))
387       else:
388         return MISSING
389
390   def _SetPeersAddressFamily(self):
391     if not self._peers:
392       raise errors.ConfdClientError("Peer list empty")
393     try:
394       peer = self._peers[0]
395       self._family = netutils.IPAddress.GetAddressFamily(peer)
396       for peer in self._peers[1:]:
397         if netutils.IPAddress.GetAddressFamily(peer) != self._family:
398           raise errors.ConfdClientError("Peers must be of same address family")
399     except errors.IPAddressError:
400       raise errors.ConfdClientError("Peer address %s invalid" % peer)
401
402
403 # UPCALL_REPLY: server reply upcall
404 # has all ConfdUpcallPayload fields populated
405 UPCALL_REPLY = 1
406 # UPCALL_EXPIRE: internal library request expire
407 # has only salt, type, orig_request and extra_args
408 UPCALL_EXPIRE = 2
409 CONFD_UPCALL_TYPES = compat.UniqueFrozenset([
410   UPCALL_REPLY,
411   UPCALL_EXPIRE,
412   ])
413
414
415 class ConfdUpcallPayload(objects.ConfigObject):
416   """Callback argument for confd replies
417
418   @type salt: string
419   @ivar salt: salt associated with the query
420   @type type: one of confd.client.CONFD_UPCALL_TYPES
421   @ivar type: upcall type (server reply, expired request, ...)
422   @type orig_request: L{objects.ConfdRequest}
423   @ivar orig_request: original request
424   @type server_reply: L{objects.ConfdReply}
425   @ivar server_reply: server reply
426   @type server_ip: string
427   @ivar server_ip: answering server ip address
428   @type server_port: int
429   @ivar server_port: answering server port
430   @type extra_args: any
431   @ivar extra_args: 'args' argument of the SendRequest function
432   @type client: L{ConfdClient}
433   @ivar client: current confd client instance
434
435   """
436   __slots__ = [
437     "salt",
438     "type",
439     "orig_request",
440     "server_reply",
441     "server_ip",
442     "server_port",
443     "extra_args",
444     "client",
445     ]
446
447
448 class ConfdClientRequest(objects.ConfdRequest):
449   """This is the client-side version of ConfdRequest.
450
451   This version of the class helps creating requests, on the client side, by
452   filling in some default values.
453
454   """
455   def __init__(self, **kwargs):
456     objects.ConfdRequest.__init__(self, **kwargs)
457     if not self.rsalt:
458       self.rsalt = utils.NewUUID()
459     if not self.protocol:
460       self.protocol = constants.CONFD_PROTOCOL_VERSION
461     if self.type not in constants.CONFD_REQS:
462       raise errors.ConfdClientError("Invalid request type")
463
464
465 class ConfdFilterCallback:
466   """Callback that calls another callback, but filters duplicate results.
467
468   @ivar consistent: a dictionary indexed by salt; for each salt, if
469       all responses ware identical, this will be True; this is the
470       expected state on a healthy cluster; on inconsistent or
471       partitioned clusters, this might be False, if we see answers
472       with the same serial but different contents
473
474   """
475   def __init__(self, callback, logger=None):
476     """Constructor for ConfdFilterCallback
477
478     @type callback: f(L{ConfdUpcallPayload})
479     @param callback: function to call when getting answers
480     @type logger: logging.Logger
481     @param logger: optional logger for internal conditions
482
483     """
484     if not callable(callback):
485       raise errors.ProgrammerError("callback must be callable")
486
487     self._callback = callback
488     self._logger = logger
489     # answers contains a dict of salt -> answer
490     self._answers = {}
491     self.consistent = {}
492
493   def _LogFilter(self, salt, new_reply, old_reply):
494     if not self._logger:
495       return
496
497     if new_reply.serial > old_reply.serial:
498       self._logger.debug("Filtering confirming answer, with newer"
499                          " serial for query %s" % salt)
500     elif new_reply.serial == old_reply.serial:
501       if new_reply.answer != old_reply.answer:
502         self._logger.warning("Got incoherent answers for query %s"
503                              " (serial: %s)" % (salt, new_reply.serial))
504       else:
505         self._logger.debug("Filtering confirming answer, with same"
506                            " serial for query %s" % salt)
507     else:
508       self._logger.debug("Filtering outdated answer for query %s"
509                          " serial: (%d < %d)" % (salt, old_reply.serial,
510                                                  new_reply.serial))
511
512   def _HandleExpire(self, up):
513     # if we have no answer we have received none, before the expiration.
514     if up.salt in self._answers:
515       del self._answers[up.salt]
516     if up.salt in self.consistent:
517       del self.consistent[up.salt]
518
519   def _HandleReply(self, up):
520     """Handle a single confd reply, and decide whether to filter it.
521
522     @rtype: boolean
523     @return: True if the reply should be filtered, False if it should be passed
524              on to the up-callback
525
526     """
527     filter_upcall = False
528     salt = up.salt
529     if salt not in self.consistent:
530       self.consistent[salt] = True
531     if salt not in self._answers:
532       # first answer for a query (don't filter, and record)
533       self._answers[salt] = up.server_reply
534     elif up.server_reply.serial > self._answers[salt].serial:
535       # newer answer (record, and compare contents)
536       old_answer = self._answers[salt]
537       self._answers[salt] = up.server_reply
538       if up.server_reply.answer == old_answer.answer:
539         # same content (filter) (version upgrade was unrelated)
540         filter_upcall = True
541         self._LogFilter(salt, up.server_reply, old_answer)
542       # else: different content, pass up a second answer
543     else:
544       # older or same-version answer (duplicate or outdated, filter)
545       if (up.server_reply.serial == self._answers[salt].serial and
546           up.server_reply.answer != self._answers[salt].answer):
547         self.consistent[salt] = False
548       filter_upcall = True
549       self._LogFilter(salt, up.server_reply, self._answers[salt])
550
551     return filter_upcall
552
553   def __call__(self, up):
554     """Filtering callback
555
556     @type up: L{ConfdUpcallPayload}
557     @param up: upper callback
558
559     """
560     filter_upcall = False
561     if up.type == UPCALL_REPLY:
562       filter_upcall = self._HandleReply(up)
563     elif up.type == UPCALL_EXPIRE:
564       self._HandleExpire(up)
565
566     if not filter_upcall:
567       self._callback(up)
568
569
570 class ConfdCountingCallback:
571   """Callback that calls another callback, and counts the answers
572
573   """
574   def __init__(self, callback, logger=None):
575     """Constructor for ConfdCountingCallback
576
577     @type callback: f(L{ConfdUpcallPayload})
578     @param callback: function to call when getting answers
579     @type logger: logging.Logger
580     @param logger: optional logger for internal conditions
581
582     """
583     if not callable(callback):
584       raise errors.ProgrammerError("callback must be callable")
585
586     self._callback = callback
587     self._logger = logger
588     # answers contains a dict of salt -> count
589     self._answers = {}
590
591   def RegisterQuery(self, salt):
592     if salt in self._answers:
593       raise errors.ProgrammerError("query already registered")
594     self._answers[salt] = 0
595
596   def AllAnswered(self):
597     """Have all the registered queries received at least an answer?
598
599     """
600     return compat.all(self._answers.values())
601
602   def _HandleExpire(self, up):
603     # if we have no answer we have received none, before the expiration.
604     if up.salt in self._answers:
605       del self._answers[up.salt]
606
607   def _HandleReply(self, up):
608     """Handle a single confd reply, and decide whether to filter it.
609
610     @rtype: boolean
611     @return: True if the reply should be filtered, False if it should be passed
612              on to the up-callback
613
614     """
615     if up.salt in self._answers:
616       self._answers[up.salt] += 1
617
618   def __call__(self, up):
619     """Filtering callback
620
621     @type up: L{ConfdUpcallPayload}
622     @param up: upper callback
623
624     """
625     if up.type == UPCALL_REPLY:
626       self._HandleReply(up)
627     elif up.type == UPCALL_EXPIRE:
628       self._HandleExpire(up)
629     self._callback(up)
630
631
632 class StoreResultCallback:
633   """Callback that simply stores the most recent answer.
634
635   @ivar _answers: dict of salt to (have_answer, reply)
636
637   """
638   _NO_KEY = (False, None)
639
640   def __init__(self):
641     """Constructor for StoreResultCallback
642
643     """
644     # answers contains a dict of salt -> best result
645     self._answers = {}
646
647   def GetResponse(self, salt):
648     """Return the best match for a salt
649
650     """
651     return self._answers.get(salt, self._NO_KEY)
652
653   def _HandleExpire(self, up):
654     """Expiration handler.
655
656     """
657     if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
658       del self._answers[up.salt]
659
660   def _HandleReply(self, up):
661     """Handle a single confd reply, and decide whether to filter it.
662
663     """
664     self._answers[up.salt] = (True, up)
665
666   def __call__(self, up):
667     """Filtering callback
668
669     @type up: L{ConfdUpcallPayload}
670     @param up: upper callback
671
672     """
673     if up.type == UPCALL_REPLY:
674       self._HandleReply(up)
675     elif up.type == UPCALL_EXPIRE:
676       self._HandleExpire(up)
677
678
679 def GetConfdClient(callback):
680   """Return a client configured using the given callback.
681
682   This is handy to abstract the MC list and HMAC key reading.
683
684   @attention: This should only be called on nodes which are part of a
685       cluster, since it depends on a valid (ganeti) data directory;
686       for code running outside of a cluster, you need to create the
687       client manually
688
689   """
690   ss = ssconf.SimpleStore()
691   mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
692   mc_list = utils.ReadFile(mc_file).splitlines()
693   hmac_key = utils.ReadFile(pathutils.CONFD_HMAC_KEY)
694   return ConfdClient(hmac_key, mc_list, callback)