Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ bfbbc223

History | View | Annotate | Download (19.5 kB)

1 e4ccf6cd Guido Trotter
#
2 e4ccf6cd Guido Trotter
#
3 e4ccf6cd Guido Trotter
4 e4ccf6cd Guido Trotter
# Copyright (C) 2009 Google Inc.
5 e4ccf6cd Guido Trotter
#
6 e4ccf6cd Guido Trotter
# This program is free software; you can redistribute it and/or modify
7 e4ccf6cd Guido Trotter
# it under the terms of the GNU General Public License as published by
8 e4ccf6cd Guido Trotter
# the Free Software Foundation; either version 2 of the License, or
9 e4ccf6cd Guido Trotter
# (at your option) any later version.
10 e4ccf6cd Guido Trotter
#
11 e4ccf6cd Guido Trotter
# This program is distributed in the hope that it will be useful, but
12 e4ccf6cd Guido Trotter
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 e4ccf6cd Guido Trotter
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 e4ccf6cd Guido Trotter
# General Public License for more details.
15 e4ccf6cd Guido Trotter
#
16 e4ccf6cd Guido Trotter
# You should have received a copy of the GNU General Public License
17 e4ccf6cd Guido Trotter
# along with this program; if not, write to the Free Software
18 e4ccf6cd Guido Trotter
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 e4ccf6cd Guido Trotter
# 02110-1301, USA.
20 e4ccf6cd Guido Trotter
21 e4ccf6cd Guido Trotter
22 e4ccf6cd Guido Trotter
"""Ganeti confd client
23 e4ccf6cd Guido Trotter

24 cf7b0cc4 Guido Trotter
Clients can use the confd client library to send requests to a group of master
25 cf7b0cc4 Guido Trotter
candidates running confd. The expected usage is through the asyncore framework,
26 cf7b0cc4 Guido Trotter
by sending queries, and asynchronously receiving replies through a callback.
27 cf7b0cc4 Guido Trotter

28 cf7b0cc4 Guido Trotter
This way the client library doesn't ever need to "wait" on a particular answer,
29 cf7b0cc4 Guido Trotter
and can proceed even if some udp packets are lost. It's up to the user to
30 cf7b0cc4 Guido Trotter
reschedule queries if they haven't received responses and they need them.
31 cf7b0cc4 Guido Trotter

32 69b99987 Michael Hanselmann
Example usage::
33 69b99987 Michael Hanselmann

34 cf7b0cc4 Guido Trotter
  client = ConfdClient(...) # includes callback specification
35 cf7b0cc4 Guido Trotter
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 cf7b0cc4 Guido Trotter
  client.SendRequest(req)
37 cf7b0cc4 Guido Trotter
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38 cf7b0cc4 Guido Trotter
  # ... wait ...
39 cf7b0cc4 Guido Trotter
  # And your callback will be called by asyncore, when your query gets a
40 cf7b0cc4 Guido Trotter
  # response, or when it expires.
41 cf7b0cc4 Guido Trotter

42 392ca296 Guido Trotter
You can use the provided ConfdFilterCallback to act as a filter, only passing
43 392ca296 Guido Trotter
"newer" answer to your callback, and filtering out outdated ones, or ones
44 392ca296 Guido Trotter
confirming what you already got.
45 392ca296 Guido Trotter

46 e4ccf6cd Guido Trotter
"""
47 69b99987 Michael Hanselmann
48 6c881c52 Iustin Pop
# pylint: disable-msg=E0203
49 6c881c52 Iustin Pop
50 6c881c52 Iustin Pop
# E0203: Access to member %r before its definition, since we use
51 6c881c52 Iustin Pop
# objects.py which doesn't explicitely initialise its members
52 6c881c52 Iustin Pop
53 e4ccf6cd Guido Trotter
import time
54 e4ccf6cd Guido Trotter
import random
55 e4ccf6cd Guido Trotter
56 e4ccf6cd Guido Trotter
from ganeti import utils
57 e4ccf6cd Guido Trotter
from ganeti import constants
58 e4ccf6cd Guido Trotter
from ganeti import objects
59 e4ccf6cd Guido Trotter
from ganeti import serializer
60 e4ccf6cd Guido Trotter
from ganeti import daemon # contains AsyncUDPSocket
61 e4ccf6cd Guido Trotter
from ganeti import errors
62 e4ccf6cd Guido Trotter
from ganeti import confd
63 5b349fd1 Iustin Pop
from ganeti import ssconf
64 e4ccf6cd Guido Trotter
65 e4ccf6cd Guido Trotter
66 e4ccf6cd Guido Trotter
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
67 e4ccf6cd Guido Trotter
  """Confd udp asyncore client
68 e4ccf6cd Guido Trotter

69 e4ccf6cd Guido Trotter
  This is kept separate from the main ConfdClient to make sure it's easy to
70 e4ccf6cd Guido Trotter
  implement a non-asyncore based client library.
71 e4ccf6cd Guido Trotter

72 e4ccf6cd Guido Trotter
  """
73 e4ccf6cd Guido Trotter
  def __init__(self, client):
74 e4ccf6cd Guido Trotter
    """Constructor for ConfdAsyncUDPClient
75 e4ccf6cd Guido Trotter

76 e4ccf6cd Guido Trotter
    @type client: L{ConfdClient}
77 e4ccf6cd Guido Trotter
    @param client: client library, to pass the datagrams to
78 e4ccf6cd Guido Trotter

79 e4ccf6cd Guido Trotter
    """
80 e4ccf6cd Guido Trotter
    daemon.AsyncUDPSocket.__init__(self)
81 e4ccf6cd Guido Trotter
    self.client = client
82 e4ccf6cd Guido Trotter
83 e4ccf6cd Guido Trotter
  # this method is overriding a daemon.AsyncUDPSocket method
84 e4ccf6cd Guido Trotter
  def handle_datagram(self, payload, ip, port):
85 e4ccf6cd Guido Trotter
    self.client.HandleResponse(payload, ip, port)
86 e4ccf6cd Guido Trotter
87 e4ccf6cd Guido Trotter
88 71e114da Iustin Pop
class _Request(object):
89 71e114da Iustin Pop
  """Request status structure.
90 71e114da Iustin Pop

91 71e114da Iustin Pop
  @ivar request: the request data
92 71e114da Iustin Pop
  @ivar args: any extra arguments for the callback
93 71e114da Iustin Pop
  @ivar expiry: the expiry timestamp of the request
94 bfbbc223 Iustin Pop
  @ivar sent: the set of contacted peers
95 bfbbc223 Iustin Pop
  @ivar rcvd: the set of peers who replied
96 71e114da Iustin Pop

97 71e114da Iustin Pop
  """
98 bfbbc223 Iustin Pop
  def __init__(self, request, args, expiry, sent):
99 71e114da Iustin Pop
    self.request = request
100 71e114da Iustin Pop
    self.args = args
101 71e114da Iustin Pop
    self.expiry = expiry
102 bfbbc223 Iustin Pop
    self.sent = frozenset(sent)
103 bfbbc223 Iustin Pop
    self.rcvd = set()
104 71e114da Iustin Pop
105 71e114da Iustin Pop
106 e4ccf6cd Guido Trotter
class ConfdClient:
107 e4ccf6cd Guido Trotter
  """Send queries to confd, and get back answers.
108 e4ccf6cd Guido Trotter

109 e4ccf6cd Guido Trotter
  Since the confd model works by querying multiple master candidates, and
110 e4ccf6cd Guido Trotter
  getting back answers, this is an asynchronous library. It can either work
111 e4ccf6cd Guido Trotter
  through asyncore or with your own handling.
112 e4ccf6cd Guido Trotter

113 71e114da Iustin Pop
  @type _requests: dict
114 71e114da Iustin Pop
  @ivar _requests: dictionary indexes by salt, which contains data
115 71e114da Iustin Pop
      about the outstanding requests; the values are objects of type
116 71e114da Iustin Pop
      L{_Request}
117 71e114da Iustin Pop

118 e4ccf6cd Guido Trotter
  """
119 a3db74e4 Guido Trotter
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
120 e4ccf6cd Guido Trotter
    """Constructor for ConfdClient
121 e4ccf6cd Guido Trotter

122 e4ccf6cd Guido Trotter
    @type hmac_key: string
123 e4ccf6cd Guido Trotter
    @param hmac_key: hmac key to talk to confd
124 e4ccf6cd Guido Trotter
    @type peers: list
125 e4ccf6cd Guido Trotter
    @param peers: list of peer nodes
126 96e03b0b Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
127 96e03b0b Guido Trotter
    @param callback: function to call when getting answers
128 7d20c647 Guido Trotter
    @type port: integer
129 d63997b3 Guido Trotter
    @param port: confd port (default: use GetDaemonPort)
130 69b99987 Michael Hanselmann
    @type logger: logging.Logger
131 d63997b3 Guido Trotter
    @param logger: optional logger for internal conditions
132 e4ccf6cd Guido Trotter

133 e4ccf6cd Guido Trotter
    """
134 96e03b0b Guido Trotter
    if not callable(callback):
135 96e03b0b Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
136 e4ccf6cd Guido Trotter
137 a5229439 Guido Trotter
    self.UpdatePeerList(peers)
138 e4ccf6cd Guido Trotter
    self._hmac_key = hmac_key
139 e4ccf6cd Guido Trotter
    self._socket = ConfdAsyncUDPClient(self)
140 96e03b0b Guido Trotter
    self._callback = callback
141 7d20c647 Guido Trotter
    self._confd_port = port
142 a3db74e4 Guido Trotter
    self._logger = logger
143 96e03b0b Guido Trotter
    self._requests = {}
144 7d20c647 Guido Trotter
145 7d20c647 Guido Trotter
    if self._confd_port is None:
146 7d20c647 Guido Trotter
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
147 e4ccf6cd Guido Trotter
148 a5229439 Guido Trotter
  def UpdatePeerList(self, peers):
149 a5229439 Guido Trotter
    """Update the list of peers
150 a5229439 Guido Trotter

151 a5229439 Guido Trotter
    @type peers: list
152 a5229439 Guido Trotter
    @param peers: list of peer nodes
153 a5229439 Guido Trotter

154 a5229439 Guido Trotter
    """
155 e11ddf13 Iustin Pop
    # we are actually called from init, so:
156 e11ddf13 Iustin Pop
    # pylint: disable-msg=W0201
157 a5229439 Guido Trotter
    if not isinstance(peers, list):
158 a5229439 Guido Trotter
      raise errors.ProgrammerError("peers must be a list")
159 db169865 Guido Trotter
    # make a copy of peers, since we're going to shuffle the list, later
160 db169865 Guido Trotter
    self._peers = list(peers)
161 a5229439 Guido Trotter
162 e4ccf6cd Guido Trotter
  def _PackRequest(self, request, now=None):
163 e4ccf6cd Guido Trotter
    """Prepare a request to be sent on the wire.
164 e4ccf6cd Guido Trotter

165 e4ccf6cd Guido Trotter
    This function puts a proper salt in a confd request, puts the proper salt,
166 e4ccf6cd Guido Trotter
    and adds the correct magic number.
167 e4ccf6cd Guido Trotter

168 e4ccf6cd Guido Trotter
    """
169 e4ccf6cd Guido Trotter
    if now is None:
170 e4ccf6cd Guido Trotter
      now = time.time()
171 e4ccf6cd Guido Trotter
    tstamp = '%d' % now
172 e4ccf6cd Guido Trotter
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
173 e4ccf6cd Guido Trotter
    return confd.PackMagic(req)
174 e4ccf6cd Guido Trotter
175 e4ccf6cd Guido Trotter
  def _UnpackReply(self, payload):
176 e4ccf6cd Guido Trotter
    in_payload = confd.UnpackMagic(payload)
177 c103d7ae Guido Trotter
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
178 c103d7ae Guido Trotter
    answer = objects.ConfdReply.FromDict(dict_answer)
179 e4ccf6cd Guido Trotter
    return answer, salt
180 e4ccf6cd Guido Trotter
181 96e03b0b Guido Trotter
  def ExpireRequests(self):
182 96e03b0b Guido Trotter
    """Delete all the expired requests.
183 e4ccf6cd Guido Trotter

184 e4ccf6cd Guido Trotter
    """
185 e4ccf6cd Guido Trotter
    now = time.time()
186 71e114da Iustin Pop
    for rsalt, rq in self._requests.items():
187 71e114da Iustin Pop
      if now >= rq.expiry:
188 96e03b0b Guido Trotter
        del self._requests[rsalt]
189 96e03b0b Guido Trotter
        client_reply = ConfdUpcallPayload(salt=rsalt,
190 96e03b0b Guido Trotter
                                          type=UPCALL_EXPIRE,
191 71e114da Iustin Pop
                                          orig_request=rq.request,
192 71e114da Iustin Pop
                                          extra_args=rq.args,
193 5f6f260a Guido Trotter
                                          client=self,
194 5f6f260a Guido Trotter
                                          )
195 96e03b0b Guido Trotter
        self._callback(client_reply)
196 e4ccf6cd Guido Trotter
197 d63997b3 Guido Trotter
  def SendRequest(self, request, args=None, coverage=None, async=True):
198 e4ccf6cd Guido Trotter
    """Send a confd request to some MCs
199 e4ccf6cd Guido Trotter

200 e4ccf6cd Guido Trotter
    @type request: L{objects.ConfdRequest}
201 e4ccf6cd Guido Trotter
    @param request: the request to send
202 e4ccf6cd Guido Trotter
    @type args: tuple
203 d63997b3 Guido Trotter
    @param args: additional callback arguments
204 e4ccf6cd Guido Trotter
    @type coverage: integer
205 d63997b3 Guido Trotter
    @param coverage: number of remote nodes to contact
206 8496d93c Guido Trotter
    @type async: boolean
207 8496d93c Guido Trotter
    @param async: handle the write asynchronously
208 e4ccf6cd Guido Trotter

209 e4ccf6cd Guido Trotter
    """
210 e4ccf6cd Guido Trotter
    if coverage is None:
211 e4ccf6cd Guido Trotter
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
212 e4ccf6cd Guido Trotter
213 e4ccf6cd Guido Trotter
    if coverage > len(self._peers):
214 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Not enough MCs known to provide the"
215 e4ccf6cd Guido Trotter
                                    " desired coverage")
216 e4ccf6cd Guido Trotter
217 e4ccf6cd Guido Trotter
    if not request.rsalt:
218 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Missing request rsalt")
219 e4ccf6cd Guido Trotter
220 96e03b0b Guido Trotter
    self.ExpireRequests()
221 96e03b0b Guido Trotter
    if request.rsalt in self._requests:
222 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Duplicate request rsalt")
223 e4ccf6cd Guido Trotter
224 e4ccf6cd Guido Trotter
    if request.type not in constants.CONFD_REQS:
225 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Invalid request type")
226 e4ccf6cd Guido Trotter
227 e4ccf6cd Guido Trotter
    random.shuffle(self._peers)
228 e4ccf6cd Guido Trotter
    targets = self._peers[:coverage]
229 e4ccf6cd Guido Trotter
230 e4ccf6cd Guido Trotter
    now = time.time()
231 e4ccf6cd Guido Trotter
    payload = self._PackRequest(request, now=now)
232 e4ccf6cd Guido Trotter
233 e4ccf6cd Guido Trotter
    for target in targets:
234 e4ccf6cd Guido Trotter
      try:
235 e4ccf6cd Guido Trotter
        self._socket.enqueue_send(target, self._confd_port, payload)
