Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ b939de46

History | View | Annotate | Download (20.8 kB)

1 e4ccf6cd Guido Trotter
#
2 e4ccf6cd Guido Trotter
#
3 e4ccf6cd Guido Trotter
4 e4ccf6cd Guido Trotter
# Copyright (C) 2009 Google Inc.
5 e4ccf6cd Guido Trotter
#
6 e4ccf6cd Guido Trotter
# This program is free software; you can redistribute it and/or modify
7 e4ccf6cd Guido Trotter
# it under the terms of the GNU General Public License as published by
8 e4ccf6cd Guido Trotter
# the Free Software Foundation; either version 2 of the License, or
9 e4ccf6cd Guido Trotter
# (at your option) any later version.
10 e4ccf6cd Guido Trotter
#
11 e4ccf6cd Guido Trotter
# This program is distributed in the hope that it will be useful, but
12 e4ccf6cd Guido Trotter
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 e4ccf6cd Guido Trotter
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 e4ccf6cd Guido Trotter
# General Public License for more details.
15 e4ccf6cd Guido Trotter
#
16 e4ccf6cd Guido Trotter
# You should have received a copy of the GNU General Public License
17 e4ccf6cd Guido Trotter
# along with this program; if not, write to the Free Software
18 e4ccf6cd Guido Trotter
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 e4ccf6cd Guido Trotter
# 02110-1301, USA.
20 e4ccf6cd Guido Trotter
21 e4ccf6cd Guido Trotter
22 e4ccf6cd Guido Trotter
"""Ganeti confd client
23 e4ccf6cd Guido Trotter

24 cf7b0cc4 Guido Trotter
Clients can use the confd client library to send requests to a group of master
25 cf7b0cc4 Guido Trotter
candidates running confd. The expected usage is through the asyncore framework,
26 cf7b0cc4 Guido Trotter
by sending queries, and asynchronously receiving replies through a callback.
27 cf7b0cc4 Guido Trotter

28 cf7b0cc4 Guido Trotter
This way the client library doesn't ever need to "wait" on a particular answer,
29 cf7b0cc4 Guido Trotter
and can proceed even if some udp packets are lost. It's up to the user to
30 cf7b0cc4 Guido Trotter
reschedule queries if they haven't received responses and they need them.
31 cf7b0cc4 Guido Trotter

32 69b99987 Michael Hanselmann
Example usage::
33 69b99987 Michael Hanselmann

34 cf7b0cc4 Guido Trotter
  client = ConfdClient(...) # includes callback specification
35 cf7b0cc4 Guido Trotter
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 cf7b0cc4 Guido Trotter
  client.SendRequest(req)
37 cf7b0cc4 Guido Trotter
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38 cf7b0cc4 Guido Trotter
  # ... wait ...
39 cf7b0cc4 Guido Trotter
  # And your callback will be called by asyncore, when your query gets a
40 cf7b0cc4 Guido Trotter
  # response, or when it expires.
41 cf7b0cc4 Guido Trotter

42 392ca296 Guido Trotter
You can use the provided ConfdFilterCallback to act as a filter, only passing
43 392ca296 Guido Trotter
"newer" answer to your callback, and filtering out outdated ones, or ones
44 392ca296 Guido Trotter
confirming what you already got.
45 392ca296 Guido Trotter

46 e4ccf6cd Guido Trotter
"""
47 69b99987 Michael Hanselmann
48 6c881c52 Iustin Pop
# pylint: disable-msg=E0203
49 6c881c52 Iustin Pop
50 6c881c52 Iustin Pop
# E0203: Access to member %r before its definition, since we use
51 6c881c52 Iustin Pop
# objects.py which doesn't explicitely initialise its members
52 6c881c52 Iustin Pop
53 e4ccf6cd Guido Trotter
import time
54 e4ccf6cd Guido Trotter
import random
55 e4ccf6cd Guido Trotter
56 e4ccf6cd Guido Trotter
from ganeti import utils
57 e4ccf6cd Guido Trotter
from ganeti import constants
58 e4ccf6cd Guido Trotter
from ganeti import objects
59 e4ccf6cd Guido Trotter
from ganeti import serializer
60 e4ccf6cd Guido Trotter
from ganeti import daemon # contains AsyncUDPSocket
61 e4ccf6cd Guido Trotter
from ganeti import errors
62 e4ccf6cd Guido Trotter
from ganeti import confd
63 5b349fd1 Iustin Pop
from ganeti import ssconf
64 cea881e5 Michael Hanselmann
from ganeti import compat
65 e4ccf6cd Guido Trotter
66 e4ccf6cd Guido Trotter
67 e4ccf6cd Guido Trotter
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
68 e4ccf6cd Guido Trotter
  """Confd udp asyncore client
69 e4ccf6cd Guido Trotter

70 e4ccf6cd Guido Trotter
  This is kept separate from the main ConfdClient to make sure it's easy to
71 e4ccf6cd Guido Trotter
  implement a non-asyncore based client library.
72 e4ccf6cd Guido Trotter

73 e4ccf6cd Guido Trotter
  """
74 e4ccf6cd Guido Trotter
  def __init__(self, client):
75 e4ccf6cd Guido Trotter
    """Constructor for ConfdAsyncUDPClient
76 e4ccf6cd Guido Trotter

77 e4ccf6cd Guido Trotter
    @type client: L{ConfdClient}
78 e4ccf6cd Guido Trotter
    @param client: client library, to pass the datagrams to
79 e4ccf6cd Guido Trotter

80 e4ccf6cd Guido Trotter
    """
81 e4ccf6cd Guido Trotter
    daemon.AsyncUDPSocket.__init__(self)
82 e4ccf6cd Guido Trotter
    self.client = client
83 e4ccf6cd Guido Trotter
84 e4ccf6cd Guido Trotter
  # this method is overriding a daemon.AsyncUDPSocket method
85 e4ccf6cd Guido Trotter
  def handle_datagram(self, payload, ip, port):
86 e4ccf6cd Guido Trotter
    self.client.HandleResponse(payload, ip, port)
87 e4ccf6cd Guido Trotter
88 e4ccf6cd Guido Trotter
89 71e114da Iustin Pop
class _Request(object):
90 71e114da Iustin Pop
  """Request status structure.
91 71e114da Iustin Pop

92 71e114da Iustin Pop
  @ivar request: the request data
93 71e114da Iustin Pop
  @ivar args: any extra arguments for the callback
94 71e114da Iustin Pop
  @ivar expiry: the expiry timestamp of the request
95 bfbbc223 Iustin Pop
  @ivar sent: the set of contacted peers
96 bfbbc223 Iustin Pop
  @ivar rcvd: the set of peers who replied
97 71e114da Iustin Pop

98 71e114da Iustin Pop
  """
99 bfbbc223 Iustin Pop
  def __init__(self, request, args, expiry, sent):
100 71e114da Iustin Pop
    self.request = request
101 71e114da Iustin Pop
    self.args = args
102 71e114da Iustin Pop
    self.expiry = expiry
103 bfbbc223 Iustin Pop
    self.sent = frozenset(sent)
104 bfbbc223 Iustin Pop
    self.rcvd = set()
105 71e114da Iustin Pop
106 71e114da Iustin Pop
107 e4ccf6cd Guido Trotter
class ConfdClient:
108 e4ccf6cd Guido Trotter
  """Send queries to confd, and get back answers.
109 e4ccf6cd Guido Trotter

110 e4ccf6cd Guido Trotter
  Since the confd model works by querying multiple master candidates, and
111 e4ccf6cd Guido Trotter
  getting back answers, this is an asynchronous library. It can either work
112 e4ccf6cd Guido Trotter
  through asyncore or with your own handling.
113 e4ccf6cd Guido Trotter

114 71e114da Iustin Pop
  @type _requests: dict
115 71e114da Iustin Pop
  @ivar _requests: dictionary indexes by salt, which contains data
116 71e114da Iustin Pop
      about the outstanding requests; the values are objects of type
117 71e114da Iustin Pop
      L{_Request}
118 71e114da Iustin Pop

119 e4ccf6cd Guido Trotter
  """
120 a3db74e4 Guido Trotter
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
121 e4ccf6cd Guido Trotter
    """Constructor for ConfdClient
122 e4ccf6cd Guido Trotter

123 e4ccf6cd Guido Trotter
    @type hmac_key: string
124 e4ccf6cd Guido Trotter
    @param hmac_key: hmac key to talk to confd
125 e4ccf6cd Guido Trotter
    @type peers: list
126 e4ccf6cd Guido Trotter
    @param peers: list of peer nodes
127 96e03b0b Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
128 96e03b0b Guido Trotter
    @param callback: function to call when getting answers
129 7d20c647 Guido Trotter
    @type port: integer
130 d63997b3 Guido Trotter
    @param port: confd port (default: use GetDaemonPort)
131 69b99987 Michael Hanselmann
    @type logger: logging.Logger
132 d63997b3 Guido Trotter
    @param logger: optional logger for internal conditions
133 e4ccf6cd Guido Trotter

134 e4ccf6cd Guido Trotter
    """
135 96e03b0b Guido Trotter
    if not callable(callback):
136 96e03b0b Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
137 e4ccf6cd Guido Trotter
138 a5229439 Guido Trotter
    self.UpdatePeerList(peers)
139 e4ccf6cd Guido Trotter
    self._hmac_key = hmac_key
140 e4ccf6cd Guido Trotter
    self._socket = ConfdAsyncUDPClient(self)
141 96e03b0b Guido Trotter
    self._callback = callback
142 7d20c647 Guido Trotter
    self._confd_port = port
143 a3db74e4 Guido Trotter
    self._logger = logger
144 96e03b0b Guido Trotter
    self._requests = {}
145 7d20c647 Guido Trotter
146 7d20c647 Guido Trotter
    if self._confd_port is None:
147 7d20c647 Guido Trotter
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
148 e4ccf6cd Guido Trotter
149 a5229439 Guido Trotter
  def UpdatePeerList(self, peers):
150 a5229439 Guido Trotter
    """Update the list of peers
151 a5229439 Guido Trotter

152 a5229439 Guido Trotter
    @type peers: list
153 a5229439 Guido Trotter
    @param peers: list of peer nodes
154 a5229439 Guido Trotter

155 a5229439 Guido Trotter
    """
156 e11ddf13 Iustin Pop
    # we are actually called from init, so:
157 e11ddf13 Iustin Pop
    # pylint: disable-msg=W0201
158 a5229439 Guido Trotter
    if not isinstance(peers, list):
159 a5229439 Guido Trotter
      raise errors.ProgrammerError("peers must be a list")
160 db169865 Guido Trotter
    # make a copy of peers, since we're going to shuffle the list, later
161 db169865 Guido Trotter
    self._peers = list(peers)
162 a5229439 Guido Trotter
163 e4ccf6cd Guido Trotter
  def _PackRequest(self, request, now=None):
164 e4ccf6cd Guido Trotter
    """Prepare a request to be sent on the wire.
165 e4ccf6cd Guido Trotter

166 e4ccf6cd Guido Trotter
    This function puts a proper salt in a confd request, puts the proper salt,
167 e4ccf6cd Guido Trotter
    and adds the correct magic number.
168 e4ccf6cd Guido Trotter

169 e4ccf6cd Guido Trotter
    """
170 e4ccf6cd Guido Trotter
    if now is None:
171 e4ccf6cd Guido Trotter
      now = time.time()
172 e4ccf6cd Guido Trotter
    tstamp = '%d' % now
173 e4ccf6cd Guido Trotter
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
174 e4ccf6cd Guido Trotter
    return confd.PackMagic(req)
175 e4ccf6cd Guido Trotter
176 e4ccf6cd Guido Trotter
  def _UnpackReply(self, payload):
177 e4ccf6cd Guido Trotter
    in_payload = confd.UnpackMagic(payload)
178 c103d7ae Guido Trotter
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
179 c103d7ae Guido Trotter
    answer = objects.ConfdReply.FromDict(dict_answer)
180 e4ccf6cd Guido Trotter
    return answer, salt
181 e4ccf6cd Guido Trotter
182 96e03b0b Guido Trotter
  def ExpireRequests(self):
183 96e03b0b Guido Trotter
    """Delete all the expired requests.
184 e4ccf6cd Guido Trotter

185 e4ccf6cd Guido Trotter
    """
186 e4ccf6cd Guido Trotter
    now = time.time()
187 71e114da Iustin Pop
    for rsalt, rq in self._requests.items():
188 71e114da Iustin Pop
      if now >= rq.expiry:
189 96e03b0b Guido Trotter
        del self._requests[rsalt]
190 96e03b0b Guido Trotter
        client_reply = ConfdUpcallPayload(salt=rsalt,
191 96e03b0b Guido Trotter
                                          type=UPCALL_EXPIRE,
192 71e114da Iustin Pop
                                          orig_request=rq.request,
193 71e114da Iustin Pop
                                          extra_args=rq.args,
194 5f6f260a Guido Trotter
                                          client=self,
195 5f6f260a Guido Trotter
                                          )
196 96e03b0b Guido Trotter
        self._callback(client_reply)
197 e4ccf6cd Guido Trotter
198 cc6484c4 Iustin Pop
  def SendRequest(self, request, args=None, coverage=0, async=True):
199 e4ccf6cd Guido Trotter
    """Send a confd request to some MCs
200 e4ccf6cd Guido Trotter

201 e4ccf6cd Guido Trotter
    @type request: L{objects.ConfdRequest}
202 e4ccf6cd Guido Trotter
    @param request: the request to send
203 e4ccf6cd Guido Trotter
    @type args: tuple
204 d63997b3 Guido Trotter
    @param args: additional callback arguments
205 e4ccf6cd Guido Trotter
    @type coverage: integer
206 cc6484c4 Iustin Pop
    @param coverage: number of remote nodes to contact; if default
207 cc6484c4 Iustin Pop
        (0), it will use a reasonable default
208 cc6484c4 Iustin Pop
        (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
209 cc6484c4 Iustin Pop
        passed, it will use the maximum number of peers, otherwise the
210 cc6484c4 Iustin Pop
        number passed in will be used
211 8496d93c Guido Trotter
    @type async: boolean
212 8496d93c Guido Trotter
    @param async: handle the write asynchronously
213 e4ccf6cd Guido Trotter

214 e4ccf6cd Guido Trotter
    """
215 cc6484c4 Iustin Pop
    if coverage == 0:
216 e4ccf6cd Guido Trotter
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
217 cc6484c4 Iustin Pop
    elif coverage == -1:
218 cc6484c4 Iustin Pop
      coverage = len(self._peers)
219 e4ccf6cd Guido Trotter
220 e4ccf6cd Guido Trotter
    if coverage > len(self._peers):
221 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Not enough MCs known to provide the"
222 e4ccf6cd Guido Trotter
                                    " desired coverage")
223 e4ccf6cd Guido Trotter
224 e4ccf6cd Guido Trotter
    if not request.rsalt:
225 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Missing request rsalt")
226 e4ccf6cd Guido Trotter
227 96e03b0b Guido Trotter
    self.ExpireRequests()
228 96e03b0b Guido Trotter
    if request.rsalt in self._requests:
229 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Duplicate request rsalt")
230 e4ccf6cd Guido Trotter
231 e4ccf6cd Guido Trotter
    if request.type not in constants.CONFD_REQS:
232 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Invalid request type")
233 e4ccf6cd Guido Trotter
234 e4ccf6cd Guido Trotter
    random.shuffle(self._peers)
235 e4ccf6cd Guido Trotter
    targets = self._peers[:coverage]
236 e4ccf6cd Guido Trotter
237 e4ccf6cd Guido Trotter
    now = time.time()
238 e4ccf6cd Guido Trotter
    payload = self._PackRequest(request, now=now)
239 e4ccf6cd Guido Trotter
240 e4ccf6cd Guido Trotter
    for target in targets:
241 e4ccf6cd Guido Trotter
      try:
242 e4ccf6cd Guido Trotter
        self._socket.enqueue_send(target, self._confd_port, payload)
243 e4ccf6cd Guido Trotter
      except errors.UdpDataSizeError:
244 e4ccf6cd Guido Trotter
        raise errors.ConfdClientError("Request too big")
245 e4ccf6cd Guido Trotter
246 e4ccf6cd Guido Trotter
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
247 bfbbc223 Iustin Pop
    self._requests[request.rsalt] = _Request(request, args, expire_time,
248 bfbbc223 Iustin Pop
                                             targets)
249 e4ccf6cd Guido Trotter
250 8496d93c Guido Trotter
    if not async:
251 8496d93c Guido Trotter
      self.FlushSendQueue()
252 8496d93c Guido Trotter
253 e4ccf6cd Guido Trotter
  def HandleResponse(self, payload, ip, port):
