Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ f7f03738

History | View | Annotate | Download (21.4 kB)

1 e4ccf6cd Guido Trotter
#
2 e4ccf6cd Guido Trotter
#
3 e4ccf6cd Guido Trotter
4 5ae4945a Iustin Pop
# Copyright (C) 2009, 2010, 2012 Google Inc.
5 e4ccf6cd Guido Trotter
#
6 e4ccf6cd Guido Trotter
# This program is free software; you can redistribute it and/or modify
7 e4ccf6cd Guido Trotter
# it under the terms of the GNU General Public License as published by
8 e4ccf6cd Guido Trotter
# the Free Software Foundation; either version 2 of the License, or
9 e4ccf6cd Guido Trotter
# (at your option) any later version.
10 e4ccf6cd Guido Trotter
#
11 e4ccf6cd Guido Trotter
# This program is distributed in the hope that it will be useful, but
12 e4ccf6cd Guido Trotter
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 e4ccf6cd Guido Trotter
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 e4ccf6cd Guido Trotter
# General Public License for more details.
15 e4ccf6cd Guido Trotter
#
16 e4ccf6cd Guido Trotter
# You should have received a copy of the GNU General Public License
17 e4ccf6cd Guido Trotter
# along with this program; if not, write to the Free Software
18 e4ccf6cd Guido Trotter
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 e4ccf6cd Guido Trotter
# 02110-1301, USA.
20 e4ccf6cd Guido Trotter
21 e4ccf6cd Guido Trotter
22 e4ccf6cd Guido Trotter
"""Ganeti confd client
23 e4ccf6cd Guido Trotter

24 cf7b0cc4 Guido Trotter
Clients can use the confd client library to send requests to a group of master
25 cf7b0cc4 Guido Trotter
candidates running confd. The expected usage is through the asyncore framework,
26 cf7b0cc4 Guido Trotter
by sending queries, and asynchronously receiving replies through a callback.
27 cf7b0cc4 Guido Trotter

28 cf7b0cc4 Guido Trotter
This way the client library doesn't ever need to "wait" on a particular answer,
29 cf7b0cc4 Guido Trotter
and can proceed even if some udp packets are lost. It's up to the user to
30 cf7b0cc4 Guido Trotter
reschedule queries if they haven't received responses and they need them.
31 cf7b0cc4 Guido Trotter

32 69b99987 Michael Hanselmann
Example usage::
33 69b99987 Michael Hanselmann

34 cf7b0cc4 Guido Trotter
  client = ConfdClient(...) # includes callback specification
35 cf7b0cc4 Guido Trotter
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 cf7b0cc4 Guido Trotter
  client.SendRequest(req)
37 cf7b0cc4 Guido Trotter
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38 cf7b0cc4 Guido Trotter
  # ... wait ...
39 cf7b0cc4 Guido Trotter
  # And your callback will be called by asyncore, when your query gets a
40 cf7b0cc4 Guido Trotter
  # response, or when it expires.
41 cf7b0cc4 Guido Trotter

42 392ca296 Guido Trotter
You can use the provided ConfdFilterCallback to act as a filter, only passing
43 392ca296 Guido Trotter
"newer" answer to your callback, and filtering out outdated ones, or ones
44 392ca296 Guido Trotter
confirming what you already got.
45 392ca296 Guido Trotter

46 e4ccf6cd Guido Trotter
"""
47 69b99987 Michael Hanselmann
48 b459a848 Andrea Spadaccini
# pylint: disable=E0203
49 6c881c52 Iustin Pop
50 6c881c52 Iustin Pop
# E0203: Access to member %r before its definition, since we use
51 2ed0e208 Iustin Pop
# objects.py which doesn't explicitly initialise its members
52 6c881c52 Iustin Pop
53 e4ccf6cd Guido Trotter
import time
54 e4ccf6cd Guido Trotter
import random
55 e4ccf6cd Guido Trotter
56 e4ccf6cd Guido Trotter
from ganeti import utils
57 e4ccf6cd Guido Trotter
from ganeti import constants
58 e4ccf6cd Guido Trotter
from ganeti import objects
59 e4ccf6cd Guido Trotter
from ganeti import serializer
60 e4ccf6cd Guido Trotter
from ganeti import daemon # contains AsyncUDPSocket
61 e4ccf6cd Guido Trotter
from ganeti import errors
62 e4ccf6cd Guido Trotter
from ganeti import confd
63 5b349fd1 Iustin Pop
from ganeti import ssconf
64 cea881e5 Michael Hanselmann
from ganeti import compat
65 a744b676 Manuel Franceschini
from ganeti import netutils
66 5a76d5f6 Michael Hanselmann
from ganeti import pathutils
67 e4ccf6cd Guido Trotter
68 e4ccf6cd Guido Trotter
69 e4ccf6cd Guido Trotter
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
70 e4ccf6cd Guido Trotter
  """Confd udp asyncore client
71 e4ccf6cd Guido Trotter

72 e4ccf6cd Guido Trotter
  This is kept separate from the main ConfdClient to make sure it's easy to
73 e4ccf6cd Guido Trotter
  implement a non-asyncore based client library.
74 e4ccf6cd Guido Trotter

75 e4ccf6cd Guido Trotter
  """
76 d8bcfe21 Manuel Franceschini
  def __init__(self, client, family):
77 e4ccf6cd Guido Trotter
    """Constructor for ConfdAsyncUDPClient
78 e4ccf6cd Guido Trotter

79 e4ccf6cd Guido Trotter
    @type client: L{ConfdClient}
80 e4ccf6cd Guido Trotter
    @param client: client library, to pass the datagrams to
81 e4ccf6cd Guido Trotter

82 e4ccf6cd Guido Trotter
    """
83 d8bcfe21 Manuel Franceschini
    daemon.AsyncUDPSocket.__init__(self, family)
84 e4ccf6cd Guido Trotter
    self.client = client
