ConfdFilterCallback: fix a bug in expire
[ganeti-local] / lib / confd / client.py
1 #
2 #
3
4 # Copyright (C) 2009 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti confd client
23
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
27
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
31
32 Example usage:
33   client = ConfdClient(...) # includes callback specification
34   req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
35   client.SendRequest(req)
36   # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
37   # ... wait ...
38   # And your callback will be called by asyncore, when your query gets a
39   # response, or when it expires.
40
41 You can use the provided ConfdFilterCallback to act as a filter, only passing
42 "newer" answer to your callback, and filtering out outdated ones, or ones
43 confirming what you already got.
44
45 """
46 import socket
47 import time
48 import random
49
50 from ganeti import utils
51 from ganeti import constants
52 from ganeti import objects
53 from ganeti import serializer
54 from ganeti import daemon # contains AsyncUDPSocket
55 from ganeti import errors
56 from ganeti import confd
57
58
59 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
60   """Confd udp asyncore client
61
62   This is kept separate from the main ConfdClient to make sure it's easy to
63   implement a non-asyncore based client library.
64
65   """
66   def __init__(self, client):
67     """Constructor for ConfdAsyncUDPClient
68
69     @type client: L{ConfdClient}
70     @param client: client library, to pass the datagrams to
71
72     """
73     daemon.AsyncUDPSocket.__init__(self)
74     self.client = client
75
76   # this method is overriding a daemon.AsyncUDPSocket method
77   def handle_datagram(self, payload, ip, port):
78     self.client.HandleResponse(payload, ip, port)
79
80
81 class ConfdClient:
82   """Send queries to confd, and get back answers.
83
84   Since the confd model works by querying multiple master candidates, and
85   getting back answers, this is an asynchronous library. It can either work
86   through asyncore or with your own handling.
87
88   """
89   def __init__(self, hmac_key, peers, callback, port=None, logger=None):
90     """Constructor for ConfdClient
91
92     @type hmac_key: string
93     @param hmac_key: hmac key to talk to confd
94     @type peers: list
95     @param peers: list of peer nodes
96     @type callback: f(L{ConfdUpcallPayload})
97     @param callback: function to call when getting answers
98     @type port: integer
99     @keyword port: confd port (default: use GetDaemonPort)
100     @type logger: L{logging.Logger}
101     @keyword logger: optional logger for internal conditions
102
103     """
104     if not isinstance(peers, list):
105       raise errors.ProgrammerError("peers must be a list")
106     if not callable(callback):
107       raise errors.ProgrammerError("callback must be callable")
108
109     self._peers = peers
110     self._hmac_key = hmac_key
111     self._socket = ConfdAsyncUDPClient(self)
112     self._callback = callback
113     self._confd_port = port
114     self._logger = logger
115     self._requests = {}
116     self._expire_requests = []
117
118     if self._confd_port is None:
119       self._confd_port = utils.GetDaemonPort(constants.CONFD)
120
121   def _PackRequest(self, request, now=None):
122     """Prepare a request to be sent on the wire.
123
124     This function puts a proper salt in a confd request, puts the proper salt,
125     and adds the correct magic number.
126
127     """
128     if now is None:
129       now = time.time()
130     tstamp = '%d' % now
131     req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
132     return confd.PackMagic(req)
133
134   def _UnpackReply(self, payload):
135     in_payload = confd.UnpackMagic(payload)
136     (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
137     answer = objects.ConfdReply.FromDict(dict_answer)
138     return answer, salt
139
140   def ExpireRequests(self):
141     """Delete all the expired requests.
142
143     """
144     now = time.time()
145     while self._expire_requests:
146       expire_time, rsalt = self._expire_requests[0]
147       if now >= expire_time:
148         self._expire_requests.pop(0)
149         (request, args) = self._requests[rsalt]
150         del self._requests[rsalt]
151         client_reply = ConfdUpcallPayload(salt=rsalt,
152                                           type=UPCALL_EXPIRE,
153                                           orig_request=request,
154                                           extra_args=args)
155         self._callback(client_reply)
156       else:
157         break
158
159   def SendRequest(self, request, args=None, coverage=None):
160     """Send a confd request to some MCs
161
162     @type request: L{objects.ConfdRequest}
163     @param request: the request to send
164     @type args: tuple
165     @keyword args: additional callback arguments
166     @type coverage: integer
167     @keyword coverage: number of remote nodes to contact
168
169     """
170     if coverage is None:
171       coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
172
173     if coverage > len(self._peers):
174       raise errors.ConfdClientError("Not enough MCs known to provide the"
175                                     " desired coverage")
176
177     if not request.rsalt:
178       raise errors.ConfdClientError("Missing request rsalt")
179
180     self.ExpireRequests()
181     if request.rsalt in self._requests:
182       raise errors.ConfdClientError("Duplicate request rsalt")
183
184     if request.type not in constants.CONFD_REQS:
185       raise errors.ConfdClientError("Invalid request type")
186
187     random.shuffle(self._peers)
188     targets = self._peers[:coverage]
189
190     now = time.time()
191     payload = self._PackRequest(request, now=now)
192
193     for target in targets:
194       try:
195         self._socket.enqueue_send(target, self._confd_port, payload)
196       except errors.UdpDataSizeError:
197         raise errors.ConfdClientError("Request too big")
198
199     self._requests[request.rsalt] = (request, args)
200     expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
201     self._expire_requests.append((expire_time, request.rsalt))
202
203   def HandleResponse(self, payload, ip, port):
204     """Asynchronous handler for a confd reply
205
206     Call the relevant callback associated to the current request.
207
208     """
209     try:
210       try:
211         answer, salt = self._UnpackReply(payload)
212       except (errors.SignatureError, errors.ConfdMagicError), err:
213         if self._logger:
214           self._logger.debug("Discarding broken package: %s" % err)
215         return
216
217       try:
218         (request, args) = self._requests[salt]
219       except KeyError:
220         if self._logger:
221           self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
222         return
223
224       client_reply = ConfdUpcallPayload(salt=salt,
225                                         type=UPCALL_REPLY,
226                                         server_reply=answer,
227                                         orig_request=request,
228                                         server_ip=ip,
229                                         server_port=port,
230                                         extra_args=args)
231       self._callback(client_reply)
232
233     finally:
234       self.ExpireRequests()
235
236
237 # UPCALL_REPLY: server reply upcall
238 # has all ConfdUpcallPayload fields populated
239 UPCALL_REPLY = 1
240 # UPCALL_EXPIRE: internal library request expire
241 # has only salt, type, orig_request and extra_args
242 UPCALL_EXPIRE = 2
243 CONFD_UPCALL_TYPES = frozenset([
244   UPCALL_REPLY,
245   UPCALL_EXPIRE,
246   ])
247
248
249 class ConfdUpcallPayload(objects.ConfigObject):
250   """Callback argument for confd replies
251
252   @type salt: string
253   @ivar salt: salt associated with the query
254   @type type: one of confd.client.CONFD_UPCALL_TYPES
255   @ivar type: upcall type (server reply, expired request, ...)
256   @type orig_request: L{objects.ConfdRequest}
257   @ivar orig_request: original request
258   @type server_reply: L{objects.ConfdReply}
259   @ivar server_reply: server reply
260   @type server_ip: string
261   @ivar server_ip: answering server ip address
262   @type server_port: int
263   @ivar server_port: answering server port
264   @type extra_args: any
265   @ivar extra_args: 'args' argument of the SendRequest function
266
267   """
268   __slots__ = [
269     "salt",
270     "type",
271     "orig_request",
272     "server_reply",
273     "server_ip",
274     "server_port",
275     "extra_args",
276     ]
277
278
279 class ConfdClientRequest(objects.ConfdRequest):
280   """This is the client-side version of ConfdRequest.
281
282   This version of the class helps creating requests, on the client side, by
283   filling in some default values.
284
285   """
286   def __init__(self, **kwargs):
287     objects.ConfdRequest.__init__(self, **kwargs)
288     if not self.rsalt:
289       self.rsalt = utils.NewUUID()
290     if not self.protocol:
291       self.protocol = constants.CONFD_PROTOCOL_VERSION
292     if self.type not in constants.CONFD_REQS:
293       raise errors.ConfdClientError("Invalid request type")
294
295
296 class ConfdFilterCallback:
297   """Callback that calls another callback, but filters duplicate results.
298
299   """
300   def __init__(self, callback, logger=None):
301     """Constructor for ConfdFilterCallback
302
303     @type callback: f(L{ConfdUpcallPayload})
304     @param callback: function to call when getting answers
305     @type logger: L{logging.Logger}
306     @keyword logger: optional logger for internal conditions
307
308     """
309     if not callable(callback):
310       raise errors.ProgrammerError("callback must be callable")
311
312     self._callback = callback
313     self._logger = logger
314     # answers contains a dict of salt -> answer
315     self._answers = {}
316
317   def _LogFilter(self, salt, new_reply, old_reply):
318     if not self._logger:
319       return
320
321     if new_reply.serial > old_reply.serial:
322       self._logger.debug("Filtering confirming answer, with newer"
323                          " serial for query %s" % salt)
324     elif new_reply.serial == old_reply.serial:
325       if new_reply.answer != old_reply.answer:
326         self._logger.warning("Got incoherent answers for query %s"
327                              " (serial: %s)" % (salt, new_reply.serial))
328       else:
329         self._logger.debug("Filtering confirming answer, with same"
330                            " serial for query %s" % salt)
331     else:
332       self._logger.debug("Filtering outdated answer for query %s"
333                          " serial: (%d < %d)" % (salt, old_reply.serial,
334                                                  new_reply.serial))
335
336   def _HandleExpire(self, up):
337     # if we have no answer we have received none, before the expiration.
338     if up.salt in self._answers:
339       del self._answers[up.salt]
340
341   def _HandleReply(self, up):
342     """Handle a single confd reply, and decide whether to filter it.
343
344     @rtype: boolean
345     @return: True if the reply should be filtered, False if it should be passed
346              on to the up-callback
347
348     """
349     filter_upcall = False
350     salt = up.salt
351     if salt not in self._answers:
352       # first answer for a query (don't filter, and record)
353       self._answers[salt] = up.server_reply
354     elif up.server_reply.serial > self._answers[salt].serial:
355       # newer answer (record, and compare contents)
356       old_answer = self._answers[salt]
357       self._answers[salt] = up.server_reply
358       if up.server_reply.answer == old_answer.answer:
359         # same content (filter) (version upgrade was unrelated)
360         filter_upcall = True
361         self._LogFilter(salt, up.server_reply, old_answer)
362       # else: different content, pass up a second answer
363     else:
364       # older or same-version answer (duplicate or outdated, filter)
365       filter_upcall = True
366       self._LogFilter(salt, up.server_reply, self._answers[salt])
367
368     return filter_upcall
369
370   def __call__(self, up):
371     """Filtering callback
372
373     @type up: L{ConfdUpcallPayload}
374     @param up: upper callback
375
376     """
377     filter_upcall = False
378     if up.type == UPCALL_REPLY:
379       filter_upcall = self._HandleReply(up)
380     elif up.type == UPCALL_EXPIRE:
381       self._HandleExpire(up)
382
383     if not filter_upcall:
384       self._callback(up)
385