Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 7142485a

History | View | Annotate | Download (21.4 kB)

1 e4ccf6cd Guido Trotter
#
2 e4ccf6cd Guido Trotter
#
3 e4ccf6cd Guido Trotter
4 8b312c1d Manuel Franceschini
# Copyright (C) 2009, 2010 Google Inc.
5 e4ccf6cd Guido Trotter
#
6 e4ccf6cd Guido Trotter
# This program is free software; you can redistribute it and/or modify
7 e4ccf6cd Guido Trotter
# it under the terms of the GNU General Public License as published by
8 e4ccf6cd Guido Trotter
# the Free Software Foundation; either version 2 of the License, or
9 e4ccf6cd Guido Trotter
# (at your option) any later version.
10 e4ccf6cd Guido Trotter
#
11 e4ccf6cd Guido Trotter
# This program is distributed in the hope that it will be useful, but
12 e4ccf6cd Guido Trotter
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 e4ccf6cd Guido Trotter
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 e4ccf6cd Guido Trotter
# General Public License for more details.
15 e4ccf6cd Guido Trotter
#
16 e4ccf6cd Guido Trotter
# You should have received a copy of the GNU General Public License
17 e4ccf6cd Guido Trotter
# along with this program; if not, write to the Free Software
18 e4ccf6cd Guido Trotter
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 e4ccf6cd Guido Trotter
# 02110-1301, USA.
20 e4ccf6cd Guido Trotter
21 e4ccf6cd Guido Trotter
22 e4ccf6cd Guido Trotter
"""Ganeti confd client
23 e4ccf6cd Guido Trotter

24 cf7b0cc4 Guido Trotter
Clients can use the confd client library to send requests to a group of master
25 cf7b0cc4 Guido Trotter
candidates running confd. The expected usage is through the asyncore framework,
26 cf7b0cc4 Guido Trotter
by sending queries, and asynchronously receiving replies through a callback.
27 cf7b0cc4 Guido Trotter

28 cf7b0cc4 Guido Trotter
This way the client library doesn't ever need to "wait" on a particular answer,
29 cf7b0cc4 Guido Trotter
and can proceed even if some udp packets are lost. It's up to the user to
30 cf7b0cc4 Guido Trotter
reschedule queries if they haven't received responses and they need them.
31 cf7b0cc4 Guido Trotter

32 69b99987 Michael Hanselmann
Example usage::
33 69b99987 Michael Hanselmann

34 cf7b0cc4 Guido Trotter
  client = ConfdClient(...) # includes callback specification
35 cf7b0cc4 Guido Trotter
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 cf7b0cc4 Guido Trotter
  client.SendRequest(req)
37 cf7b0cc4 Guido Trotter
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38 cf7b0cc4 Guido Trotter
  # ... wait ...
39 cf7b0cc4 Guido Trotter
  # And your callback will be called by asyncore, when your query gets a
40 cf7b0cc4 Guido Trotter
  # response, or when it expires.
41 cf7b0cc4 Guido Trotter

42 392ca296 Guido Trotter
You can use the provided ConfdFilterCallback to act as a filter, only passing
43 392ca296 Guido Trotter
"newer" answer to your callback, and filtering out outdated ones, or ones
44 392ca296 Guido Trotter
confirming what you already got.
45 392ca296 Guido Trotter

46 e4ccf6cd Guido Trotter
"""
47 69b99987 Michael Hanselmann
48 b459a848 Andrea Spadaccini
# pylint: disable=E0203
49 6c881c52 Iustin Pop
50 6c881c52 Iustin Pop
# E0203: Access to member %r before its definition, since we use
51 6c881c52 Iustin Pop
# objects.py which doesn't explicitely initialise its members
52 6c881c52 Iustin Pop
53 e4ccf6cd Guido Trotter
import time
54 e4ccf6cd Guido Trotter
import random
55 e4ccf6cd Guido Trotter
56 e4ccf6cd Guido Trotter
from ganeti import utils
57 e4ccf6cd Guido Trotter
from ganeti import constants
58 e4ccf6cd Guido Trotter
from ganeti import objects
59 e4ccf6cd Guido Trotter
from ganeti import serializer
60 e4ccf6cd Guido Trotter
from ganeti import daemon # contains AsyncUDPSocket
61 e4ccf6cd Guido Trotter
from ganeti import errors
62 e4ccf6cd Guido Trotter
from ganeti import confd
63 5b349fd1 Iustin Pop
from ganeti import ssconf
64 cea881e5 Michael Hanselmann
from ganeti import compat
65 a744b676 Manuel Franceschini
from ganeti import netutils
66 e4ccf6cd Guido Trotter
67 e4ccf6cd Guido Trotter
68 e4ccf6cd Guido Trotter
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
69 e4ccf6cd Guido Trotter
  """Confd udp asyncore client
70 e4ccf6cd Guido Trotter

71 e4ccf6cd Guido Trotter
  This is kept separate from the main ConfdClient to make sure it's easy to
72 e4ccf6cd Guido Trotter
  implement a non-asyncore based client library.
73 e4ccf6cd Guido Trotter

74 e4ccf6cd Guido Trotter
  """
75 d8bcfe21 Manuel Franceschini
  def __init__(self, client, family):
76 e4ccf6cd Guido Trotter
    """Constructor for ConfdAsyncUDPClient
77 e4ccf6cd Guido Trotter

78 e4ccf6cd Guido Trotter
    @type client: L{ConfdClient}
79 e4ccf6cd Guido Trotter
    @param client: client library, to pass the datagrams to
80 e4ccf6cd Guido Trotter

81 e4ccf6cd Guido Trotter
    """
82 d8bcfe21 Manuel Franceschini
    daemon.AsyncUDPSocket.__init__(self, family)
83 e4ccf6cd Guido Trotter
    self.client = client
84 e4ccf6cd Guido Trotter
85 e4ccf6cd Guido Trotter
  # this method is overriding a daemon.AsyncUDPSocket method
86 e4ccf6cd Guido Trotter
  def handle_datagram(self, payload, ip, port):
87 e4ccf6cd Guido Trotter
    self.client.HandleResponse(payload, ip, port)
88 e4ccf6cd Guido Trotter
89 e4ccf6cd Guido Trotter
90 71e114da Iustin Pop
class _Request(object):
91 71e114da Iustin Pop
  """Request status structure.
92 71e114da Iustin Pop

93 71e114da Iustin Pop
  @ivar request: the request data
94 71e114da Iustin Pop
  @ivar args: any extra arguments for the callback
95 71e114da Iustin Pop
  @ivar expiry: the expiry timestamp of the request
96 bfbbc223 Iustin Pop
  @ivar sent: the set of contacted peers
97 bfbbc223 Iustin Pop
  @ivar rcvd: the set of peers who replied
98 71e114da Iustin Pop

99 71e114da Iustin Pop
  """
100 bfbbc223 Iustin Pop
  def __init__(self, request, args, expiry, sent):
101 71e114da Iustin Pop
    self.request = request
102 71e114da Iustin Pop
    self.args = args
103 71e114da Iustin Pop
    self.expiry = expiry
104 bfbbc223 Iustin Pop
    self.sent = frozenset(sent)
105 bfbbc223 Iustin Pop
    self.rcvd = set()
106 71e114da Iustin Pop
107 71e114da Iustin Pop
108 e4ccf6cd Guido Trotter
class ConfdClient:
109 e4ccf6cd Guido Trotter
  """Send queries to confd, and get back answers.
110 e4ccf6cd Guido Trotter

111 e4ccf6cd Guido Trotter
  Since the confd model works by querying multiple master candidates, and
112 e4ccf6cd Guido Trotter
  getting back answers, this is an asynchronous library. It can either work
113 e4ccf6cd Guido Trotter
  through asyncore or with your own handling.
114 e4ccf6cd Guido Trotter

115 71e114da Iustin Pop
  @type _requests: dict
116 71e114da Iustin Pop
  @ivar _requests: dictionary indexes by salt, which contains data
117 71e114da Iustin Pop
      about the outstanding requests; the values are objects of type
118 71e114da Iustin Pop
      L{_Request}
119 71e114da Iustin Pop

120 e4ccf6cd Guido Trotter
  """
121 a3db74e4 Guido Trotter
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
122 e4ccf6cd Guido Trotter
    """Constructor for ConfdClient
123 e4ccf6cd Guido Trotter

124 e4ccf6cd Guido Trotter
    @type hmac_key: string
125 e4ccf6cd Guido Trotter
    @param hmac_key: hmac key to talk to confd
126 e4ccf6cd Guido Trotter
    @type peers: list
127 e4ccf6cd Guido Trotter
    @param peers: list of peer nodes
128 96e03b0b Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
129 96e03b0b Guido Trotter
    @param callback: function to call when getting answers
130 7d20c647 Guido Trotter
    @type port: integer
131 d63997b3 Guido Trotter
    @param port: confd port (default: use GetDaemonPort)
132 69b99987 Michael Hanselmann
    @type logger: logging.Logger
133 d63997b3 Guido Trotter
    @param logger: optional logger for internal conditions
134 e4ccf6cd Guido Trotter

135 e4ccf6cd Guido Trotter
    """
136 96e03b0b Guido Trotter
    if not callable(callback):
137 96e03b0b Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
138 e4ccf6cd Guido Trotter
139 a5229439 Guido Trotter
    self.UpdatePeerList(peers)
140 d8bcfe21 Manuel Franceschini
    self._SetPeersAddressFamily()
141 e4ccf6cd Guido Trotter
    self._hmac_key = hmac_key
142 d8bcfe21 Manuel Franceschini
    self._socket = ConfdAsyncUDPClient(self, self._family)
143 96e03b0b Guido Trotter
    self._callback = callback
144 7d20c647 Guido Trotter
    self._confd_port = port
145 a3db74e4 Guido Trotter
    self._logger = logger
146 96e03b0b Guido Trotter
    self._requests = {}
147 7d20c647 Guido Trotter
148 7d20c647 Guido Trotter
    if self._confd_port is None:
149 a744b676 Manuel Franceschini
      self._confd_port = netutils.GetDaemonPort(constants.CONFD)
150 e4ccf6cd Guido Trotter
151 a5229439 Guido Trotter
  def UpdatePeerList(self, peers):
152 a5229439 Guido Trotter
    """Update the list of peers
153 a5229439 Guido Trotter

154 a5229439 Guido Trotter
    @type peers: list
155 a5229439 Guido Trotter
    @param peers: list of peer nodes
156 a5229439 Guido Trotter

157 a5229439 Guido Trotter
    """
158 e11ddf13 Iustin Pop
    # we are actually called from init, so:
159 b459a848 Andrea Spadaccini
    # pylint: disable=W0201
160 a5229439 Guido Trotter
    if not isinstance(peers, list):
161 a5229439 Guido Trotter
      raise errors.ProgrammerError("peers must be a list")
162 db169865 Guido Trotter
    # make a copy of peers, since we're going to shuffle the list, later
163 db169865 Guido Trotter
    self._peers = list(peers)
164 a5229439 Guido Trotter
165 e4ccf6cd Guido Trotter
  def _PackRequest(self, request, now=None):
166 e4ccf6cd Guido Trotter
    """Prepare a request to be sent on the wire.
167 e4ccf6cd Guido Trotter

168 e4ccf6cd Guido Trotter
    This function puts a proper salt in a confd request, puts the proper salt,
169 e4ccf6cd Guido Trotter
    and adds the correct magic number.
170 e4ccf6cd Guido Trotter

171 e4ccf6cd Guido Trotter
    """
172 e4ccf6cd Guido Trotter
    if now is None:
173 e4ccf6cd Guido Trotter
      now = time.time()
174 3ccb3a64 Michael Hanselmann
    tstamp = "%d" % now
175 e4ccf6cd Guido Trotter
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
176 e4ccf6cd Guido Trotter
    return confd.PackMagic(req)
177 e4ccf6cd Guido Trotter
178 e4ccf6cd Guido Trotter
  def _UnpackReply(self, payload):
179 e4ccf6cd Guido Trotter
    in_payload = confd.UnpackMagic(payload)
180 c103d7ae Guido Trotter
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
181 c103d7ae Guido Trotter
    answer = objects.ConfdReply.FromDict(dict_answer)
182 e4ccf6cd Guido Trotter
    return answer, salt
183 e4ccf6cd Guido Trotter
184 96e03b0b Guido Trotter
  def ExpireRequests(self):
185 96e03b0b Guido Trotter
    """Delete all the expired requests.
186 e4ccf6cd Guido Trotter

187 e4ccf6cd Guido Trotter
    """
188 e4ccf6cd Guido Trotter
    now = time.time()
189 71e114da Iustin Pop
    for rsalt, rq in self._requests.items():
190 71e114da Iustin Pop
      if now >= rq.expiry:
191 96e03b0b Guido Trotter
        del self._requests[rsalt]
192 96e03b0b Guido Trotter
        client_reply = ConfdUpcallPayload(salt=rsalt,
193 96e03b0b Guido Trotter
                                          type=UPCALL_EXPIRE,
194 71e114da Iustin Pop
                                          orig_request=rq.request,
195 71e114da Iustin Pop
                                          extra_args=rq.args,
196 5f6f260a Guido Trotter
                                          client=self,
197 5f6f260a Guido Trotter
                                          )
198 96e03b0b Guido Trotter
        self._callback(client_reply)
199 e4ccf6cd Guido Trotter
200 cc6484c4 Iustin Pop
  def SendRequest(self, request, args=None, coverage=0, async=True):
201 e4ccf6cd Guido Trotter
    """Send a confd request to some MCs
202 e4ccf6cd Guido Trotter

203 e4ccf6cd Guido Trotter
    @type request: L{objects.ConfdRequest}
204 e4ccf6cd Guido Trotter
    @param request: the request to send
205 e4ccf6cd Guido Trotter
    @type args: tuple
206 d63997b3 Guido Trotter
    @param args: additional callback arguments
207 e4ccf6cd Guido Trotter
    @type coverage: integer
208 cc6484c4 Iustin Pop
    @param coverage: number of remote nodes to contact; if default
209 cc6484c4 Iustin Pop
        (0), it will use a reasonable default
210 cc6484c4 Iustin Pop
        (L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
211 cc6484c4 Iustin Pop
        passed, it will use the maximum number of peers, otherwise the
212 cc6484c4 Iustin Pop
        number passed in will be used
213 8496d93c Guido Trotter
    @type async: boolean
214 8496d93c Guido Trotter
    @param async: handle the write asynchronously
215 e4ccf6cd Guido Trotter

216 e4ccf6cd Guido Trotter
    """
217 cc6484c4 Iustin Pop
    if coverage == 0:
218 e4ccf6cd Guido Trotter
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
219 cc6484c4 Iustin Pop
    elif coverage == -1:
220 cc6484c4 Iustin Pop
      coverage = len(self._peers)
221 e4ccf6cd Guido Trotter
222 e4ccf6cd Guido Trotter
    if coverage > len(self._peers):
223 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Not enough MCs known to provide the"
224 e4ccf6cd Guido Trotter
                                    " desired coverage")
225 e4ccf6cd Guido Trotter
226 e4ccf6cd Guido Trotter
    if not request.rsalt:
227 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Missing request rsalt")
228 e4ccf6cd Guido Trotter
229 96e03b0b Guido Trotter
    self.ExpireRequests()
230 96e03b0b Guido Trotter
    if request.rsalt in self._requests:
231 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Duplicate request rsalt")
232 e4ccf6cd Guido Trotter
233 e4ccf6cd Guido Trotter
    if request.type not in constants.CONFD_REQS:
234 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Invalid request type")
235 e4ccf6cd Guido Trotter
236 e4ccf6cd Guido Trotter
    random.shuffle(self._peers)
237 e4ccf6cd Guido Trotter
    targets = self._peers[:coverage]
238 e4ccf6cd Guido Trotter
239 e4ccf6cd Guido Trotter
    now = time.time()
240 e4ccf6cd Guido Trotter
    payload = self._PackRequest(request, now=now)
241 e4ccf6cd Guido Trotter
242 e4ccf6cd Guido Trotter
    for target in targets:
243 e4ccf6cd Guido Trotter
      try:
244 e4ccf6cd Guido Trotter
        self._socket.enqueue_send(target, self._confd_port, payload)
245 e4ccf6cd Guido Trotter
      except errors.UdpDataSizeError:
246 e4ccf6cd Guido Trotter
        raise errors.ConfdClientError("Request too big")
247 e4ccf6cd Guido Trotter
248 e4ccf6cd Guido Trotter
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
249 bfbbc223 Iustin Pop
    self._requests[request.rsalt] = _Request(request, args, expire_time,
250 bfbbc223 Iustin Pop
                                             targets)
251 e4ccf6cd Guido Trotter
252 8496d93c Guido Trotter
    if not async:
253 8496d93c Guido Trotter
      self.FlushSendQueue()
254 8496d93c Guido Trotter
255 e4ccf6cd Guido Trotter
  def HandleResponse(self, payload, ip, port):
256 e4ccf6cd Guido Trotter
    """Asynchronous handler for a confd reply
257 e4ccf6cd Guido Trotter

258 e4ccf6cd Guido Trotter
    Call the relevant callback associated to the current request.
259 e4ccf6cd Guido Trotter

260 e4ccf6cd Guido Trotter
    """
