Move RunInSeparateProcess to ganeti.utils
[ganeti-local] / lib / confd / client.py
1 #
2 #
3
4 # Copyright (C) 2009 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti confd client
23
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
27
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
31
32 Example usage::
33
34   client = ConfdClient(...) # includes callback specification
35   req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36   client.SendRequest(req)
37   # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38   # ... wait ...
39   # And your callback will be called by asyncore, when your query gets a
40   # response, or when it expires.
41
42 You can use the provided ConfdFilterCallback to act as a filter, only passing
43 "newer" answer to your callback, and filtering out outdated ones, or ones
44 confirming what you already got.
45
46 """
47
48 # pylint: disable-msg=E0203
49
50 # E0203: Access to member %r before its definition, since we use
51 # objects.py which doesn't explicitely initialise its members
52
53 import time
54 import random
55
56 from ganeti import utils
57 from ganeti import constants
58 from ganeti import objects
59 from ganeti import serializer
60 from ganeti import daemon # contains AsyncUDPSocket
61 from ganeti import errors
62 from ganeti import confd
63
64
65 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
66   """Confd udp asyncore client
67
68   This is kept separate from the main ConfdClient to make sure it's easy to
69   implement a non-asyncore based client library.
70
71   """
72   def __init__(self, client):
73     """Constructor for ConfdAsyncUDPClient
74
75     @type client: L{ConfdClient}
76     @param client: client library, to pass the datagrams to
77
78     """
79     daemon.AsyncUDPSocket.__init__(self)
80     self.client = client
81
82   # this method is overriding a daemon.AsyncUDPSocket method
83   def handle_datagram(self, payload, ip, port):
84     self.client.HandleResponse(payload, ip, port)
85
86
87 class ConfdClient:
88   """Send queries to confd, and get back answers.
89
90   Since the confd model works by querying multiple master candidates, and
91   getting back answers, this is an asynchronous library. It can either work
92   through asyncore or with your own handling.
93
94   """
95   def __init__(self, hmac_key, peers, callback, port=None, logger=None):
96     """Constructor for ConfdClient
97
98     @type hmac_key: string
99     @param hmac_key: hmac key to talk to confd
100     @type peers: list
101     @param peers: list of peer nodes
102     @type callback: f(L{ConfdUpcallPayload})
103     @param callback: function to call when getting answers
104     @type port: integer
105     @keyword port: confd port (default: use GetDaemonPort)
106     @type logger: logging.Logger
107     @keyword logger: optional logger for internal conditions
108
109     """
110     if not callable(callback):
111       raise errors.ProgrammerError("callback must be callable")
112
113     self.UpdatePeerList(peers)
114     self._hmac_key = hmac_key
115     self._socket = ConfdAsyncUDPClient(self)
116     self._callback = callback
117     self._confd_port = port
118     self._logger = logger
119     self._requests = {}
120     self._expire_requests = []
121
122     if self._confd_port is None:
123       self._confd_port = utils.GetDaemonPort(constants.CONFD)
124
125   def UpdatePeerList(self, peers):
126     """Update the list of peers
127
128     @type peers: list
129     @param peers: list of peer nodes
130
131     """
132     # we are actually called from init, so:
133     # pylint: disable-msg=W0201
134     if not isinstance(peers, list):
135       raise errors.ProgrammerError("peers must be a list")
136     # make a copy of peers, since we're going to shuffle the list, later
137     self._peers = list(peers)
138
139   def _PackRequest(self, request, now=None):
140     """Prepare a request to be sent on the wire.
141
142     This function puts a proper salt in a confd request, puts the proper salt,
143     and adds the correct magic number.
144
145     """
146     if now is None:
147       now = time.time()
148     tstamp = '%d' % now
149     req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
150     return confd.PackMagic(req)
151
152   def _UnpackReply(self, payload):
153     in_payload = confd.UnpackMagic(payload)
154     (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
155     answer = objects.ConfdReply.FromDict(dict_answer)
156     return answer, salt
157
158   def ExpireRequests(self):
159     """Delete all the expired requests.
160
161     """
162     now = time.time()
163     while self._expire_requests:
164       expire_time, rsalt = self._expire_requests[0]
165       if now >= expire_time:
166         self._expire_requests.pop(0)
167         (request, args) = self._requests[rsalt]
168         del self._requests[rsalt]
169         client_reply = ConfdUpcallPayload(salt=rsalt,
170                                           type=UPCALL_EXPIRE,
171                                           orig_request=request,
172                                           extra_args=args,
173                                           client=self,
174                                           )
175         self._callback(client_reply)
176       else:
177         break
178
179   def SendRequest(self, request, args=None, coverage=None):
180     """Send a confd request to some MCs
181
182     @type request: L{objects.ConfdRequest}
183     @param request: the request to send
184     @type args: tuple
185     @keyword args: additional callback arguments
186     @type coverage: integer
187     @keyword coverage: number of remote nodes to contact
188
189     """
190     if coverage is None:
191       coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
192
193     if coverage > len(self._peers):
194       raise errors.ConfdClientError("Not enough MCs known to provide the"
195                                     " desired coverage")
196
197     if not request.rsalt:
198       raise errors.ConfdClientError("Missing request rsalt")
199
200     self.ExpireRequests()
201     if request.rsalt in self._requests:
202       raise errors.ConfdClientError("Duplicate request rsalt")
203
204     if request.type not in constants.CONFD_REQS:
205       raise errors.ConfdClientError("Invalid request type")
206
207     random.shuffle(self._peers)
208     targets = self._peers[:coverage]
209
210     now = time.time()
211     payload = self._PackRequest(request, now=now)
212
213     for target in targets:
214       try:
215         self._socket.enqueue_send(target, self._confd_port, payload)
216       except errors.UdpDataSizeError:
217         raise errors.ConfdClientError("Request too big")
218
219     self._requests[request.rsalt] = (request, args)
220     expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
221     self._expire_requests.append((expire_time, request.rsalt))
222
223   def HandleResponse(self, payload, ip, port):
224     """Asynchronous handler for a confd reply
225
226     Call the relevant callback associated to the current request.
227
228     """
229     try:
230       try:
231         answer, salt = self._UnpackReply(payload)
232       except (errors.SignatureError, errors.ConfdMagicError), err:
233         if self._logger:
234           self._logger.debug("Discarding broken package: %s" % err)
235         return
236
237       try:
238         (request, args) = self._requests[salt]
239       except KeyError:
240         if self._logger:
241           self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
242         return
243
244       client_reply = ConfdUpcallPayload(salt=salt,
245                                         type=UPCALL_REPLY,
246                                         server_reply=answer,
247                                         orig_request=request,
248                                         server_ip=ip,
249                                         server_port=port,
250                                         extra_args=args,
251                                         client=self,
252                                        )
253       self._callback(client_reply)
254
255     finally:
256       self.ExpireRequests()
257
258
259 # UPCALL_REPLY: server reply upcall
260 # has all ConfdUpcallPayload fields populated
261 UPCALL_REPLY = 1
262 # UPCALL_EXPIRE: internal library request expire
263 # has only salt, type, orig_request and extra_args
264 UPCALL_EXPIRE = 2
265 CONFD_UPCALL_TYPES = frozenset([
266   UPCALL_REPLY,
267   UPCALL_EXPIRE,
268   ])
269
270
271 class ConfdUpcallPayload(objects.ConfigObject):
272   """Callback argument for confd replies
273
274   @type salt: string
275   @ivar salt: salt associated with the query
276   @type type: one of confd.client.CONFD_UPCALL_TYPES
277   @ivar type: upcall type (server reply, expired request, ...)
278   @type orig_request: L{objects.ConfdRequest}
279   @ivar orig_request: original request
280   @type server_reply: L{objects.ConfdReply}
281   @ivar server_reply: server reply
282   @type server_ip: string
283   @ivar server_ip: answering server ip address
284   @type server_port: int
285   @ivar server_port: answering server port
286   @type extra_args: any
287   @ivar extra_args: 'args' argument of the SendRequest function
288   @type client: L{ConfdClient}
289   @ivar client: current confd client instance
290
291   """
292   __slots__ = [
293     "salt",
294     "type",
295     "orig_request",
296     "server_reply",
297     "server_ip",
298     "server_port",
299     "extra_args",
300     "client",
301     ]
302
303
304 class ConfdClientRequest(objects.ConfdRequest):
305   """This is the client-side version of ConfdRequest.
306
307   This version of the class helps creating requests, on the client side, by
308   filling in some default values.
309
310   """
311   def __init__(self, **kwargs):
312     objects.ConfdRequest.__init__(self, **kwargs)
313     if not self.rsalt:
314       self.rsalt = utils.NewUUID()
315     if not self.protocol:
316       self.protocol = constants.CONFD_PROTOCOL_VERSION
317     if self.type not in constants.CONFD_REQS:
318       raise errors.ConfdClientError("Invalid request type")
319
320
321 class ConfdFilterCallback:
322   """Callback that calls another callback, but filters duplicate results.
323
324   """
325   def __init__(self, callback, logger=None):
326     """Constructor for ConfdFilterCallback
327
328     @type callback: f(L{ConfdUpcallPayload})
329     @param callback: function to call when getting answers
330     @type logger: logging.Logger
331     @keyword logger: optional logger for internal conditions
332
333     """
334     if not callable(callback):
335       raise errors.ProgrammerError("callback must be callable")
336
337     self._callback = callback
338     self._logger = logger
339     # answers contains a dict of salt -> answer
340     self._answers = {}
341
342   def _LogFilter(self, salt, new_reply, old_reply):
343     if not self._logger:
344       return
345
346     if new_reply.serial > old_reply.serial:
347       self._logger.debug("Filtering confirming answer, with newer"
348                          " serial for query %s" % salt)
349     elif new_reply.serial == old_reply.serial:
350       if new_reply.answer != old_reply.answer:
351         self._logger.warning("Got incoherent answers for query %s"
352                              " (serial: %s)" % (salt, new_reply.serial))
353       else:
354         self._logger.debug("Filtering confirming answer, with same"
355                            " serial for query %s" % salt)
356     else:
357       self._logger.debug("Filtering outdated answer for query %s"
358                          " serial: (%d < %d)" % (salt, old_reply.serial,
359                                                  new_reply.serial))
360
361   def _HandleExpire(self, up):
362     # if we have no answer we have received none, before the expiration.
363     if up.salt in self._answers:
364       del self._answers[up.salt]
365
366   def _HandleReply(self, up):
367     """Handle a single confd reply, and decide whether to filter it.
368
369     @rtype: boolean
370     @return: True if the reply should be filtered, False if it should be passed
371              on to the up-callback
372
373     """
374     filter_upcall = False
375     salt = up.salt
376     if salt not in self._answers:
377       # first answer for a query (don't filter, and record)
378       self._answers[salt] = up.server_reply
379     elif up.server_reply.serial > self._answers[salt].serial:
380       # newer answer (record, and compare contents)
381       old_answer = self._answers[salt]
382       self._answers[salt] = up.server_reply
383       if up.server_reply.answer == old_answer.answer:
384         # same content (filter) (version upgrade was unrelated)
385         filter_upcall = True
386         self._LogFilter(salt, up.server_reply, old_answer)
387       # else: different content, pass up a second answer
388     else:
389       # older or same-version answer (duplicate or outdated, filter)
390       filter_upcall = True
391       self._LogFilter(salt, up.server_reply, self._answers[salt])
392
393     return filter_upcall
394
395   def __call__(self, up):
396     """Filtering callback
397
398     @type up: L{ConfdUpcallPayload}
399     @param up: upper callback
400
401     """
402     filter_upcall = False
403     if up.type == UPCALL_REPLY:
404       filter_upcall = self._HandleReply(up)
405     elif up.type == UPCALL_EXPIRE:
406       self._HandleExpire(up)
407
408     if not filter_upcall:
409       self._callback(up)