254 e4ccf6cd Guido Trotter
    """Asynchronous handler for a confd reply
255 e4ccf6cd Guido Trotter

256 e4ccf6cd Guido Trotter
    Call the relevant callback associated to the current request.
257 e4ccf6cd Guido Trotter

258 e4ccf6cd Guido Trotter
    """
259 e4ccf6cd Guido Trotter
    try:
260 e4ccf6cd Guido Trotter
      try:
261 e4ccf6cd Guido Trotter
        answer, salt = self._UnpackReply(payload)
262 a3db74e4 Guido Trotter
      except (errors.SignatureError, errors.ConfdMagicError), err:
263 a3db74e4 Guido Trotter
        if self._logger:
264 a3db74e4 Guido Trotter
          self._logger.debug("Discarding broken package: %s" % err)
265 e4ccf6cd Guido Trotter
        return
266 e4ccf6cd Guido Trotter
267 e4ccf6cd Guido Trotter
      try:
268 71e114da Iustin Pop
        rq = self._requests[salt]
269 e4ccf6cd Guido Trotter
      except KeyError:
270 a3db74e4 Guido Trotter
        if self._logger:
271 a3db74e4 Guido Trotter
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
272 96e03b0b Guido Trotter
        return
273 96e03b0b Guido Trotter
274 bfbbc223 Iustin Pop
      rq.rcvd.add(ip)
275 bfbbc223 Iustin Pop
276 96e03b0b Guido Trotter
      client_reply = ConfdUpcallPayload(salt=salt,
277 96e03b0b Guido Trotter
                                        type=UPCALL_REPLY,
278 96e03b0b Guido Trotter
                                        server_reply=answer,
279 71e114da Iustin Pop
                                        orig_request=rq.request,
280 96e03b0b Guido Trotter
                                        server_ip=ip,
281 96e03b0b Guido Trotter
                                        server_port=port,
282 71e114da Iustin Pop
                                        extra_args=rq.args,
283 5f6f260a Guido Trotter
                                        client=self,
284 5f6f260a Guido Trotter
                                       )
285 96e03b0b Guido Trotter
      self._callback(client_reply)
286 e4ccf6cd Guido Trotter
287 e4ccf6cd Guido Trotter
    finally:
288 96e03b0b Guido Trotter
      self.ExpireRequests()
289 96e03b0b Guido Trotter
290 8496d93c Guido Trotter
  def FlushSendQueue(self):
291 8496d93c Guido Trotter
    """Send out all pending requests.
292 8496d93c Guido Trotter

293 8496d93c Guido Trotter
    Can be used for synchronous client use.
294 8496d93c Guido Trotter

295 8496d93c Guido Trotter
    """
296 8496d93c Guido Trotter
    while self._socket.writable():
297 8496d93c Guido Trotter
      self._socket.handle_write()
298 8496d93c Guido Trotter
299 8496d93c Guido Trotter
  def ReceiveReply(self, timeout=1):
300 8496d93c Guido Trotter
    """Receive one reply.
301 8496d93c Guido Trotter

302 8496d93c Guido Trotter
    @type timeout: float
303 8496d93c Guido Trotter
    @param timeout: how long to wait for the reply
304 8496d93c Guido Trotter
    @rtype: boolean
305 8496d93c Guido Trotter
    @return: True if some data has been handled, False otherwise
306 8496d93c Guido Trotter

307 8496d93c Guido Trotter
    """
308 8496d93c Guido Trotter
    return self._socket.process_next_packet(timeout=timeout)
309 8496d93c Guido Trotter
310 bfbbc223 Iustin Pop
  @staticmethod
311 bfbbc223 Iustin Pop
  def _NeededReplies(peer_cnt):
312 bfbbc223 Iustin Pop
    """Compute the minimum safe number of replies for a query.
313 bfbbc223 Iustin Pop

314 bfbbc223 Iustin Pop
    The algorithm is designed to work well for both small and big
315 bfbbc223 Iustin Pop
    number of peers:
316 bfbbc223 Iustin Pop
        - for less than three, we require all responses
317 bfbbc223 Iustin Pop
        - for less than five, we allow one miss
318 bfbbc223 Iustin Pop
        - otherwise, half the number plus one
319 bfbbc223 Iustin Pop

320 bfbbc223 Iustin Pop
    This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
321 bfbbc223 Iustin Pop
    4->2, 5->3, 6->3, 7->4, etc.
322 bfbbc223 Iustin Pop

323 bfbbc223 Iustin Pop
    @type peer_cnt: int
324 bfbbc223 Iustin Pop
    @param peer_cnt: the number of peers contacted
325 bfbbc223 Iustin Pop
    @rtype: int
326 bfbbc223 Iustin Pop
    @return: the number of replies which should give a safe coverage
327 bfbbc223 Iustin Pop

328 bfbbc223 Iustin Pop
    """
329 bfbbc223 Iustin Pop
    if peer_cnt < 3:
330 bfbbc223 Iustin Pop
      return peer_cnt
331 bfbbc223 Iustin Pop
    elif peer_cnt < 5:
332 bfbbc223 Iustin Pop
      return peer_cnt - 1
333 bfbbc223 Iustin Pop
    else:
334 bfbbc223 Iustin Pop
      return int(peer_cnt/2) + 1
335 bfbbc223 Iustin Pop
336 bfbbc223 Iustin Pop
  def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
