4 # Copyright (C) 2009 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti confd client
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
33 client = ConfdClient(...) # includes callback specification
34 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
35 client.SendRequest(req)
36 # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38 # And your callback will be called by asyncore, when your query gets a
39 # response, or when it expires.
41 You can use the provided ConfdFilterCallback to act as a filter, only passing
42 "newer" answer to your callback, and filtering out outdated ones, or ones
43 confirming what you already got.
50 from ganeti import utils
51 from ganeti import constants
52 from ganeti import objects
53 from ganeti import serializer
54 from ganeti import daemon # contains AsyncUDPSocket
55 from ganeti import errors
56 from ganeti import confd
59 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
60 """Confd udp asyncore client
62 This is kept separate from the main ConfdClient to make sure it's easy to
63 implement a non-asyncore based client library.
66 def __init__(self, client):
67 """Constructor for ConfdAsyncUDPClient
69 @type client: L{ConfdClient}
70 @param client: client library, to pass the datagrams to
73 daemon.AsyncUDPSocket.__init__(self)
76 # this method is overriding a daemon.AsyncUDPSocket method
77 def handle_datagram(self, payload, ip, port):
78 self.client.HandleResponse(payload, ip, port)
82 """Send queries to confd, and get back answers.
84 Since the confd model works by querying multiple master candidates, and
85 getting back answers, this is an asynchronous library. It can either work
86 through asyncore or with your own handling.
89 def __init__(self, hmac_key, peers, callback, port=None, logger=None):
90 """Constructor for ConfdClient
92 @type hmac_key: string
93 @param hmac_key: hmac key to talk to confd
95 @param peers: list of peer nodes
96 @type callback: f(L{ConfdUpcallPayload})
97 @param callback: function to call when getting answers
99 @keyword port: confd port (default: use GetDaemonPort)
100 @type logger: L{logging.Logger}
101 @keyword logger: optional logger for internal conditions
104 if not isinstance(peers, list):
105 raise errors.ProgrammerError("peers must be a list")
106 if not callable(callback):
107 raise errors.ProgrammerError("callback must be callable")
110 self._hmac_key = hmac_key
111 self._socket = ConfdAsyncUDPClient(self)
112 self._callback = callback
113 self._confd_port = port
114 self._logger = logger
116 self._expire_requests = []
118 if self._confd_port is None:
119 self._confd_port = utils.GetDaemonPort(constants.CONFD)
121 def _PackRequest(self, request, now=None):
122 """Prepare a request to be sent on the wire.
124 This function puts a proper salt in a confd request, puts the proper salt,
125 and adds the correct magic number.
131 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
132 return confd.PackMagic(req)
134 def _UnpackReply(self, payload):
135 in_payload = confd.UnpackMagic(payload)
136 (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
137 answer = objects.ConfdReply.FromDict(dict_answer)
140 def ExpireRequests(self):
141 """Delete all the expired requests.
145 while self._expire_requests:
146 expire_time, rsalt = self._expire_requests[0]
147 if now >= expire_time:
148 self._expire_requests.pop(0)
149 (request, args) = self._requests[rsalt]
150 del self._requests[rsalt]
151 client_reply = ConfdUpcallPayload(salt=rsalt,
153 orig_request=request,
155 self._callback(client_reply)
159 def SendRequest(self, request, args=None, coverage=None):
160 """Send a confd request to some MCs
162 @type request: L{objects.ConfdRequest}
163 @param request: the request to send
165 @keyword args: additional callback arguments
166 @type coverage: integer
167 @keyword coverage: number of remote nodes to contact
171 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
173 if coverage > len(self._peers):
174 raise errors.ConfdClientError("Not enough MCs known to provide the"
177 if not request.rsalt:
178 raise errors.ConfdClientError("Missing request rsalt")
180 self.ExpireRequests()
181 if request.rsalt in self._requests:
182 raise errors.ConfdClientError("Duplicate request rsalt")
184 if request.type not in constants.CONFD_REQS:
185 raise errors.ConfdClientError("Invalid request type")
187 random.shuffle(self._peers)
188 targets = self._peers[:coverage]
191 payload = self._PackRequest(request, now=now)
193 for target in targets:
195 self._socket.enqueue_send(target, self._confd_port, payload)
196 except errors.UdpDataSizeError:
197 raise errors.ConfdClientError("Request too big")
199 self._requests[request.rsalt] = (request, args)
200 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
201 self._expire_requests.append((expire_time, request.rsalt))
203 def HandleResponse(self, payload, ip, port):
204 """Asynchronous handler for a confd reply
206 Call the relevant callback associated to the current request.
211 answer, salt = self._UnpackReply(payload)
212 except (errors.SignatureError, errors.ConfdMagicError), err:
214 self._logger.debug("Discarding broken package: %s" % err)
218 (request, args) = self._requests[salt]
221 self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
224 client_reply = ConfdUpcallPayload(salt=salt,
227 orig_request=request,
231 self._callback(client_reply)
234 self.ExpireRequests()
237 # UPCALL_REPLY: server reply upcall
238 # has all ConfdUpcallPayload fields populated
240 # UPCALL_EXPIRE: internal library request expire
241 # has only salt, type, orig_request and extra_args
243 CONFD_UPCALL_TYPES = frozenset([
249 class ConfdUpcallPayload(objects.ConfigObject):
250 """Callback argument for confd replies
253 @ivar salt: salt associated with the query
254 @type type: one of confd.client.CONFD_UPCALL_TYPES
255 @ivar type: upcall type (server reply, expired request, ...)
256 @type orig_request: L{objects.ConfdRequest}
257 @ivar orig_request: original request
258 @type server_reply: L{objects.ConfdReply}
259 @ivar server_reply: server reply
260 @type server_ip: string
261 @ivar server_ip: answering server ip address
262 @type server_port: int
263 @ivar server_port: answering server port
264 @type extra_args: any
265 @ivar extra_args: 'args' argument of the SendRequest function
279 class ConfdClientRequest(objects.ConfdRequest):
280 """This is the client-side version of ConfdRequest.
282 This version of the class helps creating requests, on the client side, by
283 filling in some default values.
286 def __init__(self, **kwargs):
287 objects.ConfdRequest.__init__(self, **kwargs)
289 self.rsalt = utils.NewUUID()
290 if not self.protocol:
291 self.protocol = constants.CONFD_PROTOCOL_VERSION
292 if self.type not in constants.CONFD_REQS:
293 raise errors.ConfdClientError("Invalid request type")
296 class ConfdFilterCallback:
297 """Callback that calls another callback, but filters duplicate results.
300 def __init__(self, callback, logger=None):
301 """Constructor for ConfdFilterCallback
303 @type callback: f(L{ConfdUpcallPayload})
304 @param callback: function to call when getting answers
305 @type logger: L{logging.Logger}
306 @keyword logger: optional logger for internal conditions
309 if not callable(callback):
310 raise errors.ProgrammerError("callback must be callable")
312 self._callback = callback
313 self._logger = logger
314 # answers contains a dict of salt -> answer
317 def _LogFilter(self, salt, new_reply, old_reply):
321 if new_reply.serial > old_reply.serial:
322 self._logger.debug("Filtering confirming answer, with newer"
323 " serial for query %s" % salt)
324 elif new_reply.serial == old_reply.serial:
325 if new_reply.answer != old_reply.answer:
326 self._logger.warning("Got incoherent answers for query %s"
327 " (serial: %s)" % (salt, new_reply.serial))
329 self._logger.debug("Filtering confirming answer, with same"
330 " serial for query %s" % salt)
332 self._logger.debug("Filtering outdated answer for query %s"
333 " serial: (%d < %d)" % (salt, old_reply.serial,
336 def _HandleExpire(self, up):
337 # if we have no answer we have received none, before the expiration.
338 if up.salt in self._answers:
339 del self._answers[up.salt]
341 def _HandleReply(self, up):
342 """Handle a single confd reply, and decide whether to filter it.
345 @return: True if the reply should be filtered, False if it should be passed
346 on to the up-callback
349 filter_upcall = False
351 if salt not in self._answers:
352 # first answer for a query (don't filter, and record)
353 self._answers[salt] = up.server_reply
354 elif up.server_reply.serial > self._answers[salt].serial:
355 # newer answer (record, and compare contents)
356 old_answer = self._answers[salt]
357 self._answers[salt] = up.server_reply
358 if up.server_reply.answer == old_answer.answer:
359 # same content (filter) (version upgrade was unrelated)
361 self._LogFilter(salt, up.server_reply, old_answer)
362 # else: different content, pass up a second answer
364 # older or same-version answer (duplicate or outdated, filter)
366 self._LogFilter(salt, up.server_reply, self._answers[salt])
370 def __call__(self, up):
371 """Filtering callback
373 @type up: L{ConfdUpcallPayload}
374 @param up: upper callback
377 filter_upcall = False
378 if up.type == UPCALL_REPLY:
379 filter_upcall = self._HandleReply(up)
380 elif up.type == UPCALL_EXPIRE:
381 self._HandleExpire(up)
383 if not filter_upcall: