4 # Copyright (C) 2009 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti confd client
29 from ganeti import utils
30 from ganeti import constants
31 from ganeti import objects
32 from ganeti import serializer
33 from ganeti import daemon # contains AsyncUDPSocket
34 from ganeti import errors
35 from ganeti import confd
38 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
39 """Confd udp asyncore client
41 This is kept separate from the main ConfdClient to make sure it's easy to
42 implement a non-asyncore based client library.
45 def __init__(self, client):
46 """Constructor for ConfdAsyncUDPClient
48 @type client: L{ConfdClient}
49 @param client: client library, to pass the datagrams to
52 daemon.AsyncUDPSocket.__init__(self)
55 # this method is overriding a daemon.AsyncUDPSocket method
56 def handle_datagram(self, payload, ip, port):
57 self.client.HandleResponse(payload, ip, port)
61 """Send queries to confd, and get back answers.
63 Since the confd model works by querying multiple master candidates, and
64 getting back answers, this is an asynchronous library. It can either work
65 through asyncore or with your own handling.
68 def __init__(self, hmac_key, peers):
69 """Constructor for ConfdClient
71 @type hmac_key: string
72 @param hmac_key: hmac key to talk to confd
74 @param peers: list of peer nodes
77 if not isinstance(peers, list):
78 raise errors.ProgrammerError("peers must be a list")
81 self._hmac_key = hmac_key
82 self._socket = ConfdAsyncUDPClient(self)
84 self._expire_callbacks = []
85 self._confd_port = utils.GetDaemonPort(constants.CONFD)
87 def _PackRequest(self, request, now=None):
88 """Prepare a request to be sent on the wire.
90 This function puts a proper salt in a confd request, puts the proper salt,
91 and adds the correct magic number.
97 req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
98 return confd.PackMagic(req)
100 def _UnpackReply(self, payload):
101 in_payload = confd.UnpackMagic(payload)
102 (answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
105 def _ExpireCallbacks(self):
106 """Delete all the expired callbacks.
110 while self._expire_callbacks:
111 expire_time, rsalt = self._expire_callbacks[0]
112 if now >= expire_time:
113 self._expire_callbacks.pop()
114 del self._callbacks[rsalt]
118 def SendRequest(self, request, callback, args, coverage=None):
119 """Send a confd request to some MCs
121 @type request: L{objects.ConfdRequest}
122 @param request: the request to send
123 @type callback: f(answer, req_type, req_query, salt, ip, port, args)
124 @param callback: answer callback
126 @param args: additional callback arguments
127 @type coverage: integer
128 @keyword coverage: number of remote nodes to contact
132 coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
134 if not callable(callback):
135 raise errors.ConfdClientError("callback must be callable")
137 if coverage > len(self._peers):
138 raise errors.ConfdClientError("Not enough MCs known to provide the"
141 if not request.rsalt:
142 raise errors.ConfdClientError("Missing request rsalt")
144 self._ExpireCallbacks()
145 if request.rsalt in self._callbacks:
146 raise errors.ConfdClientError("Duplicate request rsalt")
148 if request.type not in constants.CONFD_REQS:
149 raise errors.ConfdClientError("Invalid request type")
151 random.shuffle(self._peers)
152 targets = self._peers[:coverage]
155 payload = self._PackRequest(request, now=now)
157 for target in targets:
159 self._socket.enqueue_send(target, self._confd_port, payload)
160 except errors.UdpDataSizeError:
161 raise errors.ConfdClientError("Request too big")
163 self._callbacks[request.rsalt] = (callback, request.type,
165 expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
166 self._expire_callbacks.append((expire_time, request.rsalt))
168 def HandleResponse(self, payload, ip, port):
169 """Asynchronous handler for a confd reply
171 Call the relevant callback associated to the current request.
176 answer, salt = self._UnpackReply(payload)
177 except (errors.SignatureError, errors.ConfdMagicError):
181 (callback, type, query, args) = self._callbacks[salt]
183 # If the salt is unkown the answer is probably a replay of an old
184 # expired query. Ignoring it.
187 callback(answer, type, query, salt, ip, port, args)
190 self._ExpireCallbacks()
193 class ConfdClientRequest(objects.ConfdRequest):
194 """This is the client-side version of ConfdRequest.
196 This version of the class helps creating requests, on the client side, by
197 filling in some default values.
200 def __init__(self, **kwargs):
201 objects.ConfdRequest.__init__(self, **kwargs)
203 self.rsalt = utils.NewUUID()
204 if not self.protocol:
205 self.protocol = constants.CONFD_PROTOCOL_VERSION
206 if self.type not in constants.CONFD_REQS:
207 raise errors.ConfdClientError("Invalid request type")