4 # Copyright (C) 2009 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti confd client
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
34 client = ConfdClient(...) # includes callback specification
35 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 client.SendRequest(req)
37 # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
39 # And your callback will be called by asyncore, when your query gets a
40 # response, or when it expires.
42 You can use the provided ConfdFilterCallback to act as a filter, only passing
43 "newer" answer to your callback, and filtering out outdated ones, or ones
44 confirming what you already got.
48 # pylint: disable-msg=E0203
50 # E0203: Access to member %r before its definition, since we use
51 # objects.py which doesn't explicitely initialise its members
56 from ganeti import utils
57 from ganeti import constants
58 from ganeti import objects
59 from ganeti import serializer
60 from ganeti import daemon # contains AsyncUDPSocket
61 from ganeti import errors
62 from ganeti import confd
63 from ganeti import ssconf
64 from ganeti import compat
65 from ganeti import netutils
68 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
69 """Confd udp asyncore client
71 This is kept separate from the main ConfdClient to make sure it's easy to
72 implement a non-asyncore based client library.
75 def __init__(self, client):
76 """Constructor for ConfdAsyncUDPClient
78 @type client: L{ConfdClient}
79 @param client: client library, to pass the datagrams to
82 daemon.AsyncUDPSocket.__init__(self)
85 # this method is overriding a daemon.AsyncUDPSocket method
86 def handle_datagram(self, payload, ip, port):
87 self.client.HandleResponse(payload, ip, port)
90 class _Request(object):
91 """Request status structure.
93 @ivar request: the request data
94 @ivar args: any extra arguments for the callback
95 @ivar expiry: the expiry timestamp of the request
96 @ivar sent: the set of contacted peers
97 @ivar rcvd: the set of peers who replied
100 def __init__(self, request, args, expiry, sent):
101 self.request = request
104 self.sent = frozenset(sent)
109 """Send queries to confd, and get back answers.
111 Since the confd model works by querying multiple master candidates, and
112 getting back answers, this is an asynchronous library. It can either work
113 through asyncore or with your own handling.
115 @type _requests: dict
116 @ivar _requests: dictionary indexes by salt, which contains data
117 about the outstanding requests; the values are objects of type
121 def __init__(self, hmac_key, peers, callback, port=None, logger=None):
122 """Constructor for ConfdClient
124 @type hmac_key: string
125 @param hmac_key: hmac key to talk to confd
127 @param peers: list of peer nodes
128 @type callback: f(L{ConfdUpcallPayload})
129 @param callback: function to call when getting answers
131 @param port: confd port (default: use GetDaemonPort)
132 @type logger: logging.Logger
133 @param logger: optional logger for internal conditions
136 if not callable(callback):
137 raise errors.ProgrammerError("callback must be callable")
139 self.UpdatePeerList(peers)
140 self._hmac_key = hmac_key
141 self._socket = ConfdAsyncUDPClient(self)
142 self._callback = callback
143 self._confd_port = port
144 self._logger = logger
147 if self._confd_port is None:
148 self._confd_port = netutils.GetDaemonPort(constants.CONFD)
150 def UpdatePeerList(self, peers):
151 """Update the list of peers
154 @param peers: list of peer nodes
157 # we are actually called from init, so:
158 # pylint: disable-msg=W0201
159 if not isinstance(peers, list):
160 raise errors.ProgrammerError("peers must be a list")
161 # make a copy of peers, since we're going to shuffle the list, later
162 self._peers = list(peers)
164 def _PackRequest(self, request, now=None):
165 """Prepare a request to be sent on the wire.
167 This function puts a proper salt in a confd request, puts the proper salt,
168 and adds the correct magic number.
174 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
175 return confd.PackMagic(req)
177 def _UnpackReply(self, payload):
178 in_payload = confd.UnpackMagic(payload)
179 (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
180 answer = objects.ConfdReply.FromDict(dict_answer)
183 def ExpireRequests(self):
184 """Delete all the expired requests.
188 for rsalt, rq in self._requests.items():
190 del self._requests[rsalt]
191 client_reply = ConfdUpcallPayload(salt=rsalt,
193 orig_request=rq.request,
197 self._callback(client_reply)
199 def SendRequest(self, request, args=None, coverage=0, async=True):
200 """Send a confd request to some MCs
202 @type request: L{objects.ConfdRequest}
203 @param request: the request to send
205 @param args: additional callback arguments
206 @type coverage: integer
207 @param coverage: number of remote nodes to contact; if default
208 (0), it will use a reasonable default
209 (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
210 passed, it will use the maximum number of peers, otherwise the
211 number passed in will be used
213 @param async: handle the write asynchronously
217 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
219 coverage = len(self._peers)
221 if coverage > len(self._peers):
222 raise errors.ConfdClientError("Not enough MCs known to provide the"
225 if not request.rsalt:
226 raise errors.ConfdClientError("Missing request rsalt")
228 self.ExpireRequests()
229 if request.rsalt in self._requests:
230 raise errors.ConfdClientError("Duplicate request rsalt")
232 if request.type not in constants.CONFD_REQS:
233 raise errors.ConfdClientError("Invalid request type")
235 random.shuffle(self._peers)
236 targets = self._peers[:coverage]
239 payload = self._PackRequest(request, now=now)
241 for target in targets:
243 self._socket.enqueue_send(target, self._confd_port, payload)
244 except errors.UdpDataSizeError:
245 raise errors.ConfdClientError("Request too big")
247 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
248 self._requests[request.rsalt] = _Request(request, args, expire_time,
252 self.FlushSendQueue()
254 def HandleResponse(self, payload, ip, port):
255 """Asynchronous handler for a confd reply
257 Call the relevant callback associated to the current request.
262 answer, salt = self._UnpackReply(payload)
263 except (errors.SignatureError, errors.ConfdMagicError), err:
265 self._logger.debug("Discarding broken package: %s" % err)
269 rq = self._requests[salt]
272 self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
277 client_reply = ConfdUpcallPayload(salt=salt,
280 orig_request=rq.request,
286 self._callback(client_reply)
289 self.ExpireRequests()
291 def FlushSendQueue(self):
292 """Send out all pending requests.
294 Can be used for synchronous client use.
297 while self._socket.writable():
298 self._socket.handle_write()
300 def ReceiveReply(self, timeout=1):
301 """Receive one reply.
304 @param timeout: how long to wait for the reply
306 @return: True if some data has been handled, False otherwise
309 return self._socket.process_next_packet(timeout=timeout)
312 def _NeededReplies(peer_cnt):
313 """Compute the minimum safe number of replies for a query.
315 The algorithm is designed to work well for both small and big
317 - for less than three, we require all responses
318 - for less than five, we allow one miss
319 - otherwise, half the number plus one
321 This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
322 4->2, 5->3, 6->3, 7->4, etc.
325 @param peer_cnt: the number of peers contacted
327 @return: the number of replies which should give a safe coverage
335 return int(peer_cnt/2) + 1
337 def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
338 """Wait for replies to a given request.
340 This method will wait until either the timeout expires or a
341 minimum number (computed using L{_NeededReplies}) of replies are
342 received for the given salt. It is useful when doing synchronous
343 calls to this library.
345 @param salt: the salt of the request we want responses for
346 @param timeout: the maximum timeout (should be less or equal to
347 L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
349 @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
350 request is unknown, timed_out will be true and the counters
354 def _CheckResponse():
355 if salt not in self._requests:
358 self._logger.debug("Discarding unknown/expired request: %s" % salt)
360 rq = self._requests[salt]
361 if len(rq.rcvd) >= expected:
362 # already got all replies
363 return (False, len(rq.sent), len(rq.rcvd))
364 # else wait, using default timeout
366 raise utils.RetryAgain()
368 MISSING = (True, 0, 0)
370 if salt not in self._requests:
372 # extend the expire time with the current timeout, so that we
373 # don't get the request expired from under us
374 rq = self._requests[salt]
377 expected = self._NeededReplies(sent)
380 return utils.Retry(_CheckResponse, 0, timeout)
381 except utils.RetryTimeout:
382 if salt in self._requests:
383 rq = self._requests[salt]
384 return (True, len(rq.sent), len(rq.rcvd))
389 # UPCALL_REPLY: server reply upcall
390 # has all ConfdUpcallPayload fields populated
392 # UPCALL_EXPIRE: internal library request expire
393 # has only salt, type, orig_request and extra_args
395 CONFD_UPCALL_TYPES = frozenset([
401 class ConfdUpcallPayload(objects.ConfigObject):
402 """Callback argument for confd replies
405 @ivar salt: salt associated with the query
406 @type type: one of confd.client.CONFD_UPCALL_TYPES
407 @ivar type: upcall type (server reply, expired request, ...)
408 @type orig_request: L{objects.ConfdRequest}
409 @ivar orig_request: original request
410 @type server_reply: L{objects.ConfdReply}
411 @ivar server_reply: server reply
412 @type server_ip: string
413 @ivar server_ip: answering server ip address
414 @type server_port: int
415 @ivar server_port: answering server port
416 @type extra_args: any
417 @ivar extra_args: 'args' argument of the SendRequest function
418 @type client: L{ConfdClient}
419 @ivar client: current confd client instance
434 class ConfdClientRequest(objects.ConfdRequest):
435 """This is the client-side version of ConfdRequest.
437 This version of the class helps creating requests, on the client side, by
438 filling in some default values.
441 def __init__(self, **kwargs):
442 objects.ConfdRequest.__init__(self, **kwargs)
444 self.rsalt = utils.NewUUID()
445 if not self.protocol:
446 self.protocol = constants.CONFD_PROTOCOL_VERSION
447 if self.type not in constants.CONFD_REQS:
448 raise errors.ConfdClientError("Invalid request type")
451 class ConfdFilterCallback:
452 """Callback that calls another callback, but filters duplicate results.
454 @ivar consistent: a dictionary indexed by salt; for each salt, if
455 all responses ware identical, this will be True; this is the
456 expected state on a healthy cluster; on inconsistent or
457 partitioned clusters, this might be False, if we see answers
458 with the same serial but different contents
461 def __init__(self, callback, logger=None):
462 """Constructor for ConfdFilterCallback
464 @type callback: f(L{ConfdUpcallPayload})
465 @param callback: function to call when getting answers
466 @type logger: logging.Logger
467 @param logger: optional logger for internal conditions
470 if not callable(callback):
471 raise errors.ProgrammerError("callback must be callable")
473 self._callback = callback
474 self._logger = logger
475 # answers contains a dict of salt -> answer
479 def _LogFilter(self, salt, new_reply, old_reply):
483 if new_reply.serial > old_reply.serial:
484 self._logger.debug("Filtering confirming answer, with newer"
485 " serial for query %s" % salt)
486 elif new_reply.serial == old_reply.serial:
487 if new_reply.answer != old_reply.answer:
488 self._logger.warning("Got incoherent answers for query %s"
489 " (serial: %s)" % (salt, new_reply.serial))
491 self._logger.debug("Filtering confirming answer, with same"
492 " serial for query %s" % salt)
494 self._logger.debug("Filtering outdated answer for query %s"
495 " serial: (%d < %d)" % (salt, old_reply.serial,
498 def _HandleExpire(self, up):
499 # if we have no answer we have received none, before the expiration.
500 if up.salt in self._answers:
501 del self._answers[up.salt]
502 if up.salt in self.consistent:
503 del self.consistent[up.salt]
505 def _HandleReply(self, up):
506 """Handle a single confd reply, and decide whether to filter it.
509 @return: True if the reply should be filtered, False if it should be passed
510 on to the up-callback
513 filter_upcall = False
515 if salt not in self.consistent:
516 self.consistent[salt] = True
517 if salt not in self._answers:
518 # first answer for a query (don't filter, and record)
519 self._answers[salt] = up.server_reply
520 elif up.server_reply.serial > self._answers[salt].serial:
521 # newer answer (record, and compare contents)
522 old_answer = self._answers[salt]
523 self._answers[salt] = up.server_reply
524 if up.server_reply.answer == old_answer.answer:
525 # same content (filter) (version upgrade was unrelated)
527 self._LogFilter(salt, up.server_reply, old_answer)
528 # else: different content, pass up a second answer
530 # older or same-version answer (duplicate or outdated, filter)
531 if (up.server_reply.serial == self._answers[salt].serial and
532 up.server_reply.answer != self._answers[salt].answer):
533 self.consistent[salt] = False
535 self._LogFilter(salt, up.server_reply, self._answers[salt])
539 def __call__(self, up):
540 """Filtering callback
542 @type up: L{ConfdUpcallPayload}
543 @param up: upper callback
546 filter_upcall = False
547 if up.type == UPCALL_REPLY:
548 filter_upcall = self._HandleReply(up)
549 elif up.type == UPCALL_EXPIRE:
550 self._HandleExpire(up)
552 if not filter_upcall:
556 class ConfdCountingCallback:
557 """Callback that calls another callback, and counts the answers
560 def __init__(self, callback, logger=None):
561 """Constructor for ConfdCountingCallback
563 @type callback: f(L{ConfdUpcallPayload})
564 @param callback: function to call when getting answers
565 @type logger: logging.Logger
566 @param logger: optional logger for internal conditions
569 if not callable(callback):
570 raise errors.ProgrammerError("callback must be callable")
572 self._callback = callback
573 self._logger = logger
574 # answers contains a dict of salt -> count
577 def RegisterQuery(self, salt):
578 if salt in self._answers:
579 raise errors.ProgrammerError("query already registered")
580 self._answers[salt] = 0
582 def AllAnswered(self):
583 """Have all the registered queries received at least an answer?
586 return compat.all(self._answers.values())
588 def _HandleExpire(self, up):
589 # if we have no answer we have received none, before the expiration.
590 if up.salt in self._answers:
591 del self._answers[up.salt]
593 def _HandleReply(self, up):
594 """Handle a single confd reply, and decide whether to filter it.
597 @return: True if the reply should be filtered, False if it should be passed
598 on to the up-callback
601 if up.salt in self._answers:
602 self._answers[up.salt] += 1
604 def __call__(self, up):
605 """Filtering callback
607 @type up: L{ConfdUpcallPayload}
608 @param up: upper callback
611 if up.type == UPCALL_REPLY:
612 self._HandleReply(up)
613 elif up.type == UPCALL_EXPIRE:
614 self._HandleExpire(up)
618 class StoreResultCallback:
619 """Callback that simply stores the most recent answer.
621 @ivar _answers: dict of salt to (have_answer, reply)
624 _NO_KEY = (False, None)
627 """Constructor for StoreResultCallback
630 # answers contains a dict of salt -> best result
633 def GetResponse(self, salt):
634 """Return the best match for a salt
637 return self._answers.get(salt, self._NO_KEY)
639 def _HandleExpire(self, up):
640 """Expiration handler.
643 if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
644 del self._answers[up.salt]
646 def _HandleReply(self, up):
647 """Handle a single confd reply, and decide whether to filter it.
650 self._answers[up.salt] = (True, up)
652 def __call__(self, up):
653 """Filtering callback
655 @type up: L{ConfdUpcallPayload}
656 @param up: upper callback
659 if up.type == UPCALL_REPLY:
660 self._HandleReply(up)
661 elif up.type == UPCALL_EXPIRE:
662 self._HandleExpire(up)
665 def GetConfdClient(callback):
666 """Return a client configured using the given callback.
668 This is handy to abstract the MC list and HMAC key reading.
670 @attention: This should only be called on nodes which are part of a
671 cluster, since it depends on a valid (ganeti) data directory;
672 for code running outside of a cluster, you need to create the
676 ss = ssconf.SimpleStore()
677 mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
678 mc_list = utils.ReadFile(mc_file).splitlines()
679 hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
680 return ConfdClient(hmac_key, mc_list, callback)