4 # Copyright (C) 2009 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti confd client
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
34 client = ConfdClient(...) # includes callback specification
35 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 client.SendRequest(req)
37 # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
39 # And your callback will be called by asyncore, when your query gets a
40 # response, or when it expires.
42 You can use the provided ConfdFilterCallback to act as a filter, only passing
43 "newer" answer to your callback, and filtering out outdated ones, or ones
44 confirming what you already got.
48 # pylint: disable-msg=E0203
50 # E0203: Access to member %r before its definition, since we use
51 # objects.py which doesn't explicitely initialise its members
56 from ganeti import utils
57 from ganeti import constants
58 from ganeti import objects
59 from ganeti import serializer
60 from ganeti import daemon # contains AsyncUDPSocket
61 from ganeti import errors
62 from ganeti import confd
65 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
66 """Confd udp asyncore client
68 This is kept separate from the main ConfdClient to make sure it's easy to
69 implement a non-asyncore based client library.
72 def __init__(self, client):
73 """Constructor for ConfdAsyncUDPClient
75 @type client: L{ConfdClient}
76 @param client: client library, to pass the datagrams to
79 daemon.AsyncUDPSocket.__init__(self)
82 # this method is overriding a daemon.AsyncUDPSocket method
83 def handle_datagram(self, payload, ip, port):
84 self.client.HandleResponse(payload, ip, port)
88 """Send queries to confd, and get back answers.
90 Since the confd model works by querying multiple master candidates, and
91 getting back answers, this is an asynchronous library. It can either work
92 through asyncore or with your own handling.
95 def __init__(self, hmac_key, peers, callback, port=None, logger=None):
96 """Constructor for ConfdClient
98 @type hmac_key: string
99 @param hmac_key: hmac key to talk to confd
101 @param peers: list of peer nodes
102 @type callback: f(L{ConfdUpcallPayload})
103 @param callback: function to call when getting answers
105 @keyword port: confd port (default: use GetDaemonPort)
106 @type logger: logging.Logger
107 @keyword logger: optional logger for internal conditions
110 if not callable(callback):
111 raise errors.ProgrammerError("callback must be callable")
113 self.UpdatePeerList(peers)
114 self._hmac_key = hmac_key
115 self._socket = ConfdAsyncUDPClient(self)
116 self._callback = callback
117 self._confd_port = port
118 self._logger = logger
120 self._expire_requests = []
122 if self._confd_port is None:
123 self._confd_port = utils.GetDaemonPort(constants.CONFD)
125 def UpdatePeerList(self, peers):
126 """Update the list of peers
129 @param peers: list of peer nodes
132 # we are actually called from init, so:
133 # pylint: disable-msg=W0201
134 if not isinstance(peers, list):
135 raise errors.ProgrammerError("peers must be a list")
136 # make a copy of peers, since we're going to shuffle the list, later
137 self._peers = list(peers)
139 def _PackRequest(self, request, now=None):
140 """Prepare a request to be sent on the wire.
142 This function puts a proper salt in a confd request, puts the proper salt,
143 and adds the correct magic number.
149 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
150 return confd.PackMagic(req)
152 def _UnpackReply(self, payload):
153 in_payload = confd.UnpackMagic(payload)
154 (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
155 answer = objects.ConfdReply.FromDict(dict_answer)
158 def ExpireRequests(self):
159 """Delete all the expired requests.
163 while self._expire_requests:
164 expire_time, rsalt = self._expire_requests[0]
165 if now >= expire_time:
166 self._expire_requests.pop(0)
167 (request, args) = self._requests[rsalt]
168 del self._requests[rsalt]
169 client_reply = ConfdUpcallPayload(salt=rsalt,
171 orig_request=request,
175 self._callback(client_reply)
179 def SendRequest(self, request, args=None, coverage=None):
180 """Send a confd request to some MCs
182 @type request: L{objects.ConfdRequest}
183 @param request: the request to send
185 @keyword args: additional callback arguments
186 @type coverage: integer
187 @keyword coverage: number of remote nodes to contact
191 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
193 if coverage > len(self._peers):
194 raise errors.ConfdClientError("Not enough MCs known to provide the"
197 if not request.rsalt:
198 raise errors.ConfdClientError("Missing request rsalt")
200 self.ExpireRequests()
201 if request.rsalt in self._requests:
202 raise errors.ConfdClientError("Duplicate request rsalt")
204 if request.type not in constants.CONFD_REQS:
205 raise errors.ConfdClientError("Invalid request type")
207 random.shuffle(self._peers)
208 targets = self._peers[:coverage]
211 payload = self._PackRequest(request, now=now)
213 for target in targets:
215 self._socket.enqueue_send(target, self._confd_port, payload)
216 except errors.UdpDataSizeError:
217 raise errors.ConfdClientError("Request too big")
219 self._requests[request.rsalt] = (request, args)
220 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
221 self._expire_requests.append((expire_time, request.rsalt))
223 def HandleResponse(self, payload, ip, port):
224 """Asynchronous handler for a confd reply
226 Call the relevant callback associated to the current request.
231 answer, salt = self._UnpackReply(payload)
232 except (errors.SignatureError, errors.ConfdMagicError), err:
234 self._logger.debug("Discarding broken package: %s" % err)
238 (request, args) = self._requests[salt]
241 self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
244 client_reply = ConfdUpcallPayload(salt=salt,
247 orig_request=request,
253 self._callback(client_reply)
256 self.ExpireRequests()
259 # UPCALL_REPLY: server reply upcall
260 # has all ConfdUpcallPayload fields populated
262 # UPCALL_EXPIRE: internal library request expire
263 # has only salt, type, orig_request and extra_args
265 CONFD_UPCALL_TYPES = frozenset([
271 class ConfdUpcallPayload(objects.ConfigObject):
272 """Callback argument for confd replies
275 @ivar salt: salt associated with the query
276 @type type: one of confd.client.CONFD_UPCALL_TYPES
277 @ivar type: upcall type (server reply, expired request, ...)
278 @type orig_request: L{objects.ConfdRequest}
279 @ivar orig_request: original request
280 @type server_reply: L{objects.ConfdReply}
281 @ivar server_reply: server reply
282 @type server_ip: string
283 @ivar server_ip: answering server ip address
284 @type server_port: int
285 @ivar server_port: answering server port
286 @type extra_args: any
287 @ivar extra_args: 'args' argument of the SendRequest function
288 @type client: L{ConfdClient}
289 @ivar client: current confd client instance
304 class ConfdClientRequest(objects.ConfdRequest):
305 """This is the client-side version of ConfdRequest.
307 This version of the class helps creating requests, on the client side, by
308 filling in some default values.
311 def __init__(self, **kwargs):
312 objects.ConfdRequest.__init__(self, **kwargs)
314 self.rsalt = utils.NewUUID()
315 if not self.protocol:
316 self.protocol = constants.CONFD_PROTOCOL_VERSION
317 if self.type not in constants.CONFD_REQS:
318 raise errors.ConfdClientError("Invalid request type")
321 class ConfdFilterCallback:
322 """Callback that calls another callback, but filters duplicate results.
325 def __init__(self, callback, logger=None):
326 """Constructor for ConfdFilterCallback
328 @type callback: f(L{ConfdUpcallPayload})
329 @param callback: function to call when getting answers
330 @type logger: logging.Logger
331 @keyword logger: optional logger for internal conditions
334 if not callable(callback):
335 raise errors.ProgrammerError("callback must be callable")
337 self._callback = callback
338 self._logger = logger
339 # answers contains a dict of salt -> answer
342 def _LogFilter(self, salt, new_reply, old_reply):
346 if new_reply.serial > old_reply.serial:
347 self._logger.debug("Filtering confirming answer, with newer"
348 " serial for query %s" % salt)
349 elif new_reply.serial == old_reply.serial:
350 if new_reply.answer != old_reply.answer:
351 self._logger.warning("Got incoherent answers for query %s"
352 " (serial: %s)" % (salt, new_reply.serial))
354 self._logger.debug("Filtering confirming answer, with same"
355 " serial for query %s" % salt)
357 self._logger.debug("Filtering outdated answer for query %s"
358 " serial: (%d < %d)" % (salt, old_reply.serial,
361 def _HandleExpire(self, up):
362 # if we have no answer we have received none, before the expiration.
363 if up.salt in self._answers:
364 del self._answers[up.salt]
366 def _HandleReply(self, up):
367 """Handle a single confd reply, and decide whether to filter it.
370 @return: True if the reply should be filtered, False if it should be passed
371 on to the up-callback
374 filter_upcall = False
376 if salt not in self._answers:
377 # first answer for a query (don't filter, and record)
378 self._answers[salt] = up.server_reply
379 elif up.server_reply.serial > self._answers[salt].serial:
380 # newer answer (record, and compare contents)
381 old_answer = self._answers[salt]
382 self._answers[salt] = up.server_reply
383 if up.server_reply.answer == old_answer.answer:
384 # same content (filter) (version upgrade was unrelated)
386 self._LogFilter(salt, up.server_reply, old_answer)
387 # else: different content, pass up a second answer
389 # older or same-version answer (duplicate or outdated, filter)
391 self._LogFilter(salt, up.server_reply, self._answers[salt])
395 def __call__(self, up):
396 """Filtering callback
398 @type up: L{ConfdUpcallPayload}
399 @param up: upper callback
402 filter_upcall = False
403 if up.type == UPCALL_REPLY:
404 filter_upcall = self._HandleReply(up)
405 elif up.type == UPCALL_EXPIRE:
406 self._HandleExpire(up)
408 if not filter_upcall: