TryOSFromDisk: s/os_scripts/os_files/
[ganeti-local] / lib / confd / client.py
1 #
2 #
3
4 # Copyright (C) 2009 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti confd client
23
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
27
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
31
32 Example usage:
33   client = ConfdClient(...) # includes callback specification
34   req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
35   client.SendRequest(req)
36   # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
37   # ... wait ...
38   # And your callback will be called by asyncore, when your query gets a
39   # response, or when it expires.
40
41 You can use the provided ConfdFilterCallback to act as a filter, only passing
42 "newer" answer to your callback, and filtering out outdated ones, or ones
43 confirming what you already got.
44
45 """
46 import socket
47 import time
48 import random
49
50 from ganeti import utils
51 from ganeti import constants
52 from ganeti import objects
53 from ganeti import serializer
54 from ganeti import daemon # contains AsyncUDPSocket
55 from ganeti import errors
56 from ganeti import confd
57
58
59 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
60   """Confd udp asyncore client
61
62   This is kept separate from the main ConfdClient to make sure it's easy to
63   implement a non-asyncore based client library.
64
65   """
66   def __init__(self, client):
67     """Constructor for ConfdAsyncUDPClient
68
69     @type client: L{ConfdClient}
70     @param client: client library, to pass the datagrams to
71
72     """
73     daemon.AsyncUDPSocket.__init__(self)
74     self.client = client
75
76   # this method is overriding a daemon.AsyncUDPSocket method
77   def handle_datagram(self, payload, ip, port):
78     self.client.HandleResponse(payload, ip, port)
79
80
81 class ConfdClient:
82   """Send queries to confd, and get back answers.
83
84   Since the confd model works by querying multiple master candidates, and
85   getting back answers, this is an asynchronous library. It can either work
86   through asyncore or with your own handling.
87
88   """
89   def __init__(self, hmac_key, peers, callback, port=None, logger=None):
90     """Constructor for ConfdClient
91
92     @type hmac_key: string
93     @param hmac_key: hmac key to talk to confd
94     @type peers: list
95     @param peers: list of peer nodes
96     @type callback: f(L{ConfdUpcallPayload})
97     @param callback: function to call when getting answers
98     @type port: integer
99     @keyword port: confd port (default: use GetDaemonPort)
100     @type logger: L{logging.Logger}
101     @keyword logger: optional logger for internal conditions
102
103     """
104     if not callable(callback):
105       raise errors.ProgrammerError("callback must be callable")
106
107     self.UpdatePeerList(peers)
108     self._hmac_key = hmac_key
109     self._socket = ConfdAsyncUDPClient(self)
110     self._callback = callback
111     self._confd_port = port
112     self._logger = logger
113     self._requests = {}
114     self._expire_requests = []
115
116     if self._confd_port is None:
117       self._confd_port = utils.GetDaemonPort(constants.CONFD)
118
119   def UpdatePeerList(self, peers):
120     """Update the list of peers
121
122     @type peers: list
123     @param peers: list of peer nodes
124
125     """
126     if not isinstance(peers, list):
127       raise errors.ProgrammerError("peers must be a list")
128     self._peers = peers
129
130   def _PackRequest(self, request, now=None):
131     """Prepare a request to be sent on the wire.
132
133     This function puts a proper salt in a confd request, puts the proper salt,
134     and adds the correct magic number.
135
136     """
137     if now is None:
138       now = time.time()
139     tstamp = '%d' % now
140     req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
141     return confd.PackMagic(req)
142
143   def _UnpackReply(self, payload):
144     in_payload = confd.UnpackMagic(payload)
145     (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
146     answer = objects.ConfdReply.FromDict(dict_answer)
147     return answer, salt
148
149   def ExpireRequests(self):
150     """Delete all the expired requests.
151
152     """
153     now = time.time()
154     while self._expire_requests:
155       expire_time, rsalt = self._expire_requests[0]
156       if now >= expire_time:
157         self._expire_requests.pop(0)
158         (request, args) = self._requests[rsalt]
159         del self._requests[rsalt]
160         client_reply = ConfdUpcallPayload(salt=rsalt,
161                                           type=UPCALL_EXPIRE,
162                                           orig_request=request,
163                                           extra_args=args,
164                                           client=self,
165                                           )
166         self._callback(client_reply)
167       else:
168         break
169
170   def SendRequest(self, request, args=None, coverage=None):
171     """Send a confd request to some MCs
172
173     @type request: L{objects.ConfdRequest}
174     @param request: the request to send
175     @type args: tuple
176     @keyword args: additional callback arguments
177     @type coverage: integer
178     @keyword coverage: number of remote nodes to contact
179
180     """
181     if coverage is None:
182       coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
183
184     if coverage > len(self._peers):
185       raise errors.ConfdClientError("Not enough MCs known to provide the"
186                                     " desired coverage")
187
188     if not request.rsalt:
189       raise errors.ConfdClientError("Missing request rsalt")
190
191     self.ExpireRequests()
192     if request.rsalt in self._requests:
193       raise errors.ConfdClientError("Duplicate request rsalt")
194
195     if request.type not in constants.CONFD_REQS:
196       raise errors.ConfdClientError("Invalid request type")
197
198     random.shuffle(self._peers)
199     targets = self._peers[:coverage]
200
201     now = time.time()
202     payload = self._PackRequest(request, now=now)
203
204     for target in targets:
205       try:
206         self._socket.enqueue_send(target, self._confd_port, payload)
207       except errors.UdpDataSizeError:
208         raise errors.ConfdClientError("Request too big")
209
210     self._requests[request.rsalt] = (request, args)
211     expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
212     self._expire_requests.append((expire_time, request.rsalt))
213
214   def HandleResponse(self, payload, ip, port):
215     """Asynchronous handler for a confd reply
216
217     Call the relevant callback associated to the current request.
218
219     """
220     try:
221       try:
222         answer, salt = self._UnpackReply(payload)
223       except (errors.SignatureError, errors.ConfdMagicError), err:
224         if self._logger:
225           self._logger.debug("Discarding broken package: %s" % err)
226         return
227
228       try:
229         (request, args) = self._requests[salt]
230       except KeyError:
231         if self._logger:
232           self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
233         return
234
235       client_reply = ConfdUpcallPayload(salt=salt,
236                                         type=UPCALL_REPLY,
237                                         server_reply=answer,
238                                         orig_request=request,
239                                         server_ip=ip,
240                                         server_port=port,
241                                         extra_args=args,
242                                         client=self,
243                                        )
244       self._callback(client_reply)
245
246     finally:
247       self.ExpireRequests()
248
249
250 # UPCALL_REPLY: server reply upcall
251 # has all ConfdUpcallPayload fields populated
252 UPCALL_REPLY = 1
253 # UPCALL_EXPIRE: internal library request expire
254 # has only salt, type, orig_request and extra_args
255 UPCALL_EXPIRE = 2
256 CONFD_UPCALL_TYPES = frozenset([
257   UPCALL_REPLY,
258   UPCALL_EXPIRE,
259   ])
260
261
262 class ConfdUpcallPayload(objects.ConfigObject):
263   """Callback argument for confd replies
264
265   @type salt: string
266   @ivar salt: salt associated with the query
267   @type type: one of confd.client.CONFD_UPCALL_TYPES
268   @ivar type: upcall type (server reply, expired request, ...)
269   @type orig_request: L{objects.ConfdRequest}
270   @ivar orig_request: original request
271   @type server_reply: L{objects.ConfdReply}
272   @ivar server_reply: server reply
273   @type server_ip: string
274   @ivar server_ip: answering server ip address
275   @type server_port: int
276   @ivar server_port: answering server port
277   @type extra_args: any
278   @ivar extra_args: 'args' argument of the SendRequest function
279   @type client: L{ConfdClient}
280   @ivar client: current confd client instance
281
282   """
283   __slots__ = [
284     "salt",
285     "type",
286     "orig_request",
287     "server_reply",
288     "server_ip",
289     "server_port",
290     "extra_args",
291     "client",
292     ]
293
294
295 class ConfdClientRequest(objects.ConfdRequest):
296   """This is the client-side version of ConfdRequest.
297
298   This version of the class helps creating requests, on the client side, by
299   filling in some default values.
300
301   """
302   def __init__(self, **kwargs):
303     objects.ConfdRequest.__init__(self, **kwargs)
304     if not self.rsalt:
305       self.rsalt = utils.NewUUID()
306     if not self.protocol:
307       self.protocol = constants.CONFD_PROTOCOL_VERSION
308     if self.type not in constants.CONFD_REQS:
309       raise errors.ConfdClientError("Invalid request type")
310
311
312 class ConfdFilterCallback:
313   """Callback that calls another callback, but filters duplicate results.
314
315   """
316   def __init__(self, callback, logger=None):
317     """Constructor for ConfdFilterCallback
318
319     @type callback: f(L{ConfdUpcallPayload})
320     @param callback: function to call when getting answers
321     @type logger: L{logging.Logger}
322     @keyword logger: optional logger for internal conditions
323
324     """
325     if not callable(callback):
326       raise errors.ProgrammerError("callback must be callable")
327
328     self._callback = callback
329     self._logger = logger
330     # answers contains a dict of salt -> answer
331     self._answers = {}
332
333   def _LogFilter(self, salt, new_reply, old_reply):
334     if not self._logger:
335       return
336
337     if new_reply.serial > old_reply.serial:
338       self._logger.debug("Filtering confirming answer, with newer"
339                          " serial for query %s" % salt)
340     elif new_reply.serial == old_reply.serial:
341       if new_reply.answer != old_reply.answer:
342         self._logger.warning("Got incoherent answers for query %s"
343                              " (serial: %s)" % (salt, new_reply.serial))
344       else:
345         self._logger.debug("Filtering confirming answer, with same"
346                            " serial for query %s" % salt)
347     else:
348       self._logger.debug("Filtering outdated answer for query %s"
349                          " serial: (%d < %d)" % (salt, old_reply.serial,
350                                                  new_reply.serial))
351
352   def _HandleExpire(self, up):
353     # if we have no answer we have received none, before the expiration.
354     if up.salt in self._answers:
355       del self._answers[up.salt]
356
357   def _HandleReply(self, up):
358     """Handle a single confd reply, and decide whether to filter it.
359
360     @rtype: boolean
361     @return: True if the reply should be filtered, False if it should be passed
362              on to the up-callback
363
364     """
365     filter_upcall = False
366     salt = up.salt
367     if salt not in self._answers:
368       # first answer for a query (don't filter, and record)
369       self._answers[salt] = up.server_reply
370     elif up.server_reply.serial > self._answers[salt].serial:
371       # newer answer (record, and compare contents)
372       old_answer = self._answers[salt]
373       self._answers[salt] = up.server_reply
374       if up.server_reply.answer == old_answer.answer:
375         # same content (filter) (version upgrade was unrelated)
376         filter_upcall = True
377         self._LogFilter(salt, up.server_reply, old_answer)
378       # else: different content, pass up a second answer
379     else:
380       # older or same-version answer (duplicate or outdated, filter)
381       filter_upcall = True
382       self._LogFilter(salt, up.server_reply, self._answers[salt])
383
384     return filter_upcall
385
386   def __call__(self, up):
387     """Filtering callback
388
389     @type up: L{ConfdUpcallPayload}
390     @param up: upper callback
391
392     """
393     filter_upcall = False
394     if up.type == UPCALL_REPLY:
395       filter_upcall = self._HandleReply(up)
396     elif up.type == UPCALL_EXPIRE:
397       self._HandleExpire(up)
398
399     if not filter_upcall:
400       self._callback(up)
401