337 bfbbc223 Iustin Pop
    """Wait for replies to a given request.
338 bfbbc223 Iustin Pop

339 bfbbc223 Iustin Pop
    This method will wait until either the timeout expires or a
340 bfbbc223 Iustin Pop
    minimum number (computed using L{_NeededReplies}) of replies are
341 bfbbc223 Iustin Pop
    received for the given salt. It is useful when doing synchronous
342 bfbbc223 Iustin Pop
    calls to this library.
343 bfbbc223 Iustin Pop

344 bfbbc223 Iustin Pop
    @param salt: the salt of the request we want responses for
345 bfbbc223 Iustin Pop
    @param timeout: the maximum timeout (should be less or equal to
346 bfbbc223 Iustin Pop
        L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
347 bfbbc223 Iustin Pop
    @rtype: tuple
348 bfbbc223 Iustin Pop
    @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
349 bfbbc223 Iustin Pop
        request is unknown, timed_out will be true and the counters
350 bfbbc223 Iustin Pop
        will be zero
351 bfbbc223 Iustin Pop

352 bfbbc223 Iustin Pop
    """
353 bfbbc223 Iustin Pop
    def _CheckResponse():
354 bfbbc223 Iustin Pop
      if salt not in self._requests:
355 bfbbc223 Iustin Pop
        # expired?
356 bfbbc223 Iustin Pop
        if self._logger:
357 bfbbc223 Iustin Pop
          self._logger.debug("Discarding unknown/expired request: %s" % salt)
358 bfbbc223 Iustin Pop
        return MISSING
359 bfbbc223 Iustin Pop
      rq = self._requests[salt]
360 bfbbc223 Iustin Pop
      if len(rq.rcvd) >= expected:
361 bfbbc223 Iustin Pop
        # already got all replies
362 bfbbc223 Iustin Pop
        return (False, len(rq.sent), len(rq.rcvd))
363 bfbbc223 Iustin Pop
      # else wait, using default timeout
364 bfbbc223 Iustin Pop
      self.ReceiveReply()
365 bfbbc223 Iustin Pop
      raise utils.RetryAgain()
366 bfbbc223 Iustin Pop
367 bfbbc223 Iustin Pop
    MISSING = (True, 0, 0)
368 bfbbc223 Iustin Pop
369 bfbbc223 Iustin Pop
    if salt not in self._requests:
370 bfbbc223 Iustin Pop
      return MISSING
371 bfbbc223 Iustin Pop
    # extend the expire time with the current timeout, so that we
372 bfbbc223 Iustin Pop
    # don't get the request expired from under us
373 bfbbc223 Iustin Pop
    rq = self._requests[salt]
374 bfbbc223 Iustin Pop
    rq.expiry += timeout
375 bfbbc223 Iustin Pop
    sent = len(rq.sent)
376 bfbbc223 Iustin Pop
    expected = self._NeededReplies(sent)
377 bfbbc223 Iustin Pop
378 bfbbc223 Iustin Pop
    try:
379 bfbbc223 Iustin Pop
      return utils.Retry(_CheckResponse, 0, timeout)
380 bfbbc223 Iustin Pop
    except utils.RetryTimeout:
381 bfbbc223 Iustin Pop
      if salt in self._requests:
382 bfbbc223 Iustin Pop
        rq = self._requests[salt]
383 bfbbc223 Iustin Pop
        return (True, len(rq.sent), len(rq.rcvd))
384 bfbbc223 Iustin Pop
      else:
385 bfbbc223 Iustin Pop
        return MISSING
386 bfbbc223 Iustin Pop
387 96e03b0b Guido Trotter
388 96e03b0b Guido Trotter
# UPCALL_REPLY: server reply upcall
389 96e03b0b Guido Trotter
# has all ConfdUpcallPayload fields populated
390 96e03b0b Guido Trotter
UPCALL_REPLY = 1
391 96e03b0b Guido Trotter
# UPCALL_EXPIRE: internal library request expire
392 96e03b0b Guido Trotter
# has only salt, type, orig_request and extra_args
393 96e03b0b Guido Trotter
UPCALL_EXPIRE = 2
394 96e03b0b Guido Trotter
CONFD_UPCALL_TYPES = frozenset([
395 96e03b0b Guido Trotter
  UPCALL_REPLY,
396 96e03b0b Guido Trotter
  UPCALL_EXPIRE,
397 96e03b0b Guido Trotter
  ])
398 96e03b0b Guido Trotter
399 96e03b0b Guido Trotter
400 96e03b0b Guido Trotter
class ConfdUpcallPayload(objects.ConfigObject):
401 96e03b0b Guido Trotter
  """Callback argument for confd replies
402 96e03b0b Guido Trotter

403 96e03b0b Guido Trotter
  @type salt: string
404 96e03b0b Guido Trotter
  @ivar salt: salt associated with the query
405 96e03b0b Guido Trotter
  @type type: one of confd.client.CONFD_UPCALL_TYPES
406 96e03b0b Guido Trotter
  @ivar type: upcall type (server reply, expired request, ...)
407 96e03b0b Guido Trotter
  @type orig_request: L{objects.ConfdRequest}
408 96e03b0b Guido Trotter
  @ivar orig_request: original request
409 96e03b0b Guido Trotter
  @type server_reply: L{objects.ConfdReply}
410 96e03b0b Guido Trotter
  @ivar server_reply: server reply
411 96e03b0b Guido Trotter
  @type server_ip: string
412 96e03b0b Guido Trotter
  @ivar server_ip: answering server ip address
413 96e03b0b Guido Trotter
  @type server_port: int
414 96e03b0b Guido Trotter
  @ivar server_port: answering server port
415 96e03b0b Guido Trotter
  @type extra_args: any
416 96e03b0b Guido Trotter
  @ivar extra_args: 'args' argument of the SendRequest function
417 5f6f260a Guido Trotter
  @type client: L{ConfdClient}
418 5f6f260a Guido Trotter
  @ivar client: current confd client instance
419 96e03b0b Guido Trotter

420 96e03b0b Guido Trotter
  """