85 e4ccf6cd Guido Trotter
86 e4ccf6cd Guido Trotter
  # this method is overriding a daemon.AsyncUDPSocket method
87 e4ccf6cd Guido Trotter
  def handle_datagram(self, payload, ip, port):
88 e4ccf6cd Guido Trotter
    self.client.HandleResponse(payload, ip, port)
89 e4ccf6cd Guido Trotter
90 e4ccf6cd Guido Trotter
91 71e114da Iustin Pop
class _Request(object):
92 71e114da Iustin Pop
  """Request status structure.
93 71e114da Iustin Pop

94 71e114da Iustin Pop
  @ivar request: the request data
95 71e114da Iustin Pop
  @ivar args: any extra arguments for the callback
96 71e114da Iustin Pop
  @ivar expiry: the expiry timestamp of the request
97 bfbbc223 Iustin Pop
  @ivar sent: the set of contacted peers
98 bfbbc223 Iustin Pop
  @ivar rcvd: the set of peers who replied
99 71e114da Iustin Pop

100 71e114da Iustin Pop
  """
101 bfbbc223 Iustin Pop
  def __init__(self, request, args, expiry, sent):
102 71e114da Iustin Pop
    self.request = request
103 71e114da Iustin Pop
    self.args = args
104 71e114da Iustin Pop
    self.expiry = expiry
105 bfbbc223 Iustin Pop
    self.sent = frozenset(sent)
106 bfbbc223 Iustin Pop
    self.rcvd = set()
107 71e114da Iustin Pop
108 71e114da Iustin Pop
109 e4ccf6cd Guido Trotter
class ConfdClient:
110 e4ccf6cd Guido Trotter
  """Send queries to confd, and get back answers.
111 e4ccf6cd Guido Trotter

112 e4ccf6cd Guido Trotter
  Since the confd model works by querying multiple master candidates, and
113 e4ccf6cd Guido Trotter
  getting back answers, this is an asynchronous library. It can either work
114 e4ccf6cd Guido Trotter
  through asyncore or with your own handling.
115 e4ccf6cd Guido Trotter

116 71e114da Iustin Pop
  @type _requests: dict
117 71e114da Iustin Pop
  @ivar _requests: dictionary indexes by salt, which contains data
118 71e114da Iustin Pop
      about the outstanding requests; the values are objects of type
119 71e114da Iustin Pop
      L{_Request}
120 71e114da Iustin Pop

121 e4ccf6cd Guido Trotter
  """
122 a3db74e4 Guido Trotter
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
123 e4ccf6cd Guido Trotter
    """Constructor for ConfdClient
124 e4ccf6cd Guido Trotter

125 e4ccf6cd Guido Trotter
    @type hmac_key: string
126 e4ccf6cd Guido Trotter
    @param hmac_key: hmac key to talk to confd
127 e4ccf6cd Guido Trotter
    @type peers: list
128 e4ccf6cd Guido Trotter
    @param peers: list of peer nodes
129 96e03b0b Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
130 96e03b0b Guido Trotter
    @param callback: function to call when getting answers
131 7d20c647 Guido Trotter
    @type port: integer
132 d63997b3 Guido Trotter
    @param port: confd port (default: use GetDaemonPort)
133 69b99987 Michael Hanselmann
    @type logger: logging.Logger
134 d63997b3 Guido Trotter
    @param logger: optional logger for internal conditions
135 e4ccf6cd Guido Trotter

136 e4ccf6cd Guido Trotter
    """
137 96e03b0b Guido Trotter
    if not callable(callback):
138 96e03b0b Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
139 e4ccf6cd Guido Trotter
140 a5229439 Guido Trotter
    self.UpdatePeerList(peers)
141 d8bcfe21 Manuel Franceschini
    self._SetPeersAddressFamily()
142 e4ccf6cd Guido Trotter
    self._hmac_key = hmac_key
143 d8bcfe21 Manuel Franceschini
    self._socket = ConfdAsyncUDPClient(self, self._family)
144 96e03b0b Guido Trotter
    self._callback = callback
145 7d20c647 Guido Trotter
    self._confd_port = port
146 a3db74e4 Guido Trotter
    self._logger = logger
147 96e03b0b Guido Trotter
    self._requests = {}
148 7d20c647 Guido Trotter
149 7d20c647 Guido Trotter
    if self._confd_port is None:
150 a744b676 Manuel Franceschini
      self._confd_port = netutils.GetDaemonPort(constants.CONFD)
151 e4ccf6cd Guido Trotter
152 a5229439 Guido Trotter
  def UpdatePeerList(self, peers):
153 a5229439 Guido Trotter
    """Update the list of peers
154 a5229439 Guido Trotter

155 a5229439 Guido Trotter
    @type peers: list
156 a5229439 Guido Trotter
    @param peers: list of peer nodes
157 a5229439 Guido Trotter

158 a5229439 Guido Trotter
    """
159 e11ddf13 Iustin Pop
    # we are actually called from init, so:
160 b459a848 Andrea Spadaccini
    # pylint: disable=W0201
161 a5229439 Guido Trotter
    if not isinstance(peers, list):
162 a5229439 Guido Trotter
      raise errors.ProgrammerError("peers must be a list")
163 db169865 Guido Trotter
    # make a copy of peers, since we're going to shuffle the list, later
164 db169865 Guido Trotter
    self._peers = list(peers)
165 a5229439 Guido Trotter
166 e4ccf6cd Guido Trotter
  def _PackRequest(self, request, now=None):
167 e4ccf6cd Guido Trotter
    """Prepare a request to be sent on the wire.
168 e4ccf6cd Guido Trotter

169 e4ccf6cd Guido Trotter
    This function puts a proper salt in a confd request, puts the proper salt,
170 e4ccf6cd Guido Trotter
    and adds the correct magic number.
171 e4ccf6cd Guido Trotter

172 e4ccf6cd Guido Trotter
    """
