4 # Copyright (C) 2009 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti confd client
29 from ganeti import utils
30 from ganeti import constants
31 from ganeti import objects
32 from ganeti import serializer
33 from ganeti import daemon # contains AsyncUDPSocket
34 from ganeti import errors
35 from ganeti import confd
38 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
39 """Confd udp asyncore client
41 This is kept separate from the main ConfdClient to make sure it's easy to
42 implement a non-asyncore based client library.
45 def __init__(self, client):
46 """Constructor for ConfdAsyncUDPClient
48 @type client: L{ConfdClient}
49 @param client: client library, to pass the datagrams to
52 daemon.AsyncUDPSocket.__init__(self)
55 # this method is overriding a daemon.AsyncUDPSocket method
56 def handle_datagram(self, payload, ip, port):
57 self.client.HandleResponse(payload, ip, port)
61 """Send queries to confd, and get back answers.
63 Since the confd model works by querying multiple master candidates, and
64 getting back answers, this is an asynchronous library. It can either work
65 through asyncore or with your own handling.
68 def __init__(self, hmac_key, peers, callback):
69 """Constructor for ConfdClient
71 @type hmac_key: string
72 @param hmac_key: hmac key to talk to confd
74 @param peers: list of peer nodes
75 @type callback: f(L{ConfdUpcallPayload})
76 @param callback: function to call when getting answers
79 if not isinstance(peers, list):
80 raise errors.ProgrammerError("peers must be a list")
81 if not callable(callback):
82 raise errors.ProgrammerError("callback must be callable")
85 self._hmac_key = hmac_key
86 self._socket = ConfdAsyncUDPClient(self)
87 self._callback = callback
89 self._expire_requests = []
90 self._confd_port = utils.GetDaemonPort(constants.CONFD)
92 def _PackRequest(self, request, now=None):
93 """Prepare a request to be sent on the wire.
95 This function puts a proper salt in a confd request, puts the proper salt,
96 and adds the correct magic number.
102 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
103 return confd.PackMagic(req)
105 def _UnpackReply(self, payload):
106 in_payload = confd.UnpackMagic(payload)
107 (answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
110 def ExpireRequests(self):
111 """Delete all the expired requests.
115 while self._expire_requests:
116 expire_time, rsalt = self._expire_requests[0]
117 if now >= expire_time:
118 self._expire_requests.pop(0)
119 (request, args) = self._requests[rsalt]
120 del self._requests[rsalt]
121 client_reply = ConfdUpcallPayload(salt=rsalt,
123 orig_request=request,
125 self._callback(client_reply)
129 def SendRequest(self, request, args=None, coverage=None):
130 """Send a confd request to some MCs
132 @type request: L{objects.ConfdRequest}
133 @param request: the request to send
135 @keyword args: additional callback arguments
136 @type coverage: integer
137 @keyword coverage: number of remote nodes to contact
141 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
143 if coverage > len(self._peers):
144 raise errors.ConfdClientError("Not enough MCs known to provide the"
147 if not request.rsalt:
148 raise errors.ConfdClientError("Missing request rsalt")
150 self.ExpireRequests()
151 if request.rsalt in self._requests:
152 raise errors.ConfdClientError("Duplicate request rsalt")
154 if request.type not in constants.CONFD_REQS:
155 raise errors.ConfdClientError("Invalid request type")
157 random.shuffle(self._peers)
158 targets = self._peers[:coverage]
161 payload = self._PackRequest(request, now=now)
163 for target in targets:
165 self._socket.enqueue_send(target, self._confd_port, payload)
166 except errors.UdpDataSizeError:
167 raise errors.ConfdClientError("Request too big")
169 self._requests[request.rsalt] = (request, args)
170 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
171 self._expire_requests.append((expire_time, request.rsalt))
173 def HandleResponse(self, payload, ip, port):
174 """Asynchronous handler for a confd reply
176 Call the relevant callback associated to the current request.
181 answer, salt = self._UnpackReply(payload)
182 except (errors.SignatureError, errors.ConfdMagicError):
186 (request, args) = self._requests[salt]
188 # If the salt is unkown the answer is probably a replay of an old
189 # expired query. Ignoring it.
192 client_reply = ConfdUpcallPayload(salt=salt,
195 orig_request=request,
199 self._callback(client_reply)
202 self.ExpireRequests()
205 # UPCALL_REPLY: server reply upcall
206 # has all ConfdUpcallPayload fields populated
208 # UPCALL_EXPIRE: internal library request expire
209 # has only salt, type, orig_request and extra_args
211 CONFD_UPCALL_TYPES = frozenset([
217 class ConfdUpcallPayload(objects.ConfigObject):
218 """Callback argument for confd replies
221 @ivar salt: salt associated with the query
222 @type type: one of confd.client.CONFD_UPCALL_TYPES
223 @ivar type: upcall type (server reply, expired request, ...)
224 @type orig_request: L{objects.ConfdRequest}
225 @ivar orig_request: original request
226 @type server_reply: L{objects.ConfdReply}
227 @ivar server_reply: server reply
228 @type server_ip: string
229 @ivar server_ip: answering server ip address
230 @type server_port: int
231 @ivar server_port: answering server port
232 @type extra_args: any
233 @ivar extra_args: 'args' argument of the SendRequest function
247 class ConfdClientRequest(objects.ConfdRequest):
248 """This is the client-side version of ConfdRequest.
250 This version of the class helps creating requests, on the client side, by
251 filling in some default values.
254 def __init__(self, **kwargs):
255 objects.ConfdRequest.__init__(self, **kwargs)
257 self.rsalt = utils.NewUUID()
258 if not self.protocol:
259 self.protocol = constants.CONFD_PROTOCOL_VERSION
260 if self.type not in constants.CONFD_REQS:
261 raise errors.ConfdClientError("Invalid request type")