236 e4ccf6cd Guido Trotter
      except errors.UdpDataSizeError:
237 e4ccf6cd Guido Trotter
        raise errors.ConfdClientError("Request too big")
238 e4ccf6cd Guido Trotter
239 e4ccf6cd Guido Trotter
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
240 bfbbc223 Iustin Pop
    self._requests[request.rsalt] = _Request(request, args, expire_time,
241 bfbbc223 Iustin Pop
                                             targets)
242 e4ccf6cd Guido Trotter
243 8496d93c Guido Trotter
    if not async:
244 8496d93c Guido Trotter
      self.FlushSendQueue()
245 8496d93c Guido Trotter
246 e4ccf6cd Guido Trotter
  def HandleResponse(self, payload, ip, port):
247 e4ccf6cd Guido Trotter
    """Asynchronous handler for a confd reply
248 e4ccf6cd Guido Trotter

249 e4ccf6cd Guido Trotter
    Call the relevant callback associated to the current request.
250 e4ccf6cd Guido Trotter

251 e4ccf6cd Guido Trotter
    """
252 e4ccf6cd Guido Trotter
    try:
253 e4ccf6cd Guido Trotter
      try:
254 e4ccf6cd Guido Trotter
        answer, salt = self._UnpackReply(payload)
255 a3db74e4 Guido Trotter
      except (errors.SignatureError, errors.ConfdMagicError), err:
256 a3db74e4 Guido Trotter
        if self._logger:
257 a3db74e4 Guido Trotter
          self._logger.debug("Discarding broken package: %s" % err)
258 e4ccf6cd Guido Trotter
        return
259 e4ccf6cd Guido Trotter
260 e4ccf6cd Guido Trotter
      try:
261 71e114da Iustin Pop
        rq = self._requests[salt]
262 e4ccf6cd Guido Trotter
      except KeyError:
263 a3db74e4 Guido Trotter
        if self._logger:
264 a3db74e4 Guido Trotter
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
265 96e03b0b Guido Trotter
        return
266 96e03b0b Guido Trotter
267 bfbbc223 Iustin Pop
      rq.rcvd.add(ip)
268 bfbbc223 Iustin Pop
269 96e03b0b Guido Trotter
      client_reply = ConfdUpcallPayload(salt=salt,
270 96e03b0b Guido Trotter
                                        type=UPCALL_REPLY,
271 96e03b0b Guido Trotter
                                        server_reply=answer,
272 71e114da Iustin Pop
                                        orig_request=rq.request,
273 96e03b0b Guido Trotter
                                        server_ip=ip,
274 96e03b0b Guido Trotter
                                        server_port=port,
275 71e114da Iustin Pop
                                        extra_args=rq.args,
276 5f6f260a Guido Trotter
                                        client=self,
277 5f6f260a Guido Trotter
                                       )
278 96e03b0b Guido Trotter
      self._callback(client_reply)
279 e4ccf6cd Guido Trotter
280 e4ccf6cd Guido Trotter
    finally:
281 96e03b0b Guido Trotter
      self.ExpireRequests()
282 96e03b0b Guido Trotter
283 8496d93c Guido Trotter
  def FlushSendQueue(self):
284 8496d93c Guido Trotter
    """Send out all pending requests.
285 8496d93c Guido Trotter

286 8496d93c Guido Trotter
    Can be used for synchronous client use.
287 8496d93c Guido Trotter

288 8496d93c Guido Trotter
    """
289 8496d93c Guido Trotter
    while self._socket.writable():
290 8496d93c Guido Trotter
      self._socket.handle_write()
291 8496d93c Guido Trotter
292 8496d93c Guido Trotter
  def ReceiveReply(self, timeout=1):
293 8496d93c Guido Trotter
    """Receive one reply.
294 8496d93c Guido Trotter

295 8496d93c Guido Trotter
    @type timeout: float
296 8496d93c Guido Trotter
    @param timeout: how long to wait for the reply
297 8496d93c Guido Trotter
    @rtype: boolean
298 8496d93c Guido Trotter
    @return: True if some data has been handled, False otherwise
299 8496d93c Guido Trotter

300 8496d93c Guido Trotter
    """
301 8496d93c Guido Trotter
    return self._socket.process_next_packet(timeout=timeout)
302 8496d93c Guido Trotter
303 bfbbc223 Iustin Pop
  @staticmethod
304 bfbbc223 Iustin Pop
  def _NeededReplies(peer_cnt):
305 bfbbc223 Iustin Pop
    """Compute the minimum safe number of replies for a query.
306 bfbbc223 Iustin Pop

307 bfbbc223 Iustin Pop
    The algorithm is designed to work well for both small and big
308 bfbbc223 Iustin Pop
    number of peers:
309 bfbbc223 Iustin Pop
        - for less than three, we require all responses
310 bfbbc223 Iustin Pop
        - for less than five, we allow one miss
311 bfbbc223 Iustin Pop
        - otherwise, half the number plus one
312 bfbbc223 Iustin Pop

313 bfbbc223 Iustin Pop
    This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
314 bfbbc223 Iustin Pop
    4->2, 5->3, 6->3, 7->4, etc.
315 bfbbc223 Iustin Pop

316 bfbbc223 Iustin Pop
    @type peer_cnt: int
317 bfbbc223 Iustin Pop
    @param peer_cnt: the number of peers contacted
318 bfbbc223 Iustin Pop
    @rtype: int
319 bfbbc223 Iustin Pop
    @return: the number of replies which should give a safe coverage
320 bfbbc223 Iustin Pop

321 bfbbc223 Iustin Pop
    """