421 96e03b0b Guido Trotter
  __slots__ = [
422 96e03b0b Guido Trotter
    "salt",
423 96e03b0b Guido Trotter
    "type",
424 96e03b0b Guido Trotter
    "orig_request",
425 96e03b0b Guido Trotter
    "server_reply",
426 96e03b0b Guido Trotter
    "server_ip",
427 96e03b0b Guido Trotter
    "server_port",
428 96e03b0b Guido Trotter
    "extra_args",
429 5f6f260a Guido Trotter
    "client",
430 96e03b0b Guido Trotter
    ]
431 e4ccf6cd Guido Trotter
432 e4ccf6cd Guido Trotter
433 e4ccf6cd Guido Trotter
class ConfdClientRequest(objects.ConfdRequest):
434 e4ccf6cd Guido Trotter
  """This is the client-side version of ConfdRequest.
435 e4ccf6cd Guido Trotter

436 e4ccf6cd Guido Trotter
  This version of the class helps creating requests, on the client side, by
437 e4ccf6cd Guido Trotter
  filling in some default values.
438 e4ccf6cd Guido Trotter

439 e4ccf6cd Guido Trotter
  """
440 e4ccf6cd Guido Trotter
  def __init__(self, **kwargs):
441 e4ccf6cd Guido Trotter
    objects.ConfdRequest.__init__(self, **kwargs)
442 e4ccf6cd Guido Trotter
    if not self.rsalt:
443 e4ccf6cd Guido Trotter
      self.rsalt = utils.NewUUID()
444 e4ccf6cd Guido Trotter
    if not self.protocol:
445 e4ccf6cd Guido Trotter
      self.protocol = constants.CONFD_PROTOCOL_VERSION
446 e4ccf6cd Guido Trotter
    if self.type not in constants.CONFD_REQS:
447 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Invalid request type")
448 e4ccf6cd Guido Trotter
449 392ca296 Guido Trotter
450 392ca296 Guido Trotter
class ConfdFilterCallback:
451 392ca296 Guido Trotter
  """Callback that calls another callback, but filters duplicate results.
452 392ca296 Guido Trotter

453 49b3fdac Iustin Pop
  @ivar consistent: a dictionary indexed by salt; for each salt, if
454 49b3fdac Iustin Pop
      all responses ware identical, this will be True; this is the
455 49b3fdac Iustin Pop
      expected state on a healthy cluster; on inconsistent or
456 49b3fdac Iustin Pop
      partitioned clusters, this might be False, if we see answers
457 49b3fdac Iustin Pop
      with the same serial but different contents
458 49b3fdac Iustin Pop

459 392ca296 Guido Trotter
  """
460 392ca296 Guido Trotter
  def __init__(self, callback, logger=None):
461 392ca296 Guido Trotter
    """Constructor for ConfdFilterCallback
462 392ca296 Guido Trotter

463 392ca296 Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
464 392ca296 Guido Trotter
    @param callback: function to call when getting answers
465 69b99987 Michael Hanselmann
    @type logger: logging.Logger
466 d63997b3 Guido Trotter
    @param logger: optional logger for internal conditions
467 392ca296 Guido Trotter

468 392ca296 Guido Trotter
    """
469 392ca296 Guido Trotter
    if not callable(callback):
470 392ca296 Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
471 392ca296 Guido Trotter
472 392ca296 Guido Trotter
    self._callback = callback
473 392ca296 Guido Trotter
    self._logger = logger
474 392ca296 Guido Trotter
    # answers contains a dict of salt -> answer
475 392ca296 Guido Trotter
    self._answers = {}
476 49b3fdac Iustin Pop
    self.consistent = {}
477 392ca296 Guido Trotter
478 392ca296 Guido Trotter
  def _LogFilter(self, salt, new_reply, old_reply):
479 392ca296 Guido Trotter
    if not self._logger:
480 392ca296 Guido Trotter
      return
481 392ca296 Guido Trotter
482 392ca296 Guido Trotter
    if new_reply.serial > old_reply.serial:
483 392ca296 Guido Trotter
      self._logger.debug("Filtering confirming answer, with newer"
484 392ca296 Guido Trotter
                         " serial for query %s" % salt)
485 392ca296 Guido Trotter
    elif new_reply.serial == old_reply.serial:
486 392ca296 Guido Trotter
      if new_reply.answer != old_reply.answer:
487 392ca296 Guido Trotter
        self._logger.warning("Got incoherent answers for query %s"
488 392ca296 Guido Trotter
                             " (serial: %s)" % (salt, new_reply.serial))
489 392ca296 Guido Trotter
      else:
490 392ca296 Guido Trotter
        self._logger.debug("Filtering confirming answer, with same"
491 392ca296 Guido Trotter
                           " serial for query %s" % salt)
492 392ca296 Guido Trotter
    else:
493 392ca296 Guido Trotter
      self._logger.debug("Filtering outdated answer for query %s"
494 392ca296 Guido Trotter
                         " serial: (%d < %d)" % (salt, old_reply.serial,
495 392ca296 Guido Trotter
                                                 new_reply.serial))
496 392ca296 Guido Trotter
497 392ca296 Guido Trotter
  def _HandleExpire(self, up):
498 392ca296 Guido Trotter
    # if we have no answer we have received none, before the expiration.
499 a9613def Guido Trotter
    if up.salt in self._answers:
500 a9613def Guido Trotter
      del self._answers[up.salt]
501 49b3fdac Iustin Pop
    if up.salt in self.consistent:
502 49b3fdac Iustin Pop
      del self.consistent[up.salt]
503 392ca296 Guido Trotter
504 392ca296 Guido Trotter
  def _HandleReply(self, up):