173 e4ccf6cd Guido Trotter
    if now is None:
174 e4ccf6cd Guido Trotter
      now = time.time()
175 3ccb3a64 Michael Hanselmann
    tstamp = "%d" % now
176 e4ccf6cd Guido Trotter
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
177 e4ccf6cd Guido Trotter
    return confd.PackMagic(req)
178 e4ccf6cd Guido Trotter
179 e4ccf6cd Guido Trotter
  def _UnpackReply(self, payload):
180 e4ccf6cd Guido Trotter
    in_payload = confd.UnpackMagic(payload)
181 c103d7ae Guido Trotter
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
182 c103d7ae Guido Trotter
    answer = objects.ConfdReply.FromDict(dict_answer)
183 e4ccf6cd Guido Trotter
    return answer, salt
184 e4ccf6cd Guido Trotter
185 96e03b0b Guido Trotter
  def ExpireRequests(self):
186 96e03b0b Guido Trotter
    """Delete all the expired requests.
187 e4ccf6cd Guido Trotter

188 e4ccf6cd Guido Trotter
    """
189 e4ccf6cd Guido Trotter
    now = time.time()
190 71e114da Iustin Pop
    for rsalt, rq in self._requests.items():
191 71e114da Iustin Pop
      if now >= rq.expiry:
192 96e03b0b Guido Trotter
        del self._requests[rsalt]
193 96e03b0b Guido Trotter
        client_reply = ConfdUpcallPayload(salt=rsalt,
194 96e03b0b Guido Trotter
                                          type=UPCALL_EXPIRE,
195 71e114da Iustin Pop
                                          orig_request=rq.request,
196 71e114da Iustin Pop
                                          extra_args=rq.args,
197 5f6f260a Guido Trotter
                                          client=self,
198 5f6f260a Guido Trotter
                                          )
199 96e03b0b Guido Trotter
        self._callback(client_reply)
200 e4ccf6cd Guido Trotter
201 cc6484c4 Iustin Pop
  def SendRequest(self, request, args=None, coverage=0, async=True):
202 e4ccf6cd Guido Trotter
    """Send a confd request to some MCs
203 e4ccf6cd Guido Trotter

204 e4ccf6cd Guido Trotter
    @type request: L{objects.ConfdRequest}
205 e4ccf6cd Guido Trotter
    @param request: the request to send
206 e4ccf6cd Guido Trotter
    @type args: tuple
207 d63997b3 Guido Trotter
    @param args: additional callback arguments
208 e4ccf6cd Guido Trotter
    @type coverage: integer
209 cc6484c4 Iustin Pop
    @param coverage: number of remote nodes to contact; if default
210 cc6484c4 Iustin Pop
        (0), it will use a reasonable default
211 cc6484c4 Iustin Pop
        (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
212 cc6484c4 Iustin Pop
        passed, it will use the maximum number of peers, otherwise the
213 cc6484c4 Iustin Pop
        number passed in will be used
214 8496d93c Guido Trotter
    @type async: boolean
215 8496d93c Guido Trotter
    @param async: handle the write asynchronously
216 e4ccf6cd Guido Trotter

217 e4ccf6cd Guido Trotter
    """
218 cc6484c4 Iustin Pop
    if coverage == 0:
219 e4ccf6cd Guido Trotter
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
220 cc6484c4 Iustin Pop
    elif coverage == -1:
221 cc6484c4 Iustin Pop
      coverage = len(self._peers)
222 e4ccf6cd Guido Trotter
223 e4ccf6cd Guido Trotter
    if coverage > len(self._peers):
224 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Not enough MCs known to provide the"
225 e4ccf6cd Guido Trotter
                                    " desired coverage")
226 e4ccf6cd Guido Trotter
227 e4ccf6cd Guido Trotter
    if not request.rsalt:
228 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Missing request rsalt")
229 e4ccf6cd Guido Trotter
230 96e03b0b Guido Trotter
    self.ExpireRequests()
231 96e03b0b Guido Trotter
    if request.rsalt in self._requests:
232 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Duplicate request rsalt")
233 e4ccf6cd Guido Trotter
234 e4ccf6cd Guido Trotter
    if request.type not in constants.CONFD_REQS:
235 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Invalid request type")
236 e4ccf6cd Guido Trotter
237 e4ccf6cd Guido Trotter
    random.shuffle(self._peers)
238 e4ccf6cd Guido Trotter
    targets = self._peers[:coverage]
239 e4ccf6cd Guido Trotter
240 e4ccf6cd Guido Trotter
    now = time.time()
241 e4ccf6cd Guido Trotter
    payload = self._PackRequest(request, now=now)
242 e4ccf6cd Guido Trotter
243 e4ccf6cd Guido Trotter
    for target in targets:
244 e4ccf6cd Guido Trotter
      try:
245 e4ccf6cd Guido Trotter
        self._socket.enqueue_send(target, self._confd_port, payload)
246 e4ccf6cd Guido Trotter
      except errors.UdpDataSizeError:
247 e4ccf6cd Guido Trotter
        raise errors.ConfdClientError("Request too big")
248 e4ccf6cd Guido Trotter
249 e4ccf6cd Guido Trotter
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
250 bfbbc223 Iustin Pop
    self._requests[request.rsalt] = _Request(request, args, expire_time,
251 bfbbc223 Iustin Pop
                                             targets)
252 e4ccf6cd Guido Trotter
253 8496d93c Guido Trotter
    if not async:
254 8496d93c Guido Trotter
      self.FlushSendQueue()
255 8496d93c Guido Trotter
256 e4ccf6cd Guido Trotter
  def HandleResponse(self, payload, ip, port):
257 e4ccf6cd Guido Trotter
    """Asynchronous handler for a confd reply
258 e4ccf6cd Guido Trotter

259 e4ccf6cd Guido Trotter
    Call the relevant callback associated to the current request.
260 e4ccf6cd Guido Trotter

261 e4ccf6cd Guido Trotter
    """