322 bfbbc223 Iustin Pop
    if peer_cnt < 3:
323 bfbbc223 Iustin Pop
      return peer_cnt
324 bfbbc223 Iustin Pop
    elif peer_cnt < 5:
325 bfbbc223 Iustin Pop
      return peer_cnt - 1
326 bfbbc223 Iustin Pop
    else:
327 bfbbc223 Iustin Pop
      return int(peer_cnt/2) + 1
328 bfbbc223 Iustin Pop
329 bfbbc223 Iustin Pop
  def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
330 bfbbc223 Iustin Pop
    """Wait for replies to a given request.
331 bfbbc223 Iustin Pop

332 bfbbc223 Iustin Pop
    This method will wait until either the timeout expires or a
333 bfbbc223 Iustin Pop
    minimum number (computed using L{_NeededReplies}) of replies are
334 bfbbc223 Iustin Pop
    received for the given salt. It is useful when doing synchronous
335 bfbbc223 Iustin Pop
    calls to this library.
336 bfbbc223 Iustin Pop

337 bfbbc223 Iustin Pop
    @param salt: the salt of the request we want responses for
338 bfbbc223 Iustin Pop
    @param timeout: the maximum timeout (should be less or equal to
339 bfbbc223 Iustin Pop
        L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
340 bfbbc223 Iustin Pop
    @rtype: tuple
341 bfbbc223 Iustin Pop
    @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
342 bfbbc223 Iustin Pop
        request is unknown, timed_out will be true and the counters
343 bfbbc223 Iustin Pop
        will be zero
344 bfbbc223 Iustin Pop

345 bfbbc223 Iustin Pop
    """
346 bfbbc223 Iustin Pop
    def _CheckResponse():
347 bfbbc223 Iustin Pop
      if salt not in self._requests:
348 bfbbc223 Iustin Pop
        # expired?
349 bfbbc223 Iustin Pop
        if self._logger:
350 bfbbc223 Iustin Pop
          self._logger.debug("Discarding unknown/expired request: %s" % salt)
351 bfbbc223 Iustin Pop
        return MISSING
352 bfbbc223 Iustin Pop
      rq = self._requests[salt]
353 bfbbc223 Iustin Pop
      if len(rq.rcvd) >= expected:
354 bfbbc223 Iustin Pop
        # already got all replies
355 bfbbc223 Iustin Pop
        return (False, len(rq.sent), len(rq.rcvd))
356 bfbbc223 Iustin Pop
      # else wait, using default timeout
357 bfbbc223 Iustin Pop
      self.ReceiveReply()
358 bfbbc223 Iustin Pop
      raise utils.RetryAgain()
359 bfbbc223 Iustin Pop
360 bfbbc223 Iustin Pop
    MISSING = (True, 0, 0)
361 bfbbc223 Iustin Pop
362 bfbbc223 Iustin Pop
    if salt not in self._requests:
363 bfbbc223 Iustin Pop
      return MISSING
364 bfbbc223 Iustin Pop
    # extend the expire time with the current timeout, so that we
365 bfbbc223 Iustin Pop
    # don't get the request expired from under us
366 bfbbc223 Iustin Pop
    rq = self._requests[salt]
367 bfbbc223 Iustin Pop
    rq.expiry += timeout
368 bfbbc223 Iustin Pop
    sent = len(rq.sent)
369 bfbbc223 Iustin Pop
    expected = self._NeededReplies(sent)
370 bfbbc223 Iustin Pop
371 bfbbc223 Iustin Pop
    try:
372 bfbbc223 Iustin Pop
      return utils.Retry(_CheckResponse, 0, timeout)
373 bfbbc223 Iustin Pop
    except utils.RetryTimeout:
374 bfbbc223 Iustin Pop
      if salt in self._requests:
375 bfbbc223 Iustin Pop
        rq = self._requests[salt]
376 bfbbc223 Iustin Pop
        return (True, len(rq.sent), len(rq.rcvd))
377 bfbbc223 Iustin Pop
      else:
378 bfbbc223 Iustin Pop
        return MISSING
379 bfbbc223 Iustin Pop
380 96e03b0b Guido Trotter
381 96e03b0b Guido Trotter
# UPCALL_REPLY: server reply upcall
382 96e03b0b Guido Trotter
# has all ConfdUpcallPayload fields populated
383 96e03b0b Guido Trotter
UPCALL_REPLY = 1
384 96e03b0b Guido Trotter
# UPCALL_EXPIRE: internal library request expire
385 96e03b0b Guido Trotter
# has only salt, type, orig_request and extra_args
386 96e03b0b Guido Trotter
UPCALL_EXPIRE = 2
387 96e03b0b Guido Trotter
CONFD_UPCALL_TYPES = frozenset([
388 96e03b0b Guido Trotter
  UPCALL_REPLY,
389 96e03b0b Guido Trotter
  UPCALL_EXPIRE,
390 96e03b0b Guido Trotter
  ])
