4 # Copyright (C) 2009 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti confd client
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
34 client = ConfdClient(...) # includes callback specification
35 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 client.SendRequest(req)
37 # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
39 # And your callback will be called by asyncore, when your query gets a
40 # response, or when it expires.
42 You can use the provided ConfdFilterCallback to act as a filter, only passing
43 "newer" answer to your callback, and filtering out outdated ones, or ones
44 confirming what you already got.
48 # pylint: disable-msg=E0203
50 # E0203: Access to member %r before its definition, since we use
51 # objects.py which doesn't explicitely initialise its members
56 from ganeti import utils
57 from ganeti import constants
58 from ganeti import objects
59 from ganeti import serializer
60 from ganeti import daemon # contains AsyncUDPSocket
61 from ganeti import errors
62 from ganeti import confd
63 from ganeti import ssconf
64 from ganeti import compat
65 from ganeti import netutils
68 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
69 """Confd udp asyncore client
71 This is kept separate from the main ConfdClient to make sure it's easy to
72 implement a non-asyncore based client library.
75 def __init__(self, client, family):
76 """Constructor for ConfdAsyncUDPClient
78 @type client: L{ConfdClient}
79 @param client: client library, to pass the datagrams to
82 daemon.AsyncUDPSocket.__init__(self, family)
85 # this method is overriding a daemon.AsyncUDPSocket method
86 def handle_datagram(self, payload, ip, port):
87 self.client.HandleResponse(payload, ip, port)
90 class _Request(object):
91 """Request status structure.
93 @ivar request: the request data
94 @ivar args: any extra arguments for the callback
95 @ivar expiry: the expiry timestamp of the request
96 @ivar sent: the set of contacted peers
97 @ivar rcvd: the set of peers who replied
100 def __init__(self, request, args, expiry, sent):
101 self.request = request
104 self.sent = frozenset(sent)
109 """Send queries to confd, and get back answers.
111 Since the confd model works by querying multiple master candidates, and
112 getting back answers, this is an asynchronous library. It can either work
113 through asyncore or with your own handling.
115 @type _requests: dict
116 @ivar _requests: dictionary indexes by salt, which contains data
117 about the outstanding requests; the values are objects of type
121 def __init__(self, hmac_key, peers, callback, port=None, logger=None):
122 """Constructor for ConfdClient
124 @type hmac_key: string
125 @param hmac_key: hmac key to talk to confd
127 @param peers: list of peer nodes
128 @type callback: f(L{ConfdUpcallPayload})
129 @param callback: function to call when getting answers
131 @param port: confd port (default: use GetDaemonPort)
132 @type logger: logging.Logger
133 @param logger: optional logger for internal conditions
136 if not callable(callback):
137 raise errors.ProgrammerError("callback must be callable")
139 self.UpdatePeerList(peers)
140 self._SetPeersAddressFamily()
141 self._hmac_key = hmac_key
142 self._socket = ConfdAsyncUDPClient(self, self._family)
143 self._callback = callback
144 self._confd_port = port
145 self._logger = logger
148 if self._confd_port is None:
149 self._confd_port = netutils.GetDaemonPort(constants.CONFD)
151 def UpdatePeerList(self, peers):
152 """Update the list of peers
155 @param peers: list of peer nodes
158 # we are actually called from init, so:
159 # pylint: disable-msg=W0201
160 if not isinstance(peers, list):
161 raise errors.ProgrammerError("peers must be a list")
162 # make a copy of peers, since we're going to shuffle the list, later
163 self._peers = list(peers)
165 def _PackRequest(self, request, now=None):
166 """Prepare a request to be sent on the wire.
168 This function puts a proper salt in a confd request, puts the proper salt,
169 and adds the correct magic number.
175 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
176 return confd.PackMagic(req)
178 def _UnpackReply(self, payload):
179 in_payload = confd.UnpackMagic(payload)
180 (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
181 answer = objects.ConfdReply.FromDict(dict_answer)
184 def ExpireRequests(self):
185 """Delete all the expired requests.
189 for rsalt, rq in self._requests.items():
191 del self._requests[rsalt]
192 client_reply = ConfdUpcallPayload(salt=rsalt,
194 orig_request=rq.request,
198 self._callback(client_reply)
200 def SendRequest(self, request, args=None, coverage=0, async=True):
201 """Send a confd request to some MCs
203 @type request: L{objects.ConfdRequest}
204 @param request: the request to send
206 @param args: additional callback arguments
207 @type coverage: integer
208 @param coverage: number of remote nodes to contact; if default
209 (0), it will use a reasonable default
210 (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
211 passed, it will use the maximum number of peers, otherwise the
212 number passed in will be used
214 @param async: handle the write asynchronously
218 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
220 coverage = len(self._peers)
222 if coverage > len(self._peers):
223 raise errors.ConfdClientError("Not enough MCs known to provide the"
226 if not request.rsalt:
227 raise errors.ConfdClientError("Missing request rsalt")
229 self.ExpireRequests()
230 if request.rsalt in self._requests:
231 raise errors.ConfdClientError("Duplicate request rsalt")
233 if request.type not in constants.CONFD_REQS:
234 raise errors.ConfdClientError("Invalid request type")
236 random.shuffle(self._peers)
237 targets = self._peers[:coverage]
240 payload = self._PackRequest(request, now=now)
242 for target in targets:
244 self._socket.enqueue_send(target, self._confd_port, payload)
245 except errors.UdpDataSizeError:
246 raise errors.ConfdClientError("Request too big")
248 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
249 self._requests[request.rsalt] = _Request(request, args, expire_time,
253 self.FlushSendQueue()
255 def HandleResponse(self, payload, ip, port):
256 """Asynchronous handler for a confd reply
258 Call the relevant callback associated to the current request.
263 answer, salt = self._UnpackReply(payload)
264 except (errors.SignatureError, errors.ConfdMagicError), err:
266 self._logger.debug("Discarding broken package: %s" % err)
270 rq = self._requests[salt]
273 self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
278 client_reply = ConfdUpcallPayload(salt=salt,
281 orig_request=rq.request,
287 self._callback(client_reply)
290 self.ExpireRequests()
292 def FlushSendQueue(self):
293 """Send out all pending requests.
295 Can be used for synchronous client use.
298 while self._socket.writable():
299 self._socket.handle_write()
301 def ReceiveReply(self, timeout=1):
302 """Receive one reply.
305 @param timeout: how long to wait for the reply
307 @return: True if some data has been handled, False otherwise
310 return self._socket.process_next_packet(timeout=timeout)
313 def _NeededReplies(peer_cnt):
314 """Compute the minimum safe number of replies for a query.
316 The algorithm is designed to work well for both small and big
318 - for less than three, we require all responses
319 - for less than five, we allow one miss
320 - otherwise, half the number plus one
322 This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
323 4->2, 5->3, 6->3, 7->4, etc.
326 @param peer_cnt: the number of peers contacted
328 @return: the number of replies which should give a safe coverage
336 return int(peer_cnt/2) + 1
338 def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
339 """Wait for replies to a given request.
341 This method will wait until either the timeout expires or a
342 minimum number (computed using L{_NeededReplies}) of replies are
343 received for the given salt. It is useful when doing synchronous
344 calls to this library.
346 @param salt: the salt of the request we want responses for
347 @param timeout: the maximum timeout (should be less or equal to
348 L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
350 @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
351 request is unknown, timed_out will be true and the counters
355 def _CheckResponse():
356 if salt not in self._requests:
359 self._logger.debug("Discarding unknown/expired request: %s" % salt)
361 rq = self._requests[salt]
362 if len(rq.rcvd) >= expected:
363 # already got all replies
364 return (False, len(rq.sent), len(rq.rcvd))
365 # else wait, using default timeout
367 raise utils.RetryAgain()
369 MISSING = (True, 0, 0)
371 if salt not in self._requests:
373 # extend the expire time with the current timeout, so that we
374 # don't get the request expired from under us
375 rq = self._requests[salt]
378 expected = self._NeededReplies(sent)
381 return utils.Retry(_CheckResponse, 0, timeout)
382 except utils.RetryTimeout:
383 if salt in self._requests:
384 rq = self._requests[salt]
385 return (True, len(rq.sent), len(rq.rcvd))
389 def _SetPeersAddressFamily(self):
391 raise errors.ConfdClientError("Peer list empty")
393 peer = self._peers[0]
394 self._family = netutils.GetAddressFamily(peer)
395 for peer in self._peers[1:]:
396 if netutils.GetAddressFamily(peer) != self._family:
397 raise errors.ConfdClientError("Peers must be of same address family")
398 except errors.GenericError:
399 raise errors.ConfdClientError("Peer address %s invalid" % peer)
402 # UPCALL_REPLY: server reply upcall
403 # has all ConfdUpcallPayload fields populated
405 # UPCALL_EXPIRE: internal library request expire
406 # has only salt, type, orig_request and extra_args
408 CONFD_UPCALL_TYPES = frozenset([
414 class ConfdUpcallPayload(objects.ConfigObject):
415 """Callback argument for confd replies
418 @ivar salt: salt associated with the query
419 @type type: one of confd.client.CONFD_UPCALL_TYPES
420 @ivar type: upcall type (server reply, expired request, ...)
421 @type orig_request: L{objects.ConfdRequest}
422 @ivar orig_request: original request
423 @type server_reply: L{objects.ConfdReply}
424 @ivar server_reply: server reply
425 @type server_ip: string
426 @ivar server_ip: answering server ip address
427 @type server_port: int
428 @ivar server_port: answering server port
429 @type extra_args: any
430 @ivar extra_args: 'args' argument of the SendRequest function
431 @type client: L{ConfdClient}
432 @ivar client: current confd client instance
447 class ConfdClientRequest(objects.ConfdRequest):
448 """This is the client-side version of ConfdRequest.
450 This version of the class helps creating requests, on the client side, by
451 filling in some default values.
454 def __init__(self, **kwargs):
455 objects.ConfdRequest.__init__(self, **kwargs)
457 self.rsalt = utils.NewUUID()
458 if not self.protocol:
459 self.protocol = constants.CONFD_PROTOCOL_VERSION
460 if self.type not in constants.CONFD_REQS:
461 raise errors.ConfdClientError("Invalid request type")
464 class ConfdFilterCallback:
465 """Callback that calls another callback, but filters duplicate results.
467 @ivar consistent: a dictionary indexed by salt; for each salt, if
468 all responses ware identical, this will be True; this is the
469 expected state on a healthy cluster; on inconsistent or
470 partitioned clusters, this might be False, if we see answers
471 with the same serial but different contents
474 def __init__(self, callback, logger=None):
475 """Constructor for ConfdFilterCallback
477 @type callback: f(L{ConfdUpcallPayload})
478 @param callback: function to call when getting answers
479 @type logger: logging.Logger
480 @param logger: optional logger for internal conditions
483 if not callable(callback):
484 raise errors.ProgrammerError("callback must be callable")
486 self._callback = callback
487 self._logger = logger
488 # answers contains a dict of salt -> answer
492 def _LogFilter(self, salt, new_reply, old_reply):
496 if new_reply.serial > old_reply.serial:
497 self._logger.debug("Filtering confirming answer, with newer"
498 " serial for query %s" % salt)
499 elif new_reply.serial == old_reply.serial:
500 if new_reply.answer != old_reply.answer:
501 self._logger.warning("Got incoherent answers for query %s"
502 " (serial: %s)" % (salt, new_reply.serial))
504 self._logger.debug("Filtering confirming answer, with same"
505 " serial for query %s" % salt)
507 self._logger.debug("Filtering outdated answer for query %s"
508 " serial: (%d < %d)" % (salt, old_reply.serial,
511 def _HandleExpire(self, up):
512 # if we have no answer we have received none, before the expiration.
513 if up.salt in self._answers:
514 del self._answers[up.salt]
515 if up.salt in self.consistent:
516 del self.consistent[up.salt]
518 def _HandleReply(self, up):
519 """Handle a single confd reply, and decide whether to filter it.
522 @return: True if the reply should be filtered, False if it should be passed
523 on to the up-callback
526 filter_upcall = False
528 if salt not in self.consistent:
529 self.consistent[salt] = True
530 if salt not in self._answers:
531 # first answer for a query (don't filter, and record)
532 self._answers[salt] = up.server_reply
533 elif up.server_reply.serial > self._answers[salt].serial:
534 # newer answer (record, and compare contents)
535 old_answer = self._answers[salt]
536 self._answers[salt] = up.server_reply
537 if up.server_reply.answer == old_answer.answer:
538 # same content (filter) (version upgrade was unrelated)
540 self._LogFilter(salt, up.server_reply, old_answer)
541 # else: different content, pass up a second answer
543 # older or same-version answer (duplicate or outdated, filter)
544 if (up.server_reply.serial == self._answers[salt].serial and
545 up.server_reply.answer != self._answers[salt].answer):
546 self.consistent[salt] = False
548 self._LogFilter(salt, up.server_reply, self._answers[salt])
552 def __call__(self, up):
553 """Filtering callback
555 @type up: L{ConfdUpcallPayload}
556 @param up: upper callback
559 filter_upcall = False
560 if up.type == UPCALL_REPLY:
561 filter_upcall = self._HandleReply(up)
562 elif up.type == UPCALL_EXPIRE:
563 self._HandleExpire(up)
565 if not filter_upcall:
569 class ConfdCountingCallback:
570 """Callback that calls another callback, and counts the answers
573 def __init__(self, callback, logger=None):
574 """Constructor for ConfdCountingCallback
576 @type callback: f(L{ConfdUpcallPayload})
577 @param callback: function to call when getting answers
578 @type logger: logging.Logger
579 @param logger: optional logger for internal conditions
582 if not callable(callback):
583 raise errors.ProgrammerError("callback must be callable")
585 self._callback = callback
586 self._logger = logger
587 # answers contains a dict of salt -> count
590 def RegisterQuery(self, salt):
591 if salt in self._answers:
592 raise errors.ProgrammerError("query already registered")
593 self._answers[salt] = 0
595 def AllAnswered(self):
596 """Have all the registered queries received at least an answer?
599 return compat.all(self._answers.values())
601 def _HandleExpire(self, up):
602 # if we have no answer we have received none, before the expiration.
603 if up.salt in self._answers:
604 del self._answers[up.salt]
606 def _HandleReply(self, up):
607 """Handle a single confd reply, and decide whether to filter it.
610 @return: True if the reply should be filtered, False if it should be passed
611 on to the up-callback
614 if up.salt in self._answers:
615 self._answers[up.salt] += 1
617 def __call__(self, up):
618 """Filtering callback
620 @type up: L{ConfdUpcallPayload}
621 @param up: upper callback
624 if up.type == UPCALL_REPLY:
625 self._HandleReply(up)
626 elif up.type == UPCALL_EXPIRE:
627 self._HandleExpire(up)
631 class StoreResultCallback:
632 """Callback that simply stores the most recent answer.
634 @ivar _answers: dict of salt to (have_answer, reply)
637 _NO_KEY = (False, None)
640 """Constructor for StoreResultCallback
643 # answers contains a dict of salt -> best result
646 def GetResponse(self, salt):
647 """Return the best match for a salt
650 return self._answers.get(salt, self._NO_KEY)
652 def _HandleExpire(self, up):
653 """Expiration handler.
656 if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
657 del self._answers[up.salt]
659 def _HandleReply(self, up):
660 """Handle a single confd reply, and decide whether to filter it.
663 self._answers[up.salt] = (True, up)
665 def __call__(self, up):
666 """Filtering callback
668 @type up: L{ConfdUpcallPayload}
669 @param up: upper callback
672 if up.type == UPCALL_REPLY:
673 self._HandleReply(up)
674 elif up.type == UPCALL_EXPIRE:
675 self._HandleExpire(up)
678 def GetConfdClient(callback):
679 """Return a client configured using the given callback.
681 This is handy to abstract the MC list and HMAC key reading.
683 @attention: This should only be called on nodes which are part of a
684 cluster, since it depends on a valid (ganeti) data directory;
685 for code running outside of a cluster, you need to create the
689 ss = ssconf.SimpleStore()
690 mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
691 mc_list = utils.ReadFile(mc_file).splitlines()
692 hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
693 return ConfdClient(hmac_key, mc_list, callback)