262 e4ccf6cd Guido Trotter
    try:
263 e4ccf6cd Guido Trotter
      try:
264 e4ccf6cd Guido Trotter
        answer, salt = self._UnpackReply(payload)
265 a3db74e4 Guido Trotter
      except (errors.SignatureError, errors.ConfdMagicError), err:
266 a3db74e4 Guido Trotter
        if self._logger:
267 a3db74e4 Guido Trotter
          self._logger.debug("Discarding broken package: %s" % err)
268 e4ccf6cd Guido Trotter
        return
269 e4ccf6cd Guido Trotter
270 e4ccf6cd Guido Trotter
      try:
271 71e114da Iustin Pop
        rq = self._requests[salt]
272 e4ccf6cd Guido Trotter
      except KeyError:
273 a3db74e4 Guido Trotter
        if self._logger:
274 a3db74e4 Guido Trotter
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
275 96e03b0b Guido Trotter
        return
276 96e03b0b Guido Trotter
277 bfbbc223 Iustin Pop
      rq.rcvd.add(ip)
278 bfbbc223 Iustin Pop
279 96e03b0b Guido Trotter
      client_reply = ConfdUpcallPayload(salt=salt,
280 96e03b0b Guido Trotter
                                        type=UPCALL_REPLY,
281 96e03b0b Guido Trotter
                                        server_reply=answer,
282 71e114da Iustin Pop
                                        orig_request=rq.request,
283 96e03b0b Guido Trotter
                                        server_ip=ip,
284 96e03b0b Guido Trotter
                                        server_port=port,
285 71e114da Iustin Pop
                                        extra_args=rq.args,
286 5f6f260a Guido Trotter
                                        client=self,
287 5ae4945a Iustin Pop
                                        )
288 96e03b0b Guido Trotter
      self._callback(client_reply)
289 e4ccf6cd Guido Trotter
290 e4ccf6cd Guido Trotter
    finally:
291 96e03b0b Guido Trotter
      self.ExpireRequests()
292 96e03b0b Guido Trotter
293 8496d93c Guido Trotter
  def FlushSendQueue(self):
294 8496d93c Guido Trotter
    """Send out all pending requests.
295 8496d93c Guido Trotter

296 8496d93c Guido Trotter
    Can be used for synchronous client use.
297 8496d93c Guido Trotter

298 8496d93c Guido Trotter
    """
299 8496d93c Guido Trotter
    while self._socket.writable():
300 8496d93c Guido Trotter
      self._socket.handle_write()
301 8496d93c Guido Trotter
302 8496d93c Guido Trotter
  def ReceiveReply(self, timeout=1):
303 8496d93c Guido Trotter
    """Receive one reply.
304 8496d93c Guido Trotter

305 8496d93c Guido Trotter
    @type timeout: float
306 8496d93c Guido Trotter
    @param timeout: how long to wait for the reply
307 8496d93c Guido Trotter
    @rtype: boolean
308 8496d93c Guido Trotter
    @return: True if some data has been handled, False otherwise
309 8496d93c Guido Trotter

310 8496d93c Guido Trotter
    """
311 8496d93c Guido Trotter
    return self._socket.process_next_packet(timeout=timeout)
312 8496d93c Guido Trotter
313 bfbbc223 Iustin Pop
  @staticmethod
314 bfbbc223 Iustin Pop
  def _NeededReplies(peer_cnt):
315 bfbbc223 Iustin Pop
    """Compute the minimum safe number of replies for a query.
316 bfbbc223 Iustin Pop

317 bfbbc223 Iustin Pop
    The algorithm is designed to work well for both small and big
318 bfbbc223 Iustin Pop
    number of peers:
319 bfbbc223 Iustin Pop
        - for less than three, we require all responses
320 bfbbc223 Iustin Pop
        - for less than five, we allow one miss
321 bfbbc223 Iustin Pop
        - otherwise, half the number plus one
322 bfbbc223 Iustin Pop

323 bfbbc223 Iustin Pop
    This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
324 bfbbc223 Iustin Pop
    4->2, 5->3, 6->3, 7->4, etc.
325 bfbbc223 Iustin Pop

326 bfbbc223 Iustin Pop
    @type peer_cnt: int
327 bfbbc223 Iustin Pop
    @param peer_cnt: the number of peers contacted
328 bfbbc223 Iustin Pop
    @rtype: int
329 bfbbc223 Iustin Pop
    @return: the number of replies which should give a safe coverage
330 bfbbc223 Iustin Pop

331 bfbbc223 Iustin Pop
    """
332 bfbbc223 Iustin Pop
    if peer_cnt < 3:
333 bfbbc223 Iustin Pop
      return peer_cnt
334 bfbbc223 Iustin Pop
    elif peer_cnt < 5:
335 bfbbc223 Iustin Pop
      return peer_cnt - 1
336 bfbbc223 Iustin Pop
    else:
337 e687ec01 Michael Hanselmann
      return int(peer_cnt / 2) + 1
338 bfbbc223 Iustin Pop
339 bfbbc223 Iustin Pop
  def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
340 bfbbc223 Iustin Pop
    """Wait for replies to a given request.
341 bfbbc223 Iustin Pop

342 bfbbc223 Iustin Pop
    This method will wait until either the timeout expires or a
343 bfbbc223 Iustin Pop
    minimum number (computed using L{_NeededReplies}) of replies are
344 bfbbc223 Iustin Pop
    received for the given salt. It is useful when doing synchronous
345 bfbbc223 Iustin Pop
    calls to this library.
346 bfbbc223 Iustin Pop

347 bfbbc223 Iustin Pop
    @param salt: the salt of the request we want responses for
348 bfbbc223 Iustin Pop
    @param timeout: the maximum timeout (should be less or equal to
349 bfbbc223 Iustin Pop
        L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
350 bfbbc223 Iustin Pop
    @rtype: tuple
351 bfbbc223 Iustin Pop
    @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
352 bfbbc223 Iustin Pop
        request is unknown, timed_out will be true and the counters
353 bfbbc223 Iustin Pop
        will be zero
354 bfbbc223 Iustin Pop

355 bfbbc223 Iustin Pop
    """