505 392ca296 Guido Trotter
    """Handle a single confd reply, and decide whether to filter it.
506 392ca296 Guido Trotter

507 392ca296 Guido Trotter
    @rtype: boolean
508 392ca296 Guido Trotter
    @return: True if the reply should be filtered, False if it should be passed
509 392ca296 Guido Trotter
             on to the up-callback
510 392ca296 Guido Trotter

511 392ca296 Guido Trotter
    """
512 392ca296 Guido Trotter
    filter_upcall = False
513 392ca296 Guido Trotter
    salt = up.salt
514 49b3fdac Iustin Pop
    if salt not in self.consistent:
515 49b3fdac Iustin Pop
      self.consistent[salt] = True
516 392ca296 Guido Trotter
    if salt not in self._answers:
517 392ca296 Guido Trotter
      # first answer for a query (don't filter, and record)
518 392ca296 Guido Trotter
      self._answers[salt] = up.server_reply
519 392ca296 Guido Trotter
    elif up.server_reply.serial > self._answers[salt].serial:
520 392ca296 Guido Trotter
      # newer answer (record, and compare contents)
521 392ca296 Guido Trotter
      old_answer = self._answers[salt]
522 392ca296 Guido Trotter
      self._answers[salt] = up.server_reply
523 392ca296 Guido Trotter
      if up.server_reply.answer == old_answer.answer:
524 392ca296 Guido Trotter
        # same content (filter) (version upgrade was unrelated)
525 392ca296 Guido Trotter
        filter_upcall = True
526 392ca296 Guido Trotter
        self._LogFilter(salt, up.server_reply, old_answer)
527 392ca296 Guido Trotter
      # else: different content, pass up a second answer
528 392ca296 Guido Trotter
    else:
529 392ca296 Guido Trotter
      # older or same-version answer (duplicate or outdated, filter)
530 39292d3a Iustin Pop
      if (up.server_reply.serial == self._answers[salt].serial and
531 39292d3a Iustin Pop
          up.server_reply.answer != self._answers[salt].answer):
532 49b3fdac Iustin Pop
        self.consistent[salt] = False
533 392ca296 Guido Trotter
      filter_upcall = True
534 392ca296 Guido Trotter
      self._LogFilter(salt, up.server_reply, self._answers[salt])
535 392ca296 Guido Trotter
536 392ca296 Guido Trotter
    return filter_upcall
537 392ca296 Guido Trotter
538 392ca296 Guido Trotter
  def __call__(self, up):
539 392ca296 Guido Trotter
    """Filtering callback
540 392ca296 Guido Trotter

541 392ca296 Guido Trotter
    @type up: L{ConfdUpcallPayload}
542 392ca296 Guido Trotter
    @param up: upper callback
543 392ca296 Guido Trotter

544 392ca296 Guido Trotter
    """
545 392ca296 Guido Trotter
    filter_upcall = False
546 392ca296 Guido Trotter
    if up.type == UPCALL_REPLY:
547 392ca296 Guido Trotter
      filter_upcall = self._HandleReply(up)
548 392ca296 Guido Trotter
    elif up.type == UPCALL_EXPIRE:
549 392ca296 Guido Trotter
      self._HandleExpire(up)
550 392ca296 Guido Trotter
551 392ca296 Guido Trotter
    if not filter_upcall:
552 392ca296 Guido Trotter
      self._callback(up)
553 04cdf663 Guido Trotter
554 04cdf663 Guido Trotter
555 04cdf663 Guido Trotter
class ConfdCountingCallback:
556 04cdf663 Guido Trotter
  """Callback that calls another callback, and counts the answers
557 04cdf663 Guido Trotter

558 04cdf663 Guido Trotter
  """
559 04cdf663 Guido Trotter
  def __init__(self, callback, logger=None):
560 04cdf663 Guido Trotter
    """Constructor for ConfdCountingCallback
561 04cdf663 Guido Trotter

562 04cdf663 Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
563 04cdf663 Guido Trotter
    @param callback: function to call when getting answers
564 04cdf663 Guido Trotter
    @type logger: logging.Logger
565 04cdf663 Guido Trotter
    @param logger: optional logger for internal conditions
566 04cdf663 Guido Trotter

567 04cdf663 Guido Trotter
    """
568 04cdf663 Guido Trotter
    if not callable(callback):
569 04cdf663 Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
570 04cdf663 Guido Trotter
571 04cdf663 Guido Trotter
    self._callback = callback
572 04cdf663 Guido Trotter
    self._logger = logger
573 04cdf663 Guido Trotter
    # answers contains a dict of salt -> count
574 04cdf663 Guido Trotter
    self._answers = {}
575 04cdf663 Guido Trotter
576 04cdf663 Guido Trotter
  def RegisterQuery(self, salt):
577 04cdf663 Guido Trotter
    if salt in self._answers:
578 04cdf663 Guido Trotter
      raise errors.ProgrammerError("query already registered")
579 04cdf663 Guido Trotter
    self._answers[salt] = 0
580 04cdf663 Guido Trotter
581 04cdf663 Guido Trotter
  def AllAnswered(self):
582 04cdf663 Guido Trotter
    """Have all the registered queries received at least an answer?
583 04cdf663 Guido Trotter

584 04cdf663 Guido Trotter
    """
585 cea881e5 Michael Hanselmann
    return compat.all(self._answers.values())
586 04cdf663 Guido Trotter
587 04cdf663 Guido Trotter
  def _HandleExpire(self, up):
588 04cdf663 Guido Trotter
    # if we have no answer we have received none, before the expiration.
589 04cdf663 Guido Trotter
    if up.salt in self._answers:
590 04cdf663 Guido Trotter
      del self._answers[up.salt]
591 04cdf663 Guido Trotter
592 04cdf663 Guido Trotter
  def _HandleReply(self, up):