391 96e03b0b Guido Trotter
392 96e03b0b Guido Trotter
393 96e03b0b Guido Trotter
class ConfdUpcallPayload(objects.ConfigObject):
394 96e03b0b Guido Trotter
  """Callback argument for confd replies
395 96e03b0b Guido Trotter

396 96e03b0b Guido Trotter
  @type salt: string
397 96e03b0b Guido Trotter
  @ivar salt: salt associated with the query
398 96e03b0b Guido Trotter
  @type type: one of confd.client.CONFD_UPCALL_TYPES
399 96e03b0b Guido Trotter
  @ivar type: upcall type (server reply, expired request, ...)
400 96e03b0b Guido Trotter
  @type orig_request: L{objects.ConfdRequest}
401 96e03b0b Guido Trotter
  @ivar orig_request: original request
402 96e03b0b Guido Trotter
  @type server_reply: L{objects.ConfdReply}
403 96e03b0b Guido Trotter
  @ivar server_reply: server reply
404 96e03b0b Guido Trotter
  @type server_ip: string
405 96e03b0b Guido Trotter
  @ivar server_ip: answering server ip address
406 96e03b0b Guido Trotter
  @type server_port: int
407 96e03b0b Guido Trotter
  @ivar server_port: answering server port
408 96e03b0b Guido Trotter
  @type extra_args: any
409 96e03b0b Guido Trotter
  @ivar extra_args: 'args' argument of the SendRequest function
410 5f6f260a Guido Trotter
  @type client: L{ConfdClient}
411 5f6f260a Guido Trotter
  @ivar client: current confd client instance
412 96e03b0b Guido Trotter

413 96e03b0b Guido Trotter
  """
414 96e03b0b Guido Trotter
  __slots__ = [
415 96e03b0b Guido Trotter
    "salt",
416 96e03b0b Guido Trotter
    "type",
417 96e03b0b Guido Trotter
    "orig_request",
418 96e03b0b Guido Trotter
    "server_reply",
419 96e03b0b Guido Trotter
    "server_ip",
420 96e03b0b Guido Trotter
    "server_port",
421 96e03b0b Guido Trotter
    "extra_args",
422 5f6f260a Guido Trotter
    "client",
423 96e03b0b Guido Trotter
    ]
424 e4ccf6cd Guido Trotter
425 e4ccf6cd Guido Trotter
426 e4ccf6cd Guido Trotter
class ConfdClientRequest(objects.ConfdRequest):
427 e4ccf6cd Guido Trotter
  """This is the client-side version of ConfdRequest.
428 e4ccf6cd Guido Trotter

429 e4ccf6cd Guido Trotter
  This version of the class helps creating requests, on the client side, by
430 e4ccf6cd Guido Trotter
  filling in some default values.
431 e4ccf6cd Guido Trotter

432 e4ccf6cd Guido Trotter
  """
433 e4ccf6cd Guido Trotter
  def __init__(self, **kwargs):
434 e4ccf6cd Guido Trotter
    objects.ConfdRequest.__init__(self, **kwargs)
435 e4ccf6cd Guido Trotter
    if not self.rsalt:
436 e4ccf6cd Guido Trotter
      self.rsalt = utils.NewUUID()
437 e4ccf6cd Guido Trotter
    if not self.protocol:
438 e4ccf6cd Guido Trotter
      self.protocol = constants.CONFD_PROTOCOL_VERSION
439 e4ccf6cd Guido Trotter
    if self.type not in constants.CONFD_REQS:
440 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Invalid request type")
441 e4ccf6cd Guido Trotter
442 392ca296 Guido Trotter
443 392ca296 Guido Trotter
class ConfdFilterCallback:
444 392ca296 Guido Trotter
  """Callback that calls another callback, but filters duplicate results.
445 392ca296 Guido Trotter

446 49b3fdac Iustin Pop
  @ivar consistent: a dictionary indexed by salt; for each salt, if
447 49b3fdac Iustin Pop
      all responses ware identical, this will be True; this is the
448 49b3fdac Iustin Pop
      expected state on a healthy cluster; on inconsistent or
449 49b3fdac Iustin Pop
      partitioned clusters, this might be False, if we see answers
450 49b3fdac Iustin Pop
      with the same serial but different contents
451 49b3fdac Iustin Pop

452 392ca296 Guido Trotter
  """
453 392ca296 Guido Trotter
  def __init__(self, callback, logger=None):
454 392ca296 Guido Trotter
    """Constructor for ConfdFilterCallback
455 392ca296 Guido Trotter

456 392ca296 Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
457 392ca296 Guido Trotter
    @param callback: function to call when getting answers
458 69b99987 Michael Hanselmann
    @type logger: logging.Logger
459 d63997b3 Guido Trotter
    @param logger: optional logger for internal conditions
460 392ca296 Guido Trotter

461 392ca296 Guido Trotter
    """
462 392ca296 Guido Trotter
    if not callable(callback):
463 392ca296 Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
464 392ca296 Guido Trotter
465 392ca296 Guido Trotter
    self._callback = callback
466 392ca296 Guido Trotter
    self._logger = logger
467 392ca296 Guido Trotter
    # answers contains a dict of salt -> answer
468 392ca296 Guido Trotter
    self._answers = {}
469 49b3fdac Iustin Pop
    self.consistent = {}