356 bfbbc223 Iustin Pop
    def _CheckResponse():
357 bfbbc223 Iustin Pop
      if salt not in self._requests:
358 bfbbc223 Iustin Pop
        # expired?
359 bfbbc223 Iustin Pop
        if self._logger:
360 bfbbc223 Iustin Pop
          self._logger.debug("Discarding unknown/expired request: %s" % salt)
361 bfbbc223 Iustin Pop
        return MISSING
362 bfbbc223 Iustin Pop
      rq = self._requests[salt]
363 bfbbc223 Iustin Pop
      if len(rq.rcvd) >= expected:
364 bfbbc223 Iustin Pop
        # already got all replies
365 bfbbc223 Iustin Pop
        return (False, len(rq.sent), len(rq.rcvd))
366 bfbbc223 Iustin Pop
      # else wait, using default timeout
367 bfbbc223 Iustin Pop
      self.ReceiveReply()
368 bfbbc223 Iustin Pop
      raise utils.RetryAgain()
369 bfbbc223 Iustin Pop
370 bfbbc223 Iustin Pop
    MISSING = (True, 0, 0)
371 bfbbc223 Iustin Pop
372 bfbbc223 Iustin Pop
    if salt not in self._requests:
373 bfbbc223 Iustin Pop
      return MISSING
374 bfbbc223 Iustin Pop
    # extend the expire time with the current timeout, so that we
375 bfbbc223 Iustin Pop
    # don't get the request expired from under us
376 bfbbc223 Iustin Pop
    rq = self._requests[salt]
377 bfbbc223 Iustin Pop
    rq.expiry += timeout
378 bfbbc223 Iustin Pop
    sent = len(rq.sent)
379 bfbbc223 Iustin Pop
    expected = self._NeededReplies(sent)
380 bfbbc223 Iustin Pop
381 bfbbc223 Iustin Pop
    try:
382 bfbbc223 Iustin Pop
      return utils.Retry(_CheckResponse, 0, timeout)
383 bfbbc223 Iustin Pop
    except utils.RetryTimeout:
384 bfbbc223 Iustin Pop
      if salt in self._requests:
385 bfbbc223 Iustin Pop
        rq = self._requests[salt]
386 bfbbc223 Iustin Pop
        return (True, len(rq.sent), len(rq.rcvd))
387 bfbbc223 Iustin Pop
      else:
388 bfbbc223 Iustin Pop
        return MISSING
389 bfbbc223 Iustin Pop
390 d8bcfe21 Manuel Franceschini
  def _SetPeersAddressFamily(self):
391 d8bcfe21 Manuel Franceschini
    if not self._peers:
392 d8bcfe21 Manuel Franceschini
      raise errors.ConfdClientError("Peer list empty")
393 d8bcfe21 Manuel Franceschini
    try:
394 d8bcfe21 Manuel Franceschini
      peer = self._peers[0]
395 8b312c1d Manuel Franceschini
      self._family = netutils.IPAddress.GetAddressFamily(peer)
396 d8bcfe21 Manuel Franceschini
      for peer in self._peers[1:]:
397 8b312c1d Manuel Franceschini
        if netutils.IPAddress.GetAddressFamily(peer) != self._family:
398 d8bcfe21 Manuel Franceschini
          raise errors.ConfdClientError("Peers must be of same address family")
399 8b312c1d Manuel Franceschini
    except errors.IPAddressError:
400 d8bcfe21 Manuel Franceschini
      raise errors.ConfdClientError("Peer address %s invalid" % peer)
401 d8bcfe21 Manuel Franceschini
402 96e03b0b Guido Trotter
403 96e03b0b Guido Trotter
# UPCALL_REPLY: server reply upcall
404 96e03b0b Guido Trotter
# has all ConfdUpcallPayload fields populated
405 96e03b0b Guido Trotter
UPCALL_REPLY = 1
406 96e03b0b Guido Trotter
# UPCALL_EXPIRE: internal library request expire
407 96e03b0b Guido Trotter
# has only salt, type, orig_request and extra_args
408 96e03b0b Guido Trotter
UPCALL_EXPIRE = 2
409 b8028dcf Michael Hanselmann
CONFD_UPCALL_TYPES = compat.UniqueFrozenset([
410 96e03b0b Guido Trotter
  UPCALL_REPLY,
411 96e03b0b Guido Trotter
  UPCALL_EXPIRE,
412 96e03b0b Guido Trotter
  ])
413 96e03b0b Guido Trotter
414 96e03b0b Guido Trotter
415 96e03b0b Guido Trotter
class ConfdUpcallPayload(objects.ConfigObject):
416 96e03b0b Guido Trotter
  """Callback argument for confd replies
417 96e03b0b Guido Trotter

418 96e03b0b Guido Trotter
  @type salt: string
419 96e03b0b Guido Trotter
  @ivar salt: salt associated with the query
420 96e03b0b Guido Trotter
  @type type: one of confd.client.CONFD_UPCALL_TYPES
421 96e03b0b Guido Trotter
  @ivar type: upcall type (server reply, expired request, ...)
422 96e03b0b Guido Trotter
  @type orig_request: L{objects.ConfdRequest}
423 96e03b0b Guido Trotter
  @ivar orig_request: original request
424 96e03b0b Guido Trotter
  @type server_reply: L{objects.ConfdReply}
425 96e03b0b Guido Trotter
  @ivar server_reply: server reply
426 96e03b0b Guido Trotter
  @type server_ip: string
427 96e03b0b Guido Trotter
  @ivar server_ip: answering server ip address
428 96e03b0b Guido Trotter
  @type server_port: int
429 96e03b0b Guido Trotter
  @ivar server_port: answering server port
430 96e03b0b Guido Trotter
  @type extra_args: any
431 96e03b0b Guido Trotter
  @ivar extra_args: 'args' argument of the SendRequest function
432 5f6f260a Guido Trotter
  @type client: L{ConfdClient}
433 5f6f260a Guido Trotter
  @ivar client: current confd client instance
434 96e03b0b Guido Trotter

435 96e03b0b Guido Trotter
  """
