4 # Copyright (C) 2009 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti confd client
29 from ganeti import utils
30 from ganeti import constants
31 from ganeti import objects
32 from ganeti import serializer
33 from ganeti import daemon # contains AsyncUDPSocket
34 from ganeti import errors
35 from ganeti import confd
38 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
39 """Confd udp asyncore client
41 This is kept separate from the main ConfdClient to make sure it's easy to
42 implement a non-asyncore based client library.
45 def __init__(self, client):
46 """Constructor for ConfdAsyncUDPClient
48 @type client: L{ConfdClient}
49 @param client: client library, to pass the datagrams to
52 daemon.AsyncUDPSocket.__init__(self)
55 # this method is overriding a daemon.AsyncUDPSocket method
56 def handle_datagram(self, payload, ip, port):
57 self.client.HandleResponse(payload, ip, port)
61 """Send queries to confd, and get back answers.
63 Since the confd model works by querying multiple master candidates, and
64 getting back answers, this is an asynchronous library. It can either work
65 through asyncore or with your own handling.
68 def __init__(self, hmac_key, peers, callback, port=None, logger=None):
69 """Constructor for ConfdClient
71 @type hmac_key: string
72 @param hmac_key: hmac key to talk to confd
74 @param peers: list of peer nodes
75 @type callback: f(L{ConfdUpcallPayload})
76 @param callback: function to call when getting answers
78 @keyword port: confd port (default: use GetDaemonPort)
79 @type logger: L{logging.Logger}
80 @keyword logger: optional logger for internal conditions
83 if not isinstance(peers, list):
84 raise errors.ProgrammerError("peers must be a list")
85 if not callable(callback):
86 raise errors.ProgrammerError("callback must be callable")
89 self._hmac_key = hmac_key
90 self._socket = ConfdAsyncUDPClient(self)
91 self._callback = callback
92 self._confd_port = port
95 self._expire_requests = []
97 if self._confd_port is None:
98 self._confd_port = utils.GetDaemonPort(constants.CONFD)
100 def _PackRequest(self, request, now=None):
101 """Prepare a request to be sent on the wire.
103 This function puts a proper salt in a confd request, puts the proper salt,
104 and adds the correct magic number.
110 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
111 return confd.PackMagic(req)
113 def _UnpackReply(self, payload):
114 in_payload = confd.UnpackMagic(payload)
115 (answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
118 def ExpireRequests(self):
119 """Delete all the expired requests.
123 while self._expire_requests:
124 expire_time, rsalt = self._expire_requests[0]
125 if now >= expire_time:
126 self._expire_requests.pop(0)
127 (request, args) = self._requests[rsalt]
128 del self._requests[rsalt]
129 client_reply = ConfdUpcallPayload(salt=rsalt,
131 orig_request=request,
133 self._callback(client_reply)
137 def SendRequest(self, request, args=None, coverage=None):
138 """Send a confd request to some MCs
140 @type request: L{objects.ConfdRequest}
141 @param request: the request to send
143 @keyword args: additional callback arguments
144 @type coverage: integer
145 @keyword coverage: number of remote nodes to contact
149 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
151 if coverage > len(self._peers):
152 raise errors.ConfdClientError("Not enough MCs known to provide the"
155 if not request.rsalt:
156 raise errors.ConfdClientError("Missing request rsalt")
158 self.ExpireRequests()
159 if request.rsalt in self._requests:
160 raise errors.ConfdClientError("Duplicate request rsalt")
162 if request.type not in constants.CONFD_REQS:
163 raise errors.ConfdClientError("Invalid request type")
165 random.shuffle(self._peers)
166 targets = self._peers[:coverage]
169 payload = self._PackRequest(request, now=now)
171 for target in targets:
173 self._socket.enqueue_send(target, self._confd_port, payload)
174 except errors.UdpDataSizeError:
175 raise errors.ConfdClientError("Request too big")
177 self._requests[request.rsalt] = (request, args)
178 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
179 self._expire_requests.append((expire_time, request.rsalt))
181 def HandleResponse(self, payload, ip, port):
182 """Asynchronous handler for a confd reply
184 Call the relevant callback associated to the current request.
189 answer, salt = self._UnpackReply(payload)
190 except (errors.SignatureError, errors.ConfdMagicError), err:
192 self._logger.debug("Discarding broken package: %s" % err)
196 (request, args) = self._requests[salt]
199 self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
202 client_reply = ConfdUpcallPayload(salt=salt,
205 orig_request=request,
209 self._callback(client_reply)
212 self.ExpireRequests()
215 # UPCALL_REPLY: server reply upcall
216 # has all ConfdUpcallPayload fields populated
218 # UPCALL_EXPIRE: internal library request expire
219 # has only salt, type, orig_request and extra_args
221 CONFD_UPCALL_TYPES = frozenset([
227 class ConfdUpcallPayload(objects.ConfigObject):
228 """Callback argument for confd replies
231 @ivar salt: salt associated with the query
232 @type type: one of confd.client.CONFD_UPCALL_TYPES
233 @ivar type: upcall type (server reply, expired request, ...)
234 @type orig_request: L{objects.ConfdRequest}
235 @ivar orig_request: original request
236 @type server_reply: L{objects.ConfdReply}
237 @ivar server_reply: server reply
238 @type server_ip: string
239 @ivar server_ip: answering server ip address
240 @type server_port: int
241 @ivar server_port: answering server port
242 @type extra_args: any
243 @ivar extra_args: 'args' argument of the SendRequest function
257 class ConfdClientRequest(objects.ConfdRequest):
258 """This is the client-side version of ConfdRequest.
260 This version of the class helps creating requests, on the client side, by
261 filling in some default values.
264 def __init__(self, **kwargs):
265 objects.ConfdRequest.__init__(self, **kwargs)
267 self.rsalt = utils.NewUUID()
268 if not self.protocol:
269 self.protocol = constants.CONFD_PROTOCOL_VERSION
270 if self.type not in constants.CONFD_REQS:
271 raise errors.ConfdClientError("Invalid request type")