4 # Copyright (C) 2009 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti confd client
24 Clients can use the confd client library to send requests to a group of master
25 candidates running confd. The expected usage is through the asyncore framework,
26 by sending queries, and asynchronously receiving replies through a callback.
28 This way the client library doesn't ever need to "wait" on a particular answer,
29 and can proceed even if some udp packets are lost. It's up to the user to
30 reschedule queries if they haven't received responses and they need them.
33 client = ConfdClient(...) # includes callback specification
34 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
35 client.SendRequest(req)
36 # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38 # And your callback will be called by asyncore, when your query gets a
39 # response, or when it expires.
46 from ganeti import utils
47 from ganeti import constants
48 from ganeti import objects
49 from ganeti import serializer
50 from ganeti import daemon # contains AsyncUDPSocket
51 from ganeti import errors
52 from ganeti import confd
55 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
56 """Confd udp asyncore client
58 This is kept separate from the main ConfdClient to make sure it's easy to
59 implement a non-asyncore based client library.
62 def __init__(self, client):
63 """Constructor for ConfdAsyncUDPClient
65 @type client: L{ConfdClient}
66 @param client: client library, to pass the datagrams to
69 daemon.AsyncUDPSocket.__init__(self)
72 # this method is overriding a daemon.AsyncUDPSocket method
73 def handle_datagram(self, payload, ip, port):
74 self.client.HandleResponse(payload, ip, port)
78 """Send queries to confd, and get back answers.
80 Since the confd model works by querying multiple master candidates, and
81 getting back answers, this is an asynchronous library. It can either work
82 through asyncore or with your own handling.
85 def __init__(self, hmac_key, peers, callback, port=None, logger=None):
86 """Constructor for ConfdClient
88 @type hmac_key: string
89 @param hmac_key: hmac key to talk to confd
91 @param peers: list of peer nodes
92 @type callback: f(L{ConfdUpcallPayload})
93 @param callback: function to call when getting answers
95 @keyword port: confd port (default: use GetDaemonPort)
96 @type logger: L{logging.Logger}
97 @keyword logger: optional logger for internal conditions
100 if not isinstance(peers, list):
101 raise errors.ProgrammerError("peers must be a list")
102 if not callable(callback):
103 raise errors.ProgrammerError("callback must be callable")
106 self._hmac_key = hmac_key
107 self._socket = ConfdAsyncUDPClient(self)
108 self._callback = callback
109 self._confd_port = port
110 self._logger = logger
112 self._expire_requests = []
114 if self._confd_port is None:
115 self._confd_port = utils.GetDaemonPort(constants.CONFD)
117 def _PackRequest(self, request, now=None):
118 """Prepare a request to be sent on the wire.
120 This function puts a proper salt in a confd request, puts the proper salt,
121 and adds the correct magic number.
127 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
128 return confd.PackMagic(req)
130 def _UnpackReply(self, payload):
131 in_payload = confd.UnpackMagic(payload)
132 (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
133 answer = objects.ConfdReply.FromDict(dict_answer)
136 def ExpireRequests(self):
137 """Delete all the expired requests.
141 while self._expire_requests:
142 expire_time, rsalt = self._expire_requests[0]
143 if now >= expire_time:
144 self._expire_requests.pop(0)
145 (request, args) = self._requests[rsalt]
146 del self._requests[rsalt]
147 client_reply = ConfdUpcallPayload(salt=rsalt,
149 orig_request=request,
151 self._callback(client_reply)
155 def SendRequest(self, request, args=None, coverage=None):
156 """Send a confd request to some MCs
158 @type request: L{objects.ConfdRequest}
159 @param request: the request to send
161 @keyword args: additional callback arguments
162 @type coverage: integer
163 @keyword coverage: number of remote nodes to contact
167 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
169 if coverage > len(self._peers):
170 raise errors.ConfdClientError("Not enough MCs known to provide the"
173 if not request.rsalt:
174 raise errors.ConfdClientError("Missing request rsalt")
176 self.ExpireRequests()
177 if request.rsalt in self._requests:
178 raise errors.ConfdClientError("Duplicate request rsalt")
180 if request.type not in constants.CONFD_REQS:
181 raise errors.ConfdClientError("Invalid request type")
183 random.shuffle(self._peers)
184 targets = self._peers[:coverage]
187 payload = self._PackRequest(request, now=now)
189 for target in targets:
191 self._socket.enqueue_send(target, self._confd_port, payload)
192 except errors.UdpDataSizeError:
193 raise errors.ConfdClientError("Request too big")
195 self._requests[request.rsalt] = (request, args)
196 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
197 self._expire_requests.append((expire_time, request.rsalt))
199 def HandleResponse(self, payload, ip, port):
200 """Asynchronous handler for a confd reply
202 Call the relevant callback associated to the current request.
207 answer, salt = self._UnpackReply(payload)
208 except (errors.SignatureError, errors.ConfdMagicError), err:
210 self._logger.debug("Discarding broken package: %s" % err)
214 (request, args) = self._requests[salt]
217 self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
220 client_reply = ConfdUpcallPayload(salt=salt,
223 orig_request=request,
227 self._callback(client_reply)
230 self.ExpireRequests()
233 # UPCALL_REPLY: server reply upcall
234 # has all ConfdUpcallPayload fields populated
236 # UPCALL_EXPIRE: internal library request expire
237 # has only salt, type, orig_request and extra_args
239 CONFD_UPCALL_TYPES = frozenset([
245 class ConfdUpcallPayload(objects.ConfigObject):
246 """Callback argument for confd replies
249 @ivar salt: salt associated with the query
250 @type type: one of confd.client.CONFD_UPCALL_TYPES
251 @ivar type: upcall type (server reply, expired request, ...)
252 @type orig_request: L{objects.ConfdRequest}
253 @ivar orig_request: original request
254 @type server_reply: L{objects.ConfdReply}
255 @ivar server_reply: server reply
256 @type server_ip: string
257 @ivar server_ip: answering server ip address
258 @type server_port: int
259 @ivar server_port: answering server port
260 @type extra_args: any
261 @ivar extra_args: 'args' argument of the SendRequest function
275 class ConfdClientRequest(objects.ConfdRequest):
276 """This is the client-side version of ConfdRequest.
278 This version of the class helps creating requests, on the client side, by
279 filling in some default values.
282 def __init__(self, **kwargs):
283 objects.ConfdRequest.__init__(self, **kwargs)
285 self.rsalt = utils.NewUUID()
286 if not self.protocol:
287 self.protocol = constants.CONFD_PROTOCOL_VERSION
288 if self.type not in constants.CONFD_REQS:
289 raise errors.ConfdClientError("Invalid request type")