261 e4ccf6cd Guido Trotter
    try:
262 e4ccf6cd Guido Trotter
      try:
263 e4ccf6cd Guido Trotter
        answer, salt = self._UnpackReply(payload)
264 a3db74e4 Guido Trotter
      except (errors.SignatureError, errors.ConfdMagicError), err:
265 a3db74e4 Guido Trotter
        if self._logger:
266 a3db74e4 Guido Trotter
          self._logger.debug("Discarding broken package: %s" % err)
267 e4ccf6cd Guido Trotter
        return
268 e4ccf6cd Guido Trotter
269 e4ccf6cd Guido Trotter
      try:
270 71e114da Iustin Pop
        rq = self._requests[salt]
271 e4ccf6cd Guido Trotter
      except KeyError:
272 a3db74e4 Guido Trotter
        if self._logger:
273 a3db74e4 Guido Trotter
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
274 96e03b0b Guido Trotter
        return
275 96e03b0b Guido Trotter
276 bfbbc223 Iustin Pop
      rq.rcvd.add(ip)
277 bfbbc223 Iustin Pop
278 96e03b0b Guido Trotter
      client_reply = ConfdUpcallPayload(salt=salt,
279 96e03b0b Guido Trotter
                                        type=UPCALL_REPLY,
280 96e03b0b Guido Trotter
                                        server_reply=answer,
281 71e114da Iustin Pop
                                        orig_request=rq.request,
282 96e03b0b Guido Trotter
                                        server_ip=ip,
283 96e03b0b Guido Trotter
                                        server_port=port,
284 71e114da Iustin Pop
                                        extra_args=rq.args,
285 5f6f260a Guido Trotter
                                        client=self,
286 5f6f260a Guido Trotter
                                       )
287 96e03b0b Guido Trotter
      self._callback(client_reply)
288 e4ccf6cd Guido Trotter
289 e4ccf6cd Guido Trotter
    finally:
290 96e03b0b Guido Trotter
      self.ExpireRequests()
291 96e03b0b Guido Trotter
292 8496d93c Guido Trotter
  def FlushSendQueue(self):
293 8496d93c Guido Trotter
    """Send out all pending requests.
294 8496d93c Guido Trotter

295 8496d93c Guido Trotter
    Can be used for synchronous client use.
296 8496d93c Guido Trotter

297 8496d93c Guido Trotter
    """
298 8496d93c Guido Trotter
    while self._socket.writable():
299 8496d93c Guido Trotter
      self._socket.handle_write()
300 8496d93c Guido Trotter
301 8496d93c Guido Trotter
  def ReceiveReply(self, timeout=1):
302 8496d93c Guido Trotter
    """Receive one reply.
303 8496d93c Guido Trotter

304 8496d93c Guido Trotter
    @type timeout: float
305 8496d93c Guido Trotter
    @param timeout: how long to wait for the reply
306 8496d93c Guido Trotter
    @rtype: boolean
307 8496d93c Guido Trotter
    @return: True if some data has been handled, False otherwise
308 8496d93c Guido Trotter

309 8496d93c Guido Trotter
    """
310 8496d93c Guido Trotter
    return self._socket.process_next_packet(timeout=timeout)
311 8496d93c Guido Trotter
312 bfbbc223 Iustin Pop
  @staticmethod
313 bfbbc223 Iustin Pop
  def _NeededReplies(peer_cnt):
314 bfbbc223 Iustin Pop
    """Compute the minimum safe number of replies for a query.
315 bfbbc223 Iustin Pop

316 bfbbc223 Iustin Pop
    The algorithm is designed to work well for both small and big
317 bfbbc223 Iustin Pop
    number of peers:
318 bfbbc223 Iustin Pop
        - for less than three, we require all responses
319 bfbbc223 Iustin Pop
        - for less than five, we allow one miss
320 bfbbc223 Iustin Pop
        - otherwise, half the number plus one
321 bfbbc223 Iustin Pop

322 bfbbc223 Iustin Pop
    This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
323 bfbbc223 Iustin Pop
    4->2, 5->3, 6->3, 7->4, etc.
324 bfbbc223 Iustin Pop

325 bfbbc223 Iustin Pop
    @type peer_cnt: int
326 bfbbc223 Iustin Pop
    @param peer_cnt: the number of peers contacted
327 bfbbc223 Iustin Pop
    @rtype: int
328 bfbbc223 Iustin Pop
    @return: the number of replies which should give a safe coverage
329 bfbbc223 Iustin Pop

330 bfbbc223 Iustin Pop
    """
331 bfbbc223 Iustin Pop
    if peer_cnt < 3:
332 bfbbc223 Iustin Pop
      return peer_cnt
333 bfbbc223 Iustin Pop
    elif peer_cnt < 5:
334 bfbbc223 Iustin Pop
      return peer_cnt - 1
335 bfbbc223 Iustin Pop
    else:
336 e687ec01 Michael Hanselmann
      return int(peer_cnt / 2) + 1
337 bfbbc223 Iustin Pop
338 bfbbc223 Iustin Pop
  def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
339 bfbbc223 Iustin Pop
    """Wait for replies to a given request.
340 bfbbc223 Iustin Pop

341 bfbbc223 Iustin Pop
    This method will wait until either the timeout expires or a
342 bfbbc223 Iustin Pop
    minimum number (computed using L{_NeededReplies}) of replies are
343 bfbbc223 Iustin Pop
    received for the given salt. It is useful when doing synchronous
344 bfbbc223 Iustin Pop
    calls to this library.
345 bfbbc223 Iustin Pop

346 bfbbc223 Iustin Pop
    @param salt: the salt of the request we want responses for
347 bfbbc223 Iustin Pop
    @param timeout: the maximum timeout (should be less or equal to
348 bfbbc223 Iustin Pop
        L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
349 bfbbc223 Iustin Pop
    @rtype: tuple
350 bfbbc223 Iustin Pop
    @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
351 bfbbc223 Iustin Pop
        request is unknown, timed_out will be true and the counters
352 bfbbc223 Iustin Pop
        will be zero
353 bfbbc223 Iustin Pop

354 bfbbc223 Iustin Pop
    """
