4 # Copyright (C) 2009 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti confd client
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
33 client = ConfdClient(...) # includes callback specification
34 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
35 client.SendRequest(req)
36 # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38 # And your callback will be called by asyncore, when your query gets a
39 # response, or when it expires.
41 You can use the provided ConfdFilterCallback to act as a filter, only passing
42 "newer" answer to your callback, and filtering out outdated ones, or ones
43 confirming what you already got.
50 from ganeti import utils
51 from ganeti import constants
52 from ganeti import objects
53 from ganeti import serializer
54 from ganeti import daemon # contains AsyncUDPSocket
55 from ganeti import errors
56 from ganeti import confd
59 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
60 """Confd udp asyncore client
62 This is kept separate from the main ConfdClient to make sure it's easy to
63 implement a non-asyncore based client library.
66 def __init__(self, client):
67 """Constructor for ConfdAsyncUDPClient
69 @type client: L{ConfdClient}
70 @param client: client library, to pass the datagrams to
73 daemon.AsyncUDPSocket.__init__(self)
76 # this method is overriding a daemon.AsyncUDPSocket method
77 def handle_datagram(self, payload, ip, port):
78 self.client.HandleResponse(payload, ip, port)
82 """Send queries to confd, and get back answers.
84 Since the confd model works by querying multiple master candidates, and
85 getting back answers, this is an asynchronous library. It can either work
86 through asyncore or with your own handling.
89 def __init__(self, hmac_key, peers, callback, port=None, logger=None):
90 """Constructor for ConfdClient
92 @type hmac_key: string
93 @param hmac_key: hmac key to talk to confd
95 @param peers: list of peer nodes
96 @type callback: f(L{ConfdUpcallPayload})
97 @param callback: function to call when getting answers
99 @keyword port: confd port (default: use GetDaemonPort)
100 @type logger: L{logging.Logger}
101 @keyword logger: optional logger for internal conditions
104 if not callable(callback):
105 raise errors.ProgrammerError("callback must be callable")
107 self.UpdatePeerList(peers)
108 self._hmac_key = hmac_key
109 self._socket = ConfdAsyncUDPClient(self)
110 self._callback = callback
111 self._confd_port = port
112 self._logger = logger
114 self._expire_requests = []
116 if self._confd_port is None:
117 self._confd_port = utils.GetDaemonPort(constants.CONFD)
119 def UpdatePeerList(self, peers):
120 """Update the list of peers
123 @param peers: list of peer nodes
126 if not isinstance(peers, list):
127 raise errors.ProgrammerError("peers must be a list")
130 def _PackRequest(self, request, now=None):
131 """Prepare a request to be sent on the wire.
133 This function puts a proper salt in a confd request, puts the proper salt,
134 and adds the correct magic number.
140 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
141 return confd.PackMagic(req)
143 def _UnpackReply(self, payload):
144 in_payload = confd.UnpackMagic(payload)
145 (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
146 answer = objects.ConfdReply.FromDict(dict_answer)
149 def ExpireRequests(self):
150 """Delete all the expired requests.
154 while self._expire_requests:
155 expire_time, rsalt = self._expire_requests[0]
156 if now >= expire_time:
157 self._expire_requests.pop(0)
158 (request, args) = self._requests[rsalt]
159 del self._requests[rsalt]
160 client_reply = ConfdUpcallPayload(salt=rsalt,
162 orig_request=request,
166 self._callback(client_reply)
170 def SendRequest(self, request, args=None, coverage=None):
171 """Send a confd request to some MCs
173 @type request: L{objects.ConfdRequest}
174 @param request: the request to send
176 @keyword args: additional callback arguments
177 @type coverage: integer
178 @keyword coverage: number of remote nodes to contact
182 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
184 if coverage > len(self._peers):
185 raise errors.ConfdClientError("Not enough MCs known to provide the"
188 if not request.rsalt:
189 raise errors.ConfdClientError("Missing request rsalt")
191 self.ExpireRequests()
192 if request.rsalt in self._requests:
193 raise errors.ConfdClientError("Duplicate request rsalt")
195 if request.type not in constants.CONFD_REQS:
196 raise errors.ConfdClientError("Invalid request type")
198 random.shuffle(self._peers)
199 targets = self._peers[:coverage]
202 payload = self._PackRequest(request, now=now)
204 for target in targets:
206 self._socket.enqueue_send(target, self._confd_port, payload)
207 except errors.UdpDataSizeError:
208 raise errors.ConfdClientError("Request too big")
210 self._requests[request.rsalt] = (request, args)
211 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
212 self._expire_requests.append((expire_time, request.rsalt))
214 def HandleResponse(self, payload, ip, port):
215 """Asynchronous handler for a confd reply
217 Call the relevant callback associated to the current request.
222 answer, salt = self._UnpackReply(payload)
223 except (errors.SignatureError, errors.ConfdMagicError), err:
225 self._logger.debug("Discarding broken package: %s" % err)
229 (request, args) = self._requests[salt]
232 self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
235 client_reply = ConfdUpcallPayload(salt=salt,
238 orig_request=request,
244 self._callback(client_reply)
247 self.ExpireRequests()
250 # UPCALL_REPLY: server reply upcall
251 # has all ConfdUpcallPayload fields populated
253 # UPCALL_EXPIRE: internal library request expire
254 # has only salt, type, orig_request and extra_args
256 CONFD_UPCALL_TYPES = frozenset([
262 class ConfdUpcallPayload(objects.ConfigObject):
263 """Callback argument for confd replies
266 @ivar salt: salt associated with the query
267 @type type: one of confd.client.CONFD_UPCALL_TYPES
268 @ivar type: upcall type (server reply, expired request, ...)
269 @type orig_request: L{objects.ConfdRequest}
270 @ivar orig_request: original request
271 @type server_reply: L{objects.ConfdReply}
272 @ivar server_reply: server reply
273 @type server_ip: string
274 @ivar server_ip: answering server ip address
275 @type server_port: int
276 @ivar server_port: answering server port
277 @type extra_args: any
278 @ivar extra_args: 'args' argument of the SendRequest function
279 @type client: L{ConfdClient}
280 @ivar client: current confd client instance
295 class ConfdClientRequest(objects.ConfdRequest):
296 """This is the client-side version of ConfdRequest.
298 This version of the class helps creating requests, on the client side, by
299 filling in some default values.
302 def __init__(self, **kwargs):
303 objects.ConfdRequest.__init__(self, **kwargs)
305 self.rsalt = utils.NewUUID()
306 if not self.protocol:
307 self.protocol = constants.CONFD_PROTOCOL_VERSION
308 if self.type not in constants.CONFD_REQS:
309 raise errors.ConfdClientError("Invalid request type")
312 class ConfdFilterCallback:
313 """Callback that calls another callback, but filters duplicate results.
316 def __init__(self, callback, logger=None):
317 """Constructor for ConfdFilterCallback
319 @type callback: f(L{ConfdUpcallPayload})
320 @param callback: function to call when getting answers
321 @type logger: L{logging.Logger}
322 @keyword logger: optional logger for internal conditions
325 if not callable(callback):
326 raise errors.ProgrammerError("callback must be callable")
328 self._callback = callback
329 self._logger = logger
330 # answers contains a dict of salt -> answer
333 def _LogFilter(self, salt, new_reply, old_reply):
337 if new_reply.serial > old_reply.serial:
338 self._logger.debug("Filtering confirming answer, with newer"
339 " serial for query %s" % salt)
340 elif new_reply.serial == old_reply.serial:
341 if new_reply.answer != old_reply.answer:
342 self._logger.warning("Got incoherent answers for query %s"
343 " (serial: %s)" % (salt, new_reply.serial))
345 self._logger.debug("Filtering confirming answer, with same"
346 " serial for query %s" % salt)
348 self._logger.debug("Filtering outdated answer for query %s"
349 " serial: (%d < %d)" % (salt, old_reply.serial,
352 def _HandleExpire(self, up):
353 # if we have no answer we have received none, before the expiration.
354 if up.salt in self._answers:
355 del self._answers[up.salt]
357 def _HandleReply(self, up):
358 """Handle a single confd reply, and decide whether to filter it.
361 @return: True if the reply should be filtered, False if it should be passed
362 on to the up-callback
365 filter_upcall = False
367 if salt not in self._answers:
368 # first answer for a query (don't filter, and record)
369 self._answers[salt] = up.server_reply
370 elif up.server_reply.serial > self._answers[salt].serial:
371 # newer answer (record, and compare contents)
372 old_answer = self._answers[salt]
373 self._answers[salt] = up.server_reply
374 if up.server_reply.answer == old_answer.answer:
375 # same content (filter) (version upgrade was unrelated)
377 self._LogFilter(salt, up.server_reply, old_answer)
378 # else: different content, pass up a second answer
380 # older or same-version answer (duplicate or outdated, filter)
382 self._LogFilter(salt, up.server_reply, self._answers[salt])
386 def __call__(self, up):
387 """Filtering callback
389 @type up: L{ConfdUpcallPayload}
390 @param up: upper callback
393 filter_upcall = False
394 if up.type == UPCALL_REPLY:
395 filter_upcall = self._HandleReply(up)
396 elif up.type == UPCALL_EXPIRE:
397 self._HandleExpire(up)
399 if not filter_upcall: