4 # Copyright (C) 2009 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti confd client
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
34 client = ConfdClient(...) # includes callback specification
35 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 client.SendRequest(req)
37 # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
39 # And your callback will be called by asyncore, when your query gets a
40 # response, or when it expires.
42 You can use the provided ConfdFilterCallback to act as a filter, only passing
43 "newer" answer to your callback, and filtering out outdated ones, or ones
44 confirming what you already got.
48 # pylint: disable-msg=E0203
50 # E0203: Access to member %r before its definition, since we use
51 # objects.py which doesn't explicitely initialise its members
56 from ganeti import utils
57 from ganeti import constants
58 from ganeti import objects
59 from ganeti import serializer
60 from ganeti import daemon # contains AsyncUDPSocket
61 from ganeti import errors
62 from ganeti import confd
63 from ganeti import ssconf
64 from ganeti import compat
67 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
68 """Confd udp asyncore client
70 This is kept separate from the main ConfdClient to make sure it's easy to
71 implement a non-asyncore based client library.
74 def __init__(self, client):
75 """Constructor for ConfdAsyncUDPClient
77 @type client: L{ConfdClient}
78 @param client: client library, to pass the datagrams to
81 daemon.AsyncUDPSocket.__init__(self)
84 # this method is overriding a daemon.AsyncUDPSocket method
85 def handle_datagram(self, payload, ip, port):
86 self.client.HandleResponse(payload, ip, port)
89 class _Request(object):
90 """Request status structure.
92 @ivar request: the request data
93 @ivar args: any extra arguments for the callback
94 @ivar expiry: the expiry timestamp of the request
95 @ivar sent: the set of contacted peers
96 @ivar rcvd: the set of peers who replied
99 def __init__(self, request, args, expiry, sent):
100 self.request = request
103 self.sent = frozenset(sent)
108 """Send queries to confd, and get back answers.
110 Since the confd model works by querying multiple master candidates, and
111 getting back answers, this is an asynchronous library. It can either work
112 through asyncore or with your own handling.
114 @type _requests: dict
115 @ivar _requests: dictionary indexes by salt, which contains data
116 about the outstanding requests; the values are objects of type
120 def __init__(self, hmac_key, peers, callback, port=None, logger=None):
121 """Constructor for ConfdClient
123 @type hmac_key: string
124 @param hmac_key: hmac key to talk to confd
126 @param peers: list of peer nodes
127 @type callback: f(L{ConfdUpcallPayload})
128 @param callback: function to call when getting answers
130 @param port: confd port (default: use GetDaemonPort)
131 @type logger: logging.Logger
132 @param logger: optional logger for internal conditions
135 if not callable(callback):
136 raise errors.ProgrammerError("callback must be callable")
138 self.UpdatePeerList(peers)
139 self._hmac_key = hmac_key
140 self._socket = ConfdAsyncUDPClient(self)
141 self._callback = callback
142 self._confd_port = port
143 self._logger = logger
146 if self._confd_port is None:
147 self._confd_port = utils.GetDaemonPort(constants.CONFD)
149 def UpdatePeerList(self, peers):
150 """Update the list of peers
153 @param peers: list of peer nodes
156 # we are actually called from init, so:
157 # pylint: disable-msg=W0201
158 if not isinstance(peers, list):
159 raise errors.ProgrammerError("peers must be a list")
160 # make a copy of peers, since we're going to shuffle the list, later
161 self._peers = list(peers)
163 def _PackRequest(self, request, now=None):
164 """Prepare a request to be sent on the wire.
166 This function puts a proper salt in a confd request, puts the proper salt,
167 and adds the correct magic number.
173 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
174 return confd.PackMagic(req)
176 def _UnpackReply(self, payload):
177 in_payload = confd.UnpackMagic(payload)
178 (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
179 answer = objects.ConfdReply.FromDict(dict_answer)
182 def ExpireRequests(self):
183 """Delete all the expired requests.
187 for rsalt, rq in self._requests.items():
189 del self._requests[rsalt]
190 client_reply = ConfdUpcallPayload(salt=rsalt,
192 orig_request=rq.request,
196 self._callback(client_reply)
198 def SendRequest(self, request, args=None, coverage=0, async=True):
199 """Send a confd request to some MCs
201 @type request: L{objects.ConfdRequest}
202 @param request: the request to send
204 @param args: additional callback arguments
205 @type coverage: integer
206 @param coverage: number of remote nodes to contact; if default
207 (0), it will use a reasonable default
208 (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
209 passed, it will use the maximum number of peers, otherwise the
210 number passed in will be used
212 @param async: handle the write asynchronously
216 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
218 coverage = len(self._peers)
220 if coverage > len(self._peers):
221 raise errors.ConfdClientError("Not enough MCs known to provide the"
224 if not request.rsalt:
225 raise errors.ConfdClientError("Missing request rsalt")
227 self.ExpireRequests()
228 if request.rsalt in self._requests:
229 raise errors.ConfdClientError("Duplicate request rsalt")
231 if request.type not in constants.CONFD_REQS:
232 raise errors.ConfdClientError("Invalid request type")
234 random.shuffle(self._peers)
235 targets = self._peers[:coverage]
238 payload = self._PackRequest(request, now=now)
240 for target in targets:
242 self._socket.enqueue_send(target, self._confd_port, payload)
243 except errors.UdpDataSizeError:
244 raise errors.ConfdClientError("Request too big")
246 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
247 self._requests[request.rsalt] = _Request(request, args, expire_time,
251 self.FlushSendQueue()
253 def HandleResponse(self, payload, ip, port):
254 """Asynchronous handler for a confd reply
256 Call the relevant callback associated to the current request.
261 answer, salt = self._UnpackReply(payload)
262 except (errors.SignatureError, errors.ConfdMagicError), err:
264 self._logger.debug("Discarding broken package: %s" % err)
268 rq = self._requests[salt]
271 self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
276 client_reply = ConfdUpcallPayload(salt=salt,
279 orig_request=rq.request,
285 self._callback(client_reply)
288 self.ExpireRequests()
290 def FlushSendQueue(self):
291 """Send out all pending requests.
293 Can be used for synchronous client use.
296 while self._socket.writable():
297 self._socket.handle_write()
299 def ReceiveReply(self, timeout=1):
300 """Receive one reply.
303 @param timeout: how long to wait for the reply
305 @return: True if some data has been handled, False otherwise
308 return self._socket.process_next_packet(timeout=timeout)
311 def _NeededReplies(peer_cnt):
312 """Compute the minimum safe number of replies for a query.
314 The algorithm is designed to work well for both small and big
316 - for less than three, we require all responses
317 - for less than five, we allow one miss
318 - otherwise, half the number plus one
320 This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
321 4->2, 5->3, 6->3, 7->4, etc.
324 @param peer_cnt: the number of peers contacted
326 @return: the number of replies which should give a safe coverage
334 return int(peer_cnt/2) + 1
336 def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
337 """Wait for replies to a given request.
339 This method will wait until either the timeout expires or a
340 minimum number (computed using L{_NeededReplies}) of replies are
341 received for the given salt. It is useful when doing synchronous
342 calls to this library.
344 @param salt: the salt of the request we want responses for
345 @param timeout: the maximum timeout (should be less or equal to
346 L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
348 @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
349 request is unknown, timed_out will be true and the counters
353 def _CheckResponse():
354 if salt not in self._requests:
357 self._logger.debug("Discarding unknown/expired request: %s" % salt)
359 rq = self._requests[salt]
360 if len(rq.rcvd) >= expected:
361 # already got all replies
362 return (False, len(rq.sent), len(rq.rcvd))
363 # else wait, using default timeout
365 raise utils.RetryAgain()
367 MISSING = (True, 0, 0)
369 if salt not in self._requests:
371 # extend the expire time with the current timeout, so that we
372 # don't get the request expired from under us
373 rq = self._requests[salt]
376 expected = self._NeededReplies(sent)
379 return utils.Retry(_CheckResponse, 0, timeout)
380 except utils.RetryTimeout:
381 if salt in self._requests:
382 rq = self._requests[salt]
383 return (True, len(rq.sent), len(rq.rcvd))
388 # UPCALL_REPLY: server reply upcall
389 # has all ConfdUpcallPayload fields populated
391 # UPCALL_EXPIRE: internal library request expire
392 # has only salt, type, orig_request and extra_args
394 CONFD_UPCALL_TYPES = frozenset([
400 class ConfdUpcallPayload(objects.ConfigObject):
401 """Callback argument for confd replies
404 @ivar salt: salt associated with the query
405 @type type: one of confd.client.CONFD_UPCALL_TYPES
406 @ivar type: upcall type (server reply, expired request, ...)
407 @type orig_request: L{objects.ConfdRequest}
408 @ivar orig_request: original request
409 @type server_reply: L{objects.ConfdReply}
410 @ivar server_reply: server reply
411 @type server_ip: string
412 @ivar server_ip: answering server ip address
413 @type server_port: int
414 @ivar server_port: answering server port
415 @type extra_args: any
416 @ivar extra_args: 'args' argument of the SendRequest function
417 @type client: L{ConfdClient}
418 @ivar client: current confd client instance
433 class ConfdClientRequest(objects.ConfdRequest):
434 """This is the client-side version of ConfdRequest.
436 This version of the class helps creating requests, on the client side, by
437 filling in some default values.
440 def __init__(self, **kwargs):
441 objects.ConfdRequest.__init__(self, **kwargs)
443 self.rsalt = utils.NewUUID()
444 if not self.protocol:
445 self.protocol = constants.CONFD_PROTOCOL_VERSION
446 if self.type not in constants.CONFD_REQS:
447 raise errors.ConfdClientError("Invalid request type")
450 class ConfdFilterCallback:
451 """Callback that calls another callback, but filters duplicate results.
453 @ivar consistent: a dictionary indexed by salt; for each salt, if
454 all responses ware identical, this will be True; this is the
455 expected state on a healthy cluster; on inconsistent or
456 partitioned clusters, this might be False, if we see answers
457 with the same serial but different contents
460 def __init__(self, callback, logger=None):
461 """Constructor for ConfdFilterCallback
463 @type callback: f(L{ConfdUpcallPayload})
464 @param callback: function to call when getting answers
465 @type logger: logging.Logger
466 @param logger: optional logger for internal conditions
469 if not callable(callback):
470 raise errors.ProgrammerError("callback must be callable")
472 self._callback = callback
473 self._logger = logger
474 # answers contains a dict of salt -> answer
478 def _LogFilter(self, salt, new_reply, old_reply):
482 if new_reply.serial > old_reply.serial:
483 self._logger.debug("Filtering confirming answer, with newer"
484 " serial for query %s" % salt)
485 elif new_reply.serial == old_reply.serial:
486 if new_reply.answer != old_reply.answer:
487 self._logger.warning("Got incoherent answers for query %s"
488 " (serial: %s)" % (salt, new_reply.serial))
490 self._logger.debug("Filtering confirming answer, with same"
491 " serial for query %s" % salt)
493 self._logger.debug("Filtering outdated answer for query %s"
494 " serial: (%d < %d)" % (salt, old_reply.serial,
497 def _HandleExpire(self, up):
498 # if we have no answer we have received none, before the expiration.
499 if up.salt in self._answers:
500 del self._answers[up.salt]
501 if up.salt in self.consistent:
502 del self.consistent[up.salt]
504 def _HandleReply(self, up):
505 """Handle a single confd reply, and decide whether to filter it.
508 @return: True if the reply should be filtered, False if it should be passed
509 on to the up-callback
512 filter_upcall = False
514 if salt not in self.consistent:
515 self.consistent[salt] = True
516 if salt not in self._answers:
517 # first answer for a query (don't filter, and record)
518 self._answers[salt] = up.server_reply
519 elif up.server_reply.serial > self._answers[salt].serial:
520 # newer answer (record, and compare contents)
521 old_answer = self._answers[salt]
522 self._answers[salt] = up.server_reply
523 if up.server_reply.answer == old_answer.answer:
524 # same content (filter) (version upgrade was unrelated)
526 self._LogFilter(salt, up.server_reply, old_answer)
527 # else: different content, pass up a second answer
529 # older or same-version answer (duplicate or outdated, filter)
530 if (up.server_reply.serial == self._answers[salt].serial and
531 up.server_reply.answer != self._answers[salt].answer):
532 self.consistent[salt] = False
534 self._LogFilter(salt, up.server_reply, self._answers[salt])
538 def __call__(self, up):
539 """Filtering callback
541 @type up: L{ConfdUpcallPayload}
542 @param up: upper callback
545 filter_upcall = False
546 if up.type == UPCALL_REPLY:
547 filter_upcall = self._HandleReply(up)
548 elif up.type == UPCALL_EXPIRE:
549 self._HandleExpire(up)
551 if not filter_upcall:
555 class ConfdCountingCallback:
556 """Callback that calls another callback, and counts the answers
559 def __init__(self, callback, logger=None):
560 """Constructor for ConfdCountingCallback
562 @type callback: f(L{ConfdUpcallPayload})
563 @param callback: function to call when getting answers
564 @type logger: logging.Logger
565 @param logger: optional logger for internal conditions
568 if not callable(callback):
569 raise errors.ProgrammerError("callback must be callable")
571 self._callback = callback
572 self._logger = logger
573 # answers contains a dict of salt -> count
576 def RegisterQuery(self, salt):
577 if salt in self._answers:
578 raise errors.ProgrammerError("query already registered")
579 self._answers[salt] = 0
581 def AllAnswered(self):
582 """Have all the registered queries received at least an answer?
585 return compat.all(self._answers.values())
587 def _HandleExpire(self, up):
588 # if we have no answer we have received none, before the expiration.
589 if up.salt in self._answers:
590 del self._answers[up.salt]
592 def _HandleReply(self, up):
593 """Handle a single confd reply, and decide whether to filter it.
596 @return: True if the reply should be filtered, False if it should be passed
597 on to the up-callback
600 if up.salt in self._answers:
601 self._answers[up.salt] += 1
603 def __call__(self, up):
604 """Filtering callback
606 @type up: L{ConfdUpcallPayload}
607 @param up: upper callback
610 if up.type == UPCALL_REPLY:
611 self._HandleReply(up)
612 elif up.type == UPCALL_EXPIRE:
613 self._HandleExpire(up)
617 class StoreResultCallback:
618 """Callback that simply stores the most recent answer.
620 @ivar _answers: dict of salt to (have_answer, reply)
623 _NO_KEY = (False, None)
626 """Constructor for StoreResultCallback
629 # answers contains a dict of salt -> best result
632 def GetResponse(self, salt):
633 """Return the best match for a salt
636 return self._answers.get(salt, self._NO_KEY)
638 def _HandleExpire(self, up):
639 """Expiration handler.
642 if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
643 del self._answers[up.salt]
645 def _HandleReply(self, up):
646 """Handle a single confd reply, and decide whether to filter it.
649 self._answers[up.salt] = (True, up)
651 def __call__(self, up):
652 """Filtering callback
654 @type up: L{ConfdUpcallPayload}
655 @param up: upper callback
658 if up.type == UPCALL_REPLY:
659 self._HandleReply(up)
660 elif up.type == UPCALL_EXPIRE:
661 self._HandleExpire(up)
664 def GetConfdClient(callback):
665 """Return a client configured using the given callback.
667 This is handy to abstract the MC list and HMAC key reading.
669 @attention: This should only be called on nodes which are part of a
670 cluster, since it depends on a valid (ganeti) data directory;
671 for code running outside of a cluster, you need to create the
675 ss = ssconf.SimpleStore()
676 mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
677 mc_list = utils.ReadFile(mc_file).splitlines()
678 hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
679 return ConfdClient(hmac_key, mc_list, callback)