355 bfbbc223 Iustin Pop
    def _CheckResponse():
356 bfbbc223 Iustin Pop
      if salt not in self._requests:
357 bfbbc223 Iustin Pop
        # expired?
358 bfbbc223 Iustin Pop
        if self._logger:
359 bfbbc223 Iustin Pop
          self._logger.debug("Discarding unknown/expired request: %s" % salt)
360 bfbbc223 Iustin Pop
        return MISSING
361 bfbbc223 Iustin Pop
      rq = self._requests[salt]
362 bfbbc223 Iustin Pop
      if len(rq.rcvd) >= expected:
363 bfbbc223 Iustin Pop
        # already got all replies
364 bfbbc223 Iustin Pop
        return (False, len(rq.sent), len(rq.rcvd))
365 bfbbc223 Iustin Pop
      # else wait, using default timeout
366 bfbbc223 Iustin Pop
      self.ReceiveReply()
367 bfbbc223 Iustin Pop
      raise utils.RetryAgain()
368 bfbbc223 Iustin Pop
369 bfbbc223 Iustin Pop
    MISSING = (True, 0, 0)
370 bfbbc223 Iustin Pop
371 bfbbc223 Iustin Pop
    if salt not in self._requests:
372 bfbbc223 Iustin Pop
      return MISSING
373 bfbbc223 Iustin Pop
    # extend the expire time with the current timeout, so that we
374 bfbbc223 Iustin Pop
    # don't get the request expired from under us
375 bfbbc223 Iustin Pop
    rq = self._requests[salt]
376 bfbbc223 Iustin Pop
    rq.expiry += timeout
377 bfbbc223 Iustin Pop
    sent = len(rq.sent)
378 bfbbc223 Iustin Pop
    expected = self._NeededReplies(sent)
379 bfbbc223 Iustin Pop
380 bfbbc223 Iustin Pop
    try:
381 bfbbc223 Iustin Pop
      return utils.Retry(_CheckResponse, 0, timeout)
382 bfbbc223 Iustin Pop
    except utils.RetryTimeout:
383 bfbbc223 Iustin Pop
      if salt in self._requests:
384 bfbbc223 Iustin Pop
        rq = self._requests[salt]
385 bfbbc223 Iustin Pop
        return (True, len(rq.sent), len(rq.rcvd))
386 bfbbc223 Iustin Pop
      else:
387 bfbbc223 Iustin Pop
        return MISSING
388 bfbbc223 Iustin Pop
389 d8bcfe21 Manuel Franceschini
  def _SetPeersAddressFamily(self):
390 d8bcfe21 Manuel Franceschini
    if not self._peers:
391 d8bcfe21 Manuel Franceschini
      raise errors.ConfdClientError("Peer list empty")
392 d8bcfe21 Manuel Franceschini
    try:
393 d8bcfe21 Manuel Franceschini
      peer = self._peers[0]
394 8b312c1d Manuel Franceschini
      self._family = netutils.IPAddress.GetAddressFamily(peer)
395 d8bcfe21 Manuel Franceschini
      for peer in self._peers[1:]:
396 8b312c1d Manuel Franceschini
        if netutils.IPAddress.GetAddressFamily(peer) != self._family:
397 d8bcfe21 Manuel Franceschini
          raise errors.ConfdClientError("Peers must be of same address family")
398 8b312c1d Manuel Franceschini
    except errors.IPAddressError:
399 d8bcfe21 Manuel Franceschini
      raise errors.ConfdClientError("Peer address %s invalid" % peer)
400 d8bcfe21 Manuel Franceschini
401 96e03b0b Guido Trotter
402 96e03b0b Guido Trotter
# UPCALL_REPLY: server reply upcall
403 96e03b0b Guido Trotter
# has all ConfdUpcallPayload fields populated
404 96e03b0b Guido Trotter
UPCALL_REPLY = 1
405 96e03b0b Guido Trotter
# UPCALL_EXPIRE: internal library request expire
406 96e03b0b Guido Trotter
# has only salt, type, orig_request and extra_args
407 96e03b0b Guido Trotter
UPCALL_EXPIRE = 2
408 96e03b0b Guido Trotter
CONFD_UPCALL_TYPES = frozenset([
409 96e03b0b Guido Trotter
  UPCALL_REPLY,
410 96e03b0b Guido Trotter
  UPCALL_EXPIRE,
411 96e03b0b Guido Trotter
  ])
412 96e03b0b Guido Trotter
413 96e03b0b Guido Trotter
414 96e03b0b Guido Trotter
class ConfdUpcallPayload(objects.ConfigObject):
415 96e03b0b Guido Trotter
  """Callback argument for confd replies
416 96e03b0b Guido Trotter

417 96e03b0b Guido Trotter
  @type salt: string
418 96e03b0b Guido Trotter
  @ivar salt: salt associated with the query
419 96e03b0b Guido Trotter
  @type type: one of confd.client.CONFD_UPCALL_TYPES
420 96e03b0b Guido Trotter
  @ivar type: upcall type (server reply, expired request, ...)
421 96e03b0b Guido Trotter
  @type orig_request: L{objects.ConfdRequest}
422 96e03b0b Guido Trotter
  @ivar orig_request: original request
423 96e03b0b Guido Trotter
  @type server_reply: L{objects.ConfdReply}
424 96e03b0b Guido Trotter
  @ivar server_reply: server reply
425 96e03b0b Guido Trotter
  @type server_ip: string
426 96e03b0b Guido Trotter
  @ivar server_ip: answering server ip address
427 96e03b0b Guido Trotter
  @type server_port: int
428 96e03b0b Guido Trotter
  @ivar server_port: answering server port
429 96e03b0b Guido Trotter
  @type extra_args: any
430 96e03b0b Guido Trotter
  @ivar extra_args: 'args' argument of the SendRequest function
431 5f6f260a Guido Trotter
  @type client: L{ConfdClient}
432 5f6f260a Guido Trotter
  @ivar client: current confd client instance
433 96e03b0b Guido Trotter

434 96e03b0b Guido Trotter
  """