470 392ca296 Guido Trotter
471 392ca296 Guido Trotter
  def _LogFilter(self, salt, new_reply, old_reply):
472 392ca296 Guido Trotter
    if not self._logger:
473 392ca296 Guido Trotter
      return
474 392ca296 Guido Trotter
475 392ca296 Guido Trotter
    if new_reply.serial > old_reply.serial:
476 392ca296 Guido Trotter
      self._logger.debug("Filtering confirming answer, with newer"
477 392ca296 Guido Trotter
                         " serial for query %s" % salt)
478 392ca296 Guido Trotter
    elif new_reply.serial == old_reply.serial:
479 392ca296 Guido Trotter
      if new_reply.answer != old_reply.answer:
480 392ca296 Guido Trotter
        self._logger.warning("Got incoherent answers for query %s"
481 392ca296 Guido Trotter
                             " (serial: %s)" % (salt, new_reply.serial))
482 392ca296 Guido Trotter
      else:
483 392ca296 Guido Trotter
        self._logger.debug("Filtering confirming answer, with same"
484 392ca296 Guido Trotter
                           " serial for query %s" % salt)
485 392ca296 Guido Trotter
    else:
486 392ca296 Guido Trotter
      self._logger.debug("Filtering outdated answer for query %s"
487 392ca296 Guido Trotter
                         " serial: (%d < %d)" % (salt, old_reply.serial,
488 392ca296 Guido Trotter
                                                 new_reply.serial))
489 392ca296 Guido Trotter
490 392ca296 Guido Trotter
  def _HandleExpire(self, up):
491 392ca296 Guido Trotter
    # if we have no answer we have received none, before the expiration.
492 a9613def Guido Trotter
    if up.salt in self._answers:
493 a9613def Guido Trotter
      del self._answers[up.salt]
494 49b3fdac Iustin Pop
    if up.salt in self.consistent:
495 49b3fdac Iustin Pop
      del self.consistent[up.salt]
496 392ca296 Guido Trotter
497 392ca296 Guido Trotter
  def _HandleReply(self, up):
498 392ca296 Guido Trotter
    """Handle a single confd reply, and decide whether to filter it.
499 392ca296 Guido Trotter

500 392ca296 Guido Trotter
    @rtype: boolean
501 392ca296 Guido Trotter
    @return: True if the reply should be filtered, False if it should be passed
502 392ca296 Guido Trotter
             on to the up-callback
503 392ca296 Guido Trotter

504 392ca296 Guido Trotter
    """
505 392ca296 Guido Trotter
    filter_upcall = False
506 392ca296 Guido Trotter
    salt = up.salt
507 49b3fdac Iustin Pop
    if salt not in self.consistent:
508 49b3fdac Iustin Pop
      self.consistent[salt] = True
509 392ca296 Guido Trotter
    if salt not in self._answers:
510 392ca296 Guido Trotter
      # first answer for a query (don't filter, and record)
511 392ca296 Guido Trotter
      self._answers[salt] = up.server_reply
512 392ca296 Guido Trotter
    elif up.server_reply.serial > self._answers[salt].serial:
513 392ca296 Guido Trotter
      # newer answer (record, and compare contents)
514 392ca296 Guido Trotter
      old_answer = self._answers[salt]
515 392ca296 Guido Trotter
      self._answers[salt] = up.server_reply
516 392ca296 Guido Trotter
      if up.server_reply.answer == old_answer.answer:
517 392ca296 Guido Trotter
        # same content (filter) (version upgrade was unrelated)
518 392ca296 Guido Trotter
        filter_upcall = True
519 392ca296 Guido Trotter
        self._LogFilter(salt, up.server_reply, old_answer)
520 392ca296 Guido Trotter
      # else: different content, pass up a second answer
521 392ca296 Guido Trotter
    else:
522 392ca296 Guido Trotter
      # older or same-version answer (duplicate or outdated, filter)
523 39292d3a Iustin Pop
      if (up.server_reply.serial == self._answers[salt].serial and
524 39292d3a Iustin Pop
          up.server_reply.answer != self._answers[salt].answer):
525 49b3fdac Iustin Pop
        self.consistent[salt] = False
526 392ca296 Guido Trotter
      filter_upcall = True
527 392ca296 Guido Trotter
      self._LogFilter(salt, up.server_reply, self._answers[salt])
528 392ca296 Guido Trotter
529 392ca296 Guido Trotter
    return filter_upcall
530 392ca296 Guido Trotter
531 392ca296 Guido Trotter
  def __call__(self, up):
532 392ca296 Guido Trotter
    """Filtering callback
533 392ca296 Guido Trotter

534 392ca296 Guido Trotter
    @type up: L{ConfdUpcallPayload}
535 392ca296 Guido Trotter
    @param up: upper callback
536 392ca296 Guido Trotter

537 392ca296 Guido Trotter
    """
538 392ca296 Guido Trotter
    filter_upcall = False
539 392ca296 Guido Trotter
    if up.type == UPCALL_REPLY:
540 392ca296 Guido Trotter
      filter_upcall = self._HandleReply(up)
541 392ca296 Guido Trotter
    elif up.type == UPCALL_EXPIRE:
542 392ca296 Guido Trotter
      self._HandleExpire(up)
