4 # Copyright (C) 2009, 2010, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti confd client
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
34 client = ConfdClient(...) # includes callback specification
35 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 client.SendRequest(req)
37 # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
39 # And your callback will be called by asyncore, when your query gets a
40 # response, or when it expires.
42 You can use the provided ConfdFilterCallback to act as a filter, only passing
43 "newer" answer to your callback, and filtering out outdated ones, or ones
44 confirming what you already got.
48 # pylint: disable=E0203
50 # E0203: Access to member %r before its definition, since we use
51 # objects.py which doesn't explicitly initialise its members
56 from ganeti import utils
57 from ganeti import constants
58 from ganeti import objects
59 from ganeti import serializer
60 from ganeti import daemon # contains AsyncUDPSocket
61 from ganeti import errors
62 from ganeti import confd
63 from ganeti import ssconf
64 from ganeti import compat
65 from ganeti import netutils
66 from ganeti import pathutils
69 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
70 """Confd udp asyncore client
72 This is kept separate from the main ConfdClient to make sure it's easy to
73 implement a non-asyncore based client library.
76 def __init__(self, client, family):
77 """Constructor for ConfdAsyncUDPClient
79 @type client: L{ConfdClient}
80 @param client: client library, to pass the datagrams to
83 daemon.AsyncUDPSocket.__init__(self, family)
86 # this method is overriding a daemon.AsyncUDPSocket method
87 def handle_datagram(self, payload, ip, port):
88 self.client.HandleResponse(payload, ip, port)
91 class _Request(object):
92 """Request status structure.
94 @ivar request: the request data
95 @ivar args: any extra arguments for the callback
96 @ivar expiry: the expiry timestamp of the request
97 @ivar sent: the set of contacted peers
98 @ivar rcvd: the set of peers who replied
101 def __init__(self, request, args, expiry, sent):
102 self.request = request
105 self.sent = frozenset(sent)
110 """Send queries to confd, and get back answers.
112 Since the confd model works by querying multiple master candidates, and
113 getting back answers, this is an asynchronous library. It can either work
114 through asyncore or with your own handling.
116 @type _requests: dict
117 @ivar _requests: dictionary indexes by salt, which contains data
118 about the outstanding requests; the values are objects of type
122 def __init__(self, hmac_key, peers, callback, port=None, logger=None):
123 """Constructor for ConfdClient
125 @type hmac_key: string
126 @param hmac_key: hmac key to talk to confd
128 @param peers: list of peer nodes
129 @type callback: f(L{ConfdUpcallPayload})
130 @param callback: function to call when getting answers
132 @param port: confd port (default: use GetDaemonPort)
133 @type logger: logging.Logger
134 @param logger: optional logger for internal conditions
137 if not callable(callback):
138 raise errors.ProgrammerError("callback must be callable")
140 self.UpdatePeerList(peers)
141 self._SetPeersAddressFamily()
142 self._hmac_key = hmac_key
143 self._socket = ConfdAsyncUDPClient(self, self._family)
144 self._callback = callback
145 self._confd_port = port
146 self._logger = logger
149 if self._confd_port is None:
150 self._confd_port = netutils.GetDaemonPort(constants.CONFD)
152 def UpdatePeerList(self, peers):
153 """Update the list of peers
156 @param peers: list of peer nodes
159 # we are actually called from init, so:
160 # pylint: disable=W0201
161 if not isinstance(peers, list):
162 raise errors.ProgrammerError("peers must be a list")
163 # make a copy of peers, since we're going to shuffle the list, later
164 self._peers = list(peers)
166 def _PackRequest(self, request, now=None):
167 """Prepare a request to be sent on the wire.
169 This function puts a proper salt in a confd request, puts the proper salt,
170 and adds the correct magic number.
176 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
177 return confd.PackMagic(req)
179 def _UnpackReply(self, payload):
180 in_payload = confd.UnpackMagic(payload)
181 (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
182 answer = objects.ConfdReply.FromDict(dict_answer)
185 def ExpireRequests(self):
186 """Delete all the expired requests.
190 for rsalt, rq in self._requests.items():
192 del self._requests[rsalt]
193 client_reply = ConfdUpcallPayload(salt=rsalt,
195 orig_request=rq.request,
199 self._callback(client_reply)
201 def SendRequest(self, request, args=None, coverage=0, async=True):
202 """Send a confd request to some MCs
204 @type request: L{objects.ConfdRequest}
205 @param request: the request to send
207 @param args: additional callback arguments
208 @type coverage: integer
209 @param coverage: number of remote nodes to contact; if default
210 (0), it will use a reasonable default
211 (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
212 passed, it will use the maximum number of peers, otherwise the
213 number passed in will be used
215 @param async: handle the write asynchronously
219 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
221 coverage = len(self._peers)
223 if coverage > len(self._peers):
224 raise errors.ConfdClientError("Not enough MCs known to provide the"
227 if not request.rsalt:
228 raise errors.ConfdClientError("Missing request rsalt")
230 self.ExpireRequests()
231 if request.rsalt in self._requests:
232 raise errors.ConfdClientError("Duplicate request rsalt")
234 if request.type not in constants.CONFD_REQS:
235 raise errors.ConfdClientError("Invalid request type")
237 random.shuffle(self._peers)
238 targets = self._peers[:coverage]
241 payload = self._PackRequest(request, now=now)
243 for target in targets:
245 self._socket.enqueue_send(target, self._confd_port, payload)
246 except errors.UdpDataSizeError:
247 raise errors.ConfdClientError("Request too big")
249 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
250 self._requests[request.rsalt] = _Request(request, args, expire_time,
254 self.FlushSendQueue()
256 def HandleResponse(self, payload, ip, port):
257 """Asynchronous handler for a confd reply
259 Call the relevant callback associated to the current request.
264 answer, salt = self._UnpackReply(payload)
265 except (errors.SignatureError, errors.ConfdMagicError), err:
267 self._logger.debug("Discarding broken package: %s" % err)
271 rq = self._requests[salt]
274 self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
279 client_reply = ConfdUpcallPayload(salt=salt,
282 orig_request=rq.request,
288 self._callback(client_reply)
291 self.ExpireRequests()
293 def FlushSendQueue(self):
294 """Send out all pending requests.
296 Can be used for synchronous client use.
299 while self._socket.writable():
300 self._socket.handle_write()
302 def ReceiveReply(self, timeout=1):
303 """Receive one reply.
306 @param timeout: how long to wait for the reply
308 @return: True if some data has been handled, False otherwise
311 return self._socket.process_next_packet(timeout=timeout)
314 def _NeededReplies(peer_cnt):
315 """Compute the minimum safe number of replies for a query.
317 The algorithm is designed to work well for both small and big
319 - for less than three, we require all responses
320 - for less than five, we allow one miss
321 - otherwise, half the number plus one
323 This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
324 4->2, 5->3, 6->3, 7->4, etc.
327 @param peer_cnt: the number of peers contacted
329 @return: the number of replies which should give a safe coverage
337 return int(peer_cnt / 2) + 1
339 def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
340 """Wait for replies to a given request.
342 This method will wait until either the timeout expires or a
343 minimum number (computed using L{_NeededReplies}) of replies are
344 received for the given salt. It is useful when doing synchronous
345 calls to this library.
347 @param salt: the salt of the request we want responses for
348 @param timeout: the maximum timeout (should be less or equal to
349 L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
351 @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
352 request is unknown, timed_out will be true and the counters
356 def _CheckResponse():
357 if salt not in self._requests:
360 self._logger.debug("Discarding unknown/expired request: %s" % salt)
362 rq = self._requests[salt]
363 if len(rq.rcvd) >= expected:
364 # already got all replies
365 return (False, len(rq.sent), len(rq.rcvd))
366 # else wait, using default timeout
368 raise utils.RetryAgain()
370 MISSING = (True, 0, 0)
372 if salt not in self._requests:
374 # extend the expire time with the current timeout, so that we
375 # don't get the request expired from under us
376 rq = self._requests[salt]
379 expected = self._NeededReplies(sent)
382 return utils.Retry(_CheckResponse, 0, timeout)
383 except utils.RetryTimeout:
384 if salt in self._requests:
385 rq = self._requests[salt]
386 return (True, len(rq.sent), len(rq.rcvd))
390 def _SetPeersAddressFamily(self):
392 raise errors.ConfdClientError("Peer list empty")
394 peer = self._peers[0]
395 self._family = netutils.IPAddress.GetAddressFamily(peer)
396 for peer in self._peers[1:]:
397 if netutils.IPAddress.GetAddressFamily(peer) != self._family:
398 raise errors.ConfdClientError("Peers must be of same address family")
399 except errors.IPAddressError:
400 raise errors.ConfdClientError("Peer address %s invalid" % peer)
403 # UPCALL_REPLY: server reply upcall
404 # has all ConfdUpcallPayload fields populated
406 # UPCALL_EXPIRE: internal library request expire
407 # has only salt, type, orig_request and extra_args
409 CONFD_UPCALL_TYPES = compat.UniqueFrozenset([
415 class ConfdUpcallPayload(objects.ConfigObject):
416 """Callback argument for confd replies
419 @ivar salt: salt associated with the query
420 @type type: one of confd.client.CONFD_UPCALL_TYPES
421 @ivar type: upcall type (server reply, expired request, ...)
422 @type orig_request: L{objects.ConfdRequest}
423 @ivar orig_request: original request
424 @type server_reply: L{objects.ConfdReply}
425 @ivar server_reply: server reply
426 @type server_ip: string
427 @ivar server_ip: answering server ip address
428 @type server_port: int
429 @ivar server_port: answering server port
430 @type extra_args: any
431 @ivar extra_args: 'args' argument of the SendRequest function
432 @type client: L{ConfdClient}
433 @ivar client: current confd client instance
448 class ConfdClientRequest(objects.ConfdRequest):
449 """This is the client-side version of ConfdRequest.
451 This version of the class helps creating requests, on the client side, by
452 filling in some default values.
455 def __init__(self, **kwargs):
456 objects.ConfdRequest.__init__(self, **kwargs)
458 self.rsalt = utils.NewUUID()
459 if not self.protocol:
460 self.protocol = constants.CONFD_PROTOCOL_VERSION
461 if self.type not in constants.CONFD_REQS:
462 raise errors.ConfdClientError("Invalid request type")
465 class ConfdFilterCallback:
466 """Callback that calls another callback, but filters duplicate results.
468 @ivar consistent: a dictionary indexed by salt; for each salt, if
469 all responses ware identical, this will be True; this is the
470 expected state on a healthy cluster; on inconsistent or
471 partitioned clusters, this might be False, if we see answers
472 with the same serial but different contents
475 def __init__(self, callback, logger=None):
476 """Constructor for ConfdFilterCallback
478 @type callback: f(L{ConfdUpcallPayload})
479 @param callback: function to call when getting answers
480 @type logger: logging.Logger
481 @param logger: optional logger for internal conditions
484 if not callable(callback):
485 raise errors.ProgrammerError("callback must be callable")
487 self._callback = callback
488 self._logger = logger
489 # answers contains a dict of salt -> answer
493 def _LogFilter(self, salt, new_reply, old_reply):
497 if new_reply.serial > old_reply.serial:
498 self._logger.debug("Filtering confirming answer, with newer"
499 " serial for query %s" % salt)
500 elif new_reply.serial == old_reply.serial:
501 if new_reply.answer != old_reply.answer:
502 self._logger.warning("Got incoherent answers for query %s"
503 " (serial: %s)" % (salt, new_reply.serial))
505 self._logger.debug("Filtering confirming answer, with same"
506 " serial for query %s" % salt)
508 self._logger.debug("Filtering outdated answer for query %s"
509 " serial: (%d < %d)" % (salt, old_reply.serial,
512 def _HandleExpire(self, up):
513 # if we have no answer we have received none, before the expiration.
514 if up.salt in self._answers:
515 del self._answers[up.salt]
516 if up.salt in self.consistent:
517 del self.consistent[up.salt]
519 def _HandleReply(self, up):
520 """Handle a single confd reply, and decide whether to filter it.
523 @return: True if the reply should be filtered, False if it should be passed
524 on to the up-callback
527 filter_upcall = False
529 if salt not in self.consistent:
530 self.consistent[salt] = True
531 if salt not in self._answers:
532 # first answer for a query (don't filter, and record)
533 self._answers[salt] = up.server_reply
534 elif up.server_reply.serial > self._answers[salt].serial:
535 # newer answer (record, and compare contents)
536 old_answer = self._answers[salt]
537 self._answers[salt] = up.server_reply
538 if up.server_reply.answer == old_answer.answer:
539 # same content (filter) (version upgrade was unrelated)
541 self._LogFilter(salt, up.server_reply, old_answer)
542 # else: different content, pass up a second answer
544 # older or same-version answer (duplicate or outdated, filter)
545 if (up.server_reply.serial == self._answers[salt].serial and
546 up.server_reply.answer != self._answers[salt].answer):
547 self.consistent[salt] = False
549 self._LogFilter(salt, up.server_reply, self._answers[salt])
553 def __call__(self, up):
554 """Filtering callback
556 @type up: L{ConfdUpcallPayload}
557 @param up: upper callback
560 filter_upcall = False
561 if up.type == UPCALL_REPLY:
562 filter_upcall = self._HandleReply(up)
563 elif up.type == UPCALL_EXPIRE:
564 self._HandleExpire(up)
566 if not filter_upcall:
570 class ConfdCountingCallback:
571 """Callback that calls another callback, and counts the answers
574 def __init__(self, callback, logger=None):
575 """Constructor for ConfdCountingCallback
577 @type callback: f(L{ConfdUpcallPayload})
578 @param callback: function to call when getting answers
579 @type logger: logging.Logger
580 @param logger: optional logger for internal conditions
583 if not callable(callback):
584 raise errors.ProgrammerError("callback must be callable")
586 self._callback = callback
587 self._logger = logger
588 # answers contains a dict of salt -> count
591 def RegisterQuery(self, salt):
592 if salt in self._answers:
593 raise errors.ProgrammerError("query already registered")
594 self._answers[salt] = 0
596 def AllAnswered(self):
597 """Have all the registered queries received at least an answer?
600 return compat.all(self._answers.values())
602 def _HandleExpire(self, up):
603 # if we have no answer we have received none, before the expiration.
604 if up.salt in self._answers:
605 del self._answers[up.salt]
607 def _HandleReply(self, up):
608 """Handle a single confd reply, and decide whether to filter it.
611 @return: True if the reply should be filtered, False if it should be passed
612 on to the up-callback
615 if up.salt in self._answers:
616 self._answers[up.salt] += 1
618 def __call__(self, up):
619 """Filtering callback
621 @type up: L{ConfdUpcallPayload}
622 @param up: upper callback
625 if up.type == UPCALL_REPLY:
626 self._HandleReply(up)
627 elif up.type == UPCALL_EXPIRE:
628 self._HandleExpire(up)
632 class StoreResultCallback:
633 """Callback that simply stores the most recent answer.
635 @ivar _answers: dict of salt to (have_answer, reply)
638 _NO_KEY = (False, None)
641 """Constructor for StoreResultCallback
644 # answers contains a dict of salt -> best result
647 def GetResponse(self, salt):
648 """Return the best match for a salt
651 return self._answers.get(salt, self._NO_KEY)
653 def _HandleExpire(self, up):
654 """Expiration handler.
657 if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
658 del self._answers[up.salt]
660 def _HandleReply(self, up):
661 """Handle a single confd reply, and decide whether to filter it.
664 self._answers[up.salt] = (True, up)
666 def __call__(self, up):
667 """Filtering callback
669 @type up: L{ConfdUpcallPayload}
670 @param up: upper callback
673 if up.type == UPCALL_REPLY:
674 self._HandleReply(up)
675 elif up.type == UPCALL_EXPIRE:
676 self._HandleExpire(up)
679 def GetConfdClient(callback):
680 """Return a client configured using the given callback.
682 This is handy to abstract the MC list and HMAC key reading.
684 @attention: This should only be called on nodes which are part of a
685 cluster, since it depends on a valid (ganeti) data directory;
686 for code running outside of a cluster, you need to create the
690 ss = ssconf.SimpleStore()
691 mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
692 mc_list = utils.ReadFile(mc_file).splitlines()
693 hmac_key = utils.ReadFile(pathutils.CONFD_HMAC_KEY)
694 return ConfdClient(hmac_key, mc_list, callback)