435 96e03b0b Guido Trotter
  __slots__ = [
436 96e03b0b Guido Trotter
    "salt",
437 96e03b0b Guido Trotter
    "type",
438 96e03b0b Guido Trotter
    "orig_request",
439 96e03b0b Guido Trotter
    "server_reply",
440 96e03b0b Guido Trotter
    "server_ip",
441 96e03b0b Guido Trotter
    "server_port",
442 96e03b0b Guido Trotter
    "extra_args",
443 5f6f260a Guido Trotter
    "client",
444 96e03b0b Guido Trotter
    ]
445 e4ccf6cd Guido Trotter
446 e4ccf6cd Guido Trotter
447 e4ccf6cd Guido Trotter
class ConfdClientRequest(objects.ConfdRequest):
448 e4ccf6cd Guido Trotter
  """This is the client-side version of ConfdRequest.
449 e4ccf6cd Guido Trotter

450 e4ccf6cd Guido Trotter
  This version of the class helps creating requests, on the client side, by
451 e4ccf6cd Guido Trotter
  filling in some default values.
452 e4ccf6cd Guido Trotter

453 e4ccf6cd Guido Trotter
  """
454 e4ccf6cd Guido Trotter
  def __init__(self, **kwargs):
455 e4ccf6cd Guido Trotter
    objects.ConfdRequest.__init__(self, **kwargs)
456 e4ccf6cd Guido Trotter
    if not self.rsalt:
457 e4ccf6cd Guido Trotter
      self.rsalt = utils.NewUUID()
458 e4ccf6cd Guido Trotter
    if not self.protocol:
459 e4ccf6cd Guido Trotter
      self.protocol = constants.CONFD_PROTOCOL_VERSION
460 e4ccf6cd Guido Trotter
    if self.type not in constants.CONFD_REQS:
461 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Invalid request type")
462 e4ccf6cd Guido Trotter
463 392ca296 Guido Trotter
464 392ca296 Guido Trotter
class ConfdFilterCallback:
465 392ca296 Guido Trotter
  """Callback that calls another callback, but filters duplicate results.
466 392ca296 Guido Trotter

467 49b3fdac Iustin Pop
  @ivar consistent: a dictionary indexed by salt; for each salt, if
468 49b3fdac Iustin Pop
      all responses ware identical, this will be True; this is the
469 49b3fdac Iustin Pop
      expected state on a healthy cluster; on inconsistent or
470 49b3fdac Iustin Pop
      partitioned clusters, this might be False, if we see answers
471 49b3fdac Iustin Pop
      with the same serial but different contents
472 49b3fdac Iustin Pop

473 392ca296 Guido Trotter
  """
474 392ca296 Guido Trotter
  def __init__(self, callback, logger=None):
475 392ca296 Guido Trotter
    """Constructor for ConfdFilterCallback
476 392ca296 Guido Trotter

477 392ca296 Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
478 392ca296 Guido Trotter
    @param callback: function to call when getting answers
479 69b99987 Michael Hanselmann
    @type logger: logging.Logger
480 d63997b3 Guido Trotter
    @param logger: optional logger for internal conditions
481 392ca296 Guido Trotter

482 392ca296 Guido Trotter
    """
483 392ca296 Guido Trotter
    if not callable(callback):
484 392ca296 Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
485 392ca296 Guido Trotter
486 392ca296 Guido Trotter
    self._callback = callback
487 392ca296 Guido Trotter
    self._logger = logger
488 392ca296 Guido Trotter
    # answers contains a dict of salt -> answer
489 392ca296 Guido Trotter
    self._answers = {}
490 49b3fdac Iustin Pop
    self.consistent = {}
491 392ca296 Guido Trotter
492 392ca296 Guido Trotter
  def _LogFilter(self, salt, new_reply, old_reply):
493 392ca296 Guido Trotter
    if not self._logger:
494 392ca296 Guido Trotter
      return
495 392ca296 Guido Trotter
496 392ca296 Guido Trotter
    if new_reply.serial > old_reply.serial:
497 392ca296 Guido Trotter
      self._logger.debug("Filtering confirming answer, with newer"
498 392ca296 Guido Trotter
                         " serial for query %s" % salt)
499 392ca296 Guido Trotter
    elif new_reply.serial == old_reply.serial:
500 392ca296 Guido Trotter
      if new_reply.answer != old_reply.answer:
501 392ca296 Guido Trotter
        self._logger.warning("Got incoherent answers for query %s"
502 392ca296 Guido Trotter
                             " (serial: %s)" % (salt, new_reply.serial))
503 392ca296 Guido Trotter
      else:
504 392ca296 Guido Trotter
        self._logger.debug("Filtering confirming answer, with same"
505 392ca296 Guido Trotter
                           " serial for query %s" % salt)
506 392ca296 Guido Trotter
    else:
507 392ca296 Guido Trotter
      self._logger.debug("Filtering outdated answer for query %s"
508 392ca296 Guido Trotter
                         " serial: (%d < %d)" % (salt, old_reply.serial,
509 392ca296 Guido Trotter
                                                 new_reply.serial))
510 392ca296 Guido Trotter
511 392ca296 Guido Trotter
  def _HandleExpire(self, up):
512 392ca296 Guido Trotter
    # if we have no answer we have received none, before the expiration.
513 a9613def Guido Trotter
    if up.salt in self._answers:
514 a9613def Guido Trotter
      del self._answers[up.salt]
515 49b3fdac Iustin Pop
    if up.salt in self.consistent:
516 49b3fdac Iustin Pop
      del self.consistent[up.salt]
517 392ca296 Guido Trotter
518 392ca296 Guido Trotter
  def _HandleReply(self, up):
519 392ca296 Guido Trotter
    """Handle a single confd reply, and decide whether to filter it.
520 392ca296 Guido Trotter

521 392ca296 Guido Trotter
    @rtype: boolean
522 392ca296 Guido Trotter
    @return: True if the reply should be filtered, False if it should be passed
523 392ca296 Guido Trotter
             on to the up-callback
524 392ca296 Guido Trotter

525 392ca296 Guido Trotter
    """
526 392ca296 Guido Trotter
    filter_upcall = False
527 392ca296 Guido Trotter
    salt = up.salt
528 49b3fdac Iustin Pop
    if salt not in self.consistent:
529 49b3fdac Iustin Pop
      self.consistent[salt] = True
530 392ca296 Guido Trotter
    if salt not in self._answers:
531 392ca296 Guido Trotter
      # first answer for a query (don't filter, and record)
532 392ca296 Guido Trotter
      self._answers[salt] = up.server_reply
533 392ca296 Guido Trotter
    elif up.server_reply.serial > self._answers[salt].serial:
534 392ca296 Guido Trotter
      # newer answer (record, and compare contents)
535 392ca296 Guido Trotter
      old_answer = self._answers[salt]
536 392ca296 Guido Trotter
      self._answers[salt] = up.server_reply
537 392ca296 Guido Trotter
      if up.server_reply.answer == old_answer.answer:
538 392ca296 Guido Trotter
        # same content (filter) (version upgrade was unrelated)
539 392ca296 Guido Trotter
        filter_upcall = True
540 392ca296 Guido Trotter
        self._LogFilter(salt, up.server_reply, old_answer)
541 392ca296 Guido Trotter
      # else: different content, pass up a second answer
542 392ca296 Guido Trotter
    else:
543 392ca296 Guido Trotter
      # older or same-version answer (duplicate or outdated, filter)
544 39292d3a Iustin Pop
      if (up.server_reply.serial == self._answers[salt].serial and
545 39292d3a Iustin Pop
          up.server_reply.answer != self._answers[salt].answer):
546 49b3fdac Iustin Pop
        self.consistent[salt] = False
547 392ca296 Guido Trotter
      filter_upcall = True
548 392ca296 Guido Trotter
      self._LogFilter(salt, up.server_reply, self._answers[salt])
549 392ca296 Guido Trotter
550 392ca296 Guido Trotter
    return filter_upcall
551 392ca296 Guido Trotter
552 392ca296 Guido Trotter
  def __call__(self, up):
553 392ca296 Guido Trotter
    """Filtering callback
554 392ca296 Guido Trotter

555 392ca296 Guido Trotter
    @type up: L{ConfdUpcallPayload}
556 392ca296 Guido Trotter
    @param up: upper callback
557 392ca296 Guido Trotter

558 392ca296 Guido Trotter
    """
559 392ca296 Guido Trotter
    filter_upcall = False
560 392ca296 Guido Trotter
    if up.type == UPCALL_REPLY:
561 392ca296 Guido Trotter
      filter_upcall = self._HandleReply(up)
562 392ca296 Guido Trotter
    elif up.type == UPCALL_EXPIRE:
563 392ca296 Guido Trotter
      self._HandleExpire(up)
564 392ca296 Guido Trotter
565 392ca296 Guido Trotter
    if not filter_upcall:
566 392ca296 Guido Trotter
      self._callback(up)
567 04cdf663 Guido Trotter
568 04cdf663 Guido Trotter
569 04cdf663 Guido Trotter
class ConfdCountingCallback:
570 04cdf663 Guido Trotter
  """Callback that calls another callback, and counts the answers
571 04cdf663 Guido Trotter

572 04cdf663 Guido Trotter
  """
573 04cdf663 Guido Trotter
  def __init__(self, callback, logger=None):
574 04cdf663 Guido Trotter
    """Constructor for ConfdCountingCallback
575 04cdf663 Guido Trotter

576 04cdf663 Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
577 04cdf663 Guido Trotter
    @param callback: function to call when getting answers
578 04cdf663 Guido Trotter
    @type logger: logging.Logger
579 04cdf663 Guido Trotter
    @param logger: optional logger for internal conditions
580 04cdf663 Guido Trotter

581 04cdf663 Guido Trotter
    """
582 04cdf663 Guido Trotter
    if not callable(callback):
583 04cdf663 Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
584 04cdf663 Guido Trotter
585 04cdf663 Guido Trotter
    self._callback = callback
586 04cdf663 Guido Trotter
    self._logger = logger
587 04cdf663 Guido Trotter
    # answers contains a dict of salt -> count
588 04cdf663 Guido Trotter
    self._answers = {}
589 04cdf663 Guido Trotter
590 04cdf663 Guido Trotter
  def RegisterQuery(self, salt):
591 04cdf663 Guido Trotter
    if salt in self._answers:
592 04cdf663 Guido Trotter
      raise errors.ProgrammerError("query already registered")
593 04cdf663 Guido Trotter
    self._answers[salt] = 0
594 04cdf663 Guido Trotter
595 04cdf663 Guido Trotter
  def AllAnswered(self):
596 04cdf663 Guido Trotter
    """Have all the registered queries received at least an answer?
597 04cdf663 Guido Trotter

598 04cdf663 Guido Trotter
    """
599 cea881e5 Michael Hanselmann
    return compat.all(self._answers.values())
600 04cdf663 Guido Trotter
601 04cdf663 Guido Trotter
  def _HandleExpire(self, up):
602 04cdf663 Guido Trotter
    # if we have no answer we have received none, before the expiration.
603 04cdf663 Guido Trotter
    if up.salt in self._answers:
604 04cdf663 Guido Trotter
      del self._answers[up.salt]
605 04cdf663 Guido Trotter
606 04cdf663 Guido Trotter
  def _HandleReply(self, up):
607 04cdf663 Guido Trotter
    """Handle a single confd reply, and decide whether to filter it.
608 04cdf663 Guido Trotter

609 04cdf663 Guido Trotter
    @rtype: boolean
610 04cdf663 Guido Trotter
    @return: True if the reply should be filtered, False if it should be passed
611 04cdf663 Guido Trotter
             on to the up-callback
612 04cdf663 Guido Trotter

613 04cdf663 Guido Trotter
    """
614 04cdf663 Guido Trotter
    if up.salt in self._answers:
615 04cdf663 Guido Trotter
      self._answers[up.salt] += 1
616 04cdf663 Guido Trotter
617 04cdf663 Guido Trotter
  def __call__(self, up):
618 04cdf663 Guido Trotter
    """Filtering callback
619 04cdf663 Guido Trotter

620 04cdf663 Guido Trotter
    @type up: L{ConfdUpcallPayload}
621 04cdf663 Guido Trotter
    @param up: upper callback
622 04cdf663 Guido Trotter

623 04cdf663 Guido Trotter
    """
624 04cdf663 Guido Trotter
    if up.type == UPCALL_REPLY:
625 04cdf663 Guido Trotter
      self._HandleReply(up)
626 04cdf663 Guido Trotter
    elif up.type == UPCALL_EXPIRE:
627 04cdf663 Guido Trotter
      self._HandleExpire(up)
628 04cdf663 Guido Trotter
    self._callback(up)
629 5b349fd1 Iustin Pop
630 71e114da Iustin Pop
631 aa2efc52 Iustin Pop
class StoreResultCallback:
632 aa2efc52 Iustin Pop
  """Callback that simply stores the most recent answer.
633 aa2efc52 Iustin Pop

634 aa2efc52 Iustin Pop
  @ivar _answers: dict of salt to (have_answer, reply)
635 aa2efc52 Iustin Pop

636 aa2efc52 Iustin Pop
  """
637 aa2efc52 Iustin Pop
  _NO_KEY = (False, None)
638 aa2efc52 Iustin Pop
639 aa2efc52 Iustin Pop
  def __init__(self):
640 aa2efc52 Iustin Pop
    """Constructor for StoreResultCallback
641 aa2efc52 Iustin Pop

642 aa2efc52 Iustin Pop
    """
643 aa2efc52 Iustin Pop
    # answers contains a dict of salt -> best result
644 aa2efc52 Iustin Pop
    self._answers = {}
645 aa2efc52 Iustin Pop
646 aa2efc52 Iustin Pop
  def GetResponse(self, salt):
647 aa2efc52 Iustin Pop
    """Return the best match for a salt
648 aa2efc52 Iustin Pop

649 aa2efc52 Iustin Pop
    """
650 aa2efc52 Iustin Pop
    return self._answers.get(salt, self._NO_KEY)
651 aa2efc52 Iustin Pop
652 aa2efc52 Iustin Pop
  def _HandleExpire(self, up):
653 aa2efc52 Iustin Pop
    """Expiration handler.
654 aa2efc52 Iustin Pop

655 aa2efc52 Iustin Pop
    """
656 aa2efc52 Iustin Pop
    if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
657 aa2efc52 Iustin Pop
      del self._answers[up.salt]
658 aa2efc52 Iustin Pop
659 aa2efc52 Iustin Pop
  def _HandleReply(self, up):
660 aa2efc52 Iustin Pop
    """Handle a single confd reply, and decide whether to filter it.
661 aa2efc52 Iustin Pop

662 aa2efc52 Iustin Pop
    """
663 aa2efc52 Iustin Pop
    self._answers[up.salt] = (True, up)
664 aa2efc52 Iustin Pop
665 aa2efc52 Iustin Pop
  def __call__(self, up):
666 aa2efc52 Iustin Pop
    """Filtering callback
667 aa2efc52 Iustin Pop

668 aa2efc52 Iustin Pop
    @type up: L{ConfdUpcallPayload}
669 aa2efc52 Iustin Pop
    @param up: upper callback
670 aa2efc52 Iustin Pop

671 aa2efc52 Iustin Pop
    """
672 aa2efc52 Iustin Pop
    if up.type == UPCALL_REPLY:
673 aa2efc52 Iustin Pop
      self._HandleReply(up)
674 aa2efc52 Iustin Pop
    elif up.type == UPCALL_EXPIRE:
675 aa2efc52 Iustin Pop
      self._HandleExpire(up)
676 aa2efc52 Iustin Pop
677 aa2efc52 Iustin Pop
678 5b349fd1 Iustin Pop
def GetConfdClient(callback):
679 5b349fd1 Iustin Pop
  """Return a client configured using the given callback.
680 5b349fd1 Iustin Pop

681 5b349fd1 Iustin Pop
  This is handy to abstract the MC list and HMAC key reading.
682 5b349fd1 Iustin Pop

683 5b349fd1 Iustin Pop
  @attention: This should only be called on nodes which are part of a
684 5b349fd1 Iustin Pop
      cluster, since it depends on a valid (ganeti) data directory;
685 5b349fd1 Iustin Pop
      for code running outside of a cluster, you need to create the
686 5b349fd1 Iustin Pop
      client manually
687 5b349fd1 Iustin Pop

688 5b349fd1 Iustin Pop
  """
689 5b349fd1 Iustin Pop
  ss = ssconf.SimpleStore()
690 5b349fd1 Iustin Pop
  mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
691 5b349fd1 Iustin Pop
  mc_list = utils.ReadFile(mc_file).splitlines()
692 5b349fd1 Iustin Pop
  hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
693 5b349fd1 Iustin Pop
  return ConfdClient(hmac_key, mc_list, callback)