543 392ca296 Guido Trotter
544 392ca296 Guido Trotter
    if not filter_upcall:
545 392ca296 Guido Trotter
      self._callback(up)
546 04cdf663 Guido Trotter
547 04cdf663 Guido Trotter
548 04cdf663 Guido Trotter
class ConfdCountingCallback:
549 04cdf663 Guido Trotter
  """Callback that calls another callback, and counts the answers
550 04cdf663 Guido Trotter

551 04cdf663 Guido Trotter
  """
552 04cdf663 Guido Trotter
  def __init__(self, callback, logger=None):
553 04cdf663 Guido Trotter
    """Constructor for ConfdCountingCallback
554 04cdf663 Guido Trotter

555 04cdf663 Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
556 04cdf663 Guido Trotter
    @param callback: function to call when getting answers
557 04cdf663 Guido Trotter
    @type logger: logging.Logger
558 04cdf663 Guido Trotter
    @param logger: optional logger for internal conditions
559 04cdf663 Guido Trotter

560 04cdf663 Guido Trotter
    """
561 04cdf663 Guido Trotter
    if not callable(callback):
562 04cdf663 Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
563 04cdf663 Guido Trotter
564 04cdf663 Guido Trotter
    self._callback = callback
565 04cdf663 Guido Trotter
    self._logger = logger
566 04cdf663 Guido Trotter
    # answers contains a dict of salt -> count
567 04cdf663 Guido Trotter
    self._answers = {}
568 04cdf663 Guido Trotter
569 04cdf663 Guido Trotter
  def RegisterQuery(self, salt):
570 04cdf663 Guido Trotter
    if salt in self._answers:
571 04cdf663 Guido Trotter
      raise errors.ProgrammerError("query already registered")
572 04cdf663 Guido Trotter
    self._answers[salt] = 0
573 04cdf663 Guido Trotter
574 04cdf663 Guido Trotter
  def AllAnswered(self):
575 04cdf663 Guido Trotter
    """Have all the registered queries received at least an answer?
576 04cdf663 Guido Trotter

577 04cdf663 Guido Trotter
    """
578 04cdf663 Guido Trotter
    return utils.all(self._answers.values())
579 04cdf663 Guido Trotter
580 04cdf663 Guido Trotter
  def _HandleExpire(self, up):
581 04cdf663 Guido Trotter
    # if we have no answer we have received none, before the expiration.
582 04cdf663 Guido Trotter
    if up.salt in self._answers:
583 04cdf663 Guido Trotter
      del self._answers[up.salt]
584 04cdf663 Guido Trotter
585 04cdf663 Guido Trotter
  def _HandleReply(self, up):
586 04cdf663 Guido Trotter
    """Handle a single confd reply, and decide whether to filter it.
587 04cdf663 Guido Trotter

588 04cdf663 Guido Trotter
    @rtype: boolean
589 04cdf663 Guido Trotter
    @return: True if the reply should be filtered, False if it should be passed
590 04cdf663 Guido Trotter
             on to the up-callback
591 04cdf663 Guido Trotter

592 04cdf663 Guido Trotter
    """
593 04cdf663 Guido Trotter
    if up.salt in self._answers:
594 04cdf663 Guido Trotter
      self._answers[up.salt] += 1
595 04cdf663 Guido Trotter
596 04cdf663 Guido Trotter
  def __call__(self, up):
597 04cdf663 Guido Trotter
    """Filtering callback
598 04cdf663 Guido Trotter

599 04cdf663 Guido Trotter
    @type up: L{ConfdUpcallPayload}
600 04cdf663 Guido Trotter
    @param up: upper callback
601 04cdf663 Guido Trotter

602 04cdf663 Guido Trotter
    """
603 04cdf663 Guido Trotter
    if up.type == UPCALL_REPLY:
604 04cdf663 Guido Trotter
      self._HandleReply(up)
605 04cdf663 Guido Trotter
    elif up.type == UPCALL_EXPIRE:
606 04cdf663 Guido Trotter
      self._HandleExpire(up)
607 04cdf663 Guido Trotter
    self._callback(up)
608 5b349fd1 Iustin Pop
609 71e114da Iustin Pop
610 5b349fd1 Iustin Pop
def GetConfdClient(callback):
611 5b349fd1 Iustin Pop
  """Return a client configured using the given callback.
612 5b349fd1 Iustin Pop

613 5b349fd1 Iustin Pop
  This is handy to abstract the MC list and HMAC key reading.
614 5b349fd1 Iustin Pop

615 5b349fd1 Iustin Pop
  @attention: This should only be called on nodes which are part of a
616 5b349fd1 Iustin Pop
      cluster, since it depends on a valid (ganeti) data directory;
617 5b349fd1 Iustin Pop
      for code running outside of a cluster, you need to create the
618 5b349fd1 Iustin Pop
      client manually
619 5b349fd1 Iustin Pop

620 5b349fd1 Iustin Pop
  """
621 5b349fd1 Iustin Pop
  ss = ssconf.SimpleStore()
622 5b349fd1 Iustin Pop
  mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
623 5b349fd1 Iustin Pop
  mc_list = utils.ReadFile(mc_file).splitlines()
624 5b349fd1 Iustin Pop
  hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
625 5b349fd1 Iustin Pop
  return ConfdClient(hmac_key, mc_list, callback)