593 04cdf663 Guido Trotter
    """Handle a single confd reply, and decide whether to filter it.
594 04cdf663 Guido Trotter

595 04cdf663 Guido Trotter
    @rtype: boolean
596 04cdf663 Guido Trotter
    @return: True if the reply should be filtered, False if it should be passed
597 04cdf663 Guido Trotter
             on to the up-callback
598 04cdf663 Guido Trotter

599 04cdf663 Guido Trotter
    """
600 04cdf663 Guido Trotter
    if up.salt in self._answers:
601 04cdf663 Guido Trotter
      self._answers[up.salt] += 1
602 04cdf663 Guido Trotter
603 04cdf663 Guido Trotter
  def __call__(self, up):
604 04cdf663 Guido Trotter
    """Filtering callback
605 04cdf663 Guido Trotter

606 04cdf663 Guido Trotter
    @type up: L{ConfdUpcallPayload}
607 04cdf663 Guido Trotter
    @param up: upper callback
608 04cdf663 Guido Trotter

609 04cdf663 Guido Trotter
    """
610 04cdf663 Guido Trotter
    if up.type == UPCALL_REPLY:
611 04cdf663 Guido Trotter
      self._HandleReply(up)
612 04cdf663 Guido Trotter
    elif up.type == UPCALL_EXPIRE:
613 04cdf663 Guido Trotter
      self._HandleExpire(up)
614 04cdf663 Guido Trotter
    self._callback(up)
615 5b349fd1 Iustin Pop
616 71e114da Iustin Pop
617 aa2efc52 Iustin Pop
class StoreResultCallback:
618 aa2efc52 Iustin Pop
  """Callback that simply stores the most recent answer.
619 aa2efc52 Iustin Pop

620 aa2efc52 Iustin Pop
  @ivar _answers: dict of salt to (have_answer, reply)
621 aa2efc52 Iustin Pop

622 aa2efc52 Iustin Pop
  """
623 aa2efc52 Iustin Pop
  _NO_KEY = (False, None)
624 aa2efc52 Iustin Pop
625 aa2efc52 Iustin Pop
  def __init__(self):
626 aa2efc52 Iustin Pop
    """Constructor for StoreResultCallback
627 aa2efc52 Iustin Pop

628 aa2efc52 Iustin Pop
    """
629 aa2efc52 Iustin Pop
    # answers contains a dict of salt -> best result
630 aa2efc52 Iustin Pop
    self._answers = {}
631 aa2efc52 Iustin Pop
632 aa2efc52 Iustin Pop
  def GetResponse(self, salt):
633 aa2efc52 Iustin Pop
    """Return the best match for a salt
634 aa2efc52 Iustin Pop

635 aa2efc52 Iustin Pop
    """
636 aa2efc52 Iustin Pop
    return self._answers.get(salt, self._NO_KEY)
637 aa2efc52 Iustin Pop
638 aa2efc52 Iustin Pop
  def _HandleExpire(self, up):
639 aa2efc52 Iustin Pop
    """Expiration handler.
640 aa2efc52 Iustin Pop

641 aa2efc52 Iustin Pop
    """
642 aa2efc52 Iustin Pop
    if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
643 aa2efc52 Iustin Pop
      del self._answers[up.salt]
644 aa2efc52 Iustin Pop
645 aa2efc52 Iustin Pop
  def _HandleReply(self, up):
646 aa2efc52 Iustin Pop
    """Handle a single confd reply, and decide whether to filter it.
647 aa2efc52 Iustin Pop

648 aa2efc52 Iustin Pop
    """
649 aa2efc52 Iustin Pop
    self._answers[up.salt] = (True, up)
650 aa2efc52 Iustin Pop
651 aa2efc52 Iustin Pop
  def __call__(self, up):
652 aa2efc52 Iustin Pop
    """Filtering callback
653 aa2efc52 Iustin Pop

654 aa2efc52 Iustin Pop
    @type up: L{ConfdUpcallPayload}
655 aa2efc52 Iustin Pop
    @param up: upper callback
656 aa2efc52 Iustin Pop

657 aa2efc52 Iustin Pop
    """
658 aa2efc52 Iustin Pop
    if up.type == UPCALL_REPLY:
659 aa2efc52 Iustin Pop
      self._HandleReply(up)
660 aa2efc52 Iustin Pop
    elif up.type == UPCALL_EXPIRE:
661 aa2efc52 Iustin Pop
      self._HandleExpire(up)
662 aa2efc52 Iustin Pop
663 aa2efc52 Iustin Pop
664 5b349fd1 Iustin Pop
def GetConfdClient(callback):
665 5b349fd1 Iustin Pop
  """Return a client configured using the given callback.
666 5b349fd1 Iustin Pop

667 5b349fd1 Iustin Pop
  This is handy to abstract the MC list and HMAC key reading.
668 5b349fd1 Iustin Pop

669 5b349fd1 Iustin Pop
  @attention: This should only be called on nodes which are part of a
670 5b349fd1 Iustin Pop
      cluster, since it depends on a valid (ganeti) data directory;
671 5b349fd1 Iustin Pop
      for code running outside of a cluster, you need to create the
672 5b349fd1 Iustin Pop
      client manually
673 5b349fd1 Iustin Pop

674 5b349fd1 Iustin Pop
  """
675 5b349fd1 Iustin Pop
  ss = ssconf.SimpleStore()
676 5b349fd1 Iustin Pop
  mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
677 5b349fd1 Iustin Pop
  mc_list = utils.ReadFile(mc_file).splitlines()
678 5b349fd1 Iustin Pop
  hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
679 5b349fd1 Iustin Pop
  return ConfdClient(hmac_key, mc_list, callback)