436 96e03b0b Guido Trotter
  __slots__ = [
437 96e03b0b Guido Trotter
    "salt",
438 96e03b0b Guido Trotter
    "type",
439 96e03b0b Guido Trotter
    "orig_request",
440 96e03b0b Guido Trotter
    "server_reply",
441 96e03b0b Guido Trotter
    "server_ip",
442 96e03b0b Guido Trotter
    "server_port",
443 96e03b0b Guido Trotter
    "extra_args",
444 5f6f260a Guido Trotter
    "client",
445 96e03b0b Guido Trotter
    ]
446 e4ccf6cd Guido Trotter
447 e4ccf6cd Guido Trotter
448 e4ccf6cd Guido Trotter
class ConfdClientRequest(objects.ConfdRequest):
449 e4ccf6cd Guido Trotter
  """This is the client-side version of ConfdRequest.
450 e4ccf6cd Guido Trotter

451 e4ccf6cd Guido Trotter
  This version of the class helps creating requests, on the client side, by
452 e4ccf6cd Guido Trotter
  filling in some default values.
453 e4ccf6cd Guido Trotter

454 e4ccf6cd Guido Trotter
  """
455 e4ccf6cd Guido Trotter
  def __init__(self, **kwargs):
456 e4ccf6cd Guido Trotter
    objects.ConfdRequest.__init__(self, **kwargs)
457 e4ccf6cd Guido Trotter
    if not self.rsalt:
458 e4ccf6cd Guido Trotter
      self.rsalt = utils.NewUUID()
459 e4ccf6cd Guido Trotter
    if not self.protocol:
460 e4ccf6cd Guido Trotter
      self.protocol = constants.CONFD_PROTOCOL_VERSION
461 e4ccf6cd Guido Trotter
    if self.type not in constants.CONFD_REQS:
462 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Invalid request type")
463 e4ccf6cd Guido Trotter
464 392ca296 Guido Trotter
465 392ca296 Guido Trotter
class ConfdFilterCallback:
466 392ca296 Guido Trotter
  """Callback that calls another callback, but filters duplicate results.
467 392ca296 Guido Trotter

468 49b3fdac Iustin Pop
  @ivar consistent: a dictionary indexed by salt; for each salt, if
469 49b3fdac Iustin Pop
      all responses ware identical, this will be True; this is the
470 49b3fdac Iustin Pop
      expected state on a healthy cluster; on inconsistent or
471 49b3fdac Iustin Pop
      partitioned clusters, this might be False, if we see answers
472 49b3fdac Iustin Pop
      with the same serial but different contents
473 49b3fdac Iustin Pop

474 392ca296 Guido Trotter
  """
475 392ca296 Guido Trotter
  def __init__(self, callback, logger=None):
476 392ca296 Guido Trotter
    """Constructor for ConfdFilterCallback
477 392ca296 Guido Trotter

478 392ca296 Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
479 392ca296 Guido Trotter
    @param callback: function to call when getting answers
480 69b99987 Michael Hanselmann
    @type logger: logging.Logger
481 d63997b3 Guido Trotter
    @param logger: optional logger for internal conditions
482 392ca296 Guido Trotter

483 392ca296 Guido Trotter
    """
484 392ca296 Guido Trotter
    if not callable(callback):
485 392ca296 Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
486 392ca296 Guido Trotter
487 392ca296 Guido Trotter
    self._callback = callback
488 392ca296 Guido Trotter
    self._logger = logger
489 392ca296 Guido Trotter
    # answers contains a dict of salt -> answer
490 392ca296 Guido Trotter
    self._answers = {}
491 49b3fdac Iustin Pop
    self.consistent = {}
492 392ca296 Guido Trotter
493 392ca296 Guido Trotter
  def _LogFilter(self, salt, new_reply, old_reply):
494 392ca296 Guido Trotter
    if not self._logger:
495 392ca296 Guido Trotter
      return
496 392ca296 Guido Trotter
497 392ca296 Guido Trotter
    if new_reply.serial > old_reply.serial:
498 392ca296 Guido Trotter
      self._logger.debug("Filtering confirming answer, with newer"
499 392ca296 Guido Trotter
                         " serial for query %s" % salt)
500 392ca296 Guido Trotter
    elif new_reply.serial == old_reply.serial:
501 392ca296 Guido Trotter
      if new_reply.answer != old_reply.answer:
502 392ca296 Guido Trotter
        self._logger.warning("Got incoherent answers for query %s"
503 392ca296 Guido Trotter
                             " (serial: %s)" % (salt, new_reply.serial))
504 392ca296 Guido Trotter
      else:
505 392ca296 Guido Trotter
        self._logger.debug("Filtering confirming answer, with same"
506 392ca296 Guido Trotter
                           " serial for query %s" % salt)
507 392ca296 Guido Trotter
    else:
508 392ca296 Guido Trotter
      self._logger.debug("Filtering outdated answer for query %s"
509 392ca296 Guido Trotter
                         " serial: (%d < %d)" % (salt, old_reply.serial,
510 392ca296 Guido Trotter
                                                 new_reply.serial))
511 392ca296 Guido Trotter
512 392ca296 Guido Trotter
  def _HandleExpire(self, up):
513 392ca296 Guido Trotter
    # if we have no answer we have received none, before the expiration.
514 a9613def Guido Trotter
    if up.salt in self._answers:
515 a9613def Guido Trotter
      del self._answers[up.salt]
