Confd client: add module level documentation
[ganeti-local] / lib / confd / client.py
1 #
2 #
3
4 # Copyright (C) 2009 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti confd client
23
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
27
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
31
32 Example usage:
33   client = ConfdClient(...) # includes callback specification
34   req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
35   client.SendRequest(req)
36   # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
37   # ... wait ...
38   # And your callback will be called by asyncore, when your query gets a
39   # response, or when it expires.
40
41 """
42 import socket
43 import time
44 import random
45
46 from ganeti import utils
47 from ganeti import constants
48 from ganeti import objects
49 from ganeti import serializer
50 from ganeti import daemon # contains AsyncUDPSocket
51 from ganeti import errors
52 from ganeti import confd
53
54
55 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
56   """Confd udp asyncore client
57
58   This is kept separate from the main ConfdClient to make sure it's easy to
59   implement a non-asyncore based client library.
60
61   """
62   def __init__(self, client):
63     """Constructor for ConfdAsyncUDPClient
64
65     @type client: L{ConfdClient}
66     @param client: client library, to pass the datagrams to
67
68     """
69     daemon.AsyncUDPSocket.__init__(self)
70     self.client = client
71
72   # this method is overriding a daemon.AsyncUDPSocket method
73   def handle_datagram(self, payload, ip, port):
74     self.client.HandleResponse(payload, ip, port)
75
76
77 class ConfdClient:
78   """Send queries to confd, and get back answers.
79
80   Since the confd model works by querying multiple master candidates, and
81   getting back answers, this is an asynchronous library. It can either work
82   through asyncore or with your own handling.
83
84   """
85   def __init__(self, hmac_key, peers, callback, port=None, logger=None):
86     """Constructor for ConfdClient
87
88     @type hmac_key: string
89     @param hmac_key: hmac key to talk to confd
90     @type peers: list
91     @param peers: list of peer nodes
92     @type callback: f(L{ConfdUpcallPayload})
93     @param callback: function to call when getting answers
94     @type port: integer
95     @keyword port: confd port (default: use GetDaemonPort)
96     @type logger: L{logging.Logger}
97     @keyword logger: optional logger for internal conditions
98
99     """
100     if not isinstance(peers, list):
101       raise errors.ProgrammerError("peers must be a list")
102     if not callable(callback):
103       raise errors.ProgrammerError("callback must be callable")
104
105     self._peers = peers
106     self._hmac_key = hmac_key
107     self._socket = ConfdAsyncUDPClient(self)
108     self._callback = callback
109     self._confd_port = port
110     self._logger = logger
111     self._requests = {}
112     self._expire_requests = []
113
114     if self._confd_port is None:
115       self._confd_port = utils.GetDaemonPort(constants.CONFD)
116
117   def _PackRequest(self, request, now=None):
118     """Prepare a request to be sent on the wire.
119
120     This function puts a proper salt in a confd request, puts the proper salt,
121     and adds the correct magic number.
122
123     """
124     if now is None:
125       now = time.time()
126     tstamp = '%d' % now
127     req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
128     return confd.PackMagic(req)
129
130   def _UnpackReply(self, payload):
131     in_payload = confd.UnpackMagic(payload)
132     (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
133     answer = objects.ConfdReply.FromDict(dict_answer)
134     return answer, salt
135
136   def ExpireRequests(self):
137     """Delete all the expired requests.
138
139     """
140     now = time.time()
141     while self._expire_requests:
142       expire_time, rsalt = self._expire_requests[0]
143       if now >= expire_time:
144         self._expire_requests.pop(0)
145         (request, args) = self._requests[rsalt]
146         del self._requests[rsalt]
147         client_reply = ConfdUpcallPayload(salt=rsalt,
148                                           type=UPCALL_EXPIRE,
149                                           orig_request=request,
150                                           extra_args=args)
151         self._callback(client_reply)
152       else:
153         break
154
155   def SendRequest(self, request, args=None, coverage=None):
156     """Send a confd request to some MCs
157
158     @type request: L{objects.ConfdRequest}
159     @param request: the request to send
160     @type args: tuple
161     @keyword args: additional callback arguments
162     @type coverage: integer
163     @keyword coverage: number of remote nodes to contact
164
165     """
166     if coverage is None:
167       coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
168
169     if coverage > len(self._peers):
170       raise errors.ConfdClientError("Not enough MCs known to provide the"
171                                     " desired coverage")
172
173     if not request.rsalt:
174       raise errors.ConfdClientError("Missing request rsalt")
175
176     self.ExpireRequests()
177     if request.rsalt in self._requests:
178       raise errors.ConfdClientError("Duplicate request rsalt")
179
180     if request.type not in constants.CONFD_REQS:
181       raise errors.ConfdClientError("Invalid request type")
182
183     random.shuffle(self._peers)
184     targets = self._peers[:coverage]
185
186     now = time.time()
187     payload = self._PackRequest(request, now=now)
188
189     for target in targets:
190       try:
191         self._socket.enqueue_send(target, self._confd_port, payload)
192       except errors.UdpDataSizeError:
193         raise errors.ConfdClientError("Request too big")
194
195     self._requests[request.rsalt] = (request, args)
196     expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
197     self._expire_requests.append((expire_time, request.rsalt))
198
199   def HandleResponse(self, payload, ip, port):
200     """Asynchronous handler for a confd reply
201
202     Call the relevant callback associated to the current request.
203
204     """
205     try:
206       try:
207         answer, salt = self._UnpackReply(payload)
208       except (errors.SignatureError, errors.ConfdMagicError), err:
209         if self._logger:
210           self._logger.debug("Discarding broken package: %s" % err)
211         return
212
213       try:
214         (request, args) = self._requests[salt]
215       except KeyError:
216         if self._logger:
217           self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
218         return
219
220       client_reply = ConfdUpcallPayload(salt=salt,
221                                         type=UPCALL_REPLY,
222                                         server_reply=answer,
223                                         orig_request=request,
224                                         server_ip=ip,
225                                         server_port=port,
226                                         extra_args=args)
227       self._callback(client_reply)
228
229     finally:
230       self.ExpireRequests()
231
232
233 # UPCALL_REPLY: server reply upcall
234 # has all ConfdUpcallPayload fields populated
235 UPCALL_REPLY = 1
236 # UPCALL_EXPIRE: internal library request expire
237 # has only salt, type, orig_request and extra_args
238 UPCALL_EXPIRE = 2
239 CONFD_UPCALL_TYPES = frozenset([
240   UPCALL_REPLY,
241   UPCALL_EXPIRE,
242   ])
243
244
245 class ConfdUpcallPayload(objects.ConfigObject):
246   """Callback argument for confd replies
247
248   @type salt: string
249   @ivar salt: salt associated with the query
250   @type type: one of confd.client.CONFD_UPCALL_TYPES
251   @ivar type: upcall type (server reply, expired request, ...)
252   @type orig_request: L{objects.ConfdRequest}
253   @ivar orig_request: original request
254   @type server_reply: L{objects.ConfdReply}
255   @ivar server_reply: server reply
256   @type server_ip: string
257   @ivar server_ip: answering server ip address
258   @type server_port: int
259   @ivar server_port: answering server port
260   @type extra_args: any
261   @ivar extra_args: 'args' argument of the SendRequest function
262
263   """
264   __slots__ = [
265     "salt",
266     "type",
267     "orig_request",
268     "server_reply",
269     "server_ip",
270     "server_port",
271     "extra_args",
272     ]
273
274
275 class ConfdClientRequest(objects.ConfdRequest):
276   """This is the client-side version of ConfdRequest.
277
278   This version of the class helps creating requests, on the client side, by
279   filling in some default values.
280
281   """
282   def __init__(self, **kwargs):
283     objects.ConfdRequest.__init__(self, **kwargs)
284     if not self.rsalt:
285       self.rsalt = utils.NewUUID()
286     if not self.protocol:
287       self.protocol = constants.CONFD_PROTOCOL_VERSION
288     if self.type not in constants.CONFD_REQS:
289       raise errors.ConfdClientError("Invalid request type")
290