Confd client: Change callback model
[ganeti-local] / lib / confd / client.py
1 #
2 #
3
4 # Copyright (C) 2009 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti confd client
23
24 """
25 import socket
26 import time
27 import random
28
29 from ganeti import utils
30 from ganeti import constants
31 from ganeti import objects
32 from ganeti import serializer
33 from ganeti import daemon # contains AsyncUDPSocket
34 from ganeti import errors
35 from ganeti import confd
36
37
38 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
39   """Confd udp asyncore client
40
41   This is kept separate from the main ConfdClient to make sure it's easy to
42   implement a non-asyncore based client library.
43
44   """
45   def __init__(self, client):
46     """Constructor for ConfdAsyncUDPClient
47
48     @type client: L{ConfdClient}
49     @param client: client library, to pass the datagrams to
50
51     """
52     daemon.AsyncUDPSocket.__init__(self)
53     self.client = client
54
55   # this method is overriding a daemon.AsyncUDPSocket method
56   def handle_datagram(self, payload, ip, port):
57     self.client.HandleResponse(payload, ip, port)
58
59
60 class ConfdClient:
61   """Send queries to confd, and get back answers.
62
63   Since the confd model works by querying multiple master candidates, and
64   getting back answers, this is an asynchronous library. It can either work
65   through asyncore or with your own handling.
66
67   """
68   def __init__(self, hmac_key, peers, callback):
69     """Constructor for ConfdClient
70
71     @type hmac_key: string
72     @param hmac_key: hmac key to talk to confd
73     @type peers: list
74     @param peers: list of peer nodes
75     @type callback: f(L{ConfdUpcallPayload})
76     @param callback: function to call when getting answers
77
78     """
79     if not isinstance(peers, list):
80       raise errors.ProgrammerError("peers must be a list")
81     if not callable(callback):
82       raise errors.ProgrammerError("callback must be callable")
83
84     self._peers = peers
85     self._hmac_key = hmac_key
86     self._socket = ConfdAsyncUDPClient(self)
87     self._callback = callback
88     self._requests = {}
89     self._expire_requests = []
90     self._confd_port = utils.GetDaemonPort(constants.CONFD)
91
92   def _PackRequest(self, request, now=None):
93     """Prepare a request to be sent on the wire.
94
95     This function puts a proper salt in a confd request, puts the proper salt,
96     and adds the correct magic number.
97
98     """
99     if now is None:
100       now = time.time()
101     tstamp = '%d' % now
102     req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
103     return confd.PackMagic(req)
104
105   def _UnpackReply(self, payload):
106     in_payload = confd.UnpackMagic(payload)
107     (answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
108     return answer, salt
109
110   def ExpireRequests(self):
111     """Delete all the expired requests.
112
113     """
114     now = time.time()
115     while self._expire_requests:
116       expire_time, rsalt = self._expire_requests[0]
117       if now >= expire_time:
118         self._expire_requests.pop(0)
119         (request, args) = self._requests[rsalt]
120         del self._requests[rsalt]
121         client_reply = ConfdUpcallPayload(salt=rsalt,
122                                           type=UPCALL_EXPIRE,
123                                           orig_request=request,
124                                           extra_args=args)
125         self._callback(client_reply)
126       else:
127         break
128
129   def SendRequest(self, request, args=None, coverage=None):
130     """Send a confd request to some MCs
131
132     @type request: L{objects.ConfdRequest}
133     @param request: the request to send
134     @type args: tuple
135     @keyword args: additional callback arguments
136     @type coverage: integer
137     @keyword coverage: number of remote nodes to contact
138
139     """
140     if coverage is None:
141       coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
142
143     if coverage > len(self._peers):
144       raise errors.ConfdClientError("Not enough MCs known to provide the"
145                                     " desired coverage")
146
147     if not request.rsalt:
148       raise errors.ConfdClientError("Missing request rsalt")
149
150     self.ExpireRequests()
151     if request.rsalt in self._requests:
152       raise errors.ConfdClientError("Duplicate request rsalt")
153
154     if request.type not in constants.CONFD_REQS:
155       raise errors.ConfdClientError("Invalid request type")
156
157     random.shuffle(self._peers)
158     targets = self._peers[:coverage]
159
160     now = time.time()
161     payload = self._PackRequest(request, now=now)
162
163     for target in targets:
164       try:
165         self._socket.enqueue_send(target, self._confd_port, payload)
166       except errors.UdpDataSizeError:
167         raise errors.ConfdClientError("Request too big")
168
169     self._requests[request.rsalt] = (request, args)
170     expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
171     self._expire_requests.append((expire_time, request.rsalt))
172
173   def HandleResponse(self, payload, ip, port):
174     """Asynchronous handler for a confd reply
175
176     Call the relevant callback associated to the current request.
177
178     """
179     try:
180       try:
181         answer, salt = self._UnpackReply(payload)
182       except (errors.SignatureError, errors.ConfdMagicError):
183         return
184
185       try:
186         (request, args) = self._requests[salt]
187       except KeyError:
188         # If the salt is unkown the answer is probably a replay of an old
189         # expired query. Ignoring it.
190         return
191
192       client_reply = ConfdUpcallPayload(salt=salt,
193                                         type=UPCALL_REPLY,
194                                         server_reply=answer,
195                                         orig_request=request,
196                                         server_ip=ip,
197                                         server_port=port,
198                                         extra_args=args)
199       self._callback(client_reply)
200
201     finally:
202       self.ExpireRequests()
203
204
205 # UPCALL_REPLY: server reply upcall
206 # has all ConfdUpcallPayload fields populated
207 UPCALL_REPLY = 1
208 # UPCALL_EXPIRE: internal library request expire
209 # has only salt, type, orig_request and extra_args
210 UPCALL_EXPIRE = 2
211 CONFD_UPCALL_TYPES = frozenset([
212   UPCALL_REPLY,
213   UPCALL_EXPIRE,
214   ])
215
216
217 class ConfdUpcallPayload(objects.ConfigObject):
218   """Callback argument for confd replies
219
220   @type salt: string
221   @ivar salt: salt associated with the query
222   @type type: one of confd.client.CONFD_UPCALL_TYPES
223   @ivar type: upcall type (server reply, expired request, ...)
224   @type orig_request: L{objects.ConfdRequest}
225   @ivar orig_request: original request
226   @type server_reply: L{objects.ConfdReply}
227   @ivar server_reply: server reply
228   @type server_ip: string
229   @ivar server_ip: answering server ip address
230   @type server_port: int
231   @ivar server_port: answering server port
232   @type extra_args: any
233   @ivar extra_args: 'args' argument of the SendRequest function
234
235   """
236   __slots__ = [
237     "salt",
238     "type",
239     "orig_request",
240     "server_reply",
241     "server_ip",
242     "server_port",
243     "extra_args",
244     ]
245
246
247 class ConfdClientRequest(objects.ConfdRequest):
248   """This is the client-side version of ConfdRequest.
249
250   This version of the class helps creating requests, on the client side, by
251   filling in some default values.
252
253   """
254   def __init__(self, **kwargs):
255     objects.ConfdRequest.__init__(self, **kwargs)
256     if not self.rsalt:
257       self.rsalt = utils.NewUUID()
258     if not self.protocol:
259       self.protocol = constants.CONFD_PROTOCOL_VERSION
260     if self.type not in constants.CONFD_REQS:
261       raise errors.ConfdClientError("Invalid request type")
262