516 49b3fdac Iustin Pop
    if up.salt in self.consistent:
517 49b3fdac Iustin Pop
      del self.consistent[up.salt]
518 392ca296 Guido Trotter
519 392ca296 Guido Trotter
  def _HandleReply(self, up):
520 392ca296 Guido Trotter
    """Handle a single confd reply, and decide whether to filter it.
521 392ca296 Guido Trotter

522 392ca296 Guido Trotter
    @rtype: boolean
523 392ca296 Guido Trotter
    @return: True if the reply should be filtered, False if it should be passed
524 392ca296 Guido Trotter
             on to the up-callback
525 392ca296 Guido Trotter

526 392ca296 Guido Trotter
    """
527 392ca296 Guido Trotter
    filter_upcall = False
528 392ca296 Guido Trotter
    salt = up.salt
529 49b3fdac Iustin Pop
    if salt not in self.consistent:
530 49b3fdac Iustin Pop
      self.consistent[salt] = True
531 392ca296 Guido Trotter
    if salt not in self._answers:
532 392ca296 Guido Trotter
      # first answer for a query (don't filter, and record)
533 392ca296 Guido Trotter
      self._answers[salt] = up.server_reply
534 392ca296 Guido Trotter
    elif up.server_reply.serial > self._answers[salt].serial:
535 392ca296 Guido Trotter
      # newer answer (record, and compare contents)
536 392ca296 Guido Trotter
      old_answer = self._answers[salt]
537 392ca296 Guido Trotter
      self._answers[salt] = up.server_reply
538 392ca296 Guido Trotter
      if up.server_reply.answer == old_answer.answer:
539 392ca296 Guido Trotter
        # same content (filter) (version upgrade was unrelated)
540 392ca296 Guido Trotter
        filter_upcall = True
541 392ca296 Guido Trotter
        self._LogFilter(salt, up.server_reply, old_answer)
542 392ca296 Guido Trotter
      # else: different content, pass up a second answer
543 392ca296 Guido Trotter
    else:
544 392ca296 Guido Trotter
      # older or same-version answer (duplicate or outdated, filter)
545 39292d3a Iustin Pop
      if (up.server_reply.serial == self._answers[salt].serial and
546 39292d3a Iustin Pop
          up.server_reply.answer != self._answers[salt].answer):
547 49b3fdac Iustin Pop
        self.consistent[salt] = False
548 392ca296 Guido Trotter
      filter_upcall = True
549 392ca296 Guido Trotter
      self._LogFilter(salt, up.server_reply, self._answers[salt])
550 392ca296 Guido Trotter
551 392ca296 Guido Trotter
    return filter_upcall
552 392ca296 Guido Trotter
553 392ca296 Guido Trotter
  def __call__(self, up):
554 392ca296 Guido Trotter
    """Filtering callback
555 392ca296 Guido Trotter

556 392ca296 Guido Trotter
    @type up: L{ConfdUpcallPayload}
557 392ca296 Guido Trotter
    @param up: upper callback
558 392ca296 Guido Trotter

559 392ca296 Guido Trotter
    """
560 392ca296 Guido Trotter
    filter_upcall = False
561 392ca296 Guido Trotter
    if up.type == UPCALL_REPLY:
562 392ca296 Guido Trotter
      filter_upcall = self._HandleReply(up)
563 392ca296 Guido Trotter
    elif up.type == UPCALL_EXPIRE:
564 392ca296 Guido Trotter
      self._HandleExpire(up)
565 392ca296 Guido Trotter
566 392ca296 Guido Trotter
    if not filter_upcall:
567 392ca296 Guido Trotter
      self._callback(up)
568 04cdf663 Guido Trotter
569 04cdf663 Guido Trotter
570 04cdf663 Guido Trotter
class ConfdCountingCallback:
571 04cdf663 Guido Trotter
  """Callback that calls another callback, and counts the answers
572 04cdf663 Guido Trotter

573 04cdf663 Guido Trotter
  """
574 04cdf663 Guido Trotter
  def __init__(self, callback, logger=None):
575 04cdf663 Guido Trotter
    """Constructor for ConfdCountingCallback
576 04cdf663 Guido Trotter

577 04cdf663 Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
578 04cdf663 Guido Trotter
    @param callback: function to call when getting answers
579 04cdf663 Guido Trotter
    @type logger: logging.Logger
580 04cdf663 Guido Trotter
    @param logger: optional logger for internal conditions
581 04cdf663 Guido Trotter

582 04cdf663 Guido Trotter
    """
583 04cdf663 Guido Trotter
    if not callable(callback):
584 04cdf663 Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
585 04cdf663 Guido Trotter
586 04cdf663 Guido Trotter
    self._callback = callback
587 04cdf663 Guido Trotter
    self._logger = logger
588 04cdf663 Guido Trotter
    # answers contains a dict of salt -> count
589 04cdf663 Guido Trotter
    self._answers = {}
590 04cdf663 Guido Trotter
591 04cdf663 Guido Trotter
  def RegisterQuery(self, salt):
592 04cdf663 Guido Trotter
    if salt in self._answers:
593 04cdf663 Guido Trotter
      raise errors.ProgrammerError("query already registered")
594 04cdf663 Guido Trotter
    self._answers[salt] = 0
595 04cdf663 Guido Trotter
596 04cdf663 Guido Trotter
  def AllAnswered(self):
597 04cdf663 Guido Trotter
    """Have all the registered queries received at least an answer?
598 04cdf663 Guido Trotter

599 04cdf663 Guido Trotter
    """
600 cea881e5 Michael Hanselmann
    return compat.all(self._answers.values())
601 04cdf663 Guido Trotter
602 04cdf663 Guido Trotter
  def _HandleExpire(self, up):
603 04cdf663 Guido Trotter
    # if we have no answer we have received none, before the expiration.
604 04cdf663 Guido Trotter
    if up.salt in self._answers:
605 04cdf663 Guido Trotter
      del self._answers[up.salt]
606 04cdf663 Guido Trotter
607 04cdf663 Guido Trotter
  def _HandleReply(self, up):
608 04cdf663 Guido Trotter
    """Handle a single confd reply, and decide whether to filter it.
609 04cdf663 Guido Trotter

610 04cdf663 Guido Trotter
    @rtype: boolean
611 04cdf663 Guido Trotter
    @return: True if the reply should be filtered, False if it should be passed
612 04cdf663 Guido Trotter
             on to the up-callback
613 04cdf663 Guido Trotter

614 04cdf663 Guido Trotter
    """
615 04cdf663 Guido Trotter
    if up.salt in self._answers:
616 04cdf663 Guido Trotter
      self._answers[up.salt] += 1
617 04cdf663 Guido Trotter
618 04cdf663 Guido Trotter
  def __call__(self, up):
619 04cdf663 Guido Trotter
    """Filtering callback
620 04cdf663 Guido Trotter

621 04cdf663 Guido Trotter
    @type up: L{ConfdUpcallPayload}
622 04cdf663 Guido Trotter
    @param up: upper callback
623 04cdf663 Guido Trotter

624 04cdf663 Guido Trotter
    """
625 04cdf663 Guido Trotter
    if up.type == UPCALL_REPLY:
626 04cdf663 Guido Trotter
      self._HandleReply(up)
627 04cdf663 Guido Trotter
    elif up.type == UPCALL_EXPIRE:
628 04cdf663 Guido Trotter
      self._HandleExpire(up)
629 04cdf663 Guido Trotter
    self._callback(up)
630 5b349fd1 Iustin Pop
631 71e114da Iustin Pop
632 aa2efc52 Iustin Pop
class StoreResultCallback:
633 aa2efc52 Iustin Pop
  """Callback that simply stores the most recent answer.
634 aa2efc52 Iustin Pop

635 aa2efc52 Iustin Pop
  @ivar _answers: dict of salt to (have_answer, reply)
636 aa2efc52 Iustin Pop

637 aa2efc52 Iustin Pop
  """
638 aa2efc52 Iustin Pop
  _NO_KEY = (False, None)
639 aa2efc52 Iustin Pop
640 aa2efc52 Iustin Pop
  def __init__(self):
641 aa2efc52 Iustin Pop
    """Constructor for StoreResultCallback
642 aa2efc52 Iustin Pop

643 aa2efc52 Iustin Pop
    """
644 aa2efc52 Iustin Pop
    # answers contains a dict of salt -> best result
645 aa2efc52 Iustin Pop
    self._answers = {}
646 aa2efc52 Iustin Pop
647 aa2efc52 Iustin Pop
  def GetResponse(self, salt):
648 aa2efc52 Iustin Pop
    """Return the best match for a salt
649 aa2efc52 Iustin Pop

650 aa2efc52 Iustin Pop
    """
651 aa2efc52 Iustin Pop
    return self._answers.get(salt, self._NO_KEY)
652 aa2efc52 Iustin Pop
653 aa2efc52 Iustin Pop
  def _HandleExpire(self, up):
654 aa2efc52 Iustin Pop
    """Expiration handler.
655 aa2efc52 Iustin Pop

656 aa2efc52 Iustin Pop
    """
657 aa2efc52 Iustin Pop
    if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
658 aa2efc52 Iustin Pop
      del self._answers[up.salt]
659 aa2efc52 Iustin Pop
660 aa2efc52 Iustin Pop
  def _HandleReply(self, up):
661 aa2efc52 Iustin Pop
    """Handle a single confd reply, and decide whether to filter it.
662 aa2efc52 Iustin Pop

663 aa2efc52 Iustin Pop
    """
664 aa2efc52 Iustin Pop
    self._answers[up.salt] = (True, up)
665 aa2efc52 Iustin Pop
666 aa2efc52 Iustin Pop
  def __call__(self, up):
667 aa2efc52 Iustin Pop
    """Filtering callback
668 aa2efc52 Iustin Pop

669 aa2efc52 Iustin Pop
    @type up: L{ConfdUpcallPayload}
670 aa2efc52 Iustin Pop
    @param up: upper callback
671 aa2efc52 Iustin Pop

672 aa2efc52 Iustin Pop
    """
673 aa2efc52 Iustin Pop
    if up.type == UPCALL_REPLY:
674 aa2efc52 Iustin Pop
      self._HandleReply(up)
675 aa2efc52 Iustin Pop
    elif up.type == UPCALL_EXPIRE:
676 aa2efc52 Iustin Pop
      self._HandleExpire(up)
677 aa2efc52 Iustin Pop
678 aa2efc52 Iustin Pop
679 5b349fd1 Iustin Pop
def GetConfdClient(callback):
680 5b349fd1 Iustin Pop
  """Return a client configured using the given callback.
681 5b349fd1 Iustin Pop

682 5b349fd1 Iustin Pop
  This is handy to abstract the MC list and HMAC key reading.
683 5b349fd1 Iustin Pop

684 5b349fd1 Iustin Pop
  @attention: This should only be called on nodes which are part of a
685 5b349fd1 Iustin Pop
      cluster, since it depends on a valid (ganeti) data directory;
686 5b349fd1 Iustin Pop
      for code running outside of a cluster, you need to create the
687 5b349fd1 Iustin Pop
      client manually
688 5b349fd1 Iustin Pop

689 5b349fd1 Iustin Pop
  """
690 5b349fd1 Iustin Pop
  ss = ssconf.SimpleStore()
691 5b349fd1 Iustin Pop
  mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
692 5b349fd1 Iustin Pop
  mc_list = utils.ReadFile(mc_file).splitlines()
693 5a76d5f6 Michael Hanselmann
  hmac_key = utils.ReadFile(pathutils.CONFD_HMAC_KEY)
694 5b349fd1 Iustin Pop
  return ConfdClient(hmac